Improvements to test_hung_server, and fix for status updates in download.py

This commit is contained in:
david-sarah 2010-01-29 22:43:03 -08:00
parent d62428c1e6
commit 37a242e01a
2 changed files with 121 additions and 81 deletions

View File

@ -903,8 +903,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
if self._status: if self._status:
self._status.set_status("Locating Shares (%d/%d)" % self._status.set_status("Locating Shares (%d/%d)" %
(len(self._share_buckets), (self._responses_received,
self._verifycap.needed_shares)) self._total_queries))
return wait_for_enough_buckets_d return wait_for_enough_buckets_d
def _check_got_all_responses(self, ignored=None): def _check_got_all_responses(self, ignored=None):
@ -914,6 +914,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
self._wait_for_enough_buckets_d = None self._wait_for_enough_buckets_d = None
def _got_response(self, buckets, peerid): def _got_response(self, buckets, peerid):
# Note that this can continue to receive responses after _wait_for_enough_buckets_d
# has fired.
self._responses_received += 1 self._responses_received += 1
self.log(format="got results from [%(peerid)s]: shnums %(shnums)s", self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
peerid=idlib.shortnodeid_b2a(peerid), peerid=idlib.shortnodeid_b2a(peerid),

View File

@ -2,10 +2,10 @@
import os, shutil import os, shutil
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.python import failure
from allmydata import uri from allmydata import uri
from allmydata.util.consumer import download_to_data from allmydata.util.consumer import download_to_data
from allmydata.immutable import upload from allmydata.immutable import upload
from allmydata.mutable.common import UnrecoverableFileError
from allmydata.storage.common import storage_index_to_dir from allmydata.storage.common import storage_index_to_dir
from allmydata.test.no_network import GridTestMixin from allmydata.test.no_network import GridTestMixin
from allmydata.test.common import ShouldFailMixin from allmydata.test.common import ShouldFailMixin
@ -31,26 +31,15 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
if i_serverid in serverids: if i_serverid in serverids:
os.unlink(i_sharefile) os.unlink(i_sharefile)
# untested
def _pick_a_share_from(self, server):
(id, ss) = server
for (i_shnum, i_serverid, i_sharefile) in self.shares:
if i_serverid == id:
return (i_shnum, i_sharefile)
raise AssertionError("server %r had no shares" % server)
# untested
def _copy_all_shares_from(self, from_servers, to_server): def _copy_all_shares_from(self, from_servers, to_server):
serverids = [id for (id, ss) in from_servers] serverids = [id for (id, ss) in from_servers]
for (i_shnum, i_serverid, i_sharefile) in self.shares: for (i_shnum, i_serverid, i_sharefile) in self.shares:
if i_serverid in serverids: if i_serverid in serverids:
self._copy_share((i_shnum, i_sharefile), to_server) self._copy_share((i_shnum, i_sharefile), to_server)
# untested
def _copy_share(self, share, to_server): def _copy_share(self, share, to_server):
(sharenum, sharefile) = share (sharenum, sharefile) = share
(id, ss) = to_server (id, ss) = to_server
# FIXME: this doesn't work because we only have a LocalWrapper
shares_dir = os.path.join(ss.original.storedir, "shares") shares_dir = os.path.join(ss.original.storedir, "shares")
si = uri.from_string(self.uri).get_storage_index() si = uri.from_string(self.uri).get_storage_index()
si_dir = os.path.join(shares_dir, storage_index_to_dir(si)) si_dir = os.path.join(shares_dir, storage_index_to_dir(si))
@ -63,95 +52,144 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile) self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile)
in self.shares) in self.shares)
# untested def _set_up(self, mutable, testdir, num_clients=1, num_servers=10):
def _add_server(self, server_number, readonly=False): self.mutable = mutable
ss = self.g.make_server(server_number, readonly) if mutable:
self.g.add_server(server_number, ss) self.basedir = "hung_server/mutable_" + testdir
self.shares = self.find_shares(self.uri) else:
self.basedir = "hung_server/immutable_" + testdir
def _set_up(self, testdir, num_clients=1, num_servers=10):
self.basedir = "download/" + testdir
self.set_up_grid(num_clients=num_clients, num_servers=num_servers) self.set_up_grid(num_clients=num_clients, num_servers=num_servers)
self.c0 = self.g.clients[0] self.c0 = self.g.clients[0]
sb = self.c0.nodemaker.storage_broker nm = self.c0.nodemaker
self.servers = [(id, ss) for (id, ss) in sb.get_all_servers()] self.servers = [(id, ss) for (id, ss) in nm.storage_broker.get_all_servers()]
if mutable:
d = nm.create_mutable_file(mutable_plaintext)
def _uploaded_mutable(node):
self.uri = node.get_uri()
self.shares = self.find_shares(self.uri)
d.addCallback(_uploaded_mutable)
else:
data = upload.Data(immutable_plaintext, convergence="") data = upload.Data(immutable_plaintext, convergence="")
d = self.c0.upload(data) d = self.c0.upload(data)
def _uploaded(ur): def _uploaded_immutable(upload_res):
self.uri = ur.uri self.uri = upload_res.uri
self.shares = self.find_shares(self.uri) self.shares = self.find_shares(self.uri)
d.addCallback(_uploaded) d.addCallback(_uploaded_immutable)
return d return d
def _check_download(self):
n = self.c0.create_node_from_uri(self.uri)
if self.mutable:
d = n.download_best_version()
expected_plaintext = mutable_plaintext
else:
d = download_to_data(n)
expected_plaintext = immutable_plaintext
def _got_data(data):
self.failUnlessEqual(data, expected_plaintext)
d.addCallback(_got_data)
return d
def _should_fail_download(self):
if self.mutable:
return self.shouldFail(UnrecoverableFileError, self.basedir,
"no recoverable versions",
self._check_download)
else:
return self.shouldFail(NotEnoughSharesError, self.basedir,
"Failed to get enough shareholders",
self._check_download)
def test_10_good_sanity_check(self): def test_10_good_sanity_check(self):
d = self._set_up("test_10_good_sanity_check") d = defer.succeed(None)
d.addCallback(lambda ign: self.download_immutable()) for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_sanity_check"))
d.addCallback(lambda ign: self._check_download())
return d return d
def test_3_good_7_hung(self): def test_10_good_copied_share(self):
d = self._set_up("test_3_good_7_hung") d = defer.succeed(None)
d.addCallback(lambda ign: self._hang(self.servers[3:])) for mutable in [False, True]:
d.addCallback(lambda ign: self.download_immutable()) d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_copied_share"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
d.addCallback(lambda ign: self._check_download())
return d return d
def test_3_good_7_noshares(self): def test_3_good_7_noshares(self):
d = self._set_up("test_3_good_7_noshares") d = defer.succeed(None)
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_noshares"))
d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:])) d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:]))
d.addCallback(lambda ign: self.download_immutable()) d.addCallback(lambda ign: self._check_download())
return d return d
def test_2_good_8_broken_fail(self): def test_2_good_8_broken_fail(self):
d = self._set_up("test_2_good_8_broken_fail") d = defer.succeed(None)
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_broken_fail"))
d.addCallback(lambda ign: self._break(self.servers[2:])) d.addCallback(lambda ign: self._break(self.servers[2:]))
d.addCallback(lambda ign: d.addCallback(lambda ign: self._should_fail_download())
self.shouldFail(NotEnoughSharesError, "test_2_good_8_broken_fail",
"Failed to get enough shareholders: have 2, need 3",
self.download_immutable))
return d return d
def test_2_good_8_noshares_fail(self): def test_2_good_8_noshares_fail(self):
d = self._set_up("test_2_good_8_noshares_fail") d = defer.succeed(None)
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_noshares_fail"))
d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[2:])) d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[2:]))
d.addCallback(lambda ign: d.addCallback(lambda ign: self._should_fail_download())
self.shouldFail(NotEnoughSharesError, "test_2_good_8_noshares_fail", return d
"Failed to get enough shareholders: have 2, need 3",
self.download_immutable)) def test_2_good_8_broken_copied_share(self):
d = defer.succeed(None)
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_broken_copied_share"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
d.addCallback(lambda ign: self._break(self.servers[2:]))
d.addCallback(lambda ign: self._check_download())
return d
def test_2_good_8_broken_duplicate_share_fail(self):
d = defer.succeed(None)
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_broken_duplicate_share_fail"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[1:2], self.servers[0]))
d.addCallback(lambda ign: self._break(self.servers[2:]))
d.addCallback(lambda ign: self._should_fail_download())
return d
# The tests below do not currently pass for mutable files.
def test_3_good_7_hung(self):
d = defer.succeed(None)
for mutable in [False]:
d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_hung"))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: self._check_download())
return d return d
def test_2_good_8_hung_then_1_recovers(self): def test_2_good_8_hung_then_1_recovers(self):
d = defer.succeed(None)
for mutable in [False]:
recovered = defer.Deferred() recovered = defer.Deferred()
d = self._set_up("test_2_good_8_hung_then_1_recovers") d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers"))
d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered)) d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
d.addCallback(lambda ign: self._hang(self.servers[3:])) d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: self.download_immutable()) d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None))
reactor.callLater(5, recovered.callback, None) d.addCallback(lambda ign: self._check_download())
return d return d
def test_2_good_8_hung_then_1_recovers_with_2_shares(self): def test_2_good_8_hung_then_1_recovers_with_2_shares(self):
d = defer.succeed(None)
for mutable in [False]:
recovered = defer.Deferred() recovered = defer.Deferred()
d = self._set_up("test_2_good_8_hung_then_1_recovers_with_2_shares") d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers_with_2_shares"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2])) d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered)) d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
d.addCallback(lambda ign: self._hang(self.servers[3:])) d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: self.download_immutable()) d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None))
reactor.callLater(5, recovered.callback, None) d.addCallback(lambda ign: self._check_download())
return d
def download_immutable(self):
n = self.c0.create_node_from_uri(self.uri)
d = download_to_data(n)
def _got_data(data):
self.failUnlessEqual(data, immutable_plaintext)
d.addCallback(_got_data)
return d
# unused
def download_mutable(self):
n = self.c0.create_node_from_uri(self.uri)
d = n.download_best_version()
def _got_data(data):
self.failUnlessEqual(data, mutable_plaintext)
d.addCallback(_got_data)
return d return d