mutable: make mutable-repair work for non-verifier runs, add tests

This commit is contained in:
Brian Warner 2008-08-26 16:34:54 -07:00
parent f7b4c45d46
commit 1668401c16
7 changed files with 175 additions and 38 deletions

View File

@ -27,9 +27,9 @@ class MutableChecker:
d.addCallback(self._got_mapupdate_results)
if verify:
d.addCallback(self._verify_all_shares)
d.addCallback(self._generate_results)
if repair:
d.addCallback(self._maybe_do_repair)
d.addCallback(self._generate_results)
d.addCallback(self._return_results)
return d
@ -134,22 +134,6 @@ class MutableChecker:
if alleged_writekey != self._node.get_writekey():
raise CorruptShareError(peerid, shnum, "invalid privkey")
def _maybe_do_repair(self, res):
if not self.need_repair:
return
self.results.repair_attempted = True
d = self._node.repair(self)
def _repair_finished(repair_results):
self.results.repair_succeeded = True
self.results.repair_results = repair_results
def _repair_error(f):
# I'm not sure if I want to pass through a failure or not.
self.results.repair_succeeded = False
self.results.repair_failure = f
return f
d.addCallbacks(_repair_finished, _repair_error)
return d
def _generate_results(self, res):
self.results.healthy = True
smap = self.results.servermap
@ -207,6 +191,22 @@ class MutableChecker:
self.results.status_report = "\n".join(report) + "\n"
def _maybe_do_repair(self, res):
if not self.need_repair:
return
self.results.repair_attempted = True
d = self._node.repair(self.results)
def _repair_finished(repair_results):
self.results.repair_succeeded = True
self.results.repair_results = repair_results
def _repair_error(f):
# I'm not sure if I want to pass through a failure or not.
self.results.repair_succeeded = False
self.results.repair_failure = f
return f
d.addCallbacks(_repair_finished, _repair_error)
return d
def _return_results(self, res):
return self.results
@ -219,6 +219,7 @@ class Results:
self.storage_index_s = base32.b2a(storage_index)[:6]
self.repair_attempted = False
self.status_report = "[not generated yet]" # string
self.repair_report = None
self.problems = [] # list of (peerid, storage_index, shnum, failure)
def is_healthy(self):
@ -241,5 +242,14 @@ class Results:
s += "\n"
s += self.status_report
s += "\n"
if self.repair_attempted:
s += "Repair attempted "
if self.repair_succeeded:
s += "and successful\n"
else:
s += "and failed\n"
s += "\n"
s += self.repair_results.to_string()
s += "\n"
return s

View File

@ -408,11 +408,12 @@ class MutableFileNode:
self._client.notify_mapupdate(u.get_status())
return u.update()
def download_version(self, servermap, version):
def download_version(self, servermap, version, fetch_privkey=False):
return self._do_serialized(self._try_once_to_download_version,
servermap, version)
def _try_once_to_download_version(self, servermap, version):
r = Retrieve(self, servermap, version)
servermap, version, fetch_privkey)
def _try_once_to_download_version(self, servermap, version,
fetch_privkey=False):
r = Retrieve(self, servermap, version, fetch_privkey)
self._client.notify_retrieve(r.get_status())
return r.download()

View File

@ -236,9 +236,10 @@ class Publish:
self.connections[peerid] = self._servermap.connections[peerid]
# then we add in all the shares that were bad (corrupted, bad
# signatures, etc). We want to replace these.
for (peerid, shnum, old_checkstring) in self._servermap.bad_shares:
self.goal.add( (peerid, shnum) )
self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring
for key, old_checkstring in self._servermap.bad_shares.items():
(peerid, shnum) = key
self.goal.add(key)
self.bad_share_checkstrings[key] = old_checkstring
self.connections[peerid] = self._servermap.connections[peerid]
# create the shares. We'll discard these as they are delivered. SDMF:

View File

@ -5,6 +5,9 @@ from allmydata.interfaces import IRepairResults
class RepairResults:
implements(IRepairResults)
def to_string(self):
return ""
class MustForceRepairError(Exception):
pass
@ -76,8 +79,14 @@ class Repairer:
# servermap.bad_shares . Publish knows that it should try and replace
# these.
# I chose to use the retrieve phase to ensure that the privkey is
# available, to avoid the extra roundtrip that would occur if we,
# say, added an smap.get_privkey() method.
assert self.node.get_writekey() # repair currently requires a writecap
best_version = smap.best_recoverable_version()
d = self.node.download_version(smap, best_version)
d = self.node.download_version(smap, best_version, fetch_privkey=True)
d.addCallback(self.node.upload, smap)
d.addCallback(self.get_results)
return d

View File

@ -11,6 +11,7 @@ from allmydata.util import hashutil, idlib, log
from allmydata import hashtree, codec, storage
from allmydata.immutable.encode import NotEnoughSharesError
from pycryptopp.cipher.aes import AES
from pycryptopp.publickey import rsa
from common import DictOfSets, CorruptShareError, UncoordinatedWriteError
from layout import SIGNED_PREFIX, unpack_share_data
@ -81,7 +82,7 @@ class Retrieve:
# Retrieve object will remain tied to a specific version of the file, and
# will use a single ServerMap instance.
def __init__(self, filenode, servermap, verinfo):
def __init__(self, filenode, servermap, verinfo, fetch_privkey=False):
self._node = filenode
assert self._node._pubkey
self._storage_index = filenode.get_storage_index()
@ -97,6 +98,12 @@ class Retrieve:
self.servermap = servermap
assert self._node._pubkey
self.verinfo = verinfo
# during repair, we may be called upon to grab the private key, since
# it wasn't picked up during a verify=False checker run, and we'll
# need it for repair to generate the a new version.
self._need_privkey = fetch_privkey
if self._node._privkey:
self._need_privkey = False
self._status = RetrieveStatus()
self._status.set_storage_index(self._storage_index)
@ -166,16 +173,24 @@ class Retrieve:
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
offsets = dict(offsets_tuple)
# we read the checkstring, to make sure that the data we grab is from
# the right version. We also read the data, and the hashes necessary
# to validate them (share_hash_chain, block_hash_tree, share_data).
# We don't read the signature or the pubkey, since that was handled
# during the servermap phase, and we'll be comparing the share hash
# chain against the roothash that was validated back then.
readv = [ (0, struct.calcsize(SIGNED_PREFIX)),
(offsets['share_hash_chain'],
offsets['enc_privkey'] - offsets['share_hash_chain']),
]
# the right version.
readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ]
# We also read the data, and the hashes necessary to validate them
# (share_hash_chain, block_hash_tree, share_data). We don't read the
# signature or the pubkey, since that was handled during the
# servermap phase, and we'll be comparing the share hash chain
# against the roothash that was validated back then.
readv.append( (offsets['share_hash_chain'],
offsets['enc_privkey'] - offsets['share_hash_chain'] ) )
# if we need the private key (for repair), we also fetch that
if self._need_privkey:
readv.append( (offsets['enc_privkey'],
offsets['EOF'] - offsets['enc_privkey']) )
m = Marker()
self._outstanding_queries[m] = (peerid, shnum, started)
@ -243,7 +258,7 @@ class Retrieve:
# shares if we get them.. seems better than an assert().
for shnum,datav in datavs.items():
(prefix, hash_and_data) = datav
(prefix, hash_and_data) = datav[:2]
try:
self._got_results_one_share(shnum, peerid,
prefix, hash_and_data)
@ -259,6 +274,9 @@ class Retrieve:
self._status.problems[peerid] = f
self._last_failure = f
pass
if self._need_privkey and len(datav) > 2:
lp = None
self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
# all done!
def _got_results_one_share(self, shnum, peerid,
@ -296,6 +314,25 @@ class Retrieve:
# self.shares
self.shares[shnum] = share_data
def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
if alleged_writekey != self._node.get_writekey():
self.log("invalid privkey from %s shnum %d" %
(idlib.nodeid_b2a(peerid)[:8], shnum),
parent=lp, level=log.WEIRD, umid="YIw4tA")
return
# it's good
self.log("got valid privkey from shnum %d on peerid %s" %
(shnum, idlib.shortnodeid_b2a(peerid)),
parent=lp)
privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
self._node._populate_encprivkey(enc_privkey)
self._node._populate_privkey(privkey)
self._need_privkey = False
def _query_failed(self, f, marker, peerid):
self.log(format="query to [%(peerid)s] failed",
peerid=idlib.shortnodeid_b2a(peerid),
@ -332,6 +369,15 @@ class Retrieve:
if len(self.shares) < k:
# we don't have enough shares yet
return self._maybe_send_more_queries(k)
if self._need_privkey:
# we got k shares, but none of them had a valid privkey. TODO:
# look further. Adding code to do this is a bit complicated, and
# I want to avoid that complication, and this should be pretty
# rare (k shares with bitflips in the enc_privkey but not in the
# data blocks). If we actually do get here, the subsequent repair
# will fail for lack of a privkey.
self.log("got k shares but still need_privkey, bummer",
level=log.WEIRD, umid="MdRHPA")
# we have enough to finish. All the shares have had their hashes
# checked, so if something fails at this point, we don't know how

View File

@ -101,13 +101,14 @@ class ServerMap:
@ivar connections: maps peerid to a RemoteReference
@ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
@ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
shares that I should ignore (because a previous user of
the servermap determined that they were invalid). The
updater only locates a certain number of shares: if
some of these turn out to have integrity problems and
are unusable, the caller will need to mark those shares
as bad, then re-update the servermap, then try again.
The dict maps (peerid, shnum) tuple to old checkstring.
"""
def __init__(self):
@ -349,6 +350,9 @@ class ServermapUpdater:
self._need_privkey = False
if mode == MODE_WRITE and not self._node._privkey:
self._need_privkey = True
# check+repair: repair requires the privkey, so if we didn't happen
# to ask for it during the check, we'll have problems doing the
# publish.
prefix = storage.si_b2a(self._storage_index)[:5]
self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",

View File

@ -1915,7 +1915,73 @@ class MutableChecker(SystemTestMixin, unittest.TestCase):
shid_re = (r"Corrupt Shares:\s+%s: block hash tree failure" %
self.corrupt_shareid)
self.failUnless(re.search(shid_re, out), out)
d.addCallback(_got_results)
# now make sure the webapi repairer can fix it
def _do_repair(res):
url = (self.webish_url +
"uri/%s" % urllib.quote(self.node.get_uri()) +
"?t=check&verify=true&repair=true")
return getPage(url, method="POST")
d.addCallback(_do_repair)
def _got_repair_results(out):
self.failUnless("Repair attempted and successful" in out)
d.addCallback(_got_repair_results)
d.addCallback(_do_check)
def _got_postrepair_results(out):
self.failIf("Not Healthy!" in out, out)
self.failUnless("Recoverable Versions: 10*seq" in out)
d.addCallback(_got_postrepair_results)
return d
def test_delete_share(self):
self.basedir = self.mktemp()
d = self.set_up_nodes()
CONTENTS = "a little bit of data"
d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
def _created(node):
self.node = node
si = self.node.get_storage_index()
out = self._run_cli(["debug", "find-shares", base32.b2a(si),
self.clients[1].basedir])
files = out.split("\n")
# corrupt one of them, using the CLI debug command
f = files[0]
shnum = os.path.basename(f)
nodeid = self.clients[1].nodeid
nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
os.unlink(files[0])
d.addCallback(_created)
# now make sure the webapi checker notices it
def _do_check(res):
url = (self.webish_url +
"uri/%s" % urllib.quote(self.node.get_uri()) +
"?t=check&verify=false")
return getPage(url, method="POST")
d.addCallback(_do_check)
def _got_results(out):
self.failUnless("Not Healthy!" in out, out)
self.failUnless("Unhealthy: best recoverable version has only 9 shares (encoding is 3-of-10)" in out, out)
self.failIf("Corrupt Shares" in out, out)
d.addCallback(_got_results)
# now make sure the webapi repairer can fix it
def _do_repair(res):
url = (self.webish_url +
"uri/%s" % urllib.quote(self.node.get_uri()) +
"?t=check&verify=false&repair=true")
return getPage(url, method="POST")
d.addCallback(_do_repair)
def _got_repair_results(out):
self.failUnless("Repair attempted and successful" in out)
d.addCallback(_got_repair_results)
d.addCallback(_do_check)
def _got_postrepair_results(out):
self.failIf("Not Healthy!" in out, out)
self.failUnless("Recoverable Versions: 10*seq" in out)
d.addCallback(_got_postrepair_results)
return d