first pass at a mutable repairer. not tested at all yet, but of course all existing tests pass

This commit is contained in:
Brian Warner 2008-07-17 21:09:23 -07:00
parent 3e95681bad
commit 879fefe5f3
9 changed files with 170 additions and 23 deletions

View File

@ -1541,11 +1541,20 @@ class IDeepCheckResults(Interface):
was not fully healthy.""" was not fully healthy."""
class IRepairable(Interface): class IRepairable(Interface):
def repair(): def repair(checker_results):
"""Attempt to repair the given object. Returns a Deferred that fires """Attempt to repair the given object. Returns a Deferred that fires
with a IRepairResults object. with a IRepairResults object.
I must be called with an object that implements ICheckerResults, as
proof that you have actually discovered a problem with this file. I
will use the data in the checker results to guide the repair process,
such as which servers provided bad data and should therefore be
avoided.
""" """
class IRepairResults(Interface):
"""I contain the results of a repair operation."""
class IClient(Interface): class IClient(Interface):
def upload(uploadable): def upload(uploadable):

View File

@ -1,5 +1,4 @@
import struct
from zope.interface import implements from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer
from twisted.python import failure from twisted.python import failure
@ -9,7 +8,7 @@ from allmydata.interfaces import ICheckerResults
from common import MODE_CHECK, CorruptShareError from common import MODE_CHECK, CorruptShareError
from servermap import ServerMap, ServermapUpdater from servermap import ServerMap, ServermapUpdater
from layout import unpack_share, SIGNED_PREFIX from layout import unpack_share, SIGNED_PREFIX_LENGTH
class MutableChecker: class MutableChecker:
@ -18,9 +17,11 @@ class MutableChecker:
self.healthy = True self.healthy = True
self.problems = [] self.problems = []
self._storage_index = self._node.get_storage_index() self._storage_index = self._node.get_storage_index()
self.results = Results(self._storage_index)
def check(self, verify=False, repair=False): def check(self, verify=False, repair=False):
servermap = ServerMap() servermap = ServerMap()
self.results.servermap = servermap
self.do_verify = verify self.do_verify = verify
self.do_repair = repair self.do_repair = repair
u = ServermapUpdater(self._node, servermap, MODE_CHECK) u = ServermapUpdater(self._node, servermap, MODE_CHECK)
@ -85,11 +86,13 @@ class MutableChecker:
except CorruptShareError: except CorruptShareError:
f = failure.Failure() f = failure.Failure()
self.add_problem(shnum, peerid, f) self.add_problem(shnum, peerid, f)
prefix = data[:SIGNED_PREFIX_LENGTH]
self.results.servermap.mark_bad_share(peerid, shnum, prefix)
def check_prefix(self, peerid, shnum, data): def check_prefix(self, peerid, shnum, data):
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix, (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.best_version offsets_tuple) = self.best_version
got_prefix = data[:struct.calcsize(SIGNED_PREFIX)] got_prefix = data[:SIGNED_PREFIX_LENGTH]
if got_prefix != prefix: if got_prefix != prefix:
raise CorruptShareError(peerid, shnum, raise CorruptShareError(peerid, shnum,
"prefix mismatch: share changed while we were reading it") "prefix mismatch: share changed while we were reading it")
@ -135,25 +138,37 @@ class MutableChecker:
return return
if not self.do_repair: if not self.do_repair:
return return
pass self.results.repair_attempted = True
d = self._node.repair(self)
def _repair_finished(repair_results):
self.results.repair_succeeded = True
self.results.repair_results = repair_results
def _repair_error(f):
# I'm not sure if I want to pass through a failure or not.
self.results.repair_succeeded = False
self.results.repair_failure = f
return f
d.addCallbacks(_repair_finished, _repair_error)
return d
def _return_results(self, res): def _return_results(self, res):
r = Results(self._storage_index) self.results.healthy = self.healthy
r.healthy = self.healthy self.results.problems = self.problems
r.problems = self.problems return self.results
return r
def add_problem(self, shnum, peerid, what): def add_problem(self, shnum, peerid, what):
self.healthy = False self.healthy = False
self.problems.append( (peerid, self._storage_index, shnum, what) ) self.problems.append( (peerid, self._storage_index, shnum, what) )
class Results: class Results:
implements(ICheckerResults) implements(ICheckerResults)
def __init__(self, storage_index): def __init__(self, storage_index):
self.storage_index = storage_index self.storage_index = storage_index
self.storage_index_s = base32.b2a(storage_index)[:6] self.storage_index_s = base32.b2a(storage_index)[:6]
self.repair_attempted = False
def is_healthy(self): def is_healthy(self):
return self.healthy return self.healthy

View File

@ -4,6 +4,7 @@ from common import NeedMoreDataError
PREFIX = ">BQ32s16s" # each version has a different prefix PREFIX = ">BQ32s16s" # each version has a different prefix
SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
SIGNED_PREFIX_LENGTH = struct.calcsize(SIGNED_PREFIX)
HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets
HEADER_LENGTH = struct.calcsize(HEADER) HEADER_LENGTH = struct.calcsize(HEADER)
@ -24,7 +25,7 @@ def unpack_header(data):
def unpack_prefix_and_signature(data): def unpack_prefix_and_signature(data):
assert len(data) >= HEADER_LENGTH, len(data) assert len(data) >= HEADER_LENGTH, len(data)
prefix = data[:struct.calcsize(SIGNED_PREFIX)] prefix = data[:SIGNED_PREFIX_LENGTH]
(version, (version,
seqnum, seqnum,

View File

@ -6,7 +6,8 @@ from zope.interface import implements
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.python import log from twisted.python import log
from foolscap.eventual import eventually from foolscap.eventual import eventually
from allmydata.interfaces import IMutableFileNode, IMutableFileURI, ICheckable from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
ICheckable, ICheckerResults
from allmydata.util import hashutil from allmydata.util import hashutil
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.uri import WriteableSSKFileURI from allmydata.uri import WriteableSSKFileURI
@ -21,6 +22,7 @@ from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
from servermap import ServerMap, ServermapUpdater from servermap import ServerMap, ServermapUpdater
from retrieve import Retrieve from retrieve import Retrieve
from checker import MutableChecker from checker import MutableChecker
from repair import Repairer
class BackoffAgent: class BackoffAgent:
@ -186,6 +188,8 @@ class MutableFileNode:
def get_total_shares(self): def get_total_shares(self):
return self._total_shares return self._total_shares
####################################
# IFilesystemNode
def get_uri(self): def get_uri(self):
return self._uri.to_string() return self._uri.to_string()
@ -237,6 +241,7 @@ class MutableFileNode:
return d return d
################################# #################################
# ICheckable
def check(self, verify=False, repair=False): def check(self, verify=False, repair=False):
checker = self.checker_class(self) checker = self.checker_class(self)
@ -251,6 +256,19 @@ class MutableFileNode:
d.addCallback(_done) d.addCallback(_done)
return d return d
#################################
# IRepairable
def repair(self, checker_results):
assert ICheckerResults(checker_results)
r = Repairer(self, checker_results)
d = r.start()
return d
#################################
# IMutableFileNode
# allow the use of IDownloadTarget # allow the use of IDownloadTarget
def download(self, target): def download(self, target):
# fake it. TODO: make this cleaner. # fake it. TODO: make this cleaner.

View File

@ -225,11 +225,19 @@ class Publish:
# use later. # use later.
self.connections = {} self.connections = {}
self.bad_share_checkstrings = {}
# we use the servermap to populate the initial goal: this way we will # we use the servermap to populate the initial goal: this way we will
# try to update each existing share in place. # try to update each existing share in place.
for (peerid, shnum) in self._servermap.servermap: for (peerid, shnum) in self._servermap.servermap:
self.goal.add( (peerid, shnum) ) self.goal.add( (peerid, shnum) )
self.connections[peerid] = self._servermap.connections[peerid] self.connections[peerid] = self._servermap.connections[peerid]
# then we add in all the shares that were bad (corrupted, bad
# signatures, etc). We want to replace these.
for (peerid, shnum, old_checkstring) in self._servermap.bad_shares:
self.goal.add( (peerid, shnum) )
self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring
self.connections[peerid] = self._servermap.connections[peerid]
# create the shares. We'll discard these as they are delivered. SMDF: # create the shares. We'll discard these as they are delivered. SMDF:
# we're allowed to hold everything in memory. # we're allowed to hold everything in memory.
@ -560,6 +568,10 @@ class Publish:
old_salt) old_salt)
testv = (0, len(old_checkstring), "eq", old_checkstring) testv = (0, len(old_checkstring), "eq", old_checkstring)
elif key in self.bad_share_checkstrings:
old_checkstring = self.bad_share_checkstrings[key]
testv = (0, len(old_checkstring), "eq", old_checkstring)
else: else:
# add a testv that requires the share not exist # add a testv that requires the share not exist
#testv = (0, 1, 'eq', "") #testv = (0, 1, 'eq', "")

View File

@ -0,0 +1,86 @@
from zope.interface import implements
from allmydata.interfaces import IRepairResults
class RepairResults:
implements(IRepairResults)
class MustForceRepairError(Exception):
pass
class Repairer:
def __init__(self, node, checker_results):
self.node = node
self.checker_results = checker_results
assert checker_results.storage_index == self.node.get_storage_index()
def start(self, force=False):
# download, then re-publish. If a server had a bad share, try to
# replace it with a good one of the same shnum.
# The normal repair operation should not be used to replace
# application-specific merging of alternate versions: i.e if there
# are multiple highest seqnums with different roothashes. In this
# case, the application must use node.upload() (referencing the
# servermap that indicates the multiple-heads condition), or
# node.overwrite(). The repair() operation will refuse to run in
# these conditions unless a force=True argument is provided. If
# force=True is used, then the highest root hash will be reinforced.
# Likewise, the presence of an unrecoverable latest version is an
# unusual event, and should ideally be handled by retrying a couple
# times (spaced out over hours or days) and hoping that new shares
# will become available. If repair(force=True) is called, data will
# be lost: a new seqnum will be generated with the same contents as
# the most recent recoverable version, skipping over the lost
# version. repair(force=False) will refuse to run in a situation like
# this.
# Repair is designed to fix the following injuries:
# missing shares: add new ones to get at least N distinct ones
# old shares: replace old shares with the latest version
# bogus shares (bad sigs): replace the bad one with a good one
smap = self.checker_results.servermap
if smap.unrecoverable_newer_versions():
if not force:
raise MustForceRepairError("There were unrecoverable newer "
"versions, so force=True must be "
"passed to the repair() operation")
# continuing on means that node.upload() will pick a seqnum that
# is higher than everything visible in the servermap, effectively
# discarding the unrecoverable versions.
if smap.needs_merge():
if not force:
raise MustForceRepairError("There were multiple recoverable "
"versions with identical seqnums, "
"so force=True must be passed to "
"the repair() operation")
# continuing on means that smap.best_recoverable_version() will
# pick the one with the highest roothash, and then node.upload()
# will replace all shares with its contents
# missing shares are handled during upload, which tries to find a
# home for every share
# old shares are handled during upload, which will replace any share
# that was present in the servermap
# bogus shares need to be managed here. We might notice a bogus share
# during mapupdate (whether done for a filecheck or just before a
# download) by virtue of it having an invalid signature. We might
# also notice a bad hash in the share during verify or download. In
# either case, the problem will be noted in the servermap, and the
# bad share (along with its checkstring) will be recorded in
# servermap.bad_shares . Publish knows that it should try and replace
# these.
best_version = smap.best_recoverable_version()
d = self.node.download_version(smap, best_version)
d.addCallback(self.node.upload, smap)
d.addCallback(self.get_results)
return d
def get_results(self, res):
return RepairResults()

View File

@ -249,7 +249,7 @@ class Retrieve:
f = failure.Failure() f = failure.Failure()
self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD) self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD)
self.remove_peer(peerid) self.remove_peer(peerid)
self.servermap.mark_bad_share(peerid, shnum) self.servermap.mark_bad_share(peerid, shnum, prefix)
self._bad_shares.add( (peerid, shnum) ) self._bad_shares.add( (peerid, shnum) )
self._status.problems[peerid] = f self._status.problems[peerid] = f
self._last_failure = f self._last_failure = f

View File

@ -12,7 +12,8 @@ from pycryptopp.publickey import rsa
from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
DictOfSets, CorruptShareError, NeedMoreDataError DictOfSets, CorruptShareError, NeedMoreDataError
from layout import unpack_prefix_and_signature, unpack_header, unpack_share from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
SIGNED_PREFIX_LENGTH
class UpdateStatus: class UpdateStatus:
implements(IServermapUpdaterStatus) implements(IServermapUpdaterStatus)
@ -114,25 +115,28 @@ class ServerMap:
self.connections = {} self.connections = {}
self.unreachable_peers = set() # peerids that didn't respond to queries self.unreachable_peers = set() # peerids that didn't respond to queries
self.problems = [] # mostly for debugging self.problems = [] # mostly for debugging
self.bad_shares = set() self.bad_shares = {} # maps (peerid,shnum) to old checkstring
self.last_update_mode = None self.last_update_mode = None
self.last_update_time = 0 self.last_update_time = 0
def mark_bad_share(self, peerid, shnum): def mark_bad_share(self, peerid, shnum, checkstring):
"""This share was found to be bad, not in the checkstring or """This share was found to be bad, either in the checkstring or
signature, but deeper in the share, detected at retrieve time. Remove signature (detected during mapupdate), or deeper in the share
it from our list of useful shares, and remember that it is bad so we (detected at retrieve time). Remove it from our list of useful
don't add it back again later. shares, and remember that it is bad so we don't add it back again
later. We record the share's old checkstring (which might be
corrupted or badly signed) so that a repair operation can do the
test-and-set using it as a reference.
""" """
key = (peerid, shnum) key = (peerid, shnum) # record checkstring
self.bad_shares.add(key) self.bad_shares[key] = checkstring
self.servermap.pop(key, None) self.servermap.pop(key, None)
def add_new_share(self, peerid, shnum, verinfo, timestamp): def add_new_share(self, peerid, shnum, verinfo, timestamp):
"""We've written a new share out, replacing any that was there """We've written a new share out, replacing any that was there
before.""" before."""
key = (peerid, shnum) key = (peerid, shnum)
self.bad_shares.discard(key) self.bad_shares.pop(key, None)
self.servermap[key] = (verinfo, timestamp) self.servermap[key] = (verinfo, timestamp)
def dump(self, out=sys.stdout): def dump(self, out=sys.stdout):
@ -532,6 +536,8 @@ class ServermapUpdater:
parent=lp, level=log.WEIRD) parent=lp, level=log.WEIRD)
self._bad_peers.add(peerid) self._bad_peers.add(peerid)
self._last_failure = f self._last_failure = f
checkstring = data[:SIGNED_PREFIX_LENGTH]
self._servermap.mark_bad_share(peerid, shnum, checkstring)
self._servermap.problems.append(f) self._servermap.problems.append(f)
pass pass

View File

@ -723,7 +723,7 @@ class Servermap(unittest.TestCase):
for (shnum, peerid, timestamp) in shares: for (shnum, peerid, timestamp) in shares:
if shnum < 5: if shnum < 5:
self._corrupted.add( (peerid, shnum) ) self._corrupted.add( (peerid, shnum) )
sm.mark_bad_share(peerid, shnum) sm.mark_bad_share(peerid, shnum, "")
return self.update_servermap(sm, MODE_WRITE) return self.update_servermap(sm, MODE_WRITE)
d.addCallback(_made_map) d.addCallback(_made_map)
def _check_map(sm): def _check_map(sm):