mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-01 00:45:52 +00:00
mutable: train checker and repairer to work with MDMF mutable files
This commit is contained in:
parent
893a9028f0
commit
f80a7fdf18
@ -1,14 +1,11 @@
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.python import failure
|
||||
from allmydata import hashtree
|
||||
from allmydata.uri import from_string
|
||||
from allmydata.util import hashutil, base32, idlib, log
|
||||
from allmydata.util import base32, idlib, log
|
||||
from allmydata.check_results import CheckAndRepairResults, CheckResults
|
||||
|
||||
from allmydata.mutable.common import MODE_CHECK, CorruptShareError
|
||||
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
|
||||
from allmydata.mutable.layout import unpack_share, SIGNED_PREFIX_LENGTH
|
||||
from allmydata.mutable.retrieve import Retrieve # for verifying
|
||||
|
||||
class MutableChecker:
|
||||
|
||||
@ -25,6 +22,9 @@ class MutableChecker:
|
||||
|
||||
def check(self, verify=False, add_lease=False):
|
||||
servermap = ServerMap()
|
||||
# Updating the servermap in MODE_CHECK will stand a good chance
|
||||
# of finding all of the shares, and getting a good idea of
|
||||
# recoverability, etc, without verifying.
|
||||
u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
|
||||
servermap, MODE_CHECK, add_lease=add_lease)
|
||||
if self._history:
|
||||
@ -48,10 +48,14 @@ class MutableChecker:
|
||||
if num_recoverable:
|
||||
self.best_version = servermap.best_recoverable_version()
|
||||
|
||||
# The file is unhealthy and needs to be repaired if:
|
||||
# - There are unrecoverable versions.
|
||||
if servermap.unrecoverable_versions():
|
||||
self.need_repair = True
|
||||
# - There isn't a recoverable version.
|
||||
if num_recoverable != 1:
|
||||
self.need_repair = True
|
||||
# - The best recoverable version is missing some shares.
|
||||
if self.best_version:
|
||||
available_shares = servermap.shares_available()
|
||||
(num_distinct_shares, k, N) = available_shares[self.best_version]
|
||||
@ -62,89 +66,42 @@ class MutableChecker:
|
||||
|
||||
def _verify_all_shares(self, servermap):
|
||||
# read every byte of each share
|
||||
#
|
||||
# This logic is going to be very nearly the same as the
|
||||
# downloader. I bet we could pass the downloader a flag that
|
||||
# makes it do this, and piggyback onto that instead of
|
||||
# duplicating a bunch of code.
|
||||
#
|
||||
# Like:
|
||||
# r = Retrieve(blah, blah, blah, verify=True)
|
||||
# d = r.download()
|
||||
# (wait, wait, wait, d.callback)
|
||||
#
|
||||
# Then, when it has finished, we can check the servermap (which
|
||||
# we provided to Retrieve) to figure out which shares are bad,
|
||||
# since the Retrieve process will have updated the servermap as
|
||||
# it went along.
|
||||
#
|
||||
# By passing the verify=True flag to the constructor, we are
|
||||
# telling the downloader a few things.
|
||||
#
|
||||
# 1. It needs to download all N shares, not just K shares.
|
||||
# 2. It doesn't need to decrypt or decode the shares, only
|
||||
# verify them.
|
||||
if not self.best_version:
|
||||
return
|
||||
versionmap = servermap.make_versionmap()
|
||||
shares = versionmap[self.best_version]
|
||||
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
||||
offsets_tuple) = self.best_version
|
||||
offsets = dict(offsets_tuple)
|
||||
readv = [ (0, offsets["EOF"]) ]
|
||||
dl = []
|
||||
for (shnum, peerid, timestamp) in shares:
|
||||
ss = servermap.connections[peerid]
|
||||
d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
|
||||
d.addCallback(self._got_answer, peerid, servermap)
|
||||
dl.append(d)
|
||||
return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
|
||||
|
||||
def _do_read(self, ss, peerid, storage_index, shnums, readv):
|
||||
# isolate the callRemote to a separate method, so tests can subclass
|
||||
# Publish and override it
|
||||
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
|
||||
r = Retrieve(self._node, servermap, self.best_version, verify=True)
|
||||
d = r.download()
|
||||
d.addCallback(self._process_bad_shares)
|
||||
return d
|
||||
|
||||
def _got_answer(self, datavs, peerid, servermap):
|
||||
for shnum,datav in datavs.items():
|
||||
data = datav[0]
|
||||
try:
|
||||
self._got_results_one_share(shnum, peerid, data)
|
||||
except CorruptShareError:
|
||||
f = failure.Failure()
|
||||
self.need_repair = True
|
||||
self.bad_shares.append( (peerid, shnum, f) )
|
||||
prefix = data[:SIGNED_PREFIX_LENGTH]
|
||||
servermap.mark_bad_share(peerid, shnum, prefix)
|
||||
ss = servermap.connections[peerid]
|
||||
self.notify_server_corruption(ss, shnum, str(f.value))
|
||||
|
||||
def check_prefix(self, peerid, shnum, data):
|
||||
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
||||
offsets_tuple) = self.best_version
|
||||
got_prefix = data[:SIGNED_PREFIX_LENGTH]
|
||||
if got_prefix != prefix:
|
||||
raise CorruptShareError(peerid, shnum,
|
||||
"prefix mismatch: share changed while we were reading it")
|
||||
def _process_bad_shares(self, bad_shares):
|
||||
if bad_shares:
|
||||
self.need_repair = True
|
||||
self.bad_shares = bad_shares
|
||||
|
||||
def _got_results_one_share(self, shnum, peerid, data):
|
||||
self.check_prefix(peerid, shnum, data)
|
||||
|
||||
# the [seqnum:signature] pieces are validated by _compare_prefix,
|
||||
# which checks their signature against the pubkey known to be
|
||||
# associated with this file.
|
||||
|
||||
(seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
|
||||
share_hash_chain, block_hash_tree, share_data,
|
||||
enc_privkey) = unpack_share(data)
|
||||
|
||||
# validate [share_hash_chain,block_hash_tree,share_data]
|
||||
|
||||
leaves = [hashutil.block_hash(share_data)]
|
||||
t = hashtree.HashTree(leaves)
|
||||
if list(t) != block_hash_tree:
|
||||
raise CorruptShareError(peerid, shnum, "block hash tree failure")
|
||||
share_hash_leaf = t[0]
|
||||
t2 = hashtree.IncompleteHashTree(N)
|
||||
# root_hash was checked by the signature
|
||||
t2.set_hashes({0: root_hash})
|
||||
try:
|
||||
t2.set_hashes(hashes=share_hash_chain,
|
||||
leaves={shnum: share_hash_leaf})
|
||||
except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
|
||||
IndexError), e:
|
||||
msg = "corrupt hashes: %s" % (e,)
|
||||
raise CorruptShareError(peerid, shnum, msg)
|
||||
|
||||
# validate enc_privkey: only possible if we have a write-cap
|
||||
if not self._node.is_readonly():
|
||||
alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
|
||||
alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
|
||||
if alleged_writekey != self._node.get_writekey():
|
||||
raise CorruptShareError(peerid, shnum, "invalid privkey")
|
||||
|
||||
def notify_server_corruption(self, ss, shnum, reason):
|
||||
ss.callRemoteOnly("advise_corrupt_share",
|
||||
"mutable", self._storage_index, shnum, reason)
|
||||
|
||||
def _count_shares(self, smap, version):
|
||||
available_shares = smap.shares_available()
|
||||
|
@ -2,6 +2,7 @@
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from allmydata.interfaces import IRepairResults, ICheckResults
|
||||
from allmydata.mutable.publish import MutableData
|
||||
|
||||
class RepairResults:
|
||||
implements(IRepairResults)
|
||||
@ -104,6 +105,8 @@ class Repairer:
|
||||
raise RepairRequiresWritecapError("Sorry, repair currently requires a writecap, to set the write-enabler properly.")
|
||||
|
||||
d = self.node.download_version(smap, best_version, fetch_privkey=True)
|
||||
d.addCallback(lambda data:
|
||||
MutableData(data))
|
||||
d.addCallback(self.node.upload, smap)
|
||||
d.addCallback(self.get_results, smap)
|
||||
return d
|
||||
|
Loading…
x
Reference in New Issue
Block a user