mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-26 21:38:35 +00:00
naming: Rename a few things which I touched or changed in the recent patch to download-without-decrypting.
Rename "downloadable" to "target". Rename "u" to "v" in FileDownloader.__init__(). Rename "_uri" to "_verifycap" in FileDownloader. Rename "_downloadable" to "_target" in FileDownloader. Rename "FileDownloader" to "CiphertextDownloader".
This commit is contained in:
parent
600196f571
commit
157e365d2b
src/allmydata
@ -44,27 +44,27 @@ class DownloadResults:
|
||||
|
||||
class DecryptingTarget(log.PrefixingLogMixin):
|
||||
implements(IDownloadTarget, IConsumer)
|
||||
def __init__(self, downloadable, key, _log_msg_id=None):
|
||||
precondition(IDownloadTarget.providedBy(downloadable), downloadable)
|
||||
self.downloadable = downloadable
|
||||
def __init__(self, target, key, _log_msg_id=None):
|
||||
precondition(IDownloadTarget.providedBy(target), target)
|
||||
self.target = target
|
||||
self._decryptor = AES(key)
|
||||
prefix = str(downloadable)
|
||||
prefix = str(target)
|
||||
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
|
||||
def registerProducer(self, producer, streaming):
|
||||
if IConsumer.providedBy(self.downloadable):
|
||||
self.downloadable.registerProducer(producer, streaming)
|
||||
if IConsumer.providedBy(self.target):
|
||||
self.target.registerProducer(producer, streaming)
|
||||
def unregisterProducer(self):
|
||||
if IConsumer.providedBy(self.downloadable):
|
||||
self.downloadable.unregisterProducer()
|
||||
if IConsumer.providedBy(self.target):
|
||||
self.target.unregisterProducer()
|
||||
def write(self, ciphertext):
|
||||
plaintext = self._decryptor.process(ciphertext)
|
||||
self.downloadable.write(plaintext)
|
||||
self.target.write(plaintext)
|
||||
def open(self, size):
|
||||
self.downloadable.open(size)
|
||||
self.target.open(size)
|
||||
def close(self):
|
||||
self.downloadable.close()
|
||||
self.target.close()
|
||||
def finish(self):
|
||||
return self.downloadable.finish()
|
||||
return self.target.finish()
|
||||
|
||||
class ValidatedThingObtainer:
|
||||
def __init__(self, validatedthingproxies, debugname, log_id):
|
||||
@ -482,7 +482,7 @@ class SegmentDownloader:
|
||||
"""I am responsible for downloading all the blocks for a single segment
|
||||
of data.
|
||||
|
||||
I am a child of the FileDownloader.
|
||||
I am a child of the CiphertextDownloader.
|
||||
"""
|
||||
|
||||
def __init__(self, parent, segmentnumber, needed_shares, results):
|
||||
@ -603,36 +603,36 @@ class DownloadStatus:
|
||||
def set_results(self, value):
|
||||
self.results = value
|
||||
|
||||
class FileDownloader(log.PrefixingLogMixin):
|
||||
class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
""" I download shares, check their integrity, then decode them, check the integrity of the
|
||||
resulting ciphertext, then and write it to my target. """
|
||||
implements(IPushProducer)
|
||||
_status = None
|
||||
|
||||
def __init__(self, client, u, downloadable):
|
||||
precondition(IVerifierURI.providedBy(u), u)
|
||||
precondition(IDownloadTarget.providedBy(downloadable), downloadable)
|
||||
def __init__(self, client, v, target):
|
||||
precondition(IVerifierURI.providedBy(v), v)
|
||||
precondition(IDownloadTarget.providedBy(target), target)
|
||||
|
||||
prefix=base32.b2a_l(u.get_storage_index()[:8], 60)
|
||||
prefix=base32.b2a_l(v.get_storage_index()[:8], 60)
|
||||
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
|
||||
self._client = client
|
||||
|
||||
self._uri = u
|
||||
self._storage_index = u.get_storage_index()
|
||||
self._uri_extension_hash = u.uri_extension_hash
|
||||
self._verifycap = v
|
||||
self._storage_index = v.get_storage_index()
|
||||
self._uri_extension_hash = v.uri_extension_hash
|
||||
self._vup = None # ValidatedExtendedURIProxy
|
||||
|
||||
self._started = time.time()
|
||||
self._status = s = DownloadStatus()
|
||||
s.set_status("Starting")
|
||||
s.set_storage_index(self._storage_index)
|
||||
s.set_size(self._uri.size)
|
||||
s.set_size(self._verifycap.size)
|
||||
s.set_helper(False)
|
||||
s.set_active(True)
|
||||
|
||||
self._results = DownloadResults()
|
||||
s.set_results(self._results)
|
||||
self._results.file_size = self._uri.size
|
||||
self._results.file_size = self._verifycap.size
|
||||
self._results.timings["servers_peer_selection"] = {}
|
||||
self._results.timings["fetch_per_server"] = {}
|
||||
self._results.timings["cumulative_fetch"] = 0.0
|
||||
@ -642,9 +642,9 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
|
||||
self._paused = False
|
||||
self._stopped = False
|
||||
if IConsumer.providedBy(downloadable):
|
||||
downloadable.registerProducer(self, True)
|
||||
self._downloadable = downloadable
|
||||
if IConsumer.providedBy(target):
|
||||
target.registerProducer(self, True)
|
||||
self._target = target
|
||||
self._opened = False
|
||||
|
||||
self.active_buckets = {} # k: shnum, v: bucket
|
||||
@ -656,7 +656,7 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
self._ciphertext_hasher = hashutil.crypttext_hasher()
|
||||
|
||||
self._bytes_done = 0
|
||||
self._status.set_progress(float(self._bytes_done)/self._uri.size)
|
||||
self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
|
||||
|
||||
# _got_uri_extension() will create the following:
|
||||
# self._crypttext_hash_tree
|
||||
@ -705,8 +705,8 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
self._status.set_status("Finished")
|
||||
self._status.set_active(False)
|
||||
self._status.set_paused(False)
|
||||
if IConsumer.providedBy(self._downloadable):
|
||||
self._downloadable.unregisterProducer()
|
||||
if IConsumer.providedBy(self._target):
|
||||
self._target.unregisterProducer()
|
||||
return res
|
||||
d.addBoth(_finished)
|
||||
def _failed(why):
|
||||
@ -786,8 +786,8 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
now = time.time()
|
||||
self._results.timings["peer_selection"] = now - self._started
|
||||
|
||||
if len(self._share_buckets) < self._uri.needed_shares:
|
||||
raise NotEnoughSharesError(len(self._share_buckets), self._uri.needed_shares)
|
||||
if len(self._share_buckets) < self._verifycap.needed_shares:
|
||||
raise NotEnoughSharesError(len(self._share_buckets), self._verifycap.needed_shares)
|
||||
|
||||
#for s in self._share_vbuckets.values():
|
||||
# for vb in s:
|
||||
@ -806,7 +806,7 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
|
||||
vups = []
|
||||
for sharenum, bucket in self._share_buckets:
|
||||
vups.append(ValidatedExtendedURIProxy(bucket, self._uri, self._fetch_failures))
|
||||
vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
|
||||
vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
|
||||
d = vto.start()
|
||||
|
||||
@ -818,13 +818,13 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
|
||||
self._vup = vup
|
||||
self._codec = codec.CRSDecoder()
|
||||
self._codec.set_params(self._vup.segment_size, self._uri.needed_shares, self._uri.total_shares)
|
||||
self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
|
||||
self._tail_codec = codec.CRSDecoder()
|
||||
self._tail_codec.set_params(self._vup.tail_segment_size, self._uri.needed_shares, self._uri.total_shares)
|
||||
self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
|
||||
|
||||
self._current_segnum = 0
|
||||
|
||||
self._share_hash_tree = hashtree.IncompleteHashTree(self._uri.total_shares)
|
||||
self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
|
||||
self._share_hash_tree.set_hashes({0: vup.share_root_hash})
|
||||
|
||||
self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
|
||||
@ -858,17 +858,17 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
"""either return a mapping from shnum to a ValidatedReadBucketProxy that can
|
||||
provide data for that share, or raise NotEnoughSharesError"""
|
||||
|
||||
while len(self.active_buckets) < self._uri.needed_shares:
|
||||
while len(self.active_buckets) < self._verifycap.needed_shares:
|
||||
# need some more
|
||||
handled_shnums = set(self.active_buckets.keys())
|
||||
available_shnums = set(self._share_vbuckets.keys())
|
||||
potential_shnums = list(available_shnums - handled_shnums)
|
||||
if len(potential_shnums) < (self._uri.needed_shares - len(self.active_buckets)):
|
||||
if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
|
||||
raise NotEnoughSharesError
|
||||
# For the next share, choose a primary share if available, else a randomly chosen
|
||||
# secondary share.
|
||||
potential_shnums.sort()
|
||||
if potential_shnums[0] < self._uri.needed_shares:
|
||||
if potential_shnums[0] < self._verifycap.needed_shares:
|
||||
shnum = potential_shnums[0]
|
||||
else:
|
||||
shnum = random.choice(potential_shnums)
|
||||
@ -919,7 +919,7 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
100.0 * segnum / self._vup.num_segments))
|
||||
# memory footprint: when the SegmentDownloader finishes pulling down
|
||||
# all shares, we have 1*segment_size of usage.
|
||||
segmentdler = SegmentDownloader(self, segnum, self._uri.needed_shares,
|
||||
segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
|
||||
self._results)
|
||||
started = time.time()
|
||||
d = segmentdler.start()
|
||||
@ -960,13 +960,13 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
def _got_segment(self, buffers):
|
||||
precondition(self._crypttext_hash_tree)
|
||||
started_decrypt = time.time()
|
||||
self._status.set_progress(float(self._current_segnum)/self._uri.size)
|
||||
self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
|
||||
|
||||
if self._current_segnum + 1 == self._vup.num_segments:
|
||||
# This is the last segment.
|
||||
# Trim off any padding added by the upload side. We never send empty segments. If
|
||||
# the data was an exact multiple of the segment size, the last segment will be full.
|
||||
tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._uri.needed_shares)
|
||||
tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
|
||||
num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
|
||||
# Remove buffers which don't contain any part of the tail.
|
||||
del buffers[num_buffers_used:]
|
||||
@ -986,13 +986,13 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
# Then write this segment to the target.
|
||||
if not self._opened:
|
||||
self._opened = True
|
||||
self._downloadable.open(self._uri.size)
|
||||
self._target.open(self._verifycap.size)
|
||||
|
||||
for buffer in buffers:
|
||||
self._downloadable.write(buffer)
|
||||
self._target.write(buffer)
|
||||
self._bytes_done += len(buffer)
|
||||
|
||||
self._status.set_progress(float(self._bytes_done)/self._uri.size)
|
||||
self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
|
||||
self._current_segnum += 1
|
||||
|
||||
if self._results:
|
||||
@ -1010,10 +1010,10 @@ class FileDownloader(log.PrefixingLogMixin):
|
||||
"bad crypttext_hash: computed=%s, expected=%s" %
|
||||
(base32.b2a(self._ciphertext_hasher.digest()),
|
||||
base32.b2a(self._vup.crypttext_hash)))
|
||||
_assert(self._bytes_done == self._uri.size, self._bytes_done, self._uri.size)
|
||||
_assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
|
||||
self._status.set_progress(1)
|
||||
self._downloadable.close()
|
||||
return self._downloadable.finish()
|
||||
self._target.close()
|
||||
return self._target.finish()
|
||||
def get_download_status(self):
|
||||
return self._status
|
||||
|
||||
@ -1138,7 +1138,7 @@ class Downloader(service.MultiService):
|
||||
self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
|
||||
|
||||
target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
|
||||
dl = FileDownloader(self.parent, u.get_verify_cap(), target)
|
||||
dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target)
|
||||
self._add_download(dl)
|
||||
d = dl.start()
|
||||
return d
|
||||
|
@ -1274,11 +1274,11 @@ class IDownloadTarget(Interface):
|
||||
is a Failure object indicating what went wrong. No further methods
|
||||
will be invoked on the IDownloadTarget after fail()."""
|
||||
def register_canceller(cb):
|
||||
"""The FileDownloader uses this to register a no-argument function
|
||||
"""The CiphertextDownloader uses this to register a no-argument function
|
||||
that the target can call to cancel the download. Once this canceller
|
||||
is invoked, no further calls to write() or close() will be made."""
|
||||
def finish():
|
||||
"""When the FileDownloader is done, this finish() function will be
|
||||
"""When the CiphertextDownloader is done, this finish() function will be
|
||||
called. Whatever it returns will be returned to the invoker of
|
||||
Downloader.download.
|
||||
"""
|
||||
|
@ -494,11 +494,11 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
||||
if not target:
|
||||
target = download.Data()
|
||||
target = download.DecryptingTarget(target, u.key)
|
||||
fd = download.FileDownloader(client, u.get_verify_cap(), target)
|
||||
fd = download.CiphertextDownloader(client, u.get_verify_cap(), target)
|
||||
|
||||
# we manually cycle the FileDownloader through a number of steps that
|
||||
# we manually cycle the CiphertextDownloader through a number of steps that
|
||||
# would normally be sequenced by a Deferred chain in
|
||||
# FileDownloader.start(), to give us more control over the process.
|
||||
# CiphertextDownloader.start(), to give us more control over the process.
|
||||
# In particular, by bypassing _get_all_shareholders, we skip
|
||||
# permuted-peerlist selection.
|
||||
for shnum, bucket in shareholders.items():
|
||||
@ -515,7 +515,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
||||
|
||||
d = defer.succeed(None)
|
||||
|
||||
# have the FileDownloader retrieve a copy of uri_extension itself
|
||||
# have the CiphertextDownloader retrieve a copy of uri_extension itself
|
||||
d.addCallback(fd._obtain_uri_extension)
|
||||
|
||||
if "corrupt_crypttext_hashes" in recover_mode:
|
||||
@ -535,7 +535,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
||||
return fd._vup
|
||||
d.addCallback(_corrupt_crypttext_hashes)
|
||||
|
||||
# also have the FileDownloader ask for hash trees
|
||||
# also have the CiphertextDownloader ask for hash trees
|
||||
d.addCallback(fd._get_crypttext_hash_tree)
|
||||
|
||||
d.addCallback(fd._download_all_segments)
|
||||
|
Loading…
x
Reference in New Issue
Block a user