Merge PR #255 from meejah/2774.status-api-only.0-part1

(rebased to current master, added a few fixups)
This commit is contained in:
Brian Warner 2016-04-12 01:00:59 -07:00
commit 47b921855a
15 changed files with 170 additions and 42 deletions

View File

@ -130,7 +130,7 @@ class ProhibitedNode:
def get_best_readable_version(self):
raise FileProhibited(self.reason)
def download_best_version(self):
def download_best_version(self, progress=None):
raise FileProhibited(self.reason)
def get_best_mutable_version(self):

View File

@ -588,7 +588,7 @@ class DirectoryNode:
return d
def add_file(self, namex, uploadable, metadata=None, overwrite=True):
def add_file(self, namex, uploadable, metadata=None, overwrite=True, progress=None):
"""I upload a file (using the given IUploadable), then attach the
resulting FileNode to the directory at the given name. I return a
Deferred that fires (with the IFileNode of the uploaded file) when
@ -596,7 +596,7 @@ class DirectoryNode:
name = normalize(namex)
if self.is_readonly():
return defer.fail(NotWriteableError())
d = self._uploader.upload(uploadable)
d = self._uploader.upload(uploadable, progress=progress)
d.addCallback(lambda results:
self._create_and_validate_node(results.get_uri(), None,
name))

View File

@ -74,7 +74,7 @@ PiB=1024*TiB
class Encoder(object):
implements(IEncoder)
def __init__(self, log_parent=None, upload_status=None):
def __init__(self, log_parent=None, upload_status=None, progress=None):
object.__init__(self)
self.uri_extension_data = {}
self._codec = None
@ -86,6 +86,7 @@ class Encoder(object):
self._log_number = log.msg("creating Encoder %s" % self,
facility="tahoe.encoder", parent=log_parent)
self._aborted = False
self._progress = progress
def __repr__(self):
if hasattr(self, "_storage_index"):
@ -105,6 +106,8 @@ class Encoder(object):
def _got_size(size):
self.log(format="file size: %(size)d", size=size)
self.file_size = size
if self._progress:
self._progress.set_progress_total(self.file_size)
d.addCallback(_got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
@ -436,6 +439,7 @@ class Encoder(object):
shareid = shareids[i]
d = self.send_block(shareid, segnum, block, lognum)
dl.append(d)
block_hash = hashutil.block_hash(block)
#from allmydata.util import base32
#log.msg("creating block (shareid=%d, blocknum=%d) "
@ -445,6 +449,14 @@ class Encoder(object):
self.block_hashes[shareid].append(block_hash)
dl = self._gather_responses(dl)
def do_progress(ign):
done = self.segment_size * (segnum + 1)
if self._progress:
self._progress.set_progress(done)
return ign
dl.addCallback(do_progress)
def _logit(res):
self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
(self,

View File

@ -245,11 +245,13 @@ class ImmutableFileNode:
# we keep it here, we should also put this on CiphertextFileNode
def __hash__(self):
return self.u.__hash__()
def __eq__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
else:
return False
def __ne__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
@ -273,12 +275,16 @@ class ImmutableFileNode:
def get_uri(self):
return self.u.to_string()
def get_cap(self):
return self.u
def get_readcap(self):
return self.u.get_readonly()
def get_verify_cap(self):
return self.u.get_verify_cap()
def get_repair_cap(self):
# CHK files can be repaired with just the verifycap
return self.u.get_verify_cap()
@ -288,6 +294,7 @@ class ImmutableFileNode:
def get_size(self):
return self.u.get_size()
def get_current_size(self):
return defer.succeed(self.get_size())
@ -305,6 +312,7 @@ class ImmutableFileNode:
def check_and_repair(self, monitor, verify=False, add_lease=False):
return self._cnode.check_and_repair(monitor, verify, add_lease)
def check(self, monitor, verify=False, add_lease=False):
return self._cnode.check(monitor, verify, add_lease)
@ -316,14 +324,13 @@ class ImmutableFileNode:
"""
return defer.succeed(self)
def download_best_version(self):
def download_best_version(self, progress=None):
"""
Download the best version of this file, returning its contents
as a bytestring. Since there is only one version of an immutable
file, we download and return the contents of this file.
"""
d = consumer.download_to_data(self)
d = consumer.download_to_data(self, progress=progress)
return d
# for an immutable file, download_to_data (specified in IReadable)

View File

@ -113,7 +113,10 @@ class LiteralFileNode(_ImmutableFileNodeBase):
return defer.succeed(self)
def download_best_version(self):
def download_best_version(self, progress=None):
if progress is not None:
progress.set_progress_total(len(self.u.data))
progress.set_progress(len(self.u.data))
return defer.succeed(self.u.data)

View File

@ -137,7 +137,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
def __init__(self, storage_index,
helper, storage_broker, secret_holder,
incoming_file, encoding_file,
log_number):
log_number, progress=None):
upload.CHKUploader.__init__(self, storage_broker, secret_holder, progress=progress)
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file

View File

@ -21,7 +21,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
DEFAULT_MAX_SEGMENT_SIZE
DEFAULT_MAX_SEGMENT_SIZE, IProgress
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
@ -623,7 +623,7 @@ class EncryptAnUploadable:
implements(IEncryptedUploadable)
CHUNKSIZE = 50*1024
def __init__(self, original, log_parent=None):
def __init__(self, original, log_parent=None, progress=None):
precondition(original.default_params_set,
"set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
self.original = IUploadable(original)
@ -636,6 +636,7 @@ class EncryptAnUploadable:
self._file_size = None
self._ciphertext_bytes_read = 0
self._status = None
self._progress = progress
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
@ -656,6 +657,8 @@ class EncryptAnUploadable:
self._file_size = size
if self._status:
self._status.set_size(size)
if self._progress:
self._progress.set_progress_total(size)
return size
d.addCallback(_got_size)
return d
@ -894,7 +897,7 @@ class UploadStatus:
class CHKUploader:
server_selector_class = Tahoe2ServerSelector
def __init__(self, storage_broker, secret_holder):
def __init__(self, storage_broker, secret_holder, progress=None):
# server_selector needs storage_broker and secret_holder
self._storage_broker = storage_broker
self._secret_holder = secret_holder
@ -904,6 +907,7 @@ class CHKUploader:
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
self._progress = progress
# locate_all_shareholders() will create the following attribute:
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
@ -947,8 +951,11 @@ class CHKUploader:
eu = IEncryptedUploadable(encrypted)
started = time.time()
self._encoder = e = encode.Encoder(self._log_number,
self._upload_status)
self._encoder = e = encode.Encoder(
self._log_number,
self._upload_status,
progress=self._progress,
)
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
@ -1073,12 +1080,13 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]):
class LiteralUploader:
def __init__(self):
def __init__(self, progress=None):
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
s.set_active(False)
self._progress = progress
def start(self, uploadable):
uploadable = IUploadable(uploadable)
@ -1086,6 +1094,8 @@ class LiteralUploader:
def _got_size(size):
self._size = size
self._status.set_size(size)
if self._progress:
self._progress.set_progress_total(size)
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
@ -1109,6 +1119,8 @@ class LiteralUploader:
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
self._status.set_results(ur)
if self._progress:
self._progress.set_progress(self._size)
return ur
def close(self):
@ -1503,12 +1515,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
def __init__(self, helper_furl=None, stats_provider=None, history=None):
def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
self._progress = progress
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
@ -1542,12 +1555,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
return (self._helper_furl, bool(self._helper))
def upload(self, uploadable):
def upload(self, uploadable, progress=None):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
assert self.parent
assert self.running
assert progress is None or IProgress.providedBy(progress)
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
@ -1556,13 +1570,15 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
precondition(isinstance(default_params, dict), default_params)
precondition("max_segment_size" in default_params, default_params)
uploadable.set_default_encoding_parameters(default_params)
if progress:
progress.set_progress_total(size)
if self.stats_provider:
self.stats_provider.count('uploader.files_uploaded', 1)
self.stats_provider.count('uploader.bytes_uploaded', size)
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader = LiteralUploader()
uploader = LiteralUploader(progress=progress)
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
@ -1575,7 +1591,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
else:
storage_broker = self.parent.get_storage_broker()
secret_holder = self.parent._secret_holder
uploader = CHKUploader(storage_broker, secret_holder)
uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None

View File

@ -1,5 +1,5 @@
from zope.interface import Interface
from zope.interface import Interface, Attribute
from foolscap.api import StringConstraint, ListOf, TupleOf, SetOf, DictOf, \
ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
@ -624,6 +624,38 @@ class MustNotBeUnknownRWError(CapConstraintError):
"""Cannot add an unknown child cap specified in a rw_uri field."""
class IProgress(Interface):
"""
Remembers progress measured in arbitrary units. Users of these
instances must call ``set_progress_total`` at least once before
progress can be valid, and must use the same units for both
``set_progress_total`` and ``set_progress calls``.
See also:
:class:`allmydata.util.progress.PercentProgress`
"""
progress = Attribute(
"Current amount of progress (in percentage)"
)
def set_progress(self, value):
"""
Sets the current amount of progress.
Arbitrary units, but must match units used for
set_progress_total.
"""
def set_progress_total(self, value):
"""
Sets the total amount of expected progress
Arbitrary units, but must be same units as used when calling
set_progress() on this instance)..
"""
class IReadable(Interface):
"""I represent a readable object -- either an immutable file, or a
specific version of a mutable file.
@ -653,9 +685,12 @@ class IReadable(Interface):
def get_size():
"""Return the length (in bytes) of this readable object."""
def download_to_data():
def download_to_data(progress=None):
"""Download all of the file contents. I return a Deferred that fires
with the contents as a byte string."""
with the contents as a byte string.
:param progress: None or IProgress implementer
"""
def read(consumer, offset=0, size=None):
"""Download a portion (possibly all) of the file's contents, making
@ -915,11 +950,13 @@ class IFileNode(IFilesystemNode):
the Deferred will errback with an UnrecoverableFileError.
"""
def download_best_version():
def download_best_version(progress=None):
"""Download the contents of the version that would be returned
by get_best_readable_version(). This is equivalent to calling
download_to_data() on the IReadable given by that method.
progress is anything that implements IProgress
I return a Deferred that fires with a byte string when the file
has been fully downloaded. To support streaming download, use
the 'read' method of IReadable. If no version is recoverable,
@ -1065,7 +1102,7 @@ class IMutableFileNode(IFileNode):
everything) to get increased visibility.
"""
def upload(new_contents, servermap):
def upload(new_contents, servermap, progress=None):
"""Replace the contents of the file with new ones. This requires a
servermap that was previously updated with MODE_WRITE.
@ -1086,6 +1123,8 @@ class IMutableFileNode(IFileNode):
operation. If I do not signal UncoordinatedWriteError, then I was
able to write the new version without incident.
``progress`` is either None or an IProgress provider
I return a Deferred that fires (with a PublishStatus object) when the
publish has completed. I will update the servermap in-place with the
location of all new shares.
@ -1276,12 +1315,14 @@ class IDirectoryNode(IFilesystemNode):
equivalent to calling set_node() multiple times, but is much more
efficient."""
def add_file(name, uploadable, metadata=None, overwrite=True):
def add_file(name, uploadable, metadata=None, overwrite=True, progress=None):
"""I upload a file (using the given IUploadable), then attach the
resulting ImmutableFileNode to the directory at the given name. I set
metadata the same way as set_uri and set_node. The child name must be
a unicode string.
``progress`` either provides IProgress or is None
I return a Deferred that fires (with the IFileNode of the uploaded
file) when the operation completes."""

View File

@ -403,21 +403,21 @@ class MutableFileNode:
return d.addCallback(_get_version, version)
def download_best_version(self):
def download_best_version(self, progress=None):
"""
I return a Deferred that fires with the contents of the best
version of this mutable file.
"""
return self._do_serialized(self._download_best_version)
return self._do_serialized(self._download_best_version, progress=progress)
def _download_best_version(self):
def _download_best_version(self, progress=None):
"""
I am the serialized sibling of download_best_version.
"""
d = self.get_best_readable_version()
d.addCallback(self._record_size)
d.addCallback(lambda version: version.download_to_data())
d.addCallback(lambda version: version.download_to_data(progress=progress))
# It is possible that the download will fail because there
# aren't enough shares to be had. If so, we will try again after
@ -432,7 +432,7 @@ class MutableFileNode:
d = self.get_best_mutable_version()
d.addCallback(self._record_size)
d.addCallback(lambda version: version.download_to_data())
d.addCallback(lambda version: version.download_to_data(progress=progress))
return d
d.addErrback(_maybe_retry)
@ -935,13 +935,13 @@ class MutableFileVersion:
return self._servermap.size_of_version(self._version)
def download_to_data(self, fetch_privkey=False):
def download_to_data(self, fetch_privkey=False, progress=None):
"""
I return a Deferred that fires with the contents of this
readable object as a byte string.
"""
c = consumer.MemoryConsumer()
c = consumer.MemoryConsumer(progress=progress)
d = self.read(c, fetch_privkey=fetch_privkey)
d.addCallback(lambda mc: "".join(mc.chunks))
return d

View File

@ -31,6 +31,7 @@ class BadResponse(object):
def __init__(self, url, err):
self.status = -1
self.reason = "Error trying to connect to %s: %s" % (url, err)
self.error = err
def read(self):
return ""

View File

@ -151,8 +151,8 @@ class FakeCHKFileNode:
return defer.succeed(self)
def download_to_data(self):
return download_to_data(self)
def download_to_data(self, progress=None):
return download_to_data(self, progress=progress)
download_best_version = download_to_data
@ -329,11 +329,11 @@ class FakeMutableFileNode:
d.addCallback(_done)
return d
def download_best_version(self):
return defer.succeed(self._download_best_version())
def download_best_version(self, progress=None):
return defer.succeed(self._download_best_version(progress=progress))
def _download_best_version(self, ignored=None):
def _download_best_version(self, ignored=None, progress=None):
if isinstance(self.my_uri, uri.LiteralFileURI):
return self.my_uri.data
if self.storage_index not in self.all_contents:

View File

@ -1519,7 +1519,7 @@ class FakeMutableFile:
def get_write_uri(self):
return self.uri.to_string()
def download_best_version(self):
def download_best_version(self, progress=None):
return defer.succeed(self.data)
def get_writekey(self):

View File

@ -83,7 +83,7 @@ class FakeUploader(service.Service):
helper_furl = None
helper_connected = False
def upload(self, uploadable):
def upload(self, uploadable, **kw):
d = uploadable.get_size()
d.addCallback(lambda size: uploadable.read(size))
def _got_data(datav):

View File

@ -8,9 +8,12 @@ from twisted.internet.interfaces import IConsumer
class MemoryConsumer:
implements(IConsumer)
def __init__(self):
def __init__(self, progress=None):
self.chunks = []
self.done = False
self._progress = progress
def registerProducer(self, p, streaming):
self.producer = p
if streaming:
@ -19,12 +22,19 @@ class MemoryConsumer:
else:
while not self.done:
p.resumeProducing()
def write(self, data):
self.chunks.append(data)
if self._progress is not None:
self._progress.set_progress(sum([len(c) for c in self.chunks]))
def unregisterProducer(self):
self.done = True
def download_to_data(n, offset=0, size=None):
d = n.read(MemoryConsumer(), offset, size)
def download_to_data(n, offset=0, size=None, progress=None):
"""
:param progress: None or an IProgress implementer
"""
d = n.read(MemoryConsumer(progress=progress), offset, size)
d.addCallback(lambda mc: "".join(mc.chunks))
return d

View File

@ -0,0 +1,37 @@
"""
Utilities relating to computing progress information.
Ties in with the "consumer" module also
"""
from allmydata.interfaces import IProgress
from zope.interface import implementer
@implementer(IProgress)
class PercentProgress(object):
"""
Represents progress as a percentage, from 0.0 to 100.0
"""
def __init__(self, total_size=None):
self._value = 0.0
self.set_progress_total(total_size)
def set_progress(self, value):
"IProgress API"
self._value = value
def set_progress_total(self, size):
"IProgress API"
if size is not None:
size = float(size)
self._total_size = size
@property
def progress(self):
if self._total_size is None:
return 0 # or 1.0?
if self._total_size <= 0.0:
return 0
return (self._value / self._total_size) * 100.0