mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-19 13:07:56 +00:00
encode: handle uploads of the same file multiple times. Unfortunately we have to do almost as much work the second time around, to compute the full URI
This commit is contained in:
parent
54d737b0e7
commit
2d0e240466
@ -210,7 +210,12 @@ class Encoder(object):
|
||||
return d
|
||||
|
||||
def _encoded_segment(self, (shares, shareids), segnum):
|
||||
_assert(set(shareids) == set(self.landlords.keys()),
|
||||
# To generate the URI, we must generate the roothash, so we must
|
||||
# generate all shares, even if we aren't actually giving them to
|
||||
# anybody. This means that the set of share we create will be equal
|
||||
# to or larger than the set of landlords. If we have any landlord who
|
||||
# *doesn't* have a share, that's an error.
|
||||
_assert(set(self.landlords.keys()).issubset(set(shareids)),
|
||||
shareids=shareids, landlords=self.landlords)
|
||||
dl = []
|
||||
for i in range(len(shares)):
|
||||
@ -228,6 +233,8 @@ class Encoder(object):
|
||||
return dl
|
||||
|
||||
def send_subshare(self, shareid, segment_num, subshare):
|
||||
if shareid not in self.landlords:
|
||||
return defer.succeed(None)
|
||||
sh = self.landlords[shareid]
|
||||
return sh.callRemote("put_block", segment_num, subshare)
|
||||
|
||||
@ -247,6 +254,8 @@ class Encoder(object):
|
||||
# all_hashes[1] is the left child, == hash(ah[3]+ah[4])
|
||||
# all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
|
||||
self.share_root_hashes[shareid] = t[0]
|
||||
if shareid not in self.landlords:
|
||||
return defer.succeed(None)
|
||||
sh = self.landlords[shareid]
|
||||
return sh.callRemote("put_block_hashes", all_hashes)
|
||||
|
||||
@ -273,13 +282,15 @@ class Encoder(object):
|
||||
return defer.DeferredList(dl)
|
||||
|
||||
def send_one_share_hash_tree(self, shareid, needed_hashes):
|
||||
if shareid not in self.landlords:
|
||||
return defer.succeed(None)
|
||||
sh = self.landlords[shareid]
|
||||
return sh.callRemote("put_share_hashes", needed_hashes)
|
||||
|
||||
def close_all_shareholders(self):
|
||||
log.msg("%s: closing shareholders" % self)
|
||||
dl = []
|
||||
for shareid in range(self.num_shares):
|
||||
for shareid in self.landlords:
|
||||
dl.append(self.landlords[shareid].callRemote("close"))
|
||||
return defer.DeferredList(dl)
|
||||
|
||||
|
@ -3,10 +3,11 @@ import os
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.application import service
|
||||
from allmydata import client, queen
|
||||
from allmydata import client, queen, uri, download
|
||||
from allmydata.util import idlib, fileutil
|
||||
from foolscap.eventual import flushEventualQueue
|
||||
from twisted.python import log
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.web.client import getPage
|
||||
|
||||
def flush_but_dont_ignore(res):
|
||||
@ -129,6 +130,7 @@ class SystemTest(unittest.TestCase):
|
||||
def _do_upload(res):
|
||||
log.msg("UPLOADING")
|
||||
u = self.clients[0].getServiceNamed("uploader")
|
||||
self.uploader = u
|
||||
# we crank the max segsize down to 1024b for the duration of this
|
||||
# test, so we can exercise multiple segments. It is important
|
||||
# that this is not a multiple of the segment size, so that the
|
||||
@ -144,13 +146,26 @@ class SystemTest(unittest.TestCase):
|
||||
self.uri = uri
|
||||
dl = self.clients[1].getServiceNamed("downloader")
|
||||
self.downloader = dl
|
||||
d1 = dl.download_to_data(uri)
|
||||
return d1
|
||||
d.addCallback(_upload_done)
|
||||
def _download_done(data):
|
||||
|
||||
def _upload_again(res):
|
||||
# upload again. This ought to be short-circuited, however with
|
||||
# the way we currently generate URIs (i.e. because they include
|
||||
# the roothash), we have to do all of the encoding work, and only
|
||||
# get to save on the upload part.
|
||||
log.msg("UPLOADING AGAIN")
|
||||
options = {"max_segment_size": 1024}
|
||||
d1 = self.uploader.upload_data(DATA, options)
|
||||
d.addCallback(_upload_again)
|
||||
|
||||
def _download_to_data(res):
|
||||
log.msg("DOWNLOADING")
|
||||
return self.downloader.download_to_data(self.uri)
|
||||
d.addCallback(_download_to_data)
|
||||
def _download_to_data_done(data):
|
||||
log.msg("download finished")
|
||||
self.failUnlessEqual(data, DATA)
|
||||
d.addCallback(_download_done)
|
||||
d.addCallback(_download_to_data_done)
|
||||
|
||||
target_filename = os.path.join(self.basedir, "download.target")
|
||||
def _download_to_filename(res):
|
||||
@ -173,9 +188,31 @@ class SystemTest(unittest.TestCase):
|
||||
self.failUnlessEqual(newdata, DATA)
|
||||
d.addCallback(_download_to_filehandle_done)
|
||||
|
||||
def _download_nonexistent_uri(res):
|
||||
baduri = self.mangle_uri(self.uri)
|
||||
d1 = self.downloader.download_to_data(baduri)
|
||||
def _baduri_should_fail(res):
|
||||
self.failUnless(isinstance(res, Failure))
|
||||
self.failUnless(res.check(download.NotEnoughPeersError))
|
||||
# TODO: files that have zero peers should get a special kind
|
||||
# of NotEnoughPeersError, which can be used to suggest that
|
||||
# the URI might be wrong or that they've nver uploaded the
|
||||
# file in the first place.
|
||||
d1.addBoth(_baduri_should_fail)
|
||||
return d1
|
||||
d.addCallback(_download_nonexistent_uri)
|
||||
return d
|
||||
test_upload_and_download.timeout = 600
|
||||
|
||||
def flip_bit(self, good):
|
||||
return good[:-1] + chr(ord(good[-1]) ^ 0x01)
|
||||
|
||||
def mangle_uri(self, gooduri):
|
||||
pieces = list(uri.unpack_uri(gooduri))
|
||||
# [4] is the verifierid
|
||||
pieces[4] = self.flip_bit(pieces[4])
|
||||
return uri.pack_uri(*pieces)
|
||||
|
||||
def test_vdrive(self):
|
||||
self.basedir = "test_system/SystemTest/test_vdrive"
|
||||
self.data = DATA = "Some data to publish to the virtual drive\n"
|
||||
|
Loading…
Reference in New Issue
Block a user