immutable: downloader accepts notifications of buckets even if those notifications arrive after he has begun downloading shares.

This can be useful if one of the ones that he has already begun downloading fails. See #287 for discussion. This fixes part of #287 which part was a regression caused by #928, namely this fixes fail-over in case a share is corrupted (or the server returns an error or disconnects). This does not fix the related issue mentioned in #287 if a server hangs and doesn't reply to requests for blocks.
This commit is contained in:
Zooko O'Whielacronx 2010-01-31 22:16:10 -08:00
parent e4e2599017
commit 3e4342ecb3
3 changed files with 106 additions and 28 deletions

View File

@ -789,7 +789,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
self.active_buckets = {} # k: shnum, v: bucket
self._share_buckets = {} # k: sharenum, v: list of buckets
self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
# _download_all_segments() will set this to:
# self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
self._share_vbuckets = None
self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
@ -809,6 +812,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
# self._responses_received = 0
# self._queries_failed = 0
# This is solely for the use of unit tests. It will be triggered when
# we start downloading shares.
self._stage_4_d = defer.Deferred()
def pauseProducing(self):
if self._paused:
return
@ -938,6 +945,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True)
self._wait_for_enough_buckets_d = None
if self._share_vbuckets is not None:
vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
if self._results:
if peerid not in self._results.servermap:
self._results.servermap[peerid] = set()
@ -1088,6 +1099,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
def _download_all_segments(self, res):
# From now on if new buckets are received then I will notice that
# self._share_vbuckets is not None and generate a vbucket for that new
# bucket and add it in to _share_vbuckets. (We had to wait because we
# didn't have self._vup and self._share_hash_tree earlier. We didn't
# need validated buckets until now -- now that we are ready to download
# shares.)
self._share_vbuckets = {}
for sharenum, buckets in self._share_buckets.iteritems():
for bucket in buckets:
vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
@ -1109,6 +1127,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
# this pause, at the end of write, prevents pre-fetch from
# happening until the consumer is ready for more data.
d.addCallback(self._check_for_pause)
self._stage_4_d.callback(None)
return d
def _check_for_pause(self, res):

View File

@ -252,12 +252,21 @@ class NoNetworkGrid(service.MultiService):
def break_server(self, serverid):
# mark the given server as broken, so it will throw exceptions when
# asked to hold a share
# asked to hold a share or serve a share
self.servers_by_id[serverid].broken = True
def hang_server(self, serverid, until=defer.Deferred()):
# hang the given server until 'until' fires
self.servers_by_id[serverid].hung_until = until
def hang_server(self, serverid):
# hang the given server
ss = self.servers_by_id[serverid]
assert ss.hung_until is None
ss.hung_until = defer.Deferred()
def unhang_server(self, serverid):
# unhang the given server
ss = self.servers_by_id[serverid]
assert ss.hung_until is not None
ss.hung_until.callback(None)
ss.hung_until = None
class GridTestMixin:

View File

@ -1,14 +1,14 @@
import os, shutil
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.internet import defer
from allmydata import uri
from allmydata.util.consumer import download_to_data
from allmydata.immutable import upload
from allmydata.mutable.common import UnrecoverableFileError
from allmydata.storage.common import storage_index_to_dir
from allmydata.test.no_network import GridTestMixin
from allmydata.test.common import ShouldFailMixin
from allmydata.test.common import ShouldFailMixin, _corrupt_share_data
from allmydata.interfaces import NotEnoughSharesError
immutable_plaintext = "data" * 10000
@ -25,12 +25,22 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
for (id, ss) in servers:
self.g.hang_server(id, **kwargs)
def _unhang(self, servers, **kwargs):
for (id, ss) in servers:
self.g.unhang_server(id, **kwargs)
def _delete_all_shares_from(self, servers):
serverids = [id for (id, ss) in servers]
for (i_shnum, i_serverid, i_sharefile) in self.shares:
if i_serverid in serverids:
os.unlink(i_sharefile)
def _corrupt_all_shares_in(self, servers, corruptor_func):
serverids = [id for (id, ss) in servers]
for (i_shnum, i_serverid, i_sharefile) in self.shares:
if i_serverid in serverids:
self._corrupt_share((i_shnum, i_sharefile), corruptor_func)
def _copy_all_shares_from(self, from_servers, to_server):
serverids = [id for (id, ss) in from_servers]
for (i_shnum, i_serverid, i_sharefile) in self.shares:
@ -52,6 +62,15 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile)
in self.shares)
def _corrupt_share(self, share, corruptor_func):
(sharenum, sharefile) = share
data = open(sharefile, "rb").read()
newdata = corruptor_func(data)
os.unlink(sharefile)
wf = open(sharefile, "wb")
wf.write(newdata)
wf.close()
def _set_up(self, mutable, testdir, num_clients=1, num_servers=10):
self.mutable = mutable
if mutable:
@ -80,35 +99,50 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
d.addCallback(_uploaded_immutable)
return d
def _check_download(self):
def _start_download(self):
n = self.c0.create_node_from_uri(self.uri)
if self.mutable:
d = n.download_best_version()
expected_plaintext = mutable_plaintext
stage_4_d = None # currently we aren't doing any tests which require this for mutable files
else:
d = download_to_data(n)
expected_plaintext = immutable_plaintext
def _got_data(data):
self.failUnlessEqual(data, expected_plaintext)
d.addCallback(_got_data)
stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME
return (d, stage_4_d,)
def _wait_for_data(self, n):
if self.mutable:
d = n.download_best_version()
else:
d = download_to_data(n)
return d
def _check(self, resultingdata):
if self.mutable:
self.failUnlessEqual(resultingdata, mutable_plaintext)
else:
self.failUnlessEqual(resultingdata, immutable_plaintext)
def _download_and_check(self):
d, stage4d = self._start_download()
d.addCallback(self._check)
return d
def _should_fail_download(self):
if self.mutable:
return self.shouldFail(UnrecoverableFileError, self.basedir,
"no recoverable versions",
self._check_download)
self._download_and_check)
else:
return self.shouldFail(NotEnoughSharesError, self.basedir,
"Failed to get enough shareholders",
self._check_download)
self._download_and_check)
def test_10_good_sanity_check(self):
d = defer.succeed(None)
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_sanity_check"))
d.addCallback(lambda ign: self._check_download())
d.addCallback(lambda ign: self._download_and_check())
return d
def test_10_good_copied_share(self):
@ -116,7 +150,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_copied_share"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
d.addCallback(lambda ign: self._check_download())
d.addCallback(lambda ign: self._download_and_check())
return d
def test_3_good_7_noshares(self):
@ -124,7 +158,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_noshares"))
d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:]))
d.addCallback(lambda ign: self._check_download())
d.addCallback(lambda ign: self._download_and_check())
return d
def test_2_good_8_broken_fail(self):
@ -149,7 +183,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_broken_copied_share"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
d.addCallback(lambda ign: self._break(self.servers[2:]))
d.addCallback(lambda ign: self._check_download())
d.addCallback(lambda ign: self._download_and_check())
return d
def test_2_good_8_broken_duplicate_share_fail(self):
@ -168,28 +202,43 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
for mutable in [False]:
d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_hung"))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: self._check_download())
d.addCallback(lambda ign: self._download_and_check())
return d
def test_2_good_8_hung_then_1_recovers(self):
d = defer.succeed(None)
for mutable in [False]:
recovered = defer.Deferred()
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers"))
d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
d.addCallback(lambda ign: self._hang(self.servers[2:3]))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None))
d.addCallback(lambda ign: self._check_download())
d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
d.addCallback(lambda ign: self._download_and_check())
return d
def test_2_good_8_hung_then_1_recovers_with_2_shares(self):
d = defer.succeed(None)
for mutable in [False]:
recovered = defer.Deferred()
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers_with_2_shares"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
d.addCallback(lambda ign: self._hang(self.servers[2:3]))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None))
d.addCallback(lambda ign: self._check_download())
d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
d.addCallback(lambda ign: self._download_and_check())
return d
def test_failover_during_stage_4(self):
# See #287
d = defer.succeed(None)
for mutable in [False]:
d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4"))
d.addCallback(lambda ign: self._corrupt_all_shares_in(self.servers[2:3], _corrupt_share_data))
d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4"))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
d.addCallback(lambda ign: self._start_download())
def _after_starting_download((doned, started4d)):
started4d.addCallback(lambda ign: self._unhang(self.servers[3:4]))
doned.addCallback(self._check)
return doned
d.addCallback(_after_starting_download)
return d