Merge branch '3658.end-progress' into 3657.util-python-3

This commit is contained in:
Itamar Turner-Trauring 2021-03-31 10:43:39 -04:00
commit d3a8094630
14 changed files with 39 additions and 146 deletions

0
newsfragments/3658.minor Normal file
View File

View File

@ -142,7 +142,7 @@ class ProhibitedNode(object):
def get_best_readable_version(self):
raise FileProhibited(self.reason)
def download_best_version(self, progress=None):
def download_best_version(self):
raise FileProhibited(self.reason)
def get_best_mutable_version(self):

View File

@ -646,7 +646,7 @@ class DirectoryNode(object):
return d
def add_file(self, namex, uploadable, metadata=None, overwrite=True, progress=None):
def add_file(self, namex, uploadable, metadata=None, overwrite=True):
"""I upload a file (using the given IUploadable), then attach the
resulting FileNode to the directory at the given name. I return a
Deferred that fires (with the IFileNode of the uploaded file) when
@ -657,7 +657,7 @@ class DirectoryNode(object):
d = DeferredContext(defer.fail(NotWriteableError()))
else:
# XXX should pass reactor arg
d = DeferredContext(self._uploader.upload(uploadable, progress=progress))
d = DeferredContext(self._uploader.upload(uploadable))
d.addCallback(lambda results:
self._create_and_validate_node(results.get_uri(), None,
name))

View File

@ -90,7 +90,7 @@ PiB=1024*TiB
@implementer(IEncoder)
class Encoder(object):
def __init__(self, log_parent=None, upload_status=None, progress=None):
def __init__(self, log_parent=None, upload_status=None):
object.__init__(self)
self.uri_extension_data = {}
self._codec = None
@ -102,7 +102,6 @@ class Encoder(object):
self._log_number = log.msg("creating Encoder %s" % self,
facility="tahoe.encoder", parent=log_parent)
self._aborted = False
self._progress = progress
def __repr__(self):
if hasattr(self, "_storage_index"):
@ -123,8 +122,6 @@ class Encoder(object):
def _got_size(size):
self.log(format="file size: %(size)d", size=size)
self.file_size = size
if self._progress:
self._progress.set_progress_total(self.file_size)
d.addCallback(_got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
@ -462,13 +459,6 @@ class Encoder(object):
dl = self._gather_responses(dl)
def do_progress(ign):
done = self.segment_size * (segnum + 1)
if self._progress:
self._progress.set_progress(done)
return ign
dl.addCallback(do_progress)
def _logit(res):
self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
(self,

View File

@ -337,13 +337,13 @@ class ImmutableFileNode(object):
"""
return defer.succeed(self)
def download_best_version(self, progress=None):
def download_best_version(self):
"""
Download the best version of this file, returning its contents
as a bytestring. Since there is only one version of an immutable
file, we download and return the contents of this file.
"""
d = consumer.download_to_data(self, progress=progress)
d = consumer.download_to_data(self)
return d
# for an immutable file, download_to_data (specified in IReadable)

View File

@ -113,10 +113,7 @@ class LiteralFileNode(_ImmutableFileNodeBase):
return defer.succeed(self)
def download_best_version(self, progress=None):
if progress is not None:
progress.set_progress_total(len(self.u.data))
progress.set_progress(len(self.u.data))
def download_best_version(self):
return defer.succeed(self.u.data)

View File

@ -154,8 +154,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): # type: ignore # warn
def __init__(self, storage_index,
helper, storage_broker, secret_holder,
incoming_file, encoding_file,
log_number, progress=None):
upload.CHKUploader.__init__(self, storage_broker, secret_holder, progress=progress)
log_number):
upload.CHKUploader.__init__(self, storage_broker, secret_holder)
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file

View File

@ -48,7 +48,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
DEFAULT_MAX_SEGMENT_SIZE, IProgress, IPeerSelector
DEFAULT_MAX_SEGMENT_SIZE, IPeerSelector
from allmydata.immutable import layout
from io import BytesIO
@ -945,7 +945,7 @@ class EncryptAnUploadable(object):
IEncryptedUploadable."""
CHUNKSIZE = 50*1024
def __init__(self, original, log_parent=None, progress=None, chunk_size=None):
def __init__(self, original, log_parent=None, chunk_size=None):
"""
:param chunk_size: The number of bytes to read from the uploadable at a
time, or None for some default.
@ -962,7 +962,6 @@ class EncryptAnUploadable(object):
self._file_size = None
self._ciphertext_bytes_read = 0
self._status = None
self._progress = progress
if chunk_size is not None:
self.CHUNKSIZE = chunk_size
@ -985,8 +984,6 @@ class EncryptAnUploadable(object):
self._file_size = size
if self._status:
self._status.set_size(size)
if self._progress:
self._progress.set_progress_total(size)
return size
d.addCallback(_got_size)
return d
@ -1229,7 +1226,7 @@ class UploadStatus(object):
class CHKUploader(object):
def __init__(self, storage_broker, secret_holder, progress=None, reactor=None):
def __init__(self, storage_broker, secret_holder, reactor=None):
# server_selector needs storage_broker and secret_holder
self._storage_broker = storage_broker
self._secret_holder = secret_holder
@ -1239,7 +1236,6 @@ class CHKUploader(object):
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
self._progress = progress
self._reactor = reactor
# locate_all_shareholders() will create the following attribute:
@ -1294,7 +1290,6 @@ class CHKUploader(object):
self._encoder = encode.Encoder(
self._log_number,
self._upload_status,
progress=self._progress,
)
# this just returns itself
yield self._encoder.set_encrypted_uploadable(eu)
@ -1428,13 +1423,12 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]):
class LiteralUploader(object):
def __init__(self, progress=None):
def __init__(self):
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
s.set_active(False)
self._progress = progress
def start(self, uploadable):
uploadable = IUploadable(uploadable)
@ -1442,8 +1436,6 @@ class LiteralUploader(object):
def _got_size(size):
self._size = size
self._status.set_size(size)
if self._progress:
self._progress.set_progress_total(size)
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI(b"".join(data)))
@ -1467,8 +1459,6 @@ class LiteralUploader(object):
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
self._status.set_results(ur)
if self._progress:
self._progress.set_progress(self._size)
return ur
def close(self):
@ -1867,13 +1857,12 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
def __init__(self, helper_furl=None, stats_provider=None, history=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
self._progress = progress
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
@ -1907,13 +1896,12 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
return (self._helper_furl, bool(self._helper))
def upload(self, uploadable, progress=None, reactor=None):
def upload(self, uploadable, reactor=None):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
assert self.parent
assert self.running
assert progress is None or IProgress.providedBy(progress)
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
@ -1922,15 +1910,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
precondition(isinstance(default_params, dict), default_params)
precondition("max_segment_size" in default_params, default_params)
uploadable.set_default_encoding_parameters(default_params)
if progress:
progress.set_progress_total(size)
if self.stats_provider:
self.stats_provider.count('uploader.files_uploaded', 1)
self.stats_provider.count('uploader.bytes_uploaded', size)
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader = LiteralUploader(progress=progress)
uploader = LiteralUploader()
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
@ -1943,7 +1929,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
else:
storage_broker = self.parent.get_storage_broker()
secret_holder = self.parent._secret_holder
uploader = CHKUploader(storage_broker, secret_holder, progress=progress, reactor=reactor)
uploader = CHKUploader(storage_broker, secret_holder, reactor=reactor)
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None

View File

@ -733,38 +733,6 @@ class MustNotBeUnknownRWError(CapConstraintError):
"""Cannot add an unknown child cap specified in a rw_uri field."""
class IProgress(Interface):
"""
Remembers progress measured in arbitrary units. Users of these
instances must call ``set_progress_total`` at least once before
progress can be valid, and must use the same units for both
``set_progress_total`` and ``set_progress calls``.
See also:
:class:`allmydata.util.progress.PercentProgress`
"""
progress = Attribute(
"Current amount of progress (in percentage)"
)
def set_progress(value):
"""
Sets the current amount of progress.
Arbitrary units, but must match units used for
set_progress_total.
"""
def set_progress_total(value):
"""
Sets the total amount of expected progress
Arbitrary units, but must be same units as used when calling
set_progress() on this instance)..
"""
class IReadable(Interface):
"""I represent a readable object -- either an immutable file, or a
specific version of a mutable file.
@ -794,11 +762,9 @@ class IReadable(Interface):
def get_size():
"""Return the length (in bytes) of this readable object."""
def download_to_data(progress=None):
def download_to_data():
"""Download all of the file contents. I return a Deferred that fires
with the contents as a byte string.
:param progress: None or IProgress implementer
"""
def read(consumer, offset=0, size=None):
@ -1106,13 +1072,11 @@ class IFileNode(IFilesystemNode):
the Deferred will errback with an UnrecoverableFileError.
"""
def download_best_version(progress=None):
def download_best_version():
"""Download the contents of the version that would be returned
by get_best_readable_version(). This is equivalent to calling
download_to_data() on the IReadable given by that method.
progress is anything that implements IProgress
I return a Deferred that fires with a byte string when the file
has been fully downloaded. To support streaming download, use
the 'read' method of IReadable. If no version is recoverable,
@ -1258,7 +1222,7 @@ class IMutableFileNode(IFileNode):
everything) to get increased visibility.
"""
def upload(new_contents, servermap, progress=None):
def upload(new_contents, servermap):
"""Replace the contents of the file with new ones. This requires a
servermap that was previously updated with MODE_WRITE.
@ -1279,8 +1243,6 @@ class IMutableFileNode(IFileNode):
operation. If I do not signal UncoordinatedWriteError, then I was
able to write the new version without incident.
``progress`` is either None or an IProgress provider
I return a Deferred that fires (with a PublishStatus object) when the
publish has completed. I will update the servermap in-place with the
location of all new shares.
@ -1471,14 +1433,12 @@ class IDirectoryNode(IFilesystemNode):
equivalent to calling set_node() multiple times, but is much more
efficient."""
def add_file(name, uploadable, metadata=None, overwrite=True, progress=None):
def add_file(name, uploadable, metadata=None, overwrite=True):
"""I upload a file (using the given IUploadable), then attach the
resulting ImmutableFileNode to the directory at the given name. I set
metadata the same way as set_uri and set_node. The child name must be
a unicode string.
``progress`` either provides IProgress or is None
I return a Deferred that fires (with the IFileNode of the uploaded
file) when the operation completes."""

View File

@ -418,21 +418,21 @@ class MutableFileNode(object):
return d.addCallback(_get_version, version)
def download_best_version(self, progress=None):
def download_best_version(self):
"""
I return a Deferred that fires with the contents of the best
version of this mutable file.
"""
return self._do_serialized(self._download_best_version, progress=progress)
return self._do_serialized(self._download_best_version)
def _download_best_version(self, progress=None):
def _download_best_version(self):
"""
I am the serialized sibling of download_best_version.
"""
d = self.get_best_readable_version()
d.addCallback(self._record_size)
d.addCallback(lambda version: version.download_to_data(progress=progress))
d.addCallback(lambda version: version.download_to_data())
# It is possible that the download will fail because there
# aren't enough shares to be had. If so, we will try again after
@ -447,7 +447,7 @@ class MutableFileNode(object):
d = self.get_best_mutable_version()
d.addCallback(self._record_size)
d.addCallback(lambda version: version.download_to_data(progress=progress))
d.addCallback(lambda version: version.download_to_data())
return d
d.addErrback(_maybe_retry)
@ -564,7 +564,7 @@ class MutableFileNode(object):
return d
def upload(self, new_contents, servermap, progress=None):
def upload(self, new_contents, servermap):
"""
I overwrite the contents of the best recoverable version of this
mutable file with new_contents, using servermap instead of
@ -951,13 +951,13 @@ class MutableFileVersion(object):
return self._servermap.size_of_version(self._version)
def download_to_data(self, fetch_privkey=False, progress=None): # type: ignore # fixme
def download_to_data(self, fetch_privkey=False): # type: ignore # fixme
"""
I return a Deferred that fires with the contents of this
readable object as a byte string.
"""
c = consumer.MemoryConsumer(progress=progress)
c = consumer.MemoryConsumer()
d = self.read(c, fetch_privkey=fetch_privkey)
d.addCallback(lambda mc: b"".join(mc.chunks))
return d

View File

@ -539,8 +539,8 @@ class FakeCHKFileNode(object): # type: ignore # incomplete implementation
return defer.succeed(self)
def download_to_data(self, progress=None):
return download_to_data(self, progress=progress)
def download_to_data(self):
return download_to_data(self)
download_best_version = download_to_data
@ -717,11 +717,11 @@ class FakeMutableFileNode(object): # type: ignore # incomplete implementation
d.addCallback(_done)
return d
def download_best_version(self, progress=None):
return defer.succeed(self._download_best_version(progress=progress))
def download_best_version(self):
return defer.succeed(self._download_best_version())
def _download_best_version(self, ignored=None, progress=None):
def _download_best_version(self, ignored=None):
if isinstance(self.my_uri, uri.LiteralFileURI):
return self.my_uri.data
if self.storage_index not in self.all_contents:

View File

@ -1592,7 +1592,7 @@ class FakeMutableFile(object): # type: ignore # incomplete implementation
def get_write_uri(self):
return self.uri.to_string()
def download_best_version(self, progress=None):
def download_best_version(self):
return defer.succeed(self.data)
def get_writekey(self):

View File

@ -9,10 +9,9 @@ from twisted.internet.interfaces import IConsumer
@implementer(IConsumer)
class MemoryConsumer(object):
def __init__(self, progress=None):
def __init__(self):
self.chunks = []
self.done = False
self._progress = progress
def registerProducer(self, p, streaming):
self.producer = p
@ -25,16 +24,14 @@ class MemoryConsumer(object):
def write(self, data):
self.chunks.append(data)
if self._progress is not None:
self._progress.set_progress(sum([len(c) for c in self.chunks]))
def unregisterProducer(self):
self.done = True
def download_to_data(n, offset=0, size=None, progress=None):
def download_to_data(n, offset=0, size=None):
"""
:param progress: None or an IProgress implementer
Return Deferred that fires with results of reading from the given filenode.
"""
d = n.read(MemoryConsumer(progress=progress), offset, size)
d = n.read(MemoryConsumer(), offset, size)
d.addCallback(lambda mc: b"".join(mc.chunks))
return d

View File

@ -1,37 +0,0 @@
"""
Utilities relating to computing progress information.
Ties in with the "consumer" module also
"""
from allmydata.interfaces import IProgress
from zope.interface import implementer
@implementer(IProgress)
class PercentProgress(object):
"""
Represents progress as a percentage, from 0.0 to 100.0
"""
def __init__(self, total_size=None):
self._value = 0.0
self.set_progress_total(total_size)
def set_progress(self, value):
"IProgress API"
self._value = value
def set_progress_total(self, size):
"IProgress API"
if size is not None:
size = float(size)
self._total_size = size
@property
def progress(self):
if self._total_size is None:
return 0 # or 1.0?
if self._total_size <= 0.0:
return 0
return (self._value / self._total_size) * 100.0