mutable/retrieve: don't write() after we've been pauseProducer'ed

This fixes a test failure found against current Twisted trunk in
test_mutable.Filenode.test_retrieve_producer_mdmf (when it uses
PausingAndStoppingConsumer). There must be some sort of race: I could
make it fail against Twisted-11.0 if I just increased the 0.5s delay in
test_download.PausingAndStoppingConsumer to about 0.6s, and could make
Twisted-trunk pass by reducing it to about 0.3s .

I fixed the test (as opposed to the bug) by replacing the delay with a
simple reliable eventually(), and adding extra asserts to fail the test
if the consumer's write() method is called while the producer is
supposed to be paused

The bug itself was that mutable.retrieve.Retrieve wasn't checking the
"stopped" flag after resuming from a pause, and thus delivered one
segment to a consumer that wasn't expecting it. I split out
stopped-flag-checking to separate function, which is now called
immediately after _check_for_paused(). I also cleaned up some Deferred
usage and whitespace.
This commit is contained in:
Brian Warner 2011-10-16 17:03:47 -07:00
parent fff237be9a
commit 633641174a
2 changed files with 14 additions and 6 deletions

View File

@ -176,7 +176,7 @@ class Retrieve:
if self._pause_deferred is not None:
return
# fired when the download is unpaused.
# fired when the download is unpaused.
self._old_status = self._status.get_status()
self._set_current_status("paused")
@ -210,13 +210,16 @@ class Retrieve:
the Deferred fires immediately. Otherwise, the Deferred fires
when the downloader is unpaused.
"""
if self._stopped:
raise DownloadStopped("our Consumer called stopProducing()")
if self._pause_deferred is not None:
d = defer.Deferred()
self._pause_deferred.addCallback(lambda ignored: d.callback(res))
return d
return defer.succeed(res)
return res
def _check_for_stopped(self, res):
if self._stopped:
raise DownloadStopped("our Consumer called stopProducing()")
return res
def download(self, consumer=None, offset=0, size=None):
@ -665,6 +668,7 @@ class Retrieve:
# check to see whether we've been paused before writing
# anything.
d.addCallback(self._check_for_paused)
d.addCallback(self._check_for_stopped)
d.addCallback(self._set_segment)
return d
else:

View File

@ -20,7 +20,7 @@ from allmydata.immutable.downloader.common import BadSegmentNumberError, \
from allmydata.immutable.downloader.status import DownloadStatus
from allmydata.immutable.downloader.fetcher import SegmentFetcher
from allmydata.codec import CRSDecoder
from foolscap.eventual import fireEventually, flushEventualQueue
from foolscap.eventual import eventually, fireEventually, flushEventualQueue
plaintext = "This is a moderate-sized file.\n" * 10
mutable_plaintext = "This is a moderate-sized mutable file.\n" * 10
@ -898,10 +898,14 @@ class PausingConsumer(MemoryConsumer):
self.producer.resumeProducing()
class PausingAndStoppingConsumer(PausingConsumer):
debug_stopped = False
def write(self, data):
if self.debug_stopped:
raise Exception("I'm stopped, don't write to me")
self.producer.pauseProducing()
reactor.callLater(0.5, self._stop)
eventually(self._stop)
def _stop(self):
self.debug_stopped = True
self.producer.stopProducing()
class StoppingConsumer(PausingConsumer):