New tests for #928

This commit is contained in:
david-sarah 2010-01-29 04:38:45 -08:00
parent 2bd9dfa5bd
commit baa11a0ad4
3 changed files with 206 additions and 33 deletions

View File

@ -1,6 +1,6 @@
import random, weakref, itertools, time import random, weakref, itertools, time
from zope.interface import implements from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer, reactor
from twisted.internet.interfaces import IPushProducer, IConsumer from twisted.internet.interfaces import IPushProducer, IConsumer
from foolscap.api import DeadReferenceError, RemoteException, eventually from foolscap.api import DeadReferenceError, RemoteException, eventually
@ -835,7 +835,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
# first step: who should we download from? # first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders) d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders) d.addBoth(self._got_all_shareholders)
# now get the uri_extension block from somebody and integrity check # now get the uri_extension block from somebody and integrity check
# it and parse and validate its contents # it and parse and validate its contents
d.addCallback(self._obtain_uri_extension) d.addCallback(self._obtain_uri_extension)
@ -872,46 +872,55 @@ class CiphertextDownloader(log.PrefixingLogMixin):
""" Once the number of buckets that I know about is >= K then I """ Once the number of buckets that I know about is >= K then I
callback the Deferred that I return. callback the Deferred that I return.
If all of the get_buckets deferreds have fired (whether callback or If all of the get_buckets deferreds have fired (whether callback
errback) and I still don't have enough buckets then I'll callback the or errback) and I still don't have enough buckets then I'll also
Deferred that I return. callback -- not errback -- the Deferred that I return.
""" """
self._wait_for_enough_buckets_d = defer.Deferred() wait_for_enough_buckets_d = defer.Deferred()
self._wait_for_enough_buckets_d = wait_for_enough_buckets_d
self._queries_sent = 0
self._responses_received = 0
self._queries_failed = 0
sb = self._storage_broker sb = self._storage_broker
servers = sb.get_servers_for_index(self._storage_index) servers = sb.get_servers_for_index(self._storage_index)
if not servers: if not servers:
raise NoServersError("broker gave us no servers!") raise NoServersError("broker gave us no servers!")
self._total_queries = len(servers)
self._responses_received = 0
self._queries_failed = 0
for (peerid,ss) in servers: for (peerid,ss) in servers:
self.log(format="sending DYHB to [%(peerid)s]", self.log(format="sending DYHB to [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid), peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, umid="rT03hg") level=log.NOISY, umid="rT03hg")
self._queries_sent += 1
d = ss.callRemote("get_buckets", self._storage_index) d = ss.callRemote("get_buckets", self._storage_index)
d.addCallbacks(self._got_response, self._got_error, d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,)) callbackArgs=(peerid,))
d.addBoth(self._check_got_all_responses)
if self._status: if self._status:
self._status.set_status("Locating Shares (%d/%d)" % self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received, (len(self._share_buckets),
self._queries_sent)) self._verifycap.needed_shares))
return self._wait_for_enough_buckets_d return wait_for_enough_buckets_d
def _check_got_all_responses(self, ignored=None):
assert (self._responses_received+self._queries_failed) <= self._total_queries
if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._total_queries:
reactor.callLater(0, self._wait_for_enough_buckets_d.callback, False)
self._wait_for_enough_buckets_d = None
def _got_response(self, buckets, peerid): def _got_response(self, buckets, peerid):
self._responses_received += 1
self.log(format="got results from [%(peerid)s]: shnums %(shnums)s", self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
peerid=idlib.shortnodeid_b2a(peerid), peerid=idlib.shortnodeid_b2a(peerid),
shnums=sorted(buckets.keys()), shnums=sorted(buckets.keys()),
level=log.NOISY, umid="o4uwFg") level=log.NOISY, umid="o4uwFg")
self._responses_received += 1
if self._results: if self._results:
elapsed = time.time() - self._started elapsed = time.time() - self._started
self._results.timings["servers_peer_selection"][peerid] = elapsed self._results.timings["servers_peer_selection"][peerid] = elapsed
if self._status: if self._status:
self._status.set_status("Locating Shares (%d/%d)" % self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received, (self._responses_received,
self._queries_sent)) self._total_queries))
for sharenum, bucket in buckets.iteritems(): for sharenum, bucket in buckets.iteritems():
b = layout.ReadBucketProxy(bucket, peerid, self._storage_index) b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
self.add_share_bucket(sharenum, b) self.add_share_bucket(sharenum, b)
@ -919,14 +928,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
# deferred. Then remove it from self so that we don't fire it # deferred. Then remove it from self so that we don't fire it
# again. # again.
if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares: if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
self._wait_for_enough_buckets_d.callback(True) reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True)
self._wait_for_enough_buckets_d = None
# Else, if we ran out of outstanding requests then fire it and
# remove it from self.
assert (self._responses_received+self._queries_failed) <= self._queries_sent
if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._queries_sent:
self._wait_for_enough_buckets_d.callback(False)
self._wait_for_enough_buckets_d = None self._wait_for_enough_buckets_d = None
if self._results: if self._results:
@ -939,18 +941,12 @@ class CiphertextDownloader(log.PrefixingLogMixin):
self._share_buckets.setdefault(sharenum, []).append(bucket) self._share_buckets.setdefault(sharenum, []).append(bucket)
def _got_error(self, f): def _got_error(self, f):
self._queries_failed += 1
level = log.WEIRD level = log.WEIRD
if f.check(DeadReferenceError): if f.check(DeadReferenceError):
level = log.UNUSUAL level = log.UNUSUAL
self.log("Error during get_buckets", failure=f, level=level, self.log("Error during get_buckets", failure=f, level=level,
umid="3uuBUQ") umid="3uuBUQ")
# If we ran out of outstanding requests then errback it and remove it
# from self.
self._queries_failed += 1
assert (self._responses_received+self._queries_failed) <= self._queries_sent
if self._wait_for_enough_buckets_d and self._responses_received == self._queries_sent:
self._wait_for_enough_buckets_d.errback()
self._wait_for_enough_buckets_d = None
def bucket_failed(self, vbucket): def bucket_failed(self, vbucket):
shnum = vbucket.sharenum shnum = vbucket.sharenum

View File

@ -16,7 +16,7 @@
import os.path import os.path
from zope.interface import implements from zope.interface import implements
from twisted.application import service from twisted.application import service
from twisted.internet import reactor from twisted.internet import defer, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
from foolscap.api import Referenceable, fireEventually, RemoteException from foolscap.api import Referenceable, fireEventually, RemoteException
from base64 import b32encode from base64 import b32encode
@ -38,6 +38,7 @@ class LocalWrapper:
def __init__(self, original): def __init__(self, original):
self.original = original self.original = original
self.broken = False self.broken = False
self.hung_until = None
self.post_call_notifier = None self.post_call_notifier = None
self.disconnectors = {} self.disconnectors = {}
@ -57,11 +58,25 @@ class LocalWrapper:
return a return a
args = tuple([wrap(a) for a in args]) args = tuple([wrap(a) for a in args])
kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs]) kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
def _really_call():
meth = getattr(self.original, "remote_" + methname)
return meth(*args, **kwargs)
def _call(): def _call():
if self.broken: if self.broken:
raise IntentionalError("I was asked to break") raise IntentionalError("I was asked to break")
meth = getattr(self.original, "remote_" + methname) if self.hung_until:
return meth(*args, **kwargs) d2 = defer.Deferred()
self.hung_until.addCallback(lambda ign: _really_call())
self.hung_until.addCallback(lambda res: d2.callback(res))
def _err(res):
d2.errback(res)
return res
self.hung_until.addErrback(_err)
return d2
return _really_call()
d = fireEventually() d = fireEventually()
d.addCallback(lambda res: _call()) d.addCallback(lambda res: _call())
def _wrap_exception(f): def _wrap_exception(f):
@ -240,6 +255,11 @@ class NoNetworkGrid(service.MultiService):
# asked to hold a share # asked to hold a share
self.servers_by_id[serverid].broken = True self.servers_by_id[serverid].broken = True
def hang_server(self, serverid, until=defer.Deferred()):
# hang the given server until 'until' fires
self.servers_by_id[serverid].hung_until = until
class GridTestMixin: class GridTestMixin:
def setUp(self): def setUp(self):
self.s = service.MultiService() self.s = service.MultiService()

View File

@ -0,0 +1,157 @@
import os, shutil
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.python import failure
from allmydata import uri
from allmydata.util.consumer import download_to_data
from allmydata.immutable import upload
from allmydata.storage.common import storage_index_to_dir
from allmydata.test.no_network import GridTestMixin
from allmydata.test.common import ShouldFailMixin
from allmydata.interfaces import NotEnoughSharesError
immutable_plaintext = "data" * 10000
mutable_plaintext = "muta" * 10000
class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
timeout = 30
def _break(self, servers):
for (id, ss) in servers:
self.g.break_server(id)
def _hang(self, servers, **kwargs):
for (id, ss) in servers:
self.g.hang_server(id, **kwargs)
def _delete_all_shares_from(self, servers):
serverids = [id for (id, ss) in servers]
for (i_shnum, i_serverid, i_sharefile) in self.shares:
if i_serverid in serverids:
os.unlink(i_sharefile)
# untested
def _pick_a_share_from(self, server):
(id, ss) = server
for (i_shnum, i_serverid, i_sharefile) in self.shares:
if i_serverid == id:
return (i_shnum, i_sharefile)
raise AssertionError("server %r had no shares" % server)
# untested
def _copy_all_shares_from(self, from_servers, to_server):
serverids = [id for (id, ss) in from_servers]
for (i_shnum, i_serverid, i_sharefile) in self.shares:
if i_serverid in serverids:
self._copy_share((i_shnum, i_sharefile), to_server)
# untested
def _copy_share(self, share, to_server):
(sharenum, sharefile) = share
(id, ss) = to_server
# FIXME: this doesn't work because we only have a LocalWrapper
shares_dir = os.path.join(ss.storedir, "shares")
si = uri.from_string(self.uri).get_storage_index()
si_dir = os.path.join(shares_dir, storage_index_to_dir(si))
if not os.path.exists(si_dir):
os.makedirs(si_dir)
new_sharefile = os.path.join(si_dir, str(sharenum))
shutil.copy(sharefile, new_sharefile)
self.shares = self.find_shares(self.uri)
# Make sure that the storage server has the share.
self.failUnless((sharenum, ss.my_nodeid, new_sharefile)
in self.shares)
# untested
def _add_server(self, server_number, readonly=False):
ss = self.g.make_server(server_number, readonly)
self.g.add_server(server_number, ss)
self.shares = self.find_shares(self.uri)
def _set_up(self, testdir, num_clients=1, num_servers=10):
self.basedir = "download/" + testdir
self.set_up_grid(num_clients=num_clients, num_servers=num_servers)
self.c0 = self.g.clients[0]
sb = self.c0.nodemaker.storage_broker
self.servers = [(id, ss) for (id, ss) in sb.get_all_servers()]
data = upload.Data(immutable_plaintext, convergence="")
d = self.c0.upload(data)
def _uploaded(ur):
self.uri = ur.uri
self.shares = self.find_shares(self.uri)
d.addCallback(_uploaded)
return d
def test_10_good_sanity_check(self):
d = self._set_up("test_10_good_sanity_check")
d.addCallback(lambda ign: self.download_immutable())
return d
def test_3_good_7_hung(self):
d = self._set_up("test_3_good_7_hung")
d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: self.download_immutable())
return d
def test_3_good_7_noshares(self):
d = self._set_up("test_3_good_7_noshares")
d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:]))
d.addCallback(lambda ign: self.download_immutable())
return d
def test_2_good_8_broken_fail(self):
d = self._set_up("test_2_good_8_broken_fail")
d.addCallback(lambda ign: self._break(self.servers[2:]))
d.addCallback(lambda ign:
self.shouldFail(NotEnoughSharesError, "test_2_good_8_broken_fail",
"Failed to get enough shareholders: have 2, need 3",
self.download_immutable))
return d
def test_2_good_8_noshares_fail(self):
d = self._set_up("test_2_good_8_noshares_fail")
d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[2:]))
d.addCallback(lambda ign:
self.shouldFail(NotEnoughSharesError, "test_2_good_8_noshares_fail",
"Failed to get enough shareholders: have 2, need 3",
self.download_immutable))
return d
def test_2_good_8_hung_then_1_recovers(self):
recovered = defer.Deferred()
d = self._set_up("test_2_good_8_hung_then_1_recovers")
d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: self.download_immutable())
reactor.callLater(5, recovered.callback, None)
return d
def test_2_good_8_hung_then_1_recovers_with_2_shares(self):
recovered = defer.Deferred()
d = self._set_up("test_2_good_8_hung_then_1_recovers_with_2_shares")
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: self.download_immutable())
reactor.callLater(5, recovered.callback, None)
return d
def download_immutable(self):
n = self.c0.create_node_from_uri(self.uri)
d = download_to_data(n)
def _got_data(data):
self.failUnlessEqual(data, immutable_plaintext)
d.addCallback(_got_data)
return d
# unused
def download_mutable(self):
n = self.c0.create_node_from_uri(self.uri)
d = n.download_best_version()
def _got_data(data):
self.failUnlessEqual(data, mutable_plaintext)
d.addCallback(_got_data)
return d