mutable: simplify Retrieve._process_segment() to use a gatherDeferred

This commit is contained in:
Brian Warner 2012-01-07 14:28:57 -08:00
parent 7f0bc64325
commit f1752f54c0

View File

@ -8,7 +8,7 @@ from twisted.internet.interfaces import IPushProducer, IConsumer
from foolscap.api import eventually, fireEventually
from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
DownloadStopped, MDMF_VERSION, SDMF_VERSION
from allmydata.util import hashutil, log, mathutil
from allmydata.util import hashutil, log, mathutil, deferredutil
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
from allmydata.storage.server import si_b2a
@ -323,10 +323,10 @@ class Retrieve:
self._block_hash_trees = None
self._setup_encoding_parameters()
# _decode_blocks() expects the output of a DeferredList that contains
# the outputs of _validate_block() (each of which is a dict mapping
# shnum to (block,salt) bytestrings).
d = self._decode_blocks([(True, blocks_and_salts)], segnum)
# _decode_blocks() expects the output of a gatherResults that
# contains the outputs of _validate_block() (each of which is a dict
# mapping shnum to (block,salt) bytestrings).
d = self._decode_blocks([blocks_and_salts], segnum)
d.addCallback(self._decrypt_segment)
return d
@ -636,7 +636,11 @@ class Retrieve:
dl.addCallback(self._validate_block, segnum, reader, reader.server, started)
dl.addErrback(self._validation_or_decoding_failed, [reader])
ds.append(dl)
dl = defer.DeferredList(ds)
# _validation_or_decoding_failed is supposed to eat any recoverable
# errors (like corrupt shares), returning a None when that happens.
# If it raises an exception itself, or if it can't handle the error,
# the download should fail. So we can use gatherResults() here.
dl = deferredutil.gatherResults(ds)
if self._verify:
dl.addCallback(lambda ignored: "")
dl.addCallback(self._set_segment)
@ -645,35 +649,36 @@ class Retrieve:
return dl
def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
def _maybe_decode_and_decrypt_segment(self, results, segnum):
"""
I take the results of fetching and validating the blocks from a
callback chain in another method. If the results are such that
they tell me that validation and fetching succeeded without
incident, I will proceed with decoding and decryption.
Otherwise, I will do nothing.
I take the results of fetching and validating the blocks from
_process_segment. If validation and fetching succeeded without
incident, I will proceed with decoding and decryption. Otherwise, I
will do nothing.
"""
self.log("trying to decode and decrypt segment %d" % segnum)
failures = False
for block_and_salt in blocks_and_salts:
if not block_and_salt[0] or block_and_salt[1] == None:
self.log("some validation operations failed; not proceeding")
failures = True
break
if not failures:
self.log("everything looks ok, building segment %d" % segnum)
d = self._decode_blocks(blocks_and_salts, segnum)
d.addCallback(self._decrypt_segment)
d.addErrback(self._validation_or_decoding_failed,
self._active_readers)
# check to see whether we've been paused before writing
# anything.
d.addCallback(self._check_for_paused)
d.addCallback(self._check_for_stopped)
d.addCallback(self._set_segment)
return d
else:
# 'results' is the output of a gatherResults set up in
# _process_segment(). Each component Deferred will either contain the
# non-Failure output of _validate_block() for a single block (i.e.
# {segnum:(block,salt)}), or None if _validate_block threw an
# exception and _validation_or_decoding_failed handled it (by
# dropping that server).
if None in results:
self.log("some validation operations failed; not proceeding")
return defer.succeed(None)
self.log("everything looks ok, building segment %d" % segnum)
d = self._decode_blocks(results, segnum)
d.addCallback(self._decrypt_segment)
d.addErrback(self._validation_or_decoding_failed,
self._active_readers)
# check to see whether we've been paused before writing
# anything.
d.addCallback(self._check_for_paused)
d.addCallback(self._check_for_stopped)
d.addCallback(self._set_segment)
return d
def _set_segment(self, segment):
@ -853,19 +858,16 @@ class Retrieve:
return dl
def _decode_blocks(self, blocks_and_salts, segnum):
def _decode_blocks(self, results, segnum):
"""
I take a list of k blocks and salts, and decode that into a
single encrypted segment.
"""
d = {}
# We want to merge our dictionaries to the form
# {shnum: blocks_and_salts}
#
# The dictionaries come from validate block that way, so we just
# need to merge them.
for block_and_salt in blocks_and_salts:
d.update(block_and_salt[1])
# 'results' is one or more dicts (each {shnum:(block,salt)}), and we
# want to merge them all
blocks_and_salts = {}
for d in results:
blocks_and_salts.update(d)
# All of these blocks should have the same salt; in SDMF, it is
# the file-wide IV, while in MDMF it is the per-segment salt. In
@ -874,10 +876,10 @@ class Retrieve:
# d.items()[0] is like (shnum, (block, salt))
# d.items()[0][1] is like (block, salt)
# d.items()[0][1][1] is the salt.
salt = d.items()[0][1][1]
salt = blocks_and_salts.items()[0][1][1]
# Next, extract just the blocks from the dict. We'll use the
# salt in the next step.
share_and_shareids = [(k, v[0]) for k, v in d.items()]
share_and_shareids = [(k, v[0]) for k, v in blocks_and_salts.items()]
d2 = dict(share_and_shareids)
shareids = []
shares = []