mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-21 02:01:31 +00:00
mutable: fix control flow to allow good+bad shares from a peer. Fixes #211.
This commit is contained in:
parent
2fc5247996
commit
c2765bd8c6
@ -342,9 +342,9 @@ class Retrieve:
|
||||
d.addCallback(_got_storageserver)
|
||||
d.addCallback(lambda ss: ss.callRemote("slot_readv", storage_index,
|
||||
[], [(0, readsize)]))
|
||||
d.addCallback(self._got_results, peerid, readsize)
|
||||
d.addErrback(self._query_failed, peerid, (conn, storage_index,
|
||||
peer_storage_servers))
|
||||
d.addCallback(self._got_results, peerid, readsize,
|
||||
(conn, storage_index, peer_storage_servers))
|
||||
d.addErrback(self._query_failed, peerid)
|
||||
# errors that aren't handled by _query_failed (and errors caused by
|
||||
# _query_failed) get logged, but we still want to check for doneness.
|
||||
d.addErrback(log.err)
|
||||
@ -355,7 +355,7 @@ class Retrieve:
|
||||
verifier = rsa.create_verifying_key_from_string(pubkey_s)
|
||||
return verifier
|
||||
|
||||
def _got_results(self, datavs, peerid, readsize):
|
||||
def _got_results(self, datavs, peerid, readsize, stuff):
|
||||
self._queries_outstanding.discard(peerid)
|
||||
self._used_peers.add(peerid)
|
||||
if not self._running:
|
||||
@ -363,77 +363,88 @@ class Retrieve:
|
||||
|
||||
for shnum,datav in datavs.items():
|
||||
data = datav[0]
|
||||
self.log("_got_results: got shnum #%d from peerid %s"
|
||||
% (shnum, idlib.shortnodeid_b2a(peerid)))
|
||||
(seqnum, root_hash, IV, k, N, segsize, datalength,
|
||||
pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
|
||||
try:
|
||||
self._got_results_one_share(shnum, data, peerid)
|
||||
except NeedMoreDataError, e:
|
||||
# ah, just re-send the query then.
|
||||
self._read_size = max(self._read_size, e.needed_bytes)
|
||||
# TODO: for MDMF, sanity-check self._read_size: don't let one
|
||||
# server cause us to try to read gigabytes of data from all
|
||||
# other servers.
|
||||
(conn, storage_index, peer_storage_servers) = stuff
|
||||
self._do_query(conn, peerid, storage_index, self._read_size,
|
||||
peer_storage_servers)
|
||||
return
|
||||
except CorruptShareError, e:
|
||||
# log it and give the other shares a chance to be processed
|
||||
f = failure.Failure()
|
||||
self.log("WEIRD: bad share: %s %s" % (f, f.value))
|
||||
self._bad_peerids.add(peerid)
|
||||
self._last_failure = f
|
||||
pass
|
||||
# all done!
|
||||
|
||||
if not self._pubkey:
|
||||
fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
|
||||
if fingerprint != self._node._fingerprint:
|
||||
# bad share
|
||||
raise CorruptShareError(peerid, shnum,
|
||||
"pubkey doesn't match fingerprint")
|
||||
self._pubkey = self._deserialize_pubkey(pubkey_s)
|
||||
self._node._populate_pubkey(self._pubkey)
|
||||
def _got_results_one_share(self, shnum, data, peerid):
|
||||
self.log("_got_results: got shnum #%d from peerid %s"
|
||||
% (shnum, idlib.shortnodeid_b2a(peerid)))
|
||||
(seqnum, root_hash, IV, k, N, segsize, datalength,
|
||||
# this might raise NeedMoreDataError, in which case the rest of
|
||||
# the shares are probably short too. _query_failed() will take
|
||||
# responsiblity for re-issuing the queries with a new length.
|
||||
pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
|
||||
|
||||
verinfo = (seqnum, root_hash, IV, segsize, datalength)
|
||||
if verinfo not in self._valid_versions:
|
||||
# it's a new pair. Verify the signature.
|
||||
valid = self._pubkey.verify(prefix, signature)
|
||||
if not valid:
|
||||
raise CorruptShareError(peerid, shnum,
|
||||
"signature is invalid")
|
||||
# ok, it's a valid verinfo. Add it to the list of validated
|
||||
# versions.
|
||||
self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
|
||||
% (seqnum, idlib.b2a(root_hash)[:4],
|
||||
idlib.shortnodeid_b2a(peerid), shnum,
|
||||
k, N, segsize, datalength))
|
||||
self._valid_versions[verinfo] = (prefix, DictOfSets())
|
||||
if not self._pubkey:
|
||||
fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
|
||||
if fingerprint != self._node._fingerprint:
|
||||
raise CorruptShareError(peerid, shnum,
|
||||
"pubkey doesn't match fingerprint")
|
||||
self._pubkey = self._deserialize_pubkey(pubkey_s)
|
||||
self._node._populate_pubkey(self._pubkey)
|
||||
|
||||
# and make a note of the other parameters we've just learned
|
||||
if self._required_shares is None:
|
||||
self._required_shares = k
|
||||
self._node._populate_required_shares(k)
|
||||
if self._total_shares is None:
|
||||
self._total_shares = N
|
||||
self._node._populate_total_shares(N)
|
||||
verinfo = (seqnum, root_hash, IV, segsize, datalength)
|
||||
if verinfo not in self._valid_versions:
|
||||
# it's a new pair. Verify the signature.
|
||||
valid = self._pubkey.verify(prefix, signature)
|
||||
if not valid:
|
||||
raise CorruptShareError(peerid, shnum,
|
||||
"signature is invalid")
|
||||
# ok, it's a valid verinfo. Add it to the list of validated
|
||||
# versions.
|
||||
self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
|
||||
% (seqnum, idlib.b2a(root_hash)[:4],
|
||||
idlib.shortnodeid_b2a(peerid), shnum,
|
||||
k, N, segsize, datalength))
|
||||
self._valid_versions[verinfo] = (prefix, DictOfSets())
|
||||
|
||||
# we've already seen this pair, and checked the signature so we
|
||||
# know it's a valid candidate. Accumulate the share info, if
|
||||
# there's enough data present. If not, raise NeedMoreDataError,
|
||||
# which will trigger a re-fetch.
|
||||
_ignored = unpack_share(data)
|
||||
self.log(" found enough data to add share contents")
|
||||
self._valid_versions[verinfo][1].add(shnum, (peerid, data))
|
||||
# and make a note of the other parameters we've just learned
|
||||
if self._required_shares is None:
|
||||
self._required_shares = k
|
||||
self._node._populate_required_shares(k)
|
||||
if self._total_shares is None:
|
||||
self._total_shares = N
|
||||
self._node._populate_total_shares(N)
|
||||
|
||||
# we've already seen this pair, and checked the signature so we
|
||||
# know it's a valid candidate. Accumulate the share info, if
|
||||
# there's enough data present. If not, raise NeedMoreDataError,
|
||||
# which will trigger a re-fetch.
|
||||
_ignored = unpack_share(data)
|
||||
self.log(" found enough data to add share contents")
|
||||
self._valid_versions[verinfo][1].add(shnum, (peerid, data))
|
||||
|
||||
|
||||
def _query_failed(self, f, peerid, stuff):
|
||||
self._queries_outstanding.discard(peerid)
|
||||
self._used_peers.add(peerid)
|
||||
def _query_failed(self, f, peerid):
|
||||
if not self._running:
|
||||
return
|
||||
if f.check(NeedMoreDataError):
|
||||
# ah, just re-send the query then.
|
||||
self._read_size = max(self._read_size, f.value.needed_bytes)
|
||||
(conn, storage_index, peer_storage_servers) = stuff
|
||||
self._do_query(conn, peerid, storage_index, self._read_size,
|
||||
peer_storage_servers)
|
||||
return
|
||||
self._queries_outstanding.discard(peerid)
|
||||
self._used_peers.add(peerid)
|
||||
self._last_failure = f
|
||||
self._bad_peerids.add(peerid)
|
||||
short_sid = idlib.b2a(self._storage_index)[:6]
|
||||
if f.check(CorruptShareError):
|
||||
self.log("WEIRD: bad share for %s: %s %s" % (short_sid, f,
|
||||
f.value))
|
||||
else:
|
||||
self.log("WEIRD: other error for %s: %s %s" % (short_sid, f,
|
||||
f.value))
|
||||
self.log("WEIRD: error during query: %s %s" % (f, f.value))
|
||||
|
||||
def _check_for_done(self, res):
|
||||
if not self._running:
|
||||
self.log("UNUSUAL: _check_for_done but we're not running")
|
||||
self.log("ODD: _check_for_done but we're not running")
|
||||
return
|
||||
share_prefixes = {}
|
||||
versionmap = DictOfSets()
|
||||
|
Loading…
x
Reference in New Issue
Block a user