mutable WIP: if corrupt shares cause a retrieve to fail, restart it once, ignoring those shares and using different ones

This commit is contained in:
Brian Warner 2008-04-15 15:58:02 -07:00
parent cf84bae850
commit ca14b99397
4 changed files with 88 additions and 15 deletions

View File

@ -61,6 +61,8 @@ hash tree is put into the URI.
""" """
class NotEnoughPeersError(Exception): class NotEnoughPeersError(Exception):
worth_retrying = False
servermap = None
pass pass
class UploadAborted(Exception): class UploadAborted(Exception):

View File

@ -7,6 +7,7 @@ from twisted.internet import defer
from allmydata.interfaces import IMutableFileNode, IMutableFileURI from allmydata.interfaces import IMutableFileNode, IMutableFileURI
from allmydata.util import hashutil from allmydata.util import hashutil
from allmydata.uri import WriteableSSKFileURI from allmydata.uri import WriteableSSKFileURI
from allmydata.encode import NotEnoughPeersError
from pycryptopp.publickey import rsa from pycryptopp.publickey import rsa
from pycryptopp.cipher.aes import AES from pycryptopp.cipher.aes import AES
@ -279,15 +280,30 @@ class MutableFileNode:
d.addCallback(_done) d.addCallback(_done)
return d return d
def download_to_data(self): def _update_and_retrieve_best(self, old_map=None):
d = self.obtain_lock() d = self.update_servermap(old_map=old_map, mode=MODE_ENOUGH)
d.addCallback(lambda res: self.update_servermap(mode=MODE_ENOUGH))
def _updated(smap): def _updated(smap):
goal = smap.best_recoverable_version() goal = smap.best_recoverable_version()
if not goal: if not goal:
raise UnrecoverableFileError("no recoverable versions") raise UnrecoverableFileError("no recoverable versions")
return self.download_version(smap, goal) return self.download_version(smap, goal)
d.addCallback(_updated) d.addCallback(_updated)
return d
def download_to_data(self):
d = self.obtain_lock()
d.addCallback(lambda res: self._update_and_retrieve_best())
def _maybe_retry(f):
f.trap(NotEnoughPeersError)
e = f.value
if not e.worth_retrying:
return f
# the download is worth retrying once. Make sure to use the old
# servermap, since it is what remembers the bad shares. TODO:
# consider allowing this to retry multiple times.. this approach
# will let us tolerate about 8 bad shares, I think.
return self._update_and_retrieve_best(e.servermap)
d.addErrback(_maybe_retry)
d.addBoth(self.release_lock) d.addBoth(self.release_lock)
return d return d

View File

@ -93,6 +93,7 @@ class Retrieve:
self._outstanding_queries = {} # maps (peerid,shnum) to start_time self._outstanding_queries = {} # maps (peerid,shnum) to start_time
self._running = True self._running = True
self._decoding = False self._decoding = False
self._bad_shares = set()
self.servermap = servermap self.servermap = servermap
assert self._node._pubkey assert self._node._pubkey
@ -238,6 +239,8 @@ class Retrieve:
f = failure.Failure() f = failure.Failure()
self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD) self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD)
self.remove_peer(peerid) self.remove_peer(peerid)
self.servermap.mark_bad_share(peerid, shnum)
self._bad_shares.add( (peerid, shnum) )
self._last_failure = f self._last_failure = f
pass pass
# all done! # all done!
@ -374,19 +377,26 @@ class Retrieve:
format = ("ran out of peers: " format = ("ran out of peers: "
"have %(have)d shares (k=%(k)d), " "have %(have)d shares (k=%(k)d), "
"%(outstanding)d queries in flight, " "%(outstanding)d queries in flight, "
"need %(need)d more") "need %(need)d more, "
"found %(bad)d bad shares")
args = {"have": len(self.shares),
"k": k,
"outstanding": len(self._outstanding_queries),
"need": needed,
"bad": len(self._bad_shares),
}
self.log(format=format, self.log(format=format,
have=len(self.shares), k=k, level=log.WEIRD, **args)
outstanding=len(self._outstanding_queries), err = NotEnoughPeersError("%s, last failure: %s" %
need=needed, (format % args, self._last_failure))
level=log.WEIRD) if self._bad_shares:
msg2 = format % {"have": len(self.shares), self.log("We found some bad shares this pass. You should "
"k": k, "update the servermap and try again to check "
"outstanding": len(self._outstanding_queries), "more peers",
"need": needed, level=log.WEIRD)
} err.worth_retrying = True
raise NotEnoughPeersError("%s, last failure: %s" % err.servermap = self.servermap
(msg2, self._last_failure)) raise err
return return

View File

@ -25,6 +25,14 @@ class ServerMap:
operations, which means 'publish this new version, but only if nothing operations, which means 'publish this new version, but only if nothing
has changed since I last retrieved this data'. This reduces the chances has changed since I last retrieved this data'. This reduces the chances
of clobbering a simultaneous (uncoordinated) write. of clobbering a simultaneous (uncoordinated) write.
@ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
shares that I should ignore (because a previous user of
the servermap determined that they were invalid). The
updater only locates a certain number of shares: if
some of these turn out to have integrity problems and
are unusable, the caller will need to mark those shares
as bad, then re-update the servermap, then try again.
""" """
def __init__(self): def __init__(self):
@ -35,9 +43,36 @@ class ServerMap:
self.connections = {} # maps peerid to a RemoteReference self.connections = {} # maps peerid to a RemoteReference
self.unreachable_peers = set() # peerids that didn't respond to queries self.unreachable_peers = set() # peerids that didn't respond to queries
self.problems = [] # mostly for debugging self.problems = [] # mostly for debugging
self.bad_shares = set()
self.last_update_mode = None self.last_update_mode = None
self.last_update_time = 0 self.last_update_time = 0
def mark_bad_share(self, peerid, shnum):
"""This share was found to be bad, not in the checkstring or
signature, but deeper in the share, detected at retrieve time. Remove
it from our list of useful shares, and remember that it is bad so we
don't add it back again later.
"""
self.bad_shares.add( (peerid, shnum) )
self._remove_share(peerid, shnum)
def _remove_share(self, peerid, shnum):
#(s_shnum, s_verinfo, s_timestamp) = share
to_remove = [share
for share in self.servermap[peerid]
if share[0] == shnum]
for share in to_remove:
self.servermap[peerid].discard(share)
if not self.servermap[peerid]:
del self.servermap[peerid]
def add_new_share(self, peerid, shnum, verinfo, timestamp):
"""We've written a new share out, replacing any that was there
before."""
self.bad_shares.discard( (peerid, shnum) )
self._remove_share(peerid, shnum)
self.servermap.add(peerid, (shnum, verinfo, timestamp) )
def dump(self, out=sys.stdout): def dump(self, out=sys.stdout):
print >>out, "servermap:" print >>out, "servermap:"
for (peerid, shares) in self.servermap.items(): for (peerid, shares) in self.servermap.items():
@ -148,6 +183,11 @@ class ServerMap:
class ServermapUpdater: class ServermapUpdater:
def __init__(self, filenode, servermap, mode=MODE_ENOUGH): def __init__(self, filenode, servermap, mode=MODE_ENOUGH):
"""I update a servermap, locating a sufficient number of useful
shares and remembering where they are located.
"""
self._node = filenode self._node = filenode
self._servermap = servermap self._servermap = servermap
self.mode = mode self.mode = mode
@ -415,6 +455,11 @@ class ServermapUpdater:
self._valid_versions.add(verinfo) self._valid_versions.add(verinfo)
# We now know that this is a valid candidate verinfo. # We now know that this is a valid candidate verinfo.
if (peerid, shnum, verinfo) in self._servermap.bad_shares:
# we've been told that the rest of the data in this share is
# unusable, so don't add it to the servermap.
return verinfo
# Add the info to our servermap. # Add the info to our servermap.
timestamp = time.time() timestamp = time.time()
self._servermap.servermap.add(peerid, (shnum, verinfo, timestamp)) self._servermap.servermap.add(peerid, (shnum, verinfo, timestamp))