add upload-status objects, to track upload progress

This commit is contained in:
Brian Warner 2008-02-12 15:36:05 -07:00
parent 95f27bb8fb
commit d0ce8694c1
4 changed files with 232 additions and 13 deletions

View File

@ -10,7 +10,7 @@ from allmydata.util import mathutil, hashutil, idlib, log
from allmydata.util.assertutil import _assert, precondition
from allmydata.codec import CRSEncoder
from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
IEncryptedUploadable
IEncryptedUploadable, IUploadStatus
"""
The goal of the encoder is to turn the original file into a series of
@ -74,10 +74,13 @@ PiB=1024*TiB
class Encoder(object):
implements(IEncoder)
def __init__(self, log_parent=None):
def __init__(self, log_parent=None, upload_status=None):
object.__init__(self)
self.uri_extension_data = {}
self._codec = None
self._status = None
if upload_status:
self._status = IUploadStatus(upload_status)
precondition(log_parent is None or isinstance(log_parent, int),
log_parent)
self._log_number = log.msg("creating Encoder %s" % self,
@ -247,6 +250,18 @@ class Encoder(object):
d.addCallbacks(lambda res: self.done(), self.err)
return d
def set_status(self, status):
if self._status:
self._status.set_status(status)
def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
if self._status:
# we treat the final hash+close as an extra segment
if sent_segments is None:
sent_segments = self.num_segments
progress = float(sent_segments + extra) / (self.num_segments + 1)
self._status.set_progress(2, progress)
def abort(self):
self.log("aborting upload", level=log.UNUSUAL)
assert self._codec, "don't call abort before start"
@ -269,6 +284,7 @@ class Encoder(object):
def start_all_shareholders(self):
self.log("starting shareholders", level=log.NOISY)
self.set_status("Starting shareholders")
dl = []
for shareid in self.landlords:
d = self.landlords[shareid].start()
@ -409,6 +425,9 @@ class Encoder(object):
shareids=shareids, landlords=self.landlords)
start = time.time()
dl = []
self.set_status("Sending segment %d of %d" % (segnum+1,
self.num_segments))
self.set_encode_and_push_progress(segnum)
lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
for i in range(len(shares)):
subshare = shares[i]
@ -488,6 +507,8 @@ class Encoder(object):
def finish_hashing(self):
self._start_hashing_and_close_timestamp = time.time()
self.set_status("Finishing hashes")
self.set_encode_and_push_progress(extra=0.0)
crypttext_hash = self._crypttext_hasher.digest()
self.uri_extension_data["crypttext_hash"] = crypttext_hash
d = self._uploadable.get_plaintext_hash()
@ -509,6 +530,8 @@ class Encoder(object):
def send_plaintext_hash_tree_to_all_shareholders(self):
self.log("sending plaintext hash tree", level=log.NOISY)
self.set_status("Sending Plaintext Hash Tree")
self.set_encode_and_push_progress(extra=0.2)
dl = []
for shareid in self.landlords.keys():
d = self.send_plaintext_hash_tree(shareid,
@ -526,6 +549,8 @@ class Encoder(object):
def send_crypttext_hash_tree_to_all_shareholders(self):
self.log("sending crypttext hash tree", level=log.NOISY)
self.set_status("Sending Crypttext Hash Tree")
self.set_encode_and_push_progress(extra=0.3)
t = HashTree(self._crypttext_hashes)
all_hashes = list(t)
self.uri_extension_data["crypttext_root_hash"] = t[0]
@ -544,6 +569,8 @@ class Encoder(object):
def send_all_subshare_hash_trees(self):
self.log("sending subshare hash trees", level=log.NOISY)
self.set_status("Sending Subshare Hash Trees")
self.set_encode_and_push_progress(extra=0.4)
dl = []
for shareid,hashes in enumerate(self.subshare_hashes):
# hashes is a list of the hashes of all subshares that were sent
@ -571,6 +598,8 @@ class Encoder(object):
# not include the top-level hash root (which is stored securely in
# the URI instead).
self.log("sending all share hash trees", level=log.NOISY)
self.set_status("Sending Share Hash Trees")
self.set_encode_and_push_progress(extra=0.6)
dl = []
for h in self.share_root_hashes:
assert h
@ -597,6 +626,8 @@ class Encoder(object):
def send_uri_extension_to_all_shareholders(self):
lp = self.log("sending uri_extension", level=log.NOISY)
self.set_status("Sending URI Extensions")
self.set_encode_and_push_progress(extra=0.8)
for k in ('crypttext_root_hash', 'crypttext_hash',
'plaintext_root_hash', 'plaintext_hash',
):
@ -623,6 +654,8 @@ class Encoder(object):
def close_all_shareholders(self):
self.log("closing shareholders", level=log.NOISY)
self.set_status("Closing Shareholders")
self.set_encode_and_push_progress(extra=0.9)
dl = []
for shareid in self.landlords:
d = self.landlords[shareid].close()
@ -632,6 +665,8 @@ class Encoder(object):
def done(self):
self.log("upload done", level=log.OPERATIONAL)
self.set_status("Done")
self.set_encode_and_push_progress(extra=1.0) # done
now = time.time()
h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
self._times["hashes_and_close"] = h_and_c_elapsed
@ -645,6 +680,7 @@ class Encoder(object):
def err(self, f):
self.log("upload failed", failure=f, level=log.UNUSUAL)
self.set_status("Failed")
# we need to abort any remaining shareholders, so they'll delete the
# partial share, allowing someone else to upload it again.
self.log("aborting shareholders", level=log.UNUSUAL)

View File

@ -1104,6 +1104,13 @@ class IDownloader(Interface):
when the download is finished, or errbacks if something went wrong."""
class IEncryptedUploadable(Interface):
def set_upload_status(upload_status):
"""Provide an IUploadStatus object that should be filled with status
information. The IEncryptedUploadable is responsible for setting
key-determination progress ('chk'), size, storage_index, and
ciphertext-fetch progress. It may delegate some of this
responsibility to others, in particular to the IUploadable."""
def get_size():
"""This behaves just like IUploadable.get_size()."""
@ -1165,6 +1172,11 @@ class IEncryptedUploadable(Interface):
"""Just like IUploadable.close()."""
class IUploadable(Interface):
def set_upload_status(upload_status):
"""Provide an IUploadStatus object that should be filled with status
information. The IUploadable is responsible for setting
key-determination progress ('chk')."""
def set_default_encoding_parameters(params):
"""Set the default encoding parameters, which must be a dict mapping
strings to ints. The meaningful keys are 'k', 'happy', 'n', and
@ -1362,6 +1374,32 @@ class IClient(Interface):
IDirectoryNode-providing instances, like NewDirectoryNode.
"""
class IUploadStatus(Interface):
def get_storage_index():
"""Return a string with the (binary) storage index in use on this
upload. Returns None if the storage index has not yet been
calculated."""
def get_size():
"""Return an integer with the number of bytes that will eventually
be uploaded for this file. Returns None if the size is not yet known.
"""
def using_helper():
"""Return True if this upload is using a Helper, False if not."""
def get_status():
"""Return a string describing the current state of the upload
process."""
def get_progress():
"""Returns a tuple of floats, (chk, ciphertext, encode_and_push),
each from 0.0 to 1.0 . 'chk' describes how much progress has been
made towards hashing the file to determine a CHK encryption key: if
non-convergent encryption is in use, this will be trivial, otherwise
the whole file must be hashed. 'ciphertext' describes how much of the
ciphertext has been pushed to the helper, and is '1.0' for non-helper
uploads. 'encode_and_push' describes how much of the encode-and-push
process has finished: for helper uploads this is dependent upon the
helper providing progress reports. It might be reasonable to add all
three numbers and report the sum to the user."""
class NotCapableError(Exception):
"""You have tried to write to a read-only node."""

View File

@ -139,6 +139,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
upload_id = idlib.b2a(storage_index)[:6]
self._log_number = log_number
self._results = results
self._upload_status = upload.UploadStatus()
self._upload_status.set_helper(False)
self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
parent=log_number)
@ -416,6 +418,10 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
self._upload_helper = upload_helper
self._storage_index = storage_index
self._encoding_file = encoding_file
self._status = None
def set_upload_status(self, upload_status):
self._status = interfaces.IUploadStatus(upload_status)
def start(self):
self._size = os.stat(self._encoding_file)[stat.ST_SIZE]

View File

@ -1,5 +1,5 @@
import os, time
import os, time, weakref
from zope.interface import implements
from twisted.python import failure
from twisted.internet import defer
@ -16,7 +16,7 @@ from allmydata import encode, storage, hashtree, uri
from allmydata.util import idlib, mathutil
from allmydata.util.assertutil import precondition
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
from pycryptopp.cipher.aes import AES
from cStringIO import StringIO
@ -113,12 +113,13 @@ class PeerTracker:
class Tahoe2PeerSelector:
def __init__(self, upload_id, logparent=None):
def __init__(self, upload_id, logparent=None, upload_status=None):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
self.error_count = 0
self.num_peers_contacted = 0
self.last_failure_msg = None
self._status = IUploadStatus(upload_status)
self._log_parent = log.msg("%s starting" % self, parent=logparent)
def __repr__(self):
@ -132,6 +133,9 @@ class Tahoe2PeerSelector:
shares for us
"""
if self._status:
self._status.set_status("Contacting Peers..")
self.total_shares = total_shares
self.shares_of_happiness = shares_of_happiness
@ -204,6 +208,11 @@ class Tahoe2PeerSelector:
shares_to_ask = set([self.homeless_shares.pop(0)])
self.query_count += 1
self.num_peers_contacted += 1
if self._status:
self._status.set_status("Contacting Peers [%s] (first query),"
" %d shares left.."
% (idlib.shortnodeid_b2a(peer.peerid),
len(self.homeless_shares)))
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask,
self.contacted_peers)
@ -220,6 +229,11 @@ class Tahoe2PeerSelector:
shares_to_ask = set(self.homeless_shares[:num_shares])
self.homeless_shares[:num_shares] = []
self.query_count += 1
if self._status:
self._status.set_status("Contacting Peers [%s] (second query),"
" %d shares left.."
% (idlib.shortnodeid_b2a(peer.peerid),
len(self.homeless_shares)))
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask,
self.contacted_peers2)
@ -250,6 +264,8 @@ class Tahoe2PeerSelector:
raise encode.NotEnoughPeersError(msg)
else:
# we placed enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
return self.use_peers
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
@ -339,6 +355,12 @@ class EncryptAnUploadable:
self._plaintext_segment_hashes = []
self._encoding_parameters = None
self._file_size = None
self._ciphertext_bytes_read = 0
self._status = None
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
self.original.set_upload_status(upload_status)
def log(self, *args, **kwargs):
if "facility" not in kwargs:
@ -351,6 +373,8 @@ class EncryptAnUploadable:
d = self.original.get_size()
def _got_size(size):
self._file_size = size
if self._status:
self._status.set_size(size)
return size
d.addCallback(_got_size)
return d
@ -384,7 +408,8 @@ class EncryptAnUploadable:
# specify that it is truncated to the same 128 bits as the AES key.
assert len(storage_index) == 16 # SHA-256 truncated to 128b
self._storage_index = storage_index
if self._status:
self._status.set_storage_index(storage_index)
return e
d.addCallback(_got)
return d
@ -432,6 +457,8 @@ class EncryptAnUploadable:
def read_encrypted(self, length, hash_only):
# make sure our parameters have been set up first
d = self.get_all_encoding_parameters()
# and size
d.addCallback(lambda ignored: self.get_size())
d.addCallback(lambda ignored: self._get_encryptor())
# then fetch and encrypt the plaintext. The unusual structure here
# (passing a Deferred *into* a function) is needed to avoid
@ -481,10 +508,12 @@ class EncryptAnUploadable:
cryptdata = []
# we use data.pop(0) instead of 'for chunk in data' to save
# memory: each chunk is destroyed as soon as we're done with it.
bytes_processed = 0
while data:
chunk = data.pop(0)
log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk),
level=log.NOISY)
bytes_processed += len(chunk)
self._plaintext_hasher.update(chunk)
self._update_segment_hash(chunk)
# TODO: we have to encrypt the data (even if hash_only==True)
@ -499,6 +528,10 @@ class EncryptAnUploadable:
cryptdata.append(ciphertext)
del ciphertext
del chunk
self._ciphertext_bytes_read += bytes_processed
if self._status:
progress = float(self._ciphertext_bytes_read) / self._file_size
self._status.set_progress(1, progress)
return cryptdata
@ -526,6 +559,38 @@ class EncryptAnUploadable:
def close(self):
return self.original.close()
class UploadStatus:
implements(IUploadStatus)
def __init__(self):
self.storage_index = None
self.size = None
self.helper = False
self.status = "Not started"
self.progress = [0.0, 0.0, 0.0]
def get_storage_index(self):
return self.storage_index
def get_size(self):
return self.size
def using_helper(self):
return self.helper
def get_status(self):
return self.status
def get_progress(self):
return tuple(self.progress)
def set_storage_index(self, si):
self.storage_index = si
def set_size(self, size):
self.size = size
def set_helper(self, helper):
self.helper = helper
def set_status(self, status):
self.status = status
def set_progress(self, which, value):
# [0]: chk, [1]: ciphertext, [2]: encode+push
self.progress[which] = value
class CHKUploader:
peer_selector_class = Tahoe2PeerSelector
@ -535,6 +600,9 @@ class CHKUploader:
self._log_number = self._client.log("CHKUploader starting")
self._encoder = None
self._results = UploadResults()
self._storage_index = None
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
def log(self, *args, **kwargs):
if "parent" not in kwargs:
@ -554,6 +622,7 @@ class CHKUploader:
self.log("starting upload of %s" % uploadable)
eu = EncryptAnUploadable(uploadable)
eu.set_upload_status(self._upload_status)
d = self.start_encrypted(eu)
def _uploaded(res):
d1 = uploadable.get_encryption_key()
@ -575,7 +644,8 @@ class CHKUploader:
eu = IEncryptedUploadable(encrypted)
started = time.time()
self._encoder = e = encode.Encoder(self._log_number)
self._encoder = e = encode.Encoder(self._log_number,
self._upload_status)
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
@ -588,9 +658,11 @@ class CHKUploader:
peer_selection_started = now = time.time()
self._storage_index_elapsed = now - started
storage_index = encoder.get_param("storage_index")
self._storage_index = storage_index
upload_id = idlib.b2a(storage_index)[:6]
self.log("using storage index %s" % upload_id)
peer_selector = self.peer_selector_class(upload_id, self._log_number)
peer_selector = self.peer_selector_class(upload_id, self._log_number,
self._upload_status)
share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size")
@ -657,6 +729,8 @@ class CHKUploader:
r.uri = u.to_string()
return r
def get_upload_status(self):
return self._upload_status
def read_this_many_bytes(uploadable, size, prepend_data=[]):
if size == 0:
@ -680,11 +754,17 @@ class LiteralUploader:
def __init__(self, client):
self._client = client
self._results = UploadResults()
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
def start(self, uploadable):
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
self._size = size
self._status.set_size(size)
self._results.file_size = size
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
@ -695,21 +775,41 @@ class LiteralUploader:
def _build_results(self, uri):
self._results.uri = uri
self._status.set_status("Done")
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
return self._results
def close(self):
pass
def get_upload_status(self):
return self._status
class RemoteEncryptedUploadable(Referenceable):
implements(RIEncryptedUploadable)
def __init__(self, encrypted_uploadable):
def __init__(self, encrypted_uploadable, upload_status):
self._eu = IEncryptedUploadable(encrypted_uploadable)
self._offset = 0
self._bytes_sent = 0
self._status = IUploadStatus(upload_status)
# we are responsible for updating the status string while we run, and
# for setting the ciphertext-fetch progress.
self._size = None
def get_size(self):
if self._size is not None:
return defer.succeed(self._size)
d = self._eu.get_size()
def _got_size(size):
self._size = size
return size
d.addCallback(_got_size)
return d
def remote_get_size(self):
return self._eu.get_size()
return self.get_size()
def remote_get_all_encoding_parameters(self):
return self._eu.get_all_encoding_parameters()
@ -771,6 +871,9 @@ class AssistedUploader:
def __init__(self, helper):
self._helper = helper
self._log_number = log.msg("AssistedUploader starting")
self._storage_index = None
self._upload_status = s = UploadStatus()
s.set_helper(True)
def log(self, msg, parent=None, **kwargs):
if parent is None:
@ -781,6 +884,7 @@ class AssistedUploader:
self._started = time.time()
u = IUploadable(uploadable)
eu = EncryptAnUploadable(u)
eu.set_upload_status(self._upload_status)
self._encuploadable = eu
d = eu.get_size()
d.addCallback(self._got_size)
@ -800,6 +904,7 @@ class AssistedUploader:
def _got_size(self, size):
self._size = size
self._upload_status.set_size(size)
def _got_all_encoding_parameters(self, params):
k, happy, n, segment_size = params
@ -819,6 +924,7 @@ class AssistedUploader:
now = self._time_contacting_helper_start = time.time()
self._storage_index_elapsed = now - self._started
self.log("contacting helper..")
self._upload_status.set_status("Contacting Helper")
d = self._helper.callRemote("upload_chk", self._storage_index)
d.addCallback(self._contacted_helper)
return d
@ -829,16 +935,23 @@ class AssistedUploader:
self._elapsed_time_contacting_helper = elapsed
if upload_helper:
self.log("helper says we need to upload")
self._upload_status.set_status("Uploading Ciphertext")
# we need to upload the file
reu = RemoteEncryptedUploadable(self._encuploadable)
d = upload_helper.callRemote("upload", reu)
reu = RemoteEncryptedUploadable(self._encuploadable,
self._upload_status)
# let it pre-compute the size for progress purposes
d = reu.get_size()
d.addCallback(lambda ignored:
upload_helper.callRemote("upload", reu))
# this Deferred will fire with the upload results
return d
self.log("helper says file is already uploaded")
self._upload_status.set_progress(1, 1.0)
return upload_results
def _build_readcap(self, upload_results):
self.log("upload finished, building readcap")
self._upload_status.set_status("Building Readcap")
r = upload_results
assert r.uri_extension_data["needed_shares"] == self._needed_shares
assert r.uri_extension_data["total_shares"] == self._total_shares
@ -858,8 +971,12 @@ class AssistedUploader:
if "total" in r.timings:
r.timings["helper_total"] = r.timings["total"]
r.timings["total"] = now - self._started
self._upload_status.set_status("Done")
return r
def get_upload_status(self):
return self._upload_status
class BaseUploadable:
default_max_segment_size = 1*MiB # overridden by max_segment_size
default_encoding_param_k = 3 # overridden by encoding_parameters
@ -872,6 +989,10 @@ class BaseUploadable:
encoding_param_n = None
_all_encoding_parameters = None
_status = None
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
def set_default_encoding_parameters(self, default_params):
assert isinstance(default_params, dict)
@ -915,25 +1036,38 @@ class FileHandle(BaseUploadable):
self._filehandle = filehandle
self._key = None
self._contenthashkey = contenthashkey
self._size = None
def _get_encryption_key_content_hash(self):
if self._key is not None:
return defer.succeed(self._key)
d = self.get_all_encoding_parameters()
d = self.get_size()
# that sets self._size as a side-effect
d.addCallback(lambda size: self.get_all_encoding_parameters())
def _got(params):
k, happy, n, segsize = params
f = self._filehandle
enckey_hasher = content_hash_key_hasher(k, n, segsize)
f.seek(0)
BLOCKSIZE = 64*1024
bytes_read = 0
while True:
data = f.read(BLOCKSIZE)
if not data:
break
enckey_hasher.update(data)
# TODO: setting progress in a non-yielding loop is kind of
# pointless, but I'm anticipating (perhaps prematurely) the
# day when we use a slowjob or twisted's CooperatorService to
# make this yield time to other jobs.
bytes_read += len(data)
if self._status:
self._status.set_progress(0, float(bytes_read)/self._size)
f.seek(0)
self._key = enckey_hasher.digest()
if self._status:
self._status.set_progress(0, 1.0)
assert len(self._key) == 16
return self._key
d.addCallback(_got)
@ -951,8 +1085,11 @@ class FileHandle(BaseUploadable):
return self._get_encryption_key_random()
def get_size(self):
if self._size is not None:
return defer.succeed(self._size)
self._filehandle.seek(0,2)
size = self._filehandle.tell()
self._size = size
self._filehandle.seek(0)
return defer.succeed(size)
@ -985,6 +1122,7 @@ class Uploader(service.MultiService):
def __init__(self, helper_furl=None):
self._helper_furl = helper_furl
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary()
service.MultiService.__init__(self)
def startService(self):
@ -1021,6 +1159,7 @@ class Uploader(service.MultiService):
uploader = AssistedUploader(self._helper)
else:
uploader = self.uploader_class(self.parent)
self._all_uploads[uploader.get_upload_status()] = None
return uploader.start(uploadable)
d.addCallback(_got_size)
def _done(res):