tahoe-lafs/src/allmydata/mutable/checker.py

313 lines
13 KiB
Python
Raw Normal View History

from twisted.internet import defer
from twisted.python import failure
from allmydata import hashtree
from allmydata.uri import from_string
from allmydata.util import hashutil, base32, idlib, log
from allmydata.check_results import CheckAndRepairResults, CheckerResults
from common import MODE_CHECK, CorruptShareError
from servermap import ServerMap, ServermapUpdater
from layout import unpack_share, SIGNED_PREFIX_LENGTH
class MutableChecker:
def __init__(self, node, monitor):
self._node = node
self._monitor = monitor
self.bad_shares = [] # list of (nodeid,shnum,failure)
self._storage_index = self._node.get_storage_index()
self.results = CheckerResults(from_string(node.get_uri()), self._storage_index)
self.need_repair = False
self.responded = set() # set of (binary) nodeids
def check(self, verify=False):
servermap = ServerMap()
u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK)
self._node._client.notify_mapupdate(u.get_status())
d = u.update()
d.addCallback(self._got_mapupdate_results)
if verify:
d.addCallback(self._verify_all_shares)
d.addCallback(lambda res: servermap)
d.addCallback(self._fill_checker_results, self.results)
d.addCallback(lambda res: self.results)
return d
def _got_mapupdate_results(self, servermap):
# the file is healthy if there is exactly one recoverable version, it
# has at least N distinct shares, and there are no unrecoverable
# versions: all existing shares will be for the same version.
self._monitor.raise_if_cancelled()
self.best_version = None
num_recoverable = len(servermap.recoverable_versions())
if num_recoverable:
self.best_version = servermap.best_recoverable_version()
if servermap.unrecoverable_versions():
self.need_repair = True
if num_recoverable != 1:
self.need_repair = True
if self.best_version:
available_shares = servermap.shares_available()
(num_distinct_shares, k, N) = available_shares[self.best_version]
if num_distinct_shares < N:
self.need_repair = True
return servermap
def _verify_all_shares(self, servermap):
# read every byte of each share
if not self.best_version:
return
versionmap = servermap.make_versionmap()
shares = versionmap[self.best_version]
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.best_version
offsets = dict(offsets_tuple)
readv = [ (0, offsets["EOF"]) ]
dl = []
for (shnum, peerid, timestamp) in shares:
ss = servermap.connections[peerid]
d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
d.addCallback(self._got_answer, peerid, servermap)
dl.append(d)
return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
def _do_read(self, ss, peerid, storage_index, shnums, readv):
# isolate the callRemote to a separate method, so tests can subclass
# Publish and override it
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
return d
def _got_answer(self, datavs, peerid, servermap):
for shnum,datav in datavs.items():
data = datav[0]
try:
self._got_results_one_share(shnum, peerid, data)
except CorruptShareError:
f = failure.Failure()
self.need_repair = True
self.bad_shares.append( (peerid, shnum, f) )
prefix = data[:SIGNED_PREFIX_LENGTH]
servermap.mark_bad_share(peerid, shnum, prefix)
ss = servermap.connections[peerid]
self.notify_server_corruption(ss, shnum, str(f.value))
def check_prefix(self, peerid, shnum, data):
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.best_version
got_prefix = data[:SIGNED_PREFIX_LENGTH]
if got_prefix != prefix:
raise CorruptShareError(peerid, shnum,
"prefix mismatch: share changed while we were reading it")
def _got_results_one_share(self, shnum, peerid, data):
self.check_prefix(peerid, shnum, data)
# the [seqnum:signature] pieces are validated by _compare_prefix,
# which checks their signature against the pubkey known to be
# associated with this file.
(seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
share_hash_chain, block_hash_tree, share_data,
enc_privkey) = unpack_share(data)
# validate [share_hash_chain,block_hash_tree,share_data]
leaves = [hashutil.block_hash(share_data)]
t = hashtree.HashTree(leaves)
if list(t) != block_hash_tree:
raise CorruptShareError(peerid, shnum, "block hash tree failure")
share_hash_leaf = t[0]
t2 = hashtree.IncompleteHashTree(N)
# root_hash was checked by the signature
t2.set_hashes({0: root_hash})
try:
t2.set_hashes(hashes=share_hash_chain,
leaves={shnum: share_hash_leaf})
except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
IndexError), e:
msg = "corrupt hashes: %s" % (e,)
raise CorruptShareError(peerid, shnum, msg)
# validate enc_privkey: only possible if we have a write-cap
if not self._node.is_readonly():
alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
if alleged_writekey != self._node.get_writekey():
raise CorruptShareError(peerid, shnum, "invalid privkey")
def notify_server_corruption(self, ss, shnum, reason):
ss.callRemoteOnly("advise_corrupt_share",
"mutable", self._storage_index, shnum, reason)
def _count_shares(self, smap, version):
available_shares = smap.shares_available()
(num_distinct_shares, k, N) = available_shares[version]
counters = {}
counters["count-shares-good"] = num_distinct_shares
counters["count-shares-needed"] = k
counters["count-shares-expected"] = N
good_hosts = smap.all_peers_for_version(version)
counters["count-good-share-hosts"] = len(good_hosts)
vmap = smap.make_versionmap()
counters["count-wrong-shares"] = sum([len(shares)
for verinfo,shares in vmap.items()
if verinfo != version])
return counters
def _fill_checker_results(self, smap, r):
self._monitor.raise_if_cancelled()
r.set_servermap(smap.copy())
healthy = True
data = {}
report = []
summary = []
vmap = smap.make_versionmap()
recoverable = smap.recoverable_versions()
unrecoverable = smap.unrecoverable_versions()
data["count-recoverable-versions"] = len(recoverable)
data["count-unrecoverable-versions"] = len(unrecoverable)
if recoverable:
report.append("Recoverable Versions: " +
"/".join(["%d*%s" % (len(vmap[v]),
smap.summarize_version(v))
for v in recoverable]))
if unrecoverable:
report.append("Unrecoverable Versions: " +
"/".join(["%d*%s" % (len(vmap[v]),
smap.summarize_version(v))
for v in unrecoverable]))
if smap.unrecoverable_versions():
healthy = False
summary.append("some versions are unrecoverable")
report.append("Unhealthy: some versions are unrecoverable")
if len(recoverable) == 0:
healthy = False
summary.append("no versions are recoverable")
report.append("Unhealthy: no versions are recoverable")
if len(recoverable) > 1:
healthy = False
summary.append("multiple versions are recoverable")
report.append("Unhealthy: there are multiple recoverable versions")
needs_rebalancing = False
if recoverable:
best_version = smap.best_recoverable_version()
report.append("Best Recoverable Version: " +
smap.summarize_version(best_version))
counters = self._count_shares(smap, best_version)
data.update(counters)
s = counters["count-shares-good"]
k = counters["count-shares-needed"]
N = counters["count-shares-expected"]
if s < N:
healthy = False
report.append("Unhealthy: best version has only %d shares "
"(encoding is %d-of-%d)" % (s, k, N))
summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
hosts = smap.all_peers_for_version(best_version)
needs_rebalancing = bool( len(hosts) < N )
elif unrecoverable:
healthy = False
# find a k and N from somewhere
first = list(unrecoverable)[0]
# not exactly the best version, but that doesn't matter too much
data.update(self._count_shares(smap, first))
# leave needs_rebalancing=False: the file being unrecoverable is
# the bigger problem
else:
# couldn't find anything at all
data["count-shares-good"] = 0
data["count-shares-needed"] = 3 # arbitrary defaults
data["count-shares-expected"] = 10
data["count-good-share-hosts"] = 0
data["count-wrong-shares"] = 0
if self.bad_shares:
data["count-corrupt-shares"] = len(self.bad_shares)
data["list-corrupt-shares"] = locators = []
report.append("Corrupt Shares:")
summary.append("Corrupt Shares:")
for (peerid, shnum, f) in sorted(self.bad_shares):
locators.append( (peerid, self._storage_index, shnum) )
s = "%s-sh%d" % (idlib.shortnodeid_b2a(peerid), shnum)
if f.check(CorruptShareError):
ft = f.value.reason
else:
ft = str(f)
report.append(" %s: %s" % (s, ft))
summary.append(s)
p = (peerid, self._storage_index, shnum, f)
r.problems.append(p)
msg = ("CorruptShareError during mutable verify, "
"peerid=%(peerid)s, si=%(si)s, shnum=%(shnum)d, "
"where=%(where)s")
log.msg(format=msg, peerid=idlib.nodeid_b2a(peerid),
si=base32.b2a(self._storage_index),
shnum=shnum,
where=ft,
level=log.WEIRD, umid="EkK8QA")
else:
data["count-corrupt-shares"] = 0
data["list-corrupt-shares"] = []
sharemap = {}
for verinfo in vmap:
for (shnum, peerid, timestamp) in vmap[verinfo]:
shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
if shareid not in sharemap:
sharemap[shareid] = []
sharemap[shareid].append(peerid)
data["sharemap"] = sharemap
data["servers-responding"] = list(smap.reachable_peers)
r.set_healthy(healthy)
r.set_recoverable(bool(recoverable))
r.set_needs_rebalancing(needs_rebalancing)
r.set_data(data)
if healthy:
r.set_summary("Healthy")
else:
r.set_summary("Unhealthy: " + " ".join(summary))
r.set_report(report)
class MutableCheckAndRepairer(MutableChecker):
def __init__(self, node, monitor):
MutableChecker.__init__(self, node, monitor)
self.cr_results = CheckAndRepairResults(self._storage_index)
self.cr_results.pre_repair_results = self.results
self.need_repair = False
def check(self, verify=False):
d = MutableChecker.check(self, verify)
d.addCallback(self._maybe_repair)
d.addCallback(lambda res: self.cr_results)
return d
def _maybe_repair(self, res):
self._monitor.raise_if_cancelled()
if not self.need_repair:
self.cr_results.post_repair_results = self.results
return
self.cr_results.repair_attempted = True
d = self._node.repair(self.results)
def _repair_finished(repair_results):
self.cr_results.repair_successful = True
r = CheckerResults(from_string(self._node.get_uri()), self._storage_index)
self.cr_results.post_repair_results = r
self._fill_checker_results(repair_results.servermap, r)
self.cr_results.repair_results = repair_results # TODO?
def _repair_error(f):
# I'm not sure if I want to pass through a failure or not.
self.cr_results.repair_successful = False
self.cr_results.repair_failure = f # TODO?
#self.cr_results.post_repair_results = ??
return f
d.addCallbacks(_repair_finished, _repair_error)
return d