2009-01-06 01:28:18 +00:00
|
|
|
from foolscap import DeadReferenceError
|
2009-02-18 02:32:43 +00:00
|
|
|
from twisted.internet import defer
|
2009-01-06 15:00:54 +00:00
|
|
|
from allmydata import hashtree
|
2009-01-10 01:00:52 +00:00
|
|
|
from allmydata.check_results import CheckResults
|
2008-07-16 20:14:39 +00:00
|
|
|
from allmydata.immutable import download
|
2009-01-06 01:28:18 +00:00
|
|
|
from allmydata.uri import CHKFileVerifierURI
|
2008-12-05 20:07:23 +00:00
|
|
|
from allmydata.util.assertutil import precondition
|
2009-01-12 18:00:22 +00:00
|
|
|
from allmydata.util import base32, deferredutil, dictutil, log, rrefutil
|
2009-02-18 02:32:43 +00:00
|
|
|
from allmydata.util.hashutil import file_renewal_secret_hash, \
|
|
|
|
file_cancel_secret_hash, bucket_renewal_secret_hash, \
|
|
|
|
bucket_cancel_secret_hash
|
2007-10-15 23:16:39 +00:00
|
|
|
|
2009-01-06 01:28:18 +00:00
|
|
|
from allmydata.immutable import layout
|
2007-10-15 23:16:39 +00:00
|
|
|
|
2009-01-06 01:28:18 +00:00
|
|
|
class Checker(log.PrefixingLogMixin):
|
2009-02-07 21:04:39 +00:00
|
|
|
"""I query all servers to see if M uniquely-numbered shares are
|
|
|
|
available.
|
|
|
|
|
|
|
|
If the verify flag was passed to my constructor, then for each share I
|
|
|
|
download every data block and all metadata from each server and perform a
|
|
|
|
cryptographic integrity check on all of it. If not, I just ask each
|
|
|
|
server 'Which shares do you have?' and believe its answer.
|
|
|
|
|
|
|
|
In either case, I wait until I have gotten responses from all servers.
|
|
|
|
This fact -- that I wait -- means that an ill-behaved server which fails
|
|
|
|
to answer my questions will make me wait indefinitely. If it is
|
|
|
|
ill-behaved in a way that triggers the underlying foolscap timeouts, then
|
|
|
|
I will wait only as long as those foolscap timeouts, but if it is
|
|
|
|
ill-behaved in a way which placates the foolscap timeouts but still
|
|
|
|
doesn't answer my question then I will wait indefinitely.
|
|
|
|
|
|
|
|
Before I send any new request to a server, I always ask the 'monitor'
|
|
|
|
object that was passed into my constructor whether this task has been
|
|
|
|
cancelled (by invoking its raise_if_cancelled() method).
|
2009-01-06 01:28:18 +00:00
|
|
|
"""
|
2009-02-18 02:32:43 +00:00
|
|
|
def __init__(self, client, verifycap, servers, verify, add_lease, monitor):
|
2009-01-06 01:28:18 +00:00
|
|
|
assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
|
|
|
|
assert precondition(isinstance(servers, (set, frozenset)), servers)
|
|
|
|
for (serverid, serverrref) in servers:
|
|
|
|
assert precondition(isinstance(serverid, str))
|
|
|
|
assert precondition(isinstance(serverrref, rrefutil.WrappedRemoteReference), serverrref)
|
|
|
|
|
|
|
|
prefix = "%s" % base32.b2a_l(verifycap.storage_index[:8], 60)
|
|
|
|
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
|
|
|
|
|
|
|
|
self._client = client
|
|
|
|
self._verifycap = verifycap
|
|
|
|
|
|
|
|
self._monitor = monitor
|
|
|
|
self._servers = servers
|
|
|
|
self._verify = verify # bool: verify what the servers claim, or not?
|
2009-02-18 02:32:43 +00:00
|
|
|
self._add_lease = add_lease
|
2009-01-06 01:28:18 +00:00
|
|
|
|
|
|
|
self._share_hash_tree = None
|
|
|
|
|
2009-02-18 02:32:43 +00:00
|
|
|
frs = file_renewal_secret_hash(client.get_renewal_secret(),
|
|
|
|
self._verifycap.storage_index)
|
|
|
|
self.file_renewal_secret = frs
|
|
|
|
fcs = file_cancel_secret_hash(client.get_cancel_secret(),
|
|
|
|
self._verifycap.storage_index)
|
|
|
|
self.file_cancel_secret = fcs
|
|
|
|
|
|
|
|
def _get_renewal_secret(self, peerid):
|
|
|
|
return bucket_renewal_secret_hash(self.file_renewal_secret, peerid)
|
|
|
|
def _get_cancel_secret(self, peerid):
|
|
|
|
return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
|
|
|
|
|
2009-01-06 01:28:18 +00:00
|
|
|
def _get_buckets(self, server, storageindex, serverid):
|
2009-02-07 21:04:39 +00:00
|
|
|
"""Return a deferred that eventually fires with ({sharenum: bucket},
|
|
|
|
serverid, success). In case the server is disconnected or returns a
|
|
|
|
Failure then it fires with ({}, serverid, False) (A server
|
|
|
|
disconnecting or returning a Failure when we ask it for buckets is
|
|
|
|
the same, for our purposes, as a server that says it has none, except
|
|
|
|
that we want to track and report whether or not each server
|
|
|
|
responded.)"""
|
2009-01-06 01:28:18 +00:00
|
|
|
|
|
|
|
d = server.callRemote("get_buckets", storageindex)
|
2009-02-18 02:32:43 +00:00
|
|
|
if self._add_lease:
|
|
|
|
renew_secret = self._get_renewal_secret(serverid)
|
|
|
|
cancel_secret = self._get_cancel_secret(serverid)
|
|
|
|
d2 = server.callRemote("add_lease", storageindex,
|
|
|
|
renew_secret, cancel_secret)
|
|
|
|
dl = defer.DeferredList([d, d2])
|
|
|
|
def _done(res):
|
|
|
|
[(get_success, get_result),
|
|
|
|
(addlease_success, addlease_result)] = res
|
|
|
|
if (not addlease_success and
|
|
|
|
not addlease_result.check(IndexError)):
|
|
|
|
# tahoe=1.3.0 raised IndexError on non-existant buckets,
|
|
|
|
# which we ignore. But report others, including the
|
|
|
|
# unfortunate internal KeyError bug that <1.3.0 had.
|
|
|
|
return addlease_result # propagate error
|
|
|
|
return get_result
|
|
|
|
dl.addCallback(_done)
|
|
|
|
d = dl
|
2009-01-06 01:28:18 +00:00
|
|
|
|
|
|
|
def _wrap_results(res):
|
|
|
|
for k in res:
|
|
|
|
res[k] = rrefutil.WrappedRemoteReference(res[k])
|
|
|
|
return (res, serverid, True)
|
|
|
|
|
|
|
|
def _trap_errs(f):
|
|
|
|
level = log.WEIRD
|
|
|
|
if f.check(DeadReferenceError):
|
|
|
|
level = log.UNUSUAL
|
|
|
|
self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ")
|
|
|
|
return ({}, serverid, False)
|
|
|
|
|
|
|
|
d.addCallbacks(_wrap_results, _trap_errs)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _download_and_verify(self, serverid, sharenum, bucket):
|
2009-02-07 21:04:39 +00:00
|
|
|
"""Start an attempt to download and verify every block in this bucket
|
|
|
|
and return a deferred that will eventually fire once the attempt
|
|
|
|
completes.
|
|
|
|
|
|
|
|
If you download and verify every block then fire with (True,
|
|
|
|
sharenum, None), else if the share data couldn't be parsed because it
|
|
|
|
was of an unknown version number fire with (False, sharenum,
|
|
|
|
'incompatible'), else if any of the blocks were invalid, fire with
|
|
|
|
(False, sharenum, 'corrupt'), else if the server disconnected (False,
|
|
|
|
sharenum, 'disconnect'), else if the server returned a Failure during
|
|
|
|
the process fire with (False, sharenum, 'failure').
|
|
|
|
|
|
|
|
If there is an internal error such as an uncaught exception in this
|
|
|
|
code, then the deferred will errback, but if there is a remote error
|
|
|
|
such as the server failing or the returned data being incorrect then
|
|
|
|
it will not errback -- it will fire normally with the indicated
|
|
|
|
results."""
|
2009-01-06 01:28:18 +00:00
|
|
|
|
|
|
|
b = layout.ReadBucketProxy(bucket, serverid, self._verifycap.storage_index)
|
|
|
|
veup = download.ValidatedExtendedURIProxy(b, self._verifycap)
|
|
|
|
d = veup.start()
|
|
|
|
|
|
|
|
def _errb(f):
|
2009-02-07 21:04:39 +00:00
|
|
|
# Okay, we didn't succeed at fetching and verifying all the
|
|
|
|
# blocks of this share. Now we need to handle different reasons
|
|
|
|
# for failure differently. If the failure isn't one of the
|
|
|
|
# following four classes then it will get re-raised.
|
|
|
|
failtype = f.trap(DeadReferenceError,
|
|
|
|
rrefutil.ServerFailure,
|
|
|
|
layout.LayoutInvalid,
|
|
|
|
layout.RidiculouslyLargeURIExtensionBlock,
|
2009-02-24 05:14:05 +00:00
|
|
|
layout.ShareVersionIncompatible,
|
2009-02-07 21:04:39 +00:00
|
|
|
download.BadOrMissingHash,
|
|
|
|
download.BadURIExtensionHashValue)
|
2009-01-06 01:28:18 +00:00
|
|
|
|
2009-02-24 05:14:05 +00:00
|
|
|
if f.check(DeadReferenceError):
|
2009-01-06 01:28:18 +00:00
|
|
|
return (False, sharenum, 'disconnect')
|
2009-02-24 05:14:05 +00:00
|
|
|
elif f.check(rrefutil.ServerFailure):
|
2009-01-06 01:28:18 +00:00
|
|
|
return (False, sharenum, 'failure')
|
2009-02-24 05:14:05 +00:00
|
|
|
elif f.check(layout.ShareVersionIncompatible):
|
2009-01-06 01:28:18 +00:00
|
|
|
return (False, sharenum, 'incompatible')
|
|
|
|
else:
|
|
|
|
return (False, sharenum, 'corrupt')
|
|
|
|
|
|
|
|
def _got_ueb(vup):
|
|
|
|
self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
|
|
|
|
self._share_hash_tree.set_hashes({0: vup.share_root_hash})
|
|
|
|
|
|
|
|
vrbp = download.ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, vup.num_segments, vup.block_size, vup.share_size)
|
|
|
|
|
|
|
|
ds = []
|
|
|
|
for blocknum in range(vup.num_segments):
|
|
|
|
def _discard_result(r):
|
|
|
|
assert isinstance(r, str), r
|
|
|
|
# to free up the RAM
|
|
|
|
return None
|
|
|
|
d2 = vrbp.get_block(blocknum)
|
|
|
|
d2.addCallback(_discard_result)
|
|
|
|
ds.append(d2)
|
|
|
|
|
|
|
|
dl = deferredutil.gatherResults(ds)
|
2009-02-07 21:04:39 +00:00
|
|
|
# dl will fire once every block of this share has been downloaded
|
|
|
|
# and verified, or else it will errback.
|
2009-01-06 01:28:18 +00:00
|
|
|
|
|
|
|
def _cb(result):
|
|
|
|
return (True, sharenum, None)
|
|
|
|
|
|
|
|
dl.addCallback(_cb)
|
|
|
|
return dl
|
|
|
|
|
|
|
|
d.addCallback(_got_ueb)
|
|
|
|
d.addErrback(_errb)
|
|
|
|
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _verify_server_shares(self, serverid, ss):
|
2009-02-07 21:04:39 +00:00
|
|
|
""" Return a deferred which eventually fires with a tuple of
|
|
|
|
(set(sharenum), serverid, set(corruptsharenum),
|
|
|
|
set(incompatiblesharenum), success) showing all the shares verified
|
|
|
|
to be served by this server, and all the corrupt shares served by the
|
|
|
|
server, and all the incompatible shares served by the server. In case
|
|
|
|
the server is disconnected or returns a Failure then it fires with
|
|
|
|
the last element False.
|
|
|
|
|
|
|
|
A server disconnecting or returning a failure when we ask it for
|
|
|
|
shares is the same, for our purposes, as a server that says it has
|
|
|
|
none or offers invalid ones, except that we want to track and report
|
|
|
|
the server's behavior. Similarly, the presence of corrupt shares is
|
|
|
|
mainly of use for diagnostics -- you can typically treat it as just
|
|
|
|
like being no share at all by just observing its absence from the
|
|
|
|
verified shares dict and ignoring its presence in the corrupt shares
|
|
|
|
dict.
|
|
|
|
|
|
|
|
The 'success' argument means whether the server responded to *any*
|
|
|
|
queries during this process, so if it responded to some queries and
|
|
|
|
then disconnected and ceased responding, or returned a failure, it is
|
|
|
|
still marked with the True flag for 'success'.
|
2009-01-06 01:28:18 +00:00
|
|
|
"""
|
|
|
|
d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
|
|
|
|
|
|
|
|
def _got_buckets(result):
|
|
|
|
bucketdict, serverid, success = result
|
|
|
|
|
|
|
|
shareverds = []
|
|
|
|
for (sharenum, bucket) in bucketdict.items():
|
|
|
|
d = self._download_and_verify(serverid, sharenum, bucket)
|
|
|
|
shareverds.append(d)
|
|
|
|
|
|
|
|
dl = deferredutil.gatherResults(shareverds)
|
|
|
|
|
|
|
|
def collect(results):
|
|
|
|
verified = set()
|
|
|
|
corrupt = set()
|
|
|
|
incompatible = set()
|
|
|
|
for succ, sharenum, whynot in results:
|
|
|
|
if succ:
|
|
|
|
verified.add(sharenum)
|
|
|
|
else:
|
|
|
|
if whynot == 'corrupt':
|
|
|
|
corrupt.add(sharenum)
|
|
|
|
elif whynot == 'incompatible':
|
|
|
|
incompatible.add(sharenum)
|
|
|
|
return (verified, serverid, corrupt, incompatible, success)
|
|
|
|
|
|
|
|
dl.addCallback(collect)
|
|
|
|
return dl
|
|
|
|
|
|
|
|
def _err(f):
|
|
|
|
f.trap(rrefutil.ServerFailure)
|
|
|
|
return (set(), serverid, set(), set(), False)
|
|
|
|
|
|
|
|
d.addCallbacks(_got_buckets, _err)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _check_server_shares(self, serverid, ss):
|
2009-02-07 21:04:39 +00:00
|
|
|
"""Return a deferred which eventually fires with a tuple of
|
|
|
|
(set(sharenum), serverid, set(), set(), responded) showing all the
|
|
|
|
shares claimed to be served by this server. In case the server is
|
|
|
|
disconnected then it fires with (set() serverid, set(), set(), False)
|
|
|
|
(a server disconnecting when we ask it for buckets is the same, for
|
|
|
|
our purposes, as a server that says it has none, except that we want
|
|
|
|
to track and report whether or not each server responded.)"""
|
2009-01-06 01:28:18 +00:00
|
|
|
def _curry_empty_corrupted(res):
|
|
|
|
buckets, serverid, responded = res
|
|
|
|
return (set(buckets), serverid, set(), set(), responded)
|
|
|
|
d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
|
|
|
|
d.addCallback(_curry_empty_corrupted)
|
2007-10-15 23:16:39 +00:00
|
|
|
return d
|
|
|
|
|
2009-01-06 01:28:18 +00:00
|
|
|
def _format_results(self, results):
|
2009-01-10 01:00:52 +00:00
|
|
|
cr = CheckResults(self._verifycap, self._verifycap.storage_index)
|
2009-01-06 01:28:18 +00:00
|
|
|
d = {}
|
|
|
|
d['count-shares-needed'] = self._verifycap.needed_shares
|
|
|
|
d['count-shares-expected'] = self._verifycap.total_shares
|
|
|
|
|
2009-01-12 18:00:22 +00:00
|
|
|
verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
|
2009-01-06 01:28:18 +00:00
|
|
|
servers = {} # {serverid: set(sharenums)}
|
|
|
|
corruptsharelocators = [] # (serverid, storageindex, sharenum)
|
|
|
|
incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
|
|
|
|
|
|
|
|
for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
|
|
|
|
servers.setdefault(thisserverid, set()).update(theseverifiedshares)
|
|
|
|
for sharenum in theseverifiedshares:
|
|
|
|
verifiedshares.setdefault(sharenum, set()).add(thisserverid)
|
|
|
|
for sharenum in thesecorruptshares:
|
|
|
|
corruptsharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
|
|
|
|
for sharenum in theseincompatibleshares:
|
|
|
|
incompatiblesharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
|
|
|
|
|
|
|
|
d['count-shares-good'] = len(verifiedshares)
|
|
|
|
d['count-good-share-hosts'] = len([s for s in servers.keys() if servers[s]])
|
|
|
|
|
|
|
|
assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
|
|
|
|
if len(verifiedshares) == self._verifycap.total_shares:
|
|
|
|
cr.set_healthy(True)
|
2009-01-09 03:01:45 +00:00
|
|
|
cr.set_summary("Healthy")
|
2008-09-10 00:15:46 +00:00
|
|
|
else:
|
2009-01-06 01:28:18 +00:00
|
|
|
cr.set_healthy(False)
|
2009-01-09 03:01:45 +00:00
|
|
|
cr.set_summary("Not Healthy: %d shares (enc %d-of-%d)" %
|
|
|
|
(len(verifiedshares),
|
|
|
|
self._verifycap.needed_shares,
|
|
|
|
self._verifycap.total_shares))
|
2009-01-06 01:28:18 +00:00
|
|
|
if len(verifiedshares) >= self._verifycap.needed_shares:
|
|
|
|
cr.set_recoverable(True)
|
|
|
|
d['count-recoverable-versions'] = 1
|
|
|
|
d['count-unrecoverable-versions'] = 0
|
2008-11-19 01:28:26 +00:00
|
|
|
else:
|
2009-01-06 01:28:18 +00:00
|
|
|
cr.set_recoverable(False)
|
|
|
|
d['count-recoverable-versions'] = 0
|
|
|
|
d['count-unrecoverable-versions'] = 1
|
|
|
|
|
|
|
|
d['servers-responding'] = list(servers)
|
|
|
|
d['sharemap'] = verifiedshares
|
2009-02-07 21:04:39 +00:00
|
|
|
# no such thing as wrong shares of an immutable file
|
|
|
|
d['count-wrong-shares'] = 0
|
2009-01-06 01:28:18 +00:00
|
|
|
d['list-corrupt-shares'] = corruptsharelocators
|
|
|
|
d['count-corrupt-shares'] = len(corruptsharelocators)
|
|
|
|
d['list-incompatible-shares'] = incompatiblesharelocators
|
|
|
|
d['count-incompatible-shares'] = len(incompatiblesharelocators)
|
2007-10-16 19:25:09 +00:00
|
|
|
|
2007-11-20 02:07:10 +00:00
|
|
|
|
2009-02-07 21:04:39 +00:00
|
|
|
# The file needs rebalancing if the set of servers that have at least
|
|
|
|
# one share is less than the number of uniquely-numbered shares
|
|
|
|
# available.
|
2009-01-06 01:28:18 +00:00
|
|
|
cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
|
|
|
|
|
|
|
|
cr.set_data(d)
|
|
|
|
|
|
|
|
return cr
|
2007-10-16 19:25:09 +00:00
|
|
|
|
|
|
|
def start(self):
|
2009-01-06 01:28:18 +00:00
|
|
|
ds = []
|
|
|
|
if self._verify:
|
|
|
|
for (serverid, ss) in self._servers:
|
|
|
|
ds.append(self._verify_server_shares(serverid, ss))
|
|
|
|
else:
|
|
|
|
for (serverid, ss) in self._servers:
|
|
|
|
ds.append(self._check_server_shares(serverid, ss))
|
2007-10-22 23:19:18 +00:00
|
|
|
|
2009-01-06 01:28:18 +00:00
|
|
|
return deferredutil.gatherResults(ds).addCallback(self._format_results)
|