immutable/download.py: wrap to 80cols, no functional changes

This commit is contained in:
Brian Warner 2009-10-05 12:25:42 -07:00
parent 7ef99c5e33
commit 19d336513c

View File

@ -67,9 +67,10 @@ class DecryptingTarget(log.PrefixingLogMixin):
self.target.close()
def finish(self):
return self.target.finish()
# The following methods is just to pass through to the next target, and just because that
# target might be a repairer.DownUpConnector, and just because the current CHKUpload object
# expects to find the storage index in its Uploadable.
# The following methods is just to pass through to the next target, and
# just because that target might be a repairer.DownUpConnector, and just
# because the current CHKUpload object expects to find the storage index
# in its Uploadable.
def set_storageindex(self, storageindex):
self.target.set_storageindex(storageindex)
def set_encodingparams(self, encodingparams):
@ -104,7 +105,9 @@ class ValidatedThingObtainer:
def _try_the_next_one(self):
vtp = self._validatedthingproxies.pop(0)
d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
# start() obtains, validates, and callsback-with the thing or else
# errbacks
d = vtp.start()
d.addErrback(self._bad, vtp)
return d
@ -113,10 +116,12 @@ class ValidatedThingObtainer:
class ValidatedCrypttextHashTreeProxy:
implements(IValidatedThingProxy)
""" I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
start() method that fires with self once I get a valid one). """
def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
""" I am a front-end for a remote crypttext hash tree using a local
ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the
Validated Thing protocol (i.e., I have a start() method that fires with
self once I get a valid one)."""
def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments,
fetch_failures=None):
# fetch_failures is for debugging -- see test_encode.py
self._readbucketproxy = readbucketproxy
self._num_segments = num_segments
@ -131,8 +136,10 @@ class ValidatedCrypttextHashTreeProxy:
if self._fetch_failures is not None:
self._fetch_failures["crypttext_hash_tree"] += 1
raise BadOrMissingHash(le)
# If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
# TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
# If we now have enough of the crypttext hash tree to integrity-check
# *any* segment of ciphertext, then we are done. TODO: It would have
# better alacrity if we downloaded only part of the crypttext hash
# tree at a time.
for segnum in range(self._num_segments):
if self._crypttext_hash_tree.needed_hashes(segnum):
raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
@ -145,8 +152,8 @@ class ValidatedCrypttextHashTreeProxy:
class ValidatedExtendedURIProxy:
implements(IValidatedThingProxy)
""" I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
retrieving and validating the elements from the UEB. """
""" I am a front-end for a remote UEB (using a local ReadBucketProxy),
responsible for retrieving and validating the elements from the UEB."""
def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
# fetch_failures is for debugging -- see test_encode.py
@ -177,7 +184,9 @@ class ValidatedExtendedURIProxy:
h = hashutil.uri_extension_hash(data)
if h != self._verifycap.uri_extension_hash:
msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
(self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
(self._readbucketproxy,
base32.b2a(self._verifycap.uri_extension_hash),
base32.b2a(h)))
if self._fetch_failures is not None:
self._fetch_failures["uri_extension"] += 1
raise BadURIExtensionHashValue(msg)
@ -185,38 +194,46 @@ class ValidatedExtendedURIProxy:
return data
def _parse_and_validate(self, data):
self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
self.share_size = mathutil.div_ceil(self._verifycap.size,
self._verifycap.needed_shares)
d = uri.unpack_extension(data)
# There are several kinds of things that can be found in a UEB. First, things that we
# really need to learn from the UEB in order to do this download. Next: things which are
# optional but not redundant -- if they are present in the UEB they will get used. Next,
# things that are optional and redundant. These things are required to be consistent:
# they don't have to be in the UEB, but if they are in the UEB then they will be checked
# for consistency with the already-known facts, and if they are inconsistent then an
# exception will be raised. These things aren't actually used -- they are just tested
# for consistency and ignored. Finally: things which are deprecated -- they ought not be
# in the UEB at all, and if they are present then a warning will be logged but they are
# otherwise ignored.
# There are several kinds of things that can be found in a UEB.
# First, things that we really need to learn from the UEB in order to
# do this download. Next: things which are optional but not redundant
# -- if they are present in the UEB they will get used. Next, things
# that are optional and redundant. These things are required to be
# consistent: they don't have to be in the UEB, but if they are in
# the UEB then they will be checked for consistency with the
# already-known facts, and if they are inconsistent then an exception
# will be raised. These things aren't actually used -- they are just
# tested for consistency and ignored. Finally: things which are
# deprecated -- they ought not be in the UEB at all, and if they are
# present then a warning will be logged but they are otherwise
# ignored.
# First, things that we really need to learn from the UEB: segment_size,
# crypttext_root_hash, and share_root_hash.
# First, things that we really need to learn from the UEB:
# segment_size, crypttext_root_hash, and share_root_hash.
self.segment_size = d['segment_size']
self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
self.block_size = mathutil.div_ceil(self.segment_size,
self._verifycap.needed_shares)
self.num_segments = mathutil.div_ceil(self._verifycap.size,
self.segment_size)
self.tail_data_size = self._verifycap.size % self.segment_size
if not self.tail_data_size:
self.tail_data_size = self.segment_size
# padding for erasure code
self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
self._verifycap.needed_shares)
# Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
# matches this read-cap or verify-cap. The integrity check on the shares is not
# sufficient to prevent the original encoder from creating some shares of file A and
# other shares of file B.
# Ciphertext hash tree root is mandatory, so that there is at most
# one ciphertext that matches this read-cap or verify-cap. The
# integrity check on the shares is not sufficient to prevent the
# original encoder from creating some shares of file A and other
# shares of file B.
self.crypttext_root_hash = d['crypttext_root_hash']
self.share_root_hash = d['share_root_hash']
@ -229,8 +246,9 @@ class ValidatedExtendedURIProxy:
raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
# Next: things that are optional, redundant, and required to be consistent: codec_name,
# codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
# Next: things that are optional, redundant, and required to be
# consistent: codec_name, codec_params, tail_codec_params,
# num_segments, size, needed_shares, total_shares
if d.has_key('codec_name'):
if d['codec_name'] != "crs":
raise UnsupportedErasureCodec(d['codec_name'])
@ -238,16 +256,17 @@ class ValidatedExtendedURIProxy:
if d.has_key('codec_params'):
ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
if ucpss != self.segment_size:
raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
"self.segment_size: %s" % (ucpss, self.segment_size))
raise BadURIExtension("inconsistent erasure code params: "
"ucpss: %s != self.segment_size: %s" %
(ucpss, self.segment_size))
if ucpns != self._verifycap.needed_shares:
raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
"self._verifycap.needed_shares: %s" % (ucpns,
self._verifycap.needed_shares))
"self._verifycap.needed_shares: %s" %
(ucpns, self._verifycap.needed_shares))
if ucpts != self._verifycap.total_shares:
raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
"self._verifycap.total_shares: %s" % (ucpts,
self._verifycap.total_shares))
"self._verifycap.total_shares: %s" %
(ucpts, self._verifycap.total_shares))
if d.has_key('tail_codec_params'):
utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
@ -291,7 +310,8 @@ class ValidatedExtendedURIProxy:
"total shares: %s" % (self._verifycap.total_shares,
d['total_shares']))
# Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
# Finally, things that are deprecated and ignored: plaintext_hash,
# plaintext_root_hash
if d.get('plaintext_hash'):
log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
"and is no longer used. Ignoring. %s" % (self,))
@ -302,27 +322,33 @@ class ValidatedExtendedURIProxy:
return self
def start(self):
""" Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
it. Returns a deferred which is called back with self once the fetch is successful, or
is erred back if it fails. """
"""Fetch the UEB from bucket, compare its hash to the hash from
verifycap, then parse it. Returns a deferred which is called back
with self once the fetch is successful, or is erred back if it
fails."""
d = self._readbucketproxy.get_uri_extension()
d.addCallback(self._check_integrity)
d.addCallback(self._parse_and_validate)
return d
class ValidatedReadBucketProxy(log.PrefixingLogMixin):
"""I am a front-end for a remote storage bucket, responsible for retrieving and validating
data from that bucket.
"""I am a front-end for a remote storage bucket, responsible for
retrieving and validating data from that bucket.
My get_block() method is used by BlockDownloaders.
"""
def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
""" share_hash_tree is required to have already been initialized with the root hash
(the number-0 hash), using the share_root_hash from the UEB """
def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
block_size, share_size):
""" share_hash_tree is required to have already been initialized with
the root hash (the number-0 hash), using the share_root_hash from the
UEB"""
precondition(share_hash_tree[0] is not None, share_hash_tree)
prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
prefix = "%d-%s-%s" % (sharenum, bucket,
base32.b2a_l(share_hash_tree[0][:8], 60))
log.PrefixingLogMixin.__init__(self,
facility="tahoe.immutable.download",
prefix=prefix)
self.sharenum = sharenum
self.bucket = bucket
self.share_hash_tree = share_hash_tree
@ -343,7 +369,8 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
# We might need to grab some elements of our block hash tree, to
# validate the requested block up to the share hash.
blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
# We don't need the root of the block hash tree, as that comes in the share tree.
# We don't need the root of the block hash tree, as that comes in the
# share tree.
blockhashesneeded.discard(0)
d2 = self.bucket.get_block_hashes(blockhashesneeded)
@ -353,14 +380,16 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
thisblocksize = self.share_size % self.block_size
if thisblocksize == 0:
thisblocksize = self.block_size
d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
d3 = self.bucket.get_block_data(blocknum,
self.block_size, thisblocksize)
dl = deferredutil.gatherResults([d1, d2, d3])
dl.addCallback(self._got_data, blocknum)
return dl
def _got_data(self, results, blocknum):
precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
precondition(blocknum < self.num_blocks,
self, blocknum, self.num_blocks)
sharehashes, blockhashes, blockdata = results
try:
sharehashes = dict(sharehashes)
@ -374,22 +403,25 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
try:
if self.share_hash_tree.needed_hashes(self.sharenum):
# This will raise exception if the values being passed do not match the root
# node of self.share_hash_tree.
# This will raise exception if the values being passed do not
# match the root node of self.share_hash_tree.
try:
self.share_hash_tree.set_hashes(sharehashes)
except IndexError, le:
# Weird -- sharehashes contained index numbers outside of the range that fit
# into this hash tree.
# Weird -- sharehashes contained index numbers outside of
# the range that fit into this hash tree.
raise BadOrMissingHash(le)
# To validate a block we need the root of the block hash tree, which is also one of
# the leafs of the share hash tree, and is called "the share hash".
# To validate a block we need the root of the block hash tree,
# which is also one of the leafs of the share hash tree, and is
# called "the share hash".
if not self.block_hash_tree[0]: # empty -- no root node yet
# Get the share hash from the share hash tree.
share_hash = self.share_hash_tree.get_leaf(self.sharenum)
if not share_hash:
raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
# No root node in block_hash_tree and also the share hash
# wasn't sent by the server.
raise hashtree.NotEnoughHashesError
self.block_hash_tree.set_hashes({0: share_hash})
if self.block_hash_tree.needed_hashes(blocknum):
@ -662,7 +694,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
if IConsumer.providedBy(target):
target.registerProducer(self, True)
self._target = target
self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
# Repairer (uploader) needs the storageindex.
self._target.set_storageindex(self._storage_index)
self._monitor = monitor
self._opened = False
@ -715,7 +748,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
# first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders)
# now get the uri_extension block from somebody and integrity check it and parse and validate its contents
# now get the uri_extension block from somebody and integrity check
# it and parse and validate its contents
d.addCallback(self._obtain_uri_extension)
d.addCallback(self._get_crypttext_hash_tree)
# once we know that, we can download blocks from everybody
@ -734,11 +768,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
self._status.set_status("Failed")
self._status.set_active(False)
if why.check(DownloadStopped):
# DownloadStopped just means the consumer aborted the download; not so scary.
# DownloadStopped just means the consumer aborted the
# download; not so scary.
self.log("download stopped", level=log.UNUSUAL)
else:
# This is really unusual, and deserves maximum forensics.
self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
self.log("download failed!", failure=why, level=log.SCARY,
umid="lp1vaQ")
return why
d.addErrback(_failed)
d.addCallback(self._done)
@ -885,12 +921,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
if self._status:
self._status.set_status("Retrieving crypttext hash tree")
vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
vto = ValidatedThingObtainer(vchtps, debugname="vchtps",
log_id=self._parentmsgid)
d = vto.start()
def _got_crypttext_hash_tree(res):
# Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
# with hashes.
# Good -- the self._crypttext_hash_tree that we passed to vchtp
# is now populated with hashes.
if self._results:
elapsed = time.time() - _get_crypttext_hash_tree_started
self._results.timings["hashtrees"] = elapsed
@ -898,15 +935,16 @@ class CiphertextDownloader(log.PrefixingLogMixin):
return d
def _activate_enough_buckets(self):
"""either return a mapping from shnum to a ValidatedReadBucketProxy that can
provide data for that share, or raise NotEnoughSharesError"""
"""either return a mapping from shnum to a ValidatedReadBucketProxy
that can provide data for that share, or raise NotEnoughSharesError"""
while len(self.active_buckets) < self._verifycap.needed_shares:
# need some more
handled_shnums = set(self.active_buckets.keys())
available_shnums = set(self._share_vbuckets.keys())
potential_shnums = list(available_shnums - handled_shnums)
if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
if len(potential_shnums) < (self._verifycap.needed_shares
- len(self.active_buckets)):
have = len(potential_shnums) + len(self.active_buckets)
msg = "Unable to activate enough shares: have %d, need %d" \
% (have, self._verifycap.needed_shares)
@ -914,8 +952,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
raise NotEnoughSharesError(msg)
else:
raise NoSharesError(msg)
# For the next share, choose a primary share if available, else a randomly chosen
# secondary share.
# For the next share, choose a primary share if available, else a
# randomly chosen secondary share.
potential_shnums.sort()
if potential_shnums[0] < self._verifycap.needed_shares:
shnum = potential_shnums[0]
@ -969,7 +1007,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
100.0 * segnum / self._vup.num_segments))
# memory footprint: when the SegmentDownloader finishes pulling down
# all shares, we have 1*segment_size of usage.
segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
segmentdler = SegmentDownloader(self, segnum,
self._verifycap.needed_shares,
self._results)
started = time.time()
d = segmentdler.start()
@ -1014,8 +1053,9 @@ class CiphertextDownloader(log.PrefixingLogMixin):
if self._current_segnum + 1 == self._vup.num_segments:
# This is the last segment.
# Trim off any padding added by the upload side. We never send empty segments. If
# the data was an exact multiple of the segment size, the last segment will be full.
# Trim off any padding added by the upload side. We never send
# empty segments. If the data was an exact multiple of the
# segment size, the last segment will be full.
tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
# Remove buffers which don't contain any part of the tail.
@ -1089,9 +1129,10 @@ class FileName:
pass # we won't use it
def finish(self):
pass
# The following methods are just because the target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
# The following methods are just because the target might be a
# repairer.DownUpConnector, and just because the current CHKUpload object
# expects to find the storage index and encoding parameters in its
# Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
@ -1114,9 +1155,10 @@ class Data:
pass # we won't use it
def finish(self):
return self.data
# The following methods are just because the target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
# The following methods are just because the target might be a
# repairer.DownUpConnector, and just because the current CHKUpload object
# expects to find the storage index and encoding parameters in its
# Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
@ -1144,9 +1186,10 @@ class FileHandle:
pass
def finish(self):
return self._filehandle
# The following methods are just because the target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
# The following methods are just because the target might be a
# repairer.DownUpConnector, and just because the current CHKUpload object
# expects to find the storage index and encoding parameters in its
# Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
@ -1175,9 +1218,10 @@ class ConsumerAdapter:
pass
def finish(self):
return self._consumer
# The following methods are just because the target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
# The following methods are just because the target might be a
# repairer.DownUpConnector, and just because the current CHKUpload object
# expects to find the storage index and encoding parameters in its
# Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):