immutable: add a monitor API to CiphertextDownloader with which to tell it to stop its work

This commit is contained in:
Zooko O'Whielacronx 2009-01-08 14:42:15 -07:00
parent 157e365d2b
commit ade6a4fa74
2 changed files with 14 additions and 5 deletions

View File

@ -13,6 +13,7 @@ from allmydata import codec, hashtree, uri
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \ from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError
from allmydata.immutable import layout from allmydata.immutable import layout
from allmydata.monitor import Monitor
from pycryptopp.cipher.aes import AES from pycryptopp.cipher.aes import AES
class IntegrityCheckReject(Exception): class IntegrityCheckReject(Exception):
@ -605,11 +606,14 @@ class DownloadStatus:
class CiphertextDownloader(log.PrefixingLogMixin): class CiphertextDownloader(log.PrefixingLogMixin):
""" I download shares, check their integrity, then decode them, check the integrity of the """ I download shares, check their integrity, then decode them, check the integrity of the
resulting ciphertext, then and write it to my target. """ resulting ciphertext, then and write it to my target. Before I send any new request to a
server, I always ask the "monitor" object that was passed into my constructor whether this
task has been cancelled (by invoking its raise_if_cancelled() method). """
implements(IPushProducer) implements(IPushProducer)
_status = None _status = None
def __init__(self, client, v, target): def __init__(self, client, v, target, monitor):
precondition(IVerifierURI.providedBy(v), v) precondition(IVerifierURI.providedBy(v), v)
precondition(IDownloadTarget.providedBy(target), target) precondition(IDownloadTarget.providedBy(target), target)
@ -645,6 +649,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
if IConsumer.providedBy(target): if IConsumer.providedBy(target):
target.registerProducer(self, True) target.registerProducer(self, True)
self._target = target self._target = target
self._monitor = monitor
self._opened = False self._opened = False
self.active_buckets = {} # k: shnum, v: bucket self.active_buckets = {} # k: shnum, v: bucket
@ -908,6 +913,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
return d return d
if self._stopped: if self._stopped:
raise DownloadStopped("our Consumer called stopProducing()") raise DownloadStopped("our Consumer called stopProducing()")
self._monitor.raise_if_cancelled()
return res return res
def _download_segment(self, res, segnum): def _download_segment(self, res, segnum):
@ -1122,7 +1128,7 @@ class Downloader(service.MultiService):
self._all_download_statuses = weakref.WeakKeyDictionary() self._all_download_statuses = weakref.WeakKeyDictionary()
self._recent_download_statuses = [] self._recent_download_statuses = []
def download(self, u, t, _log_msg_id=None): def download(self, u, t, _log_msg_id=None, monitor=None):
assert self.parent assert self.parent
assert self.running assert self.running
u = IFileURI(u) u = IFileURI(u)
@ -1138,7 +1144,9 @@ class Downloader(service.MultiService):
self.stats_provider.count('downloader.bytes_downloaded', u.get_size()) self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id) target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target) if not monitor:
monitor=Monitor()
dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target, monitor=monitor)
self._add_download(dl) self._add_download(dl)
d = dl.start() d = dl.start()
return d return d

View File

@ -9,6 +9,7 @@ from allmydata.immutable import encode, upload, download
from allmydata.util import hashutil from allmydata.util import hashutil
from allmydata.util.assertutil import _assert from allmydata.util.assertutil import _assert
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
from allmydata.monitor import Monitor
import common_util as testutil import common_util as testutil
class LostPeerError(Exception): class LostPeerError(Exception):
@ -494,7 +495,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
if not target: if not target:
target = download.Data() target = download.Data()
target = download.DecryptingTarget(target, u.key) target = download.DecryptingTarget(target, u.key)
fd = download.CiphertextDownloader(client, u.get_verify_cap(), target) fd = download.CiphertextDownloader(client, u.get_verify_cap(), target, monitor=Monitor())
# we manually cycle the CiphertextDownloader through a number of steps that # we manually cycle the CiphertextDownloader through a number of steps that
# would normally be sequenced by a Deferred chain in # would normally be sequenced by a Deferred chain in