mutable: tolerate multiple encodings, using whichever version is recoverable first. Closes #312

This commit is contained in:
Brian Warner 2008-03-11 00:26:00 -07:00
parent 791482cf8d
commit c727348d85
2 changed files with 41 additions and 75 deletions

View File

@ -387,22 +387,19 @@ class Retrieve:
initial_query_count = 5
# we might not know how many shares we need yet.
self._required_shares = self._node.get_required_shares()
self._total_shares = self._node.get_total_shares()
# self._valid_versions is a dictionary in which the keys are
# 'verinfo' tuples (seqnum, root_hash, IV). Every time we hear about
# a new potential version of the file, we check its signature, and
# the valid ones are added to this dictionary. The values of the
# dictionary are (prefix, sharemap) tuples, where 'prefix' is just
# the first part of the share (containing the serialized verinfo),
# for easier comparison. 'sharemap' is a DictOfSets, in which the
# keys are sharenumbers, and the values are sets of (peerid, data)
# tuples. There is a (peerid, data) tuple for every instance of a
# given share that we've seen. The 'data' in this tuple is a full
# copy of the SDMF share, starting with the \x00 version byte and
# continuing through the last byte of sharedata.
# 'verinfo' tuples (seqnum, root_hash, IV, segsize, datalength, k,
# N). Every time we hear about a new potential version of the file,
# we check its signature, and the valid ones are added to this
# dictionary. The values of the dictionary are (prefix, sharemap)
# tuples, where 'prefix' is just the first part of the share
# (containing the serialized verinfo), for easier comparison.
# 'sharemap' is a DictOfSets, in which the keys are sharenumbers, and
# the values are sets of (peerid, data) tuples. There is a (peerid,
# data) tuple for every instance of a given share that we've seen.
# The 'data' in this tuple is a full copy of the SDMF share, starting
# with the \x00 version byte and continuing through the last byte of
# sharedata.
self._valid_versions = {}
# self._valid_shares is a dict mapping (peerid,data) tuples to
@ -544,7 +541,7 @@ class Retrieve:
self._pubkey = self._deserialize_pubkey(pubkey_s)
self._node._populate_pubkey(self._pubkey)
verinfo = (seqnum, root_hash, IV, segsize, datalength) #, k, N)
verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N)
self._status.sharemap[peerid].add(verinfo)
if verinfo not in self._valid_versions:
@ -569,42 +566,9 @@ class Retrieve:
k, N, segsize, datalength))
self._valid_versions[verinfo] = (prefix, DictOfSets())
# and make a note of the other parameters we've just learned
# NOTE: Retrieve needs to be refactored to put k,N in the verinfo
# along with seqnum/etc, to make sure we don't co-mingle shares
# from differently-encoded versions of the same file.
if self._required_shares is None:
self._required_shares = k
self._node._populate_required_shares(k)
if self._total_shares is None:
self._total_shares = N
self._node._populate_total_shares(N)
# reject shares that don't match our narrow-minded ideas of what
# encoding we're going to use. This addresses the immediate needs of
# ticket #312, by turning the data corruption into unavailability. To
# get back the availability (i.e. make sure that one weird-encoding
# share that happens to come back first doesn't make us ignore the
# rest of the shares), we need to implement the refactoring mentioned
# above.
if k != self._required_shares:
self._status.problems[peerid] = "sh#%d: k=%d, we want %d" \
% (shnum, k, self._required_shares)
raise CorruptShareError(peerid, shnum,
"share has k=%d, we want k=%d" %
(k, self._required_shares))
if N != self._total_shares:
self._status.problems[peerid] = "sh#%d: N=%d, we want %d" \
% (shnum, N, self._total_shares)
raise CorruptShareError(peerid, shnum,
"share has N=%d, we want N=%d" %
(N, self._total_shares))
# we've already seen this pair, and checked the signature so we
# know it's a valid candidate. Accumulate the share info, if
# there's enough data present. If not, raise NeedMoreDataError,
# which will trigger a re-fetch.
# We now know that this is a valid candidate verinfo. Accumulate the
# share info, if there's enough data present. If not, raise
# NeedMoreDataError, which will trigger a re-fetch.
_ignored = unpack_share_data(data)
self.log(" found enough data to add share contents")
self._valid_versions[verinfo][1].add(shnum, (peerid, data))
@ -625,11 +589,14 @@ class Retrieve:
return
share_prefixes = {}
versionmap = DictOfSets()
max_N = 0
for verinfo, (prefix, sharemap) in self._valid_versions.items():
# sharemap is a dict that maps shnums to sets of (peerid,data).
# len(sharemap) is the number of distinct shares that appear to
# be available.
if len(sharemap) >= self._required_shares:
(seqnum, root_hash, IV, segsize, datalength, k, N) = verinfo
max_N = max(max_N, N)
if len(sharemap) >= k:
# this one looks retrievable. TODO: our policy of decoding
# the first version that we can get is a bit troublesome: in
# a small grid with a large expansion factor, a single
@ -674,11 +641,11 @@ class Retrieve:
# no more queries are outstanding. Can we send out more? First,
# should we be looking at more peers?
self.log("need more peers: N=%s, peerlist=%d peerlist_limit=%d" %
(self._total_shares, len(self._peerlist),
self.log("need more peers: max_N=%s, peerlist=%d peerlist_limit=%d" %
(max_N, len(self._peerlist),
self._peerlist_limit), level=log.UNUSUAL)
if self._total_shares is not None:
search_distance = self._total_shares * 2
if max_N:
search_distance = max_N * 2
else:
search_distance = 20
self.log("search_distance=%d" % search_distance, level=log.UNUSUAL)
@ -721,9 +688,9 @@ class Retrieve:
def _attempt_decode(self, verinfo, sharemap):
# sharemap is a dict which maps shnum to [(peerid,data)..] sets.
(seqnum, root_hash, IV, segsize, datalength) = verinfo
(seqnum, root_hash, IV, segsize, datalength, k, N) = verinfo
assert len(sharemap) >= self._required_shares, len(sharemap)
assert len(sharemap) >= k, len(sharemap)
shares_s = []
for shnum in sorted(sharemap.keys()):
@ -784,8 +751,8 @@ class Retrieve:
# it's now down to doing FEC and decrypt.
elapsed = time.time() - self._started
self._status.timings["fetch"] = elapsed
assert len(shares) >= self._required_shares, len(shares)
d = defer.maybeDeferred(self._decode, shares, segsize, datalength)
assert len(shares) >= k, len(shares)
d = defer.maybeDeferred(self._decode, shares, segsize, datalength, k, N)
d.addCallback(self._decrypt, IV, seqnum, root_hash)
return d
@ -818,10 +785,7 @@ class Retrieve:
self.log(" data valid! len=%d" % len(share_data))
return share_data
def _decode(self, shares_dict, segsize, datalength):
# we ought to know these values by now
assert self._required_shares is not None
assert self._total_shares is not None
def _decode(self, shares_dict, segsize, datalength, k, N):
# shares_dict is a dict mapping shnum to share data, but the codec
# wants two lists.
@ -830,14 +794,13 @@ class Retrieve:
shareids.append(shareid)
shares.append(share)
assert len(shareids) >= self._required_shares, len(shareids)
assert len(shareids) >= k, len(shareids)
# zfec really doesn't want extra shares
shareids = shareids[:self._required_shares]
shares = shares[:self._required_shares]
shareids = shareids[:k]
shares = shares[:k]
fec = codec.CRSDecoder()
params = "%d-%d-%d" % (segsize,
self._required_shares, self._total_shares)
params = "%d-%d-%d" % (segsize, k, N)
fec.set_serialized_params(params)
self.log("params %s, we have %d shares" % (params, len(shares)))
@ -847,7 +810,12 @@ class Retrieve:
def _done(buffers):
elapsed = time.time() - started
self._status.timings["decode"] = elapsed
self._status.set_encoding(self._required_shares, self._total_shares)
self._status.set_encoding(k, N)
# stash these in the MutableFileNode to speed up the next pass
self._node._populate_required_shares(k)
self._node._populate_total_shares(N)
self.log(" decode done, %d buffers" % len(buffers))
segment = "".join(buffers)
self.log(" joined length %d, datalength %d" %

View File

@ -830,10 +830,8 @@ class Roundtrip(unittest.TestCase):
return r3.retrieve()
d.addCallback(_retrieve)
def _retrieved(new_contents):
## the current specified behavior is "first version recoverable"
#self.failUnlessEqual(new_contents, contents1)
# the current behavior is "first version seen is sticky"
self.failUnlessEqual(new_contents, contents2)
# the current specified behavior is "first version recoverable"
self.failUnlessEqual(new_contents, contents1)
d.addCallback(_retrieved)
return d