download: fix stopProducing failure ('self._paused_at not defined'), add tests

This commit is contained in:
Brian Warner 2008-07-14 15:25:21 -07:00
parent 5b8caf26a7
commit fd465b4aaf
3 changed files with 72 additions and 8 deletions

View File

@ -496,6 +496,8 @@ class FileDownloader:
def resumeProducing(self): def resumeProducing(self):
if self._paused: if self._paused:
paused_for = time.time() - self._paused_at
self._results.timings['paused'] += paused_for
p = self._paused p = self._paused
self._paused = None self._paused = None
eventually(p.callback, None) eventually(p.callback, None)
@ -505,8 +507,7 @@ class FileDownloader:
def stopProducing(self): def stopProducing(self):
self.log("Download.stopProducing") self.log("Download.stopProducing")
self._stopped = True self._stopped = True
paused_for = time.time() - self._paused_at self.resumeProducing()
self._results.timings['paused'] += paused_for
if self._status: if self._status:
self._status.set_stopped(True) self._status.set_stopped(True)
self._status.set_active(False) self._status.set_active(False)

View File

@ -1175,6 +1175,9 @@ class IDecoder(Interface):
""" """
class IDownloadTarget(Interface): class IDownloadTarget(Interface):
# Note that if the IDownloadTarget is also an IConsumable, the downloader
# will register itself as a producer. This allows the target to invoke
# downloader.pauseProducing, resumeProducing, and stopProducing.
def open(size): def open(size):
"""Called before any calls to write() or close(). If an error """Called before any calls to write() or close(). If an error
occurs before any data is available, fail() may be called without occurs before any data is available, fail() may be called without

View File

@ -1,11 +1,12 @@
from zope.interface import implements from zope.interface import implements
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer, reactor
from twisted.internet.interfaces import IConsumer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from foolscap import eventual from foolscap import eventual
from allmydata import encode, upload, download, hashtree, uri from allmydata import encode, upload, download, hashtree, uri
from allmydata.util import hashutil from allmydata.util import hashutil, testutil
from allmydata.util.assertutil import _assert from allmydata.util.assertutil import _assert
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
@ -274,13 +275,47 @@ class Encode(unittest.TestCase):
# 5 segments: 25, 25, 25, 25, 1 # 5 segments: 25, 25, 25, 25, 1
return self.do_encode(25, 101, 100, 5, 15, 8) return self.do_encode(25, 101, 100, 5, 15, 8)
class Roundtrip(unittest.TestCase): class PausingTarget(download.Data):
implements(IConsumer)
def __init__(self):
download.Data.__init__(self)
self.size = 0
self.writes = 0
def write(self, data):
self.size += len(data)
self.writes += 1
if self.writes <= 2:
# we happen to use 4 segments, and want to avoid pausing on the
# last one (since then the _unpause timer will still be running)
self.producer.pauseProducing()
reactor.callLater(0.1, self._unpause)
return download.Data.write(self, data)
def _unpause(self):
self.producer.resumeProducing()
def registerProducer(self, producer, streaming):
self.producer = producer
def unregisterProducer(self):
self.producer = None
class PausingAndStoppingTarget(PausingTarget):
def write(self, data):
self.producer.pauseProducing()
reactor.callLater(0.5, self._stop)
def _stop(self):
self.producer.stopProducing()
class StoppingTarget(PausingTarget):
def write(self, data):
self.producer.stopProducing()
class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
def send_and_recover(self, k_and_happy_and_n=(25,75,100), def send_and_recover(self, k_and_happy_and_n=(25,75,100),
AVAILABLE_SHARES=None, AVAILABLE_SHARES=None,
datalen=76, datalen=76,
max_segment_size=25, max_segment_size=25,
bucket_modes={}, bucket_modes={},
recover_mode="recover", recover_mode="recover",
target=None,
): ):
if AVAILABLE_SHARES is None: if AVAILABLE_SHARES is None:
AVAILABLE_SHARES = k_and_happy_and_n[2] AVAILABLE_SHARES = k_and_happy_and_n[2]
@ -288,7 +323,8 @@ class Roundtrip(unittest.TestCase):
d = self.send(k_and_happy_and_n, AVAILABLE_SHARES, d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
max_segment_size, bucket_modes, data) max_segment_size, bucket_modes, data)
# that fires with (uri_extension_hash, e, shareholders) # that fires with (uri_extension_hash, e, shareholders)
d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode) d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode,
target=target)
# that fires with newdata # that fires with newdata
def _downloaded((newdata, fd)): def _downloaded((newdata, fd)):
self.failUnless(newdata == data) self.failUnless(newdata == data)
@ -332,7 +368,7 @@ class Roundtrip(unittest.TestCase):
return d return d
def recover(self, (res, key, shareholders), AVAILABLE_SHARES, def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
recover_mode): recover_mode, target=None):
(uri_extension_hash, required_shares, num_shares, file_size) = res (uri_extension_hash, required_shares, num_shares, file_size) = res
if "corrupt_key" in recover_mode: if "corrupt_key" in recover_mode:
@ -350,6 +386,7 @@ class Roundtrip(unittest.TestCase):
URI = u.to_string() URI = u.to_string()
client = FakeClient() client = FakeClient()
if not target:
target = download.Data() target = download.Data()
fd = download.FileDownloader(client, URI, target) fd = download.FileDownloader(client, URI, target)
@ -436,6 +473,29 @@ class Roundtrip(unittest.TestCase):
def test_101(self): def test_101(self):
return self.send_and_recover(datalen=101) return self.send_and_recover(datalen=101)
def test_pause(self):
# use a DownloadTarget that does pauseProducing/resumeProducing a few
# times, then finishes
t = PausingTarget()
d = self.send_and_recover(target=t)
return d
def test_pause_then_stop(self):
# use a DownloadTarget that pauses, then stops.
t = PausingAndStoppingTarget()
d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",
"our Consumer called stopProducing()",
self.send_and_recover, target=t)
return d
def test_stop(self):
# use a DownloadTarget that does an immediate stop (ticket #473)
t = StoppingTarget()
d = self.shouldFail(download.DownloadStopped, "test_stop",
"our Consumer called stopProducing()",
self.send_and_recover, target=t)
return d
# the following tests all use 4-out-of-10 encoding # the following tests all use 4-out-of-10 encoding
def test_bad_blocks(self): def test_bad_blocks(self):