mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
add download-status objects, to track download progress
This commit is contained in:
parent
d0ce8694c1
commit
94097affc3
@ -1,5 +1,5 @@
|
||||
|
||||
import os, random
|
||||
import os, random, weakref
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.interfaces import IPushProducer, IConsumer
|
||||
@ -9,7 +9,8 @@ from foolscap.eventual import eventually
|
||||
from allmydata.util import idlib, mathutil, hashutil, log
|
||||
from allmydata.util.assertutil import _assert
|
||||
from allmydata import codec, hashtree, storage, uri
|
||||
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI
|
||||
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
|
||||
IDownloadStatus
|
||||
from allmydata.encode import NotEnoughPeersError
|
||||
from pycryptopp.cipher.aes import AES
|
||||
|
||||
@ -41,6 +42,9 @@ class Output:
|
||||
self._opened = False
|
||||
self._log_parent = log_parent
|
||||
|
||||
def get_progress(self):
|
||||
return float(self.length) / self.total_length
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if "parent" not in kwargs:
|
||||
kwargs["parent"] = self._log_parent
|
||||
@ -321,7 +325,7 @@ class SegmentDownloader:
|
||||
self.parent.bucket_failed(vbucket)
|
||||
|
||||
class FileDownloader:
|
||||
implements(IPushProducer)
|
||||
implements(IPushProducer, IDownloadStatus)
|
||||
check_crypttext_hash = True
|
||||
check_plaintext_hash = True
|
||||
|
||||
@ -337,6 +341,8 @@ class FileDownloader:
|
||||
|
||||
self.init_logging()
|
||||
|
||||
self._status = "Starting"
|
||||
|
||||
if IConsumer.providedBy(downloadable):
|
||||
downloadable.registerProducer(self, True)
|
||||
self._downloadable = downloadable
|
||||
@ -400,11 +406,13 @@ class FileDownloader:
|
||||
# once we know that, we can download blocks from everybody
|
||||
d.addCallback(self._download_all_segments)
|
||||
def _finished(res):
|
||||
self._status = "Finished"
|
||||
if IConsumer.providedBy(self._downloadable):
|
||||
self._downloadable.unregisterProducer()
|
||||
return res
|
||||
d.addBoth(_finished)
|
||||
def _failed(why):
|
||||
self._status = "Failed"
|
||||
self._output.fail(why)
|
||||
return why
|
||||
d.addErrback(_failed)
|
||||
@ -418,9 +426,16 @@ class FileDownloader:
|
||||
d = ss.callRemote("get_buckets", self._storage_index)
|
||||
d.addCallbacks(self._got_response, self._got_error)
|
||||
dl.append(d)
|
||||
self._responses_received = 0
|
||||
self._queries_sent = len(dl)
|
||||
self._status = "Locating Shares (%d/%d)" % (self._responses_received,
|
||||
self._queries_sent)
|
||||
return defer.DeferredList(dl)
|
||||
|
||||
def _got_response(self, buckets):
|
||||
self._responses_received += 1
|
||||
self._status = "Locating Shares (%d/%d)" % (self._responses_received,
|
||||
self._queries_sent)
|
||||
for sharenum, bucket in buckets.iteritems():
|
||||
b = storage.ReadBucketProxy(bucket)
|
||||
self.add_share_bucket(sharenum, b)
|
||||
@ -448,6 +463,7 @@ class FileDownloader:
|
||||
def _got_all_shareholders(self, res):
|
||||
if len(self._share_buckets) < self._num_needed_shares:
|
||||
raise NotEnoughPeersError
|
||||
|
||||
#for s in self._share_vbuckets.values():
|
||||
# for vb in s:
|
||||
# assert isinstance(vb, ValidatedBucket), \
|
||||
@ -461,6 +477,8 @@ class FileDownloader:
|
||||
# all are supposed to be identical. We compute the hash of the data
|
||||
# that comes back, and compare it against the version in our URI. If
|
||||
# they don't match, ignore their data and try someone else.
|
||||
self._status = "Obtaining URI Extension"
|
||||
|
||||
def _validate(proposal, bucket):
|
||||
h = hashutil.uri_extension_hash(proposal)
|
||||
if h != self._uri_extension_hash:
|
||||
@ -519,6 +537,7 @@ class FileDownloader:
|
||||
self._share_hashtree.set_hashes({0: self._roothash})
|
||||
|
||||
def _get_hashtrees(self, res):
|
||||
self._status = "Retrieving Hash Trees"
|
||||
d = self._get_plaintext_hashtrees()
|
||||
d.addCallback(self._get_crypttext_hashtrees)
|
||||
d.addCallback(self._setup_hashtrees)
|
||||
@ -634,6 +653,8 @@ class FileDownloader:
|
||||
return res
|
||||
|
||||
def _download_segment(self, res, segnum):
|
||||
self._status = "Downloading segment %d of %d" % (segnum,
|
||||
self._total_segments)
|
||||
self.log("downloading seg#%d of %d (%d%%)"
|
||||
% (segnum, self._total_segments,
|
||||
100.0 * segnum / self._total_segments))
|
||||
@ -709,7 +730,25 @@ class FileDownloader:
|
||||
got=self._output.length, expected=self._size)
|
||||
return self._output.finish()
|
||||
|
||||
def get_storage_index(self):
|
||||
return self._storage_index
|
||||
def get_size(self):
|
||||
return self._size
|
||||
def using_helper(self):
|
||||
return False
|
||||
def get_status(self):
|
||||
status = self._status
|
||||
if self._paused:
|
||||
status += " (output paused)"
|
||||
if self._stopped:
|
||||
status += " (output stopped)"
|
||||
return status
|
||||
def get_progress(self):
|
||||
return self._output.get_progress()
|
||||
|
||||
class LiteralDownloader:
|
||||
implements(IDownloadStatus)
|
||||
|
||||
def __init__(self, client, u, downloadable):
|
||||
self._uri = IFileURI(u)
|
||||
assert isinstance(self._uri, uri.LiteralFileURI)
|
||||
@ -722,6 +761,16 @@ class LiteralDownloader:
|
||||
self._downloadable.close()
|
||||
return defer.maybeDeferred(self._downloadable.finish)
|
||||
|
||||
def get_storage_index(self):
|
||||
return None
|
||||
def get_size(self):
|
||||
return len(self._uri.data)
|
||||
def using_helper(self):
|
||||
return False
|
||||
def get_status(self):
|
||||
return "Done"
|
||||
def get_progress(self):
|
||||
return 1.0
|
||||
|
||||
class FileName:
|
||||
implements(IDownloadTarget)
|
||||
@ -792,6 +841,10 @@ class Downloader(service.MultiService):
|
||||
implements(IDownloader)
|
||||
name = "downloader"
|
||||
|
||||
def __init__(self):
|
||||
service.MultiService.__init__(self)
|
||||
self._all_downloads = weakref.WeakKeyDictionary()
|
||||
|
||||
def download(self, u, t):
|
||||
assert self.parent
|
||||
assert self.running
|
||||
@ -805,6 +858,7 @@ class Downloader(service.MultiService):
|
||||
dl = FileDownloader(self.parent, u, t)
|
||||
else:
|
||||
raise RuntimeError("I don't know how to download a %s" % u)
|
||||
self._all_downloads[dl] = None
|
||||
d = dl.start()
|
||||
return d
|
||||
|
||||
|
@ -1400,6 +1400,25 @@ class IUploadStatus(Interface):
|
||||
helper providing progress reports. It might be reasonable to add all
|
||||
three numbers and report the sum to the user."""
|
||||
|
||||
class IDownloadStatus(Interface):
|
||||
def get_storage_index():
|
||||
"""Return a string with the (binary) storage index in use on this
|
||||
download. This may be None if there is no storage index (i.e. LIT
|
||||
files)."""
|
||||
def get_size():
|
||||
"""Return an integer with the number of bytes that will eventually be
|
||||
retrieved for this file. Returns None if the size is not yet known.
|
||||
"""
|
||||
def using_helper():
|
||||
"""Return True if this download is using a Helper, False if not."""
|
||||
def get_status():
|
||||
"""Return a string describing the current state of the download
|
||||
process."""
|
||||
def get_progress():
|
||||
"""Returns a float (from 0.0 to 1.0) describing the amount of the
|
||||
download that has completed. This value will remain at 0.0 until the
|
||||
first byte of plaintext is pushed to the download target."""
|
||||
|
||||
|
||||
class NotCapableError(Exception):
|
||||
"""You have tried to write to a read-only node."""
|
||||
|
Loading…
x
Reference in New Issue
Block a user