immutable repairer

This implements an immutable repairer by marrying a CiphertextDownloader to a CHKUploader.  It extends the IDownloadTarget interface so that the downloader can provide some metadata that the uploader requires.
The processing is incremental -- it uploads the first segments before it finishes downloading the whole file.  This is necessary so that you can repair large files without running out of RAM or using a temporary file on the repairer.
It requires only a verifycap, not a readcap.  That is: it doesn't need or use the decryption key, only the integrity check codes.
There are several tests marked TODO and several instances of XXX in the source code.  I intend to open tickets to document further improvements to functionality and testing, but the current version is probably good enough for Tahoe-1.3.0.
This commit is contained in:
Zooko O'Whielacronx 2009-01-12 11:00:22 -07:00
parent e449052a17
commit 25063688b4
9 changed files with 824 additions and 155 deletions

View File

@ -4,7 +4,7 @@ from allmydata.check_results import CheckResults
from allmydata.immutable import download
from allmydata.uri import CHKFileVerifierURI
from allmydata.util.assertutil import precondition
from allmydata.util import base32, deferredutil, log, rrefutil
from allmydata.util import base32, deferredutil, dictutil, log, rrefutil
from allmydata.immutable import layout
@ -207,7 +207,7 @@ class Checker(log.PrefixingLogMixin):
d['count-shares-needed'] = self._verifycap.needed_shares
d['count-shares-expected'] = self._verifycap.total_shares
verifiedshares = {} # {sharenum: set(serverid)}
verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
servers = {} # {serverid: set(sharenums)}
corruptsharelocators = [] # (serverid, storageindex, sharenum)
incompatiblesharelocators = [] # (serverid, storageindex, sharenum)

View File

@ -6,7 +6,7 @@ from twisted.application import service
from foolscap import DeadReferenceError
from foolscap.eventual import eventually
from allmydata.util import base32, deferredutil, mathutil, hashutil, log
from allmydata.util import base32, deferredutil, hashutil, log, mathutil, observer
from allmydata.util.assertutil import _assert, precondition
from allmydata.util.rrefutil import ServerFailure
from allmydata import codec, hashtree, uri
@ -51,6 +51,7 @@ class DecryptingTarget(log.PrefixingLogMixin):
self._decryptor = AES(key)
prefix = str(target)
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
# methods to satisfy the IConsumer interface
def registerProducer(self, producer, streaming):
if IConsumer.providedBy(self.target):
self.target.registerProducer(producer, streaming)
@ -66,6 +67,13 @@ class DecryptingTarget(log.PrefixingLogMixin):
self.target.close()
def finish(self):
return self.target.finish()
# The following methods is just to pass through to the next target, and just because that
# target might be a repairer.DownUpConnector, and just because the current CHKUpload object
# expects to find the storage index in its Uploadable.
def set_storageindex(self, storageindex):
self.target.set_storageindex(storageindex)
def set_encodingparams(self, encodingparams):
self.target.set_encodingparams(encodingparams)
class ValidatedThingObtainer:
def __init__(self, validatedthingproxies, debugname, log_id):
@ -617,14 +625,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
precondition(IVerifierURI.providedBy(v), v)
precondition(IDownloadTarget.providedBy(target), target)
prefix=base32.b2a_l(v.get_storage_index()[:8], 60)
prefix=base32.b2a_l(v.storage_index[:8], 60)
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
self._client = client
self._verifycap = v
self._storage_index = v.get_storage_index()
self._storage_index = v.storage_index
self._uri_extension_hash = v.uri_extension_hash
self._vup = None # ValidatedExtendedURIProxy
self._started = time.time()
self._status = s = DownloadStatus()
@ -649,6 +656,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
if IConsumer.providedBy(target):
target.registerProducer(self, True)
self._target = target
self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
self._monitor = monitor
self._opened = False
@ -667,6 +675,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
# self._crypttext_hash_tree
# self._share_hash_tree
# self._current_segnum = 0
# self._vup # ValidatedExtendedURIProxy
def pauseProducing(self):
if self._paused:
@ -834,6 +843,14 @@ class CiphertextDownloader(log.PrefixingLogMixin):
self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
# Repairer (uploader) needs the encodingparams.
self._target.set_encodingparams((
self._verifycap.needed_shares,
self._verifycap.total_shares, # I don't think the target actually cares about "happy".
self._verifycap.total_shares,
self._vup.segment_size
))
d.addCallback(_got_uri_extension)
return d
@ -1045,6 +1062,13 @@ class FileName:
pass # we won't use it
def finish(self):
pass
# The following methods are just because the target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
pass
class Data:
implements(IDownloadTarget)
@ -1063,6 +1087,13 @@ class Data:
pass # we won't use it
def finish(self):
return self.data
# The following methods are just because the target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
pass
class FileHandle:
"""Use me to download data to a pre-defined filehandle-like object. I
@ -1086,6 +1117,13 @@ class FileHandle:
pass
def finish(self):
return self._filehandle
# The following methods are just because the target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
pass
class ConsumerAdapter:
implements(IDownloadTarget, IConsumer)
@ -1110,6 +1148,13 @@ class ConsumerAdapter:
pass
def finish(self):
return self._consumer
# The following methods are just because the target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
pass
class Downloader(service.MultiService):

View File

@ -1,5 +1,4 @@
import os.path, stat
import copy, os.path, stat
from cStringIO import StringIO
from zope.interface import implements
from twisted.internet import defer
@ -7,12 +6,12 @@ from twisted.internet.interfaces import IPushProducer, IConsumer
from twisted.protocols import basic
from foolscap.eventual import eventually
from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
IDownloadTarget
from allmydata.util import log, base32
IDownloadTarget, IUploadResults
from allmydata.util import dictutil, log, base32
from allmydata.util.assertutil import precondition
from allmydata import uri as urimodule
from allmydata.immutable.checker import Checker
from allmydata.check_results import CheckAndRepairResults
from allmydata.check_results import CheckResults, CheckAndRepairResults
from allmydata.immutable.repairer import Repairer
from allmydata.immutable import download
@ -167,7 +166,13 @@ class DownloadCache:
pass
def finish(self):
return None
# The following methods are just because the target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
pass
class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
@ -203,10 +208,26 @@ class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
crr.post_repair_results = cr
return defer.succeed(crr)
else:
def _gather_repair_results(rr):
crr.post_repair_results = rr
def _gather_repair_results(ur):
assert IUploadResults.providedBy(ur), ur
# clone the cr -- check results to form the basic of the prr -- post-repair results
prr = CheckResults(cr.uri, cr.storage_index)
prr.data = copy.deepcopy(cr.data)
sm = prr.data['sharemap']
assert isinstance(sm, dictutil.DictOfSets), sm
sm.update(ur.sharemap)
servers_responding = set(prr.data['servers-responding'])
servers_responding.union(ur.sharemap.iterkeys())
prr.data['servers-responding'] = list(servers_responding)
prr.data['count-shares-good'] = len(sm)
prr.data['count-good-share-hosts'] = len(sm)
prr.set_healthy(len(sm) >= self.u.total_shares)
prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
crr.post_repair_results = prr
return crr
r = Repairer(client=self._client, verifycap=verifycap, servers=servers, monitor=monitor)
r = Repairer(client=self._client, verifycap=verifycap, monitor=monitor)
d = r.start()
d.addCallback(_gather_repair_results)
return d

View File

@ -1,43 +1,23 @@
from zope.interface import implements
from twisted.internet import defer
from allmydata import storage
from allmydata.check_results import CheckResults, CheckAndRepairResults
from allmydata.immutable import download
from allmydata.util import nummedobj
from allmydata.util import log, observer
from allmydata.util.assertutil import precondition
from allmydata.uri import CHKFileVerifierURI
from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
from twisted.internet.interfaces import IConsumer
from allmydata.immutable import layout
from allmydata.immutable import download, layout, upload
import sha, time
import collections
def _permute_servers(servers, key):
return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
class LogMixin(nummedobj.NummedObj):
def __init__(self, client, verifycap):
nummedobj.NummedObj.__init__(self)
self._client = client
self._verifycap = verifycap
self._storageindex = self._verifycap.storage_index
self._log_prefix = prefix = storage.si_b2a(self._storageindex)[:5]
self._parentmsgid = self._client.log("%s(%s): starting" % (self.__repr__(), self._log_prefix))
def log(self, msg, parent=None, *args, **kwargs):
if parent is None:
parent = self._parentmsgid
return self._client.log("%s(%s): %s" % (self.__repr__(), self._log_prefix, msg), parent=parent, *args, **kwargs)
class Repairer(LogMixin):
class Repairer(log.PrefixingLogMixin):
""" I generate any shares which were not available and upload them to servers.
Which servers? Well, I take the list of servers and if I used the Checker in verify mode
then I exclude any servers which claimed to have a share but then either failed to serve it
up or served up a corrupted one when I asked for it. (If I didn't use verify mode, then I
won't exclude any servers, not even servers which, when I subsequently attempt to download
the file during repair, claim to have a share but then fail to produce it or then produce a
corrupted share.) Then I perform the normal server-selection process of permuting the order
of the servers with the storage index, and choosing the next server which doesn't already
have more shares than others.
Which servers? Well, I just use the normal upload process, so any servers that will take
shares. In fact, I even believe servers if they say that they already have shares even if
attempts to download those shares would fail because the shares are corrupted.
My process of uploading replacement shares proceeds in a segment-wise fashion -- first I ask
servers if they can hold the new shares, and wait until enough have agreed then I download
@ -47,120 +27,179 @@ class Repairer(LogMixin):
way in order to minimize the amount of downloading I have to do and the amount of memory I
have to use at any one time.)
If any of the servers to which I am uploading replacement shares fails to accept the blocks
during this process, then I just stop using that server, abandon any share-uploads that were
going to that server, and proceed to finish uploading the remaining shares to their
respective servers. At the end of my work, I produce an object which satisfies the
ICheckAndRepairResults interface (by firing the deferred that I returned from start() and
If any of the servers to which I am uploading replacement shares fails to accept the blocks
during this process, then I just stop using that server, abandon any share-uploads that were
going to that server, and proceed to finish uploading the remaining shares to their
respective servers. At the end of my work, I produce an object which satisfies the
ICheckAndRepairResults interface (by firing the deferred that I returned from start() and
passing that check-and-repair-results object).
Before I send any new request to a server, I always ask the "monitor" object that was passed
into my constructor whether this task has been cancelled (by invoking its
raise_if_cancelled() method).
"""
def __init__(self, client, verifycap, servers, monitor):
def __init__(self, client, verifycap, monitor):
assert precondition(isinstance(verifycap, CHKFileVerifierURI))
assert precondition(isinstance(servers, (set, frozenset)))
for (serverid, serverrref) in servers:
assert precondition(isinstance(serverid, str))
LogMixin.__init__(self, client, verifycap)
logprefix = storage.si_b2a(verifycap.storage_index)[:5]
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", prefix=logprefix)
self._client = client
self._verifycap = verifycap
self._monitor = monitor
self._servers = servers
def start(self):
self.log("starting download")
d = defer.succeed(_permute_servers(self._servers, self._storageindex))
d.addCallback(self._check_phase)
d.addCallback(self._repair_phase)
self.log("starting repair")
duc = DownUpConnector()
dl = download.CiphertextDownloader(self._client, self._verifycap, target=duc, monitor=self._monitor)
ul = upload.CHKUploader(self._client)
d = defer.Deferred()
# If the upload or the download fails or is stopped, then the repair failed.
def _errb(f):
d.errback(f)
return None
# If the upload succeeds, then the repair has succeeded.
def _cb(res):
d.callback(res)
ul.start(duc).addCallbacks(_cb, _errb)
# If the download fails or is stopped, then the repair failed.
d2 = dl.start()
d2.addErrback(_errb)
# We ignore the callback from d2. Is this right? Ugh.
return d
def _check_phase(self, unused=None):
return unused
class DownUpConnector(log.PrefixingLogMixin):
implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
""" I act like an "encrypted uploadable" -- something that a local uploader can read
ciphertext from in order to upload the ciphertext. However, unbeknownst to the uploader,
I actually download the ciphertext from a CiphertextDownloader instance as it is needed.
def _repair_phase(self, unused=None):
bogusresults = CheckAndRepairResults(self._storageindex) # XXX THIS REPAIRER NOT HERE YET
bogusresults.pre_repair_results = CheckResults(self._verifycap, self._storageindex)
bogusresults.pre_repair_results.set_healthy(True)
bogusresults.pre_repair_results.set_needs_rebalancing(False)
bogusresults.post_repair_results = CheckResults(self._verifycap, self._storageindex)
bogusresults.post_repair_results.set_healthy(True)
bogusresults.post_repair_results.set_needs_rebalancing(False)
bogusdata = {}
bogusdata['count-shares-good'] = "this repairer not here yet"
bogusdata['count-shares-needed'] = "this repairer not here yet"
bogusdata['count-shares-expected'] = "this repairer not here yet"
bogusdata['count-good-share-hosts'] = "this repairer not here yet"
bogusdata['count-corrupt-shares'] = "this repairer not here yet"
bogusdata['count-list-corrupt-shares'] = [] # XXX THIS REPAIRER NOT HERE YET
bogusdata['servers-responding'] = [] # XXX THIS REPAIRER NOT HERE YET
bogusdata['sharemap'] = {} # XXX THIS REPAIRER NOT HERE YET
bogusdata['count-wrong-shares'] = "this repairer not here yet"
bogusdata['count-recoverable-versions'] = "this repairer not here yet"
bogusdata['count-unrecoverable-versions'] = "this repairer not here yet"
bogusresults.pre_repair_results.data.update(bogusdata)
bogusresults.post_repair_results.data.update(bogusdata)
return bogusresults
On the other hand, I act like a "download target" -- something that a local downloader can
write ciphertext to as it downloads the ciphertext. That downloader doesn't realize, of
course, that I'm just turning around and giving the ciphertext to the uploader. """
def _get_all_shareholders(self, ignored=None):
dl = []
for (peerid,ss) in self._client.get_permuted_peers("storage",
self._storageindex):
d = ss.callRemote("get_buckets", self._storageindex)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,))
dl.append(d)
self._responses_received = 0
self._queries_sent = len(dl)
if self._status:
self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received,
self._queries_sent))
return defer.DeferredList(dl)
# The theory behind this class is nice: just satisfy two separate interfaces. The
# implementation is slightly horrible, because of "impedance mismatch" -- the downloader
# expects to be able to synchronously push data in, and the uploader expects to be able to
# read data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred. The two
# interfaces have different APIs for pausing/unpausing. The uploader requests metadata like
# size and encodingparams which the downloader provides either eventually or not at all
# (okay I just now extended the downloader to provide encodingparams). Most of this
# slightly horrible code would disappear if CiphertextDownloader just used this object as an
# IConsumer (plus maybe a couple of other methods) and if the Uploader simply expected to be
# treated as an IConsumer (plus maybe a couple of other things).
def _got_response(self, buckets, peerid):
self._responses_received += 1
if self._results:
elapsed = time.time() - self._started
self._results.timings["servers_peer_selection"][peerid] = elapsed
if self._status:
self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received,
self._queries_sent))
for sharenum, bucket in buckets.iteritems():
b = layout.ReadBucketProxy(bucket, peerid, self._si_s)
self.add_share_bucket(sharenum, b)
self._uri_extension_sources.append(b)
if self._results:
if peerid not in self._results.servermap:
self._results.servermap[peerid] = set()
self._results.servermap[peerid].add(sharenum)
def __init__(self, buflim=2**19):
""" If we're already holding at least buflim bytes, then tell the downloader to pause
until we have less than buflim bytes."""
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
self.buflim = buflim
self.bufs = collections.deque() # list of strings
self.bufsiz = 0 # how many bytes total in bufs
def _got_all_shareholders(self, res):
if self._results:
now = time.time()
self._results.timings["peer_selection"] = now - self._started
self.next_read_ds = collections.deque() # list of deferreds which will fire with the requested ciphertext
self.next_read_lens = collections.deque() # how many bytes of ciphertext were requested by each deferred
if len(self._share_buckets) < self._num_needed_shares:
raise download.NotEnoughSharesError
self._size_osol = observer.OneShotObserverList()
self._encodingparams_osol = observer.OneShotObserverList()
self._storageindex_osol = observer.OneShotObserverList()
self._closed_to_pusher = False
def _verify_done(self, ignored):
# TODO: The following results are just stubs, and need to be replaced
# with actual values. These exist to make things like deep-check not
# fail. XXX
self._check_results.set_needs_rebalancing(False)
N = self._total_shares
data = {
"count-shares-good": N,
"count-good-share-hosts": N,
"count-corrupt-shares": 0,
"list-corrupt-shares": [],
"servers-responding": [],
"sharemap": {},
"count-wrong-shares": 0,
"count-recoverable-versions": 1,
"count-unrecoverable-versions": 0,
}
self._check_results.set_data(data)
return self._check_results
# once seg size is available, the following attribute will be created to hold it:
# self.encodingparams # (provided by the object which is pushing data into me, required
# by the object which is pulling data out of me)
# open() will create the following attribute:
# self.size # size of the whole file (provided by the object which is pushing data into
# me, required by the object which is pulling data out of me)
# set_upload_status() will create the following attribute:
# self.upload_status # XXX do we need to actually update this? Is anybody watching the
# results during a repair?
def _satisfy_reads_if_possible(self):
assert bool(self.next_read_ds) == bool(self.next_read_lens)
while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0]) or self._closed_to_pusher):
nrd = self.next_read_ds.popleft()
nrl = self.next_read_lens.popleft()
# Pick out the requested number of bytes from self.bufs, turn it into a string, and
# callback the deferred with that.
res = []
ressize = 0
while ressize < nrl and self.bufs:
nextbuf = self.bufs.popleft()
res.append(nextbuf)
ressize += len(nextbuf)
if ressize > nrl:
leftover = ressize - nrl
self.bufs.appendleft(nextbuf[leftover:])
res[-1] = nextbuf[:leftover]
self.bufsiz -= nrl
if self.bufsiz < self.buflim and self.producer:
self.producer.resumeProducing()
nrd.callback(res)
# methods to satisfy the IConsumer and IDownloadTarget interfaces
# (From the perspective of a downloader I am an IDownloadTarget and an IConsumer.)
def registerProducer(self, producer, streaming):
assert streaming # We know how to handle only streaming producers.
self.producer = producer # the downloader
def unregisterProducer(self):
self.producer = None
def open(self, size):
self.size = size
self._size_osol.fire(self.size)
def set_encodingparams(self, encodingparams):
self.encodingparams = encodingparams
self._encodingparams_osol.fire(self.encodingparams)
def set_storageindex(self, storageindex):
self.storageindex = storageindex
self._storageindex_osol.fire(self.storageindex)
def write(self, data):
self.bufs.append(data)
self.bufsiz += len(data)
self._satisfy_reads_if_possible()
if self.bufsiz >= self.buflim and self.producer:
self.producer.pauseProducing()
def finish(self):
pass
def close(self):
self._closed_to_pusher = True
# methods to satisfy the IEncryptedUploader interface
# (From the perspective of an uploader I am an IEncryptedUploadable.)
def set_upload_status(self, upload_status):
self.upload_status = upload_status
def get_size(self):
if hasattr(self, 'size'): # attribute created by self.open()
return defer.succeed(self.size)
else:
return self._size_osol.when_fired()
def get_all_encoding_parameters(self):
# We have to learn the encoding params from pusher.
if hasattr(self, 'encodingparams'): # attribute created by self.set_encodingparams()
return defer.succeed(self.encodingparams)
else:
return self._encodingparams_osol.when_fired()
def read_encrypted(self, length, hash_only):
""" Returns a deferred which eventually fired with the requested ciphertext. """
d = defer.Deferred()
self.next_read_ds.append(d)
self.next_read_lens.append(length)
self._satisfy_reads_if_possible()
return d
def get_storage_index(self):
# We have to learn the storage index from pusher.
if hasattr(self, 'storageindex'): # attribute created by self.set_storageindex()
return defer.succeed(self.storageindex)
else:
return self._storageindex.when_fired()

View File

@ -12,7 +12,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
storage_index_hash, plaintext_segment_hasher, convergence_hasher
from allmydata import storage, hashtree, uri
from allmydata.immutable import encode
from allmydata.util import base32, idlib, log, mathutil
from allmydata.util import base32, dictutil, idlib, log, mathutil
from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import get_versioned_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
@ -48,8 +48,8 @@ class UploadResults(Copyable, RemoteCopy):
def __init__(self):
self.timings = {} # dict of name to number of seconds
self.sharemap = {} # k: shnum, v: set(serverid)
self.servermap = {} # k: serverid, v: set(shnum)
self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
self.file_size = None
self.ciphertext_fetched = None # how much the helper fetched
self.uri = None
@ -758,8 +758,8 @@ class CHKUploader:
peer_tracker = self._peer_trackers[shnum]
peerid = peer_tracker.peerid
peerid_s = idlib.shortnodeid_b2a(peerid)
r.sharemap.setdefault(shnum, set()).add(peerid)
r.servermap.setdefault(peerid, set()).add(shnum)
r.sharemap.add(shnum, peerid)
r.servermap.add(peerid, shnum)
r.pushed_shares = len(self._encoder.get_shares_placed())
now = time.time()
r.file_size = self._encoder.file_size

View File

@ -1282,6 +1282,13 @@ class IDownloadTarget(Interface):
called. Whatever it returns will be returned to the invoker of
Downloader.download.
"""
# The following methods are just because that target might be a repairer.DownUpConnector,
# and just because the current CHKUpload object expects to find the storage index and
# encoding parameters in its Uploadable.
def set_storageindex(storageindex):
""" Set the storage index. """
def set_encodingparams(encodingparams):
""" Set the encoding parameters. """
class IDownloader(Interface):
def download(uri, target):

View File

@ -994,6 +994,13 @@ class ShareManglingMixin(SystemTestMixin):
sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
return sum_of_allocate_counts
def _count_writes(self):
sum_of_write_counts = 0
for thisclient in self.clients:
counters = thisclient.stats_provider.get_stats()['counters']
sum_of_write_counts += counters.get('storage_server.write', 0)
return sum_of_write_counts
def _download_and_check_plaintext(self, unused=None):
self.downloader = self.clients[1].getServiceNamed("downloader")
d = self.downloader.download_to_data(self.uri)

View File

@ -32,7 +32,7 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
def _then_download(unused=None):
self.downloader = self.clients[1].getServiceNamed("downloader")
d = self.downloader.download_to_data(self.uri)
d2 = self.downloader.download_to_data(self.uri)
def _after_download_callb(result):
self.fail() # should have gotten an errback instead
@ -40,7 +40,8 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
def _after_download_errb(failure):
failure.trap(NotEnoughSharesError)
return None # success!
d.addCallbacks(_after_download_callb, _after_download_errb)
d2.addCallbacks(_after_download_callb, _after_download_errb)
return d2
d.addCallback(_then_download)
return d
@ -99,14 +100,14 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
before_download_reads = self._count_reads()
def _attempt_to_download(unused=None):
downloader = self.clients[1].getServiceNamed("downloader")
d = downloader.download_to_data(self.uri)
d2 = downloader.download_to_data(self.uri)
def _callb(res):
self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
def _errb(f):
self.failUnless(f.check(NotEnoughSharesError))
d.addCallbacks(_callb, _errb)
return d
d2.addCallbacks(_callb, _errb)
return d2
d.addCallback(_attempt_to_download)
@ -133,14 +134,14 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
before_download_reads = self._count_reads()
def _attempt_to_download(unused=None):
downloader = self.clients[1].getServiceNamed("downloader")
d = downloader.download_to_data(self.uri)
d2 = downloader.download_to_data(self.uri)
def _callb(res):
self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
def _errb(f):
self.failUnless(f.check(NotEnoughSharesError))
d.addCallbacks(_callb, _errb)
return d
d2.addCallbacks(_callb, _errb)
return d2
d.addCallback(_attempt_to_download)

View File

@ -0,0 +1,549 @@
from allmydata.test import common
from allmydata.monitor import Monitor
from allmydata import check_results
from allmydata.interfaces import IURI, NotEnoughSharesError
from allmydata.immutable import upload
from allmydata.util import hashutil, log
from twisted.internet import defer
from twisted.trial import unittest
import random, struct
import common_util as testutil
READ_LEEWAY = 18 # We'll allow you to pass this test even if you trigger eighteen times as many disk reads and block fetches as would be optimal.
DELTA_READS = 10 * READ_LEEWAY # N = 10
class Verifier(common.ShareManglingMixin, unittest.TestCase):
def test_check_without_verify(self):
""" Check says the file is healthy when none of the shares have been touched. It says
that the file is unhealthy when all of them have been removed. It doesn't use any reads.
"""
d = defer.succeed(self.filenode)
def _check1(filenode):
before_check_reads = self._count_reads()
d2 = filenode.check(Monitor(), verify=False)
def _after_check(checkresults):
after_check_reads = self._count_reads()
self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
self.failUnless(checkresults.is_healthy())
d2.addCallback(_after_check)
return d2
d.addCallback(_check1)
d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
def _check2(ignored):
before_check_reads = self._count_reads()
d2 = self.filenode.check(Monitor(), verify=False)
def _after_check(checkresults):
after_check_reads = self._count_reads()
self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
self.failIf(checkresults.is_healthy())
d2.addCallback(_after_check)
return d2
d.addCallback(_check2)
return d
def _help_test_verify(self, corruptor_funcs, judgement_func):
d = defer.succeed(None)
d.addCallback(self.find_shares)
stash = [None]
def _stash_it(res):
stash[0] = res
return res
d.addCallback(_stash_it)
def _put_it_all_back(ignored):
self.replace_shares(stash[0], storage_index=self.uri.storage_index)
return ignored
def _verify_after_corruption(corruptor_func):
before_check_reads = self._count_reads()
d2 = self.filenode.check(Monitor(), verify=True)
def _after_check(checkresults):
after_check_reads = self._count_reads()
self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads))
try:
return judgement_func(checkresults)
except Exception, le:
le.args = tuple(le.args + ("corruptor_func: " + corruptor_func.__name__,))
raise
d2.addCallback(_after_check)
return d2
for corruptor_func in corruptor_funcs:
d.addCallback(self._corrupt_a_random_share, corruptor_func)
d.addCallback(_verify_after_corruption)
d.addCallback(_put_it_all_back)
return d
def test_verify_no_problem(self):
""" Verify says the file is healthy when none of the shares have been touched in a way
that matters. It doesn't use more than seven times as many reads as it needs."""
def judge(checkresults):
self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 10, data)
self.failUnless(len(data['sharemap']) == 10, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['list-corrupt-shares']) == 0, data)
return self._help_test_verify([
common._corrupt_nothing,
common._corrupt_size_of_file_data,
common._corrupt_size_of_sharedata,
common._corrupt_segment_size, ], judge)
def test_verify_server_visible_corruption(self):
""" Corruption which is detected by the server means that the server will send you back
a Failure in response to get_bucket instead of giving you the share data. Test that
verifier handles these answers correctly. It doesn't use more than seven times as many
reads as it needs."""
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
# The server might fail to serve up its other share as well as the corrupted
# one, so count-shares-good could be 8 or 9.
self.failUnless(data['count-shares-good'] in (8, 9), data)
self.failUnless(len(data['sharemap']) in (8, 9,), data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
# The server may have served up the non-corrupted share, or it may not have, so
# the checker could have detected either 4 or 5 good servers.
self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
self.failUnless(len(data['servers-responding']) in (4, 5), data)
# If the server served up the other share, then the checker should consider it good, else it should
# not.
self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
self.failUnless(len(data['list-corrupt-shares']) == 0, data)
return self._help_test_verify([
common._corrupt_file_version_number,
], judge)
def test_verify_share_incompatibility(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(len(data['sharemap']) == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
return self._help_test_verify([
common._corrupt_sharedata_version_number,
], judge)
def test_verify_server_invisible_corruption(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(data['count-corrupt-shares'] == 1, (data,))
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['sharemap']) == 9, data)
return self._help_test_verify([
common._corrupt_offset_of_sharedata,
common._corrupt_offset_of_uri_extension,
common._corrupt_offset_of_uri_extension_to_force_short_read,
common._corrupt_share_data,
common._corrupt_length_of_uri_extension,
common._corrupt_uri_extension,
], judge)
def test_verify_server_invisible_corruption_offset_of_block_hashtree_to_truncate_crypttext_hashtree_TODO(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(data['count-corrupt-shares'] == 1, (data,))
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['sharemap']) == 9, data)
return self._help_test_verify([
common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
], judge)
test_verify_server_invisible_corruption_offset_of_block_hashtree_to_truncate_crypttext_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
def test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(data['count-corrupt-shares'] == 1, (data,))
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['sharemap']) == 9, data)
return self._help_test_verify([
common._corrupt_offset_of_block_hashes,
], judge)
test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
def test_verify_server_invisible_corruption_sharedata_plausible_version(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(data['count-corrupt-shares'] == 1, (data,))
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['sharemap']) == 9, data)
return self._help_test_verify([
common._corrupt_sharedata_version_number_to_plausible_version,
], judge)
def test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(data['count-corrupt-shares'] == 1, (data,))
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['sharemap']) == 9, data)
return self._help_test_verify([
common._corrupt_offset_of_share_hashes,
], judge)
test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
def test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(data['count-corrupt-shares'] == 1, (data,))
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['sharemap']) == 9, data)
return self._help_test_verify([
common._corrupt_offset_of_ciphertext_hash_tree,
], judge)
test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
def test_verify_server_invisible_corruption_cryptext_hash_tree_TODO(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(data['count-corrupt-shares'] == 1, (data,))
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['sharemap']) == 9, data)
return self._help_test_verify([
common._corrupt_crypttext_hash_tree,
], judge)
test_verify_server_invisible_corruption_cryptext_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
def test_verify_server_invisible_corruption_block_hash_tree_TODO(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(data['count-corrupt-shares'] == 1, (data,))
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['sharemap']) == 9, data)
return self._help_test_verify([
common._corrupt_block_hashes,
], judge)
test_verify_server_invisible_corruption_block_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
def test_verify_server_invisible_corruption_share_hash_tree_TODO(self):
def judge(checkresults):
self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
data = checkresults.get_data()
self.failUnless(data['count-shares-good'] == 9, data)
self.failUnless(data['count-shares-needed'] == 3, data)
self.failUnless(data['count-shares-expected'] == 10, data)
self.failUnless(data['count-good-share-hosts'] == 5, data)
self.failUnless(data['count-corrupt-shares'] == 1, (data,))
self.failUnless(len(data['list-corrupt-shares']) == 1, data)
self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
self.failUnless(len(data['list-incompatible-shares']) == 0, data)
self.failUnless(len(data['servers-responding']) == 5, data)
self.failUnless(len(data['sharemap']) == 9, data)
return self._help_test_verify([
common._corrupt_share_hashes,
], judge)
test_verify_server_invisible_corruption_share_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
WRITE_LEEWAY = 10 # We'll allow you to pass this test even if you trigger ten times as many block sends and disk writes as would be optimal.
DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY # Optimally, you could repair one of these (small) files in a single write.
class Repairer(common.ShareManglingMixin, unittest.TestCase):
def test_test_code(self):
# The following process of stashing the shares, running
# replace_shares, and asserting that the new set of shares equals the
# old is more to test this test code than to test the Tahoe code...
d = defer.succeed(None)
d.addCallback(self.find_shares)
stash = [None]
def _stash_it(res):
stash[0] = res
return res
d.addCallback(_stash_it)
d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
def _compare(res):
oldshares = stash[0]
self.failUnless(isinstance(oldshares, dict), oldshares)
self.failUnlessEqual(oldshares, res)
d.addCallback(self.find_shares)
d.addCallback(_compare)
d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
d.addCallback(self.find_shares)
d.addCallback(lambda x: self.failUnlessEqual(x, {}))
# The following process of deleting 8 of the shares and asserting that you can't
# download it is more to test this test code than to test the Tahoe code...
def _then_delete_8(unused=None):
self.replace_shares(stash[0], storage_index=self.uri.storage_index)
for i in range(8):
self._delete_a_share()
d.addCallback(_then_delete_8)
def _then_download(unused=None):
self.downloader = self.clients[1].getServiceNamed("downloader")
d = self.downloader.download_to_data(self.uri)
def _after_download_callb(result):
self.fail() # should have gotten an errback instead
return result
def _after_download_errb(failure):
failure.trap(NotEnoughSharesError)
return None # success!
d.addCallbacks(_after_download_callb, _after_download_errb)
d.addCallback(_then_download)
# The following process of deleting 8 of the shares and asserting that you can't repair
# it is more to test this test code than to test the Tahoe code...
def _then_delete_8(unused=None):
self.replace_shares(stash[0], storage_index=self.uri.storage_index)
for i in range(8):
self._delete_a_share()
d.addCallback(_then_delete_8)
def _then_repair(unused=None):
d2 = self.filenode.check_and_repair(Monitor(), verify=False)
def _after_repair_callb(result):
self.fail() # should have gotten an errback instead
return result
def _after_repair_errb(f):
f.trap(NotEnoughSharesError)
return None # success!
d2.addCallbacks(_after_repair_callb, _after_repair_errb)
return d2
d.addCallback(_then_repair)
return d
def test_repair_from_deletion_of_1(self):
""" Repair replaces a share that got deleted. """
d = defer.succeed(None)
d.addCallback(self._delete_a_share, sharenum=2)
def _repair_from_deletion_of_1(unused):
before_repair_reads = self._count_reads()
before_repair_allocates = self._count_writes()
d2 = self.filenode.check_and_repair(Monitor(), verify=False)
def _after_repair(checkandrepairresults):
assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
prerepairres = checkandrepairresults.get_pre_repair_results()
assert isinstance(prerepairres, check_results.CheckResults), prerepairres
postrepairres = checkandrepairresults.get_post_repair_results()
assert isinstance(postrepairres, check_results.CheckResults), postrepairres
after_repair_reads = self._count_reads()
after_repair_allocates = self._count_writes()
# print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
self.failIf(after_repair_allocates - before_repair_allocates > DELTA_WRITES_PER_SHARE, (after_repair_allocates, before_repair_allocates))
self.failIf(prerepairres.is_healthy())
self.failUnless(postrepairres.is_healthy())
# Now we inspect the filesystem to make sure that it has 10 shares.
shares = self.find_shares()
self.failIf(len(shares) < 10)
# Now we delete seven of the other shares, then try to download the file and
# assert that it succeeds at downloading and has the right contents. This can't
# work unless it has already repaired the previously-deleted share #2.
for sharenum in range(3, 10):
self._delete_a_share(sharenum=sharenum)
return self._download_and_check_plaintext()
d2.addCallback(_after_repair)
return d2
d.addCallback(_repair_from_deletion_of_1)
return d
def test_repair_from_deletion_of_7(self):
""" Repair replaces seven shares that got deleted. """
shares = self.find_shares()
self.failIf(len(shares) != 10)
d = defer.succeed(None)
def _delete_7(unused=None):
shnums = range(10)
random.shuffle(shnums)
for sharenum in shnums[:7]:
self._delete_a_share(sharenum=sharenum)
d.addCallback(_delete_7)
def _repair_from_deletion_of_7(unused):
before_repair_reads = self._count_reads()
before_repair_allocates = self._count_writes()
d2 = self.filenode.check_and_repair(Monitor(), verify=False)
def _after_repair(checkandrepairresults):
assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
prerepairres = checkandrepairresults.get_pre_repair_results()
assert isinstance(prerepairres, check_results.CheckResults), prerepairres
postrepairres = checkandrepairresults.get_post_repair_results()
assert isinstance(postrepairres, check_results.CheckResults), postrepairres
after_repair_reads = self._count_reads()
after_repair_allocates = self._count_writes()
# print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 7), (after_repair_allocates, before_repair_allocates))
self.failIf(prerepairres.is_healthy())
self.failUnless(postrepairres.is_healthy(), postrepairres.data)
# Now we inspect the filesystem to make sure that it has 10 shares.
shares = self.find_shares()
self.failIf(len(shares) < 10)
# Now we delete seven random shares, then try to download the file and
# assert that it succeeds at downloading and has the right contents.
for i in range(7):
self._delete_a_share()
return self._download_and_check_plaintext()
d2.addCallback(_after_repair)
return d2
d.addCallback(_repair_from_deletion_of_7)
return d
def test_repair_from_corruption_of_1(self):
d = defer.succeed(None)
def _repair_from_corruption(unused, corruptor_func):
before_repair_reads = self._count_reads()
before_repair_allocates = self._count_writes()
d2 = self.filenode.check_and_repair(Monitor(), verify=True)
def _after_repair(checkandrepairresults):
prerepairres = checkandrepairresults.get_pre_repair_results()
postrepairres = checkandrepairresults.get_post_repair_results()
after_repair_reads = self._count_reads()
after_repair_allocates = self._count_writes()
# The "* 2" in reads is because you might read a whole share before figuring out that it is corrupted. It might be possible to make this delta reads number a little tighter.
self.failIf(after_repair_reads - before_repair_reads > (DELTA_READS * 2), (after_repair_reads, before_repair_reads))
# The "* 2" in writes is because each server has two shares, and it is reasonable for repairer to conclude that there are two shares that it should upload, if the server fails to serve the first share.
self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
return self._download_and_check_plaintext()
d2.addCallback(_after_repair)
return d2
for corruptor_func in (
common._corrupt_file_version_number,
common._corrupt_sharedata_version_number,
common._corrupt_offset_of_sharedata,
common._corrupt_offset_of_uri_extension,
common._corrupt_offset_of_uri_extension_to_force_short_read,
common._corrupt_share_data,
common._corrupt_length_of_uri_extension,
common._corrupt_uri_extension,
):
# Now we corrupt a share...
d.addCallback(self._corrupt_a_random_share, corruptor_func)
# And repair...
d.addCallback(_repair_from_corruption, corruptor_func)
return d
test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
# XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
# XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
# XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
# XXX test corruption that truncates other hash trees than just the crypttext hash tree
# XXX test the notify-someone-about-corruption feature (also implement that feature)
# XXX test whether repairer (downloader) correctly downloads a file even if to do so it has to acquire shares from a server that has already tried to serve it a corrupted share. (I don't think the current downloader would pass this test, depending on the kind of corruption.)