DownloadNode: fix lost-progress in fetch_failed, tolerate cancel when no segment-fetch is active. Fixes #1154.

The lost-progress bug occurred when two simultanous read() calls fetched
different segments, and the first one failed (due to corruption, or the other
bugs in #1154): the second read() would never complete. While in this state,
cancelling the second read by having its consumer call stopProducing) would
trigger the cancel-intolerance bug. Finally, in downloader.node.Cancel,
prevent late cancels by adding an 'active' flag
This commit is contained in:
Brian Warner 2010-08-05 11:45:49 -07:00
parent 43c5032105
commit f6f9a97627
2 changed files with 153 additions and 7 deletions

View File

@ -20,10 +20,10 @@ from common import BadCiphertextHashError
class Cancel:
def __init__(self, f):
self._f = f
self.cancelled = False
self.active = True
def cancel(self):
if not self.cancelled:
self.cancelled = True
if self.active:
self.active = False
self._f(self)
class DownloadNode:
@ -360,10 +360,11 @@ class DownloadNode:
def fetch_failed(self, sf, f):
assert sf is self._active_segment
self._active_segment = None
# deliver error upwards
for (d,c) in self._extract_requests(sf.segnum):
eventually(self._deliver, d, c, f)
self._active_segment = None
self._start_new_segment()
def process_blocks(self, segnum, blocks):
d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
@ -449,7 +450,8 @@ class DownloadNode:
def _deliver(self, d, c, result):
# this method exists to handle cancel() that occurs between
# _got_segment and _deliver
if not c.cancelled:
if c.active:
c.active = False # it is now too late to cancel
d.callback(result) # might actually be an errback
def _extract_requests(self, segnum):
@ -465,7 +467,9 @@ class DownloadNode:
self._segment_requests = [t for t in self._segment_requests
if t[2] != c]
segnums = [segnum for (segnum,d,c) in self._segment_requests]
if self._active_segment.segnum not in segnums:
# self._active_segment might be None in rare circumstances, so make
# sure we tolerate it
if self._active_segment and self._active_segment.segnum not in segnums:
self._active_segment.stop()
self._active_segment = None
self._start_new_segment()

View File

@ -486,6 +486,113 @@ class DownloadTest(_Base, unittest.TestCase):
d.addCallback(_done)
return d
def test_simultaneous_onefails_onecancelled(self):
# This exercises an mplayer behavior in ticket #1154. I believe that
# mplayer made two simultaneous webapi GET requests: first one for an
# index region at the end of the (mp3/video) file, then one for the
# first block of the file (the order doesn't really matter). All GETs
# failed (NoSharesError) because of the type(__len__)==long bug. Each
# GET submitted a DownloadNode.get_segment() request, which was
# queued by the DN (DN._segment_requests), so the second one was
# blocked waiting on the first one. When the first one failed,
# DN.fetch_failed() was invoked, which errbacks the first GET, but
# left the other one hanging (the lost-progress bug mentioned in
# #1154 comment 10)
#
# Then mplayer sees that the index region GET failed, so it cancels
# the first-block GET (by closing the HTTP request), triggering
# stopProducer. The second GET was waiting in the Deferred (between
# n.get_segment() and self._request_retired), so its
# _cancel_segment_request was active, so was invoked. However,
# DN._active_segment was None since it was not working on any segment
# at that time, hence the error in #1154.
self.basedir = self.mktemp()
self.set_up_grid()
self.c0 = self.g.clients[0]
# upload a file with multiple segments, so we can catch the download
# in the middle. Tell the downloader, so it can guess correctly.
u = upload.Data(plaintext, None)
u.max_segment_size = 70 # 5 segs
d = self.c0.upload(u)
def _uploaded(ur):
# corrupt all the shares so the download will fail
def _corruptor(s, debug=False):
which = 48 # first byte of block0
return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
self.corrupt_all_shares(ur.uri, _corruptor)
n = self.c0.create_node_from_uri(ur.uri)
n._cnode._maybe_create_download_node()
n._cnode._node._build_guessed_tables(u.max_segment_size)
con1 = MemoryConsumer()
con2 = MemoryConsumer()
d = n.read(con1, 0L, 20)
d2 = n.read(con2, 140L, 20)
# con2 will be cancelled, so d2 should fail with DownloadStopped
def _con2_should_not_succeed(res):
self.fail("the second read should not have succeeded")
def _con2_failed(f):
self.failUnless(f.check(DownloadStopped))
d2.addCallbacks(_con2_should_not_succeed, _con2_failed)
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
self.failUnless(f.check(NotEnoughSharesError))
con2.producer.stopProducing()
return d2
d.addCallbacks(_con1_should_not_succeed, _con1_failed)
return d
d.addCallback(_uploaded)
return d
def test_simultaneous_onefails(self):
self.basedir = self.mktemp()
self.set_up_grid()
self.c0 = self.g.clients[0]
# upload a file with multiple segments, so we can catch the download
# in the middle. Tell the downloader, so it can guess correctly.
u = upload.Data(plaintext, None)
u.max_segment_size = 70 # 5 segs
d = self.c0.upload(u)
def _uploaded(ur):
# corrupt all the shares so the download will fail
def _corruptor(s, debug=False):
which = 48 # first byte of block0
return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
self.corrupt_all_shares(ur.uri, _corruptor)
n = self.c0.create_node_from_uri(ur.uri)
n._cnode._maybe_create_download_node()
n._cnode._node._build_guessed_tables(u.max_segment_size)
con1 = MemoryConsumer()
con2 = MemoryConsumer()
d = n.read(con1, 0L, 20)
d2 = n.read(con2, 140L, 20)
# con2 should wait for con1 to fail and then con2 should succeed.
# In particular, we should not lose progress. If this test fails,
# it will fail with a timeout error.
def _con2_should_succeed(res):
# this should succeed because we only corrupted the first
# segment of each share. The segment that holds [140:160] is
# fine, as are the hash chains and UEB.
self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
d2.addCallback(_con2_should_succeed)
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
self.failUnless(f.check(NotEnoughSharesError))
# we *don't* cancel the second one here: this exercises a
# lost-progress bug from #1154. We just wait for it to
# succeed.
return d2
d.addCallbacks(_con1_should_not_succeed, _con1_failed)
return d
d.addCallback(_uploaded)
return d
def test_download_no_overrun(self):
self.basedir = self.mktemp()
self.set_up_grid()
@ -599,7 +706,7 @@ class DownloadTest(_Base, unittest.TestCase):
return d
def test_stop(self):
# use a download targetthat does an immediate stop (ticket #473)
# use a download target that stops after the first segment (#473)
self.basedir = self.mktemp()
self.set_up_grid()
self.c0 = self.g.clients[0]
@ -611,6 +718,36 @@ class DownloadTest(_Base, unittest.TestCase):
n.read, c)
return d
def test_stop_immediately(self):
# and a target that stops right after registerProducer (maybe #1154)
self.basedir = self.mktemp()
self.set_up_grid()
self.c0 = self.g.clients[0]
self.load_shares()
n = self.c0.create_node_from_uri(immutable_uri)
c = ImmediatelyStoppingConsumer() # stops after registerProducer
d = self.shouldFail(DownloadStopped, "test_stop_immediately",
"our Consumer called stopProducing()",
n.read, c)
return d
def test_stop_immediately2(self):
# and a target that stops right after registerProducer (maybe #1154)
self.basedir = self.mktemp()
self.set_up_grid()
self.c0 = self.g.clients[0]
self.load_shares()
n = self.c0.create_node_from_uri(immutable_uri)
c = MemoryConsumer()
d0 = n.read(c)
c.producer.stopProducing()
d = self.shouldFail(DownloadStopped, "test_stop_immediately",
"our Consumer called stopProducing()",
lambda: d0)
return d
def test_download_segment_bad_ciphertext_hash(self):
# The crypttext_hash_tree asserts the integrity of the decoded
# ciphertext, and exists to detect two sorts of problems. The first
@ -776,6 +913,11 @@ class StoppingConsumer(PausingConsumer):
def write(self, data):
self.producer.stopProducing()
class ImmediatelyStoppingConsumer(MemoryConsumer):
def registerProducer(self, p, streaming):
MemoryConsumer.registerProducer(self, p, streaming)
self.producer.stopProducing()
class StallingConsumer(MemoryConsumer):
def __init__(self, halfway_cb):
MemoryConsumer.__init__(self)