Fix mutable publish/retrieve timing status displays. Fixes #1505.

publish:
* encrypt and encode times are cumulative, not just current-segment

retrieve:
* same for decrypt and decode times
* update "current status" to include segment number
* set status to Finished/Failed when download is complete
* set progress to 1.0 when complete

More improvements to consider:
* progress is currently 0% or 100%: should calculate how many segments are
  involved (remembering retrieve can be less than the whole file) and set it
  to a fraction
* "fetch" time is fuzzy: what we want is to know how much of the delay is not
  our own fault, but since we do decode/decrypt work while waiting for more
  shares, it's not straightforward
This commit is contained in:
Brian Warner 2011-08-28 16:22:21 -07:00
parent d575ccba28
commit 9756146d61
2 changed files with 28 additions and 11 deletions

View File

@ -33,6 +33,8 @@ class PublishStatus:
def __init__(self):
self.timings = {}
self.timings["send_per_server"] = {}
self.timings["encrypt"] = 0.0
self.timings["encode"] = 0.0
self.servermap = None
self.problems = {}
self.active = True
@ -49,6 +51,10 @@ class PublishStatus:
if peerid not in self.timings["send_per_server"]:
self.timings["send_per_server"][peerid] = []
self.timings["send_per_server"][peerid].append(elapsed)
def accumulate_encode_time(self, elapsed):
self.timings["encode"] += elapsed
def accumulate_encrypt_time(self, elapsed):
self.timings["encrypt"] += elapsed
def get_started(self):
return self.started
@ -711,7 +717,7 @@ class Publish:
assert len(crypttext) == len(data)
now = time.time()
self._status.timings["encrypt"] = now - started
self._status.accumulate_encrypt_time(now - started)
started = now
# now apply FEC
@ -732,7 +738,7 @@ class Publish:
d = fec.encode(crypttext_pieces)
def _done_encoding(res):
elapsed = time.time() - started
self._status.timings["encode"] = elapsed
self._status.accumulate_encode_time(elapsed)
return (res, salt)
d.addCallback(_done_encoding)
return d

View File

@ -24,6 +24,8 @@ class RetrieveStatus:
def __init__(self):
self.timings = {}
self.timings["fetch_per_server"] = {}
self.timings["decode"] = 0.0
self.timings["decrypt"] = 0.0
self.timings["cumulative_verify"] = 0.0
self.problems = {}
self.active = True
@ -59,6 +61,10 @@ class RetrieveStatus:
if peerid not in self.timings["fetch_per_server"]:
self.timings["fetch_per_server"][peerid] = []
self.timings["fetch_per_server"][peerid].append(elapsed)
def accumulate_decode_time(self, elapsed):
self.timings["decode"] += elapsed
def accumulate_decrypt_time(self, elapsed):
self.timings["decrypt"] += elapsed
def set_storage_index(self, si):
self.storage_index = si
def set_helper(self, helper):
@ -153,6 +159,9 @@ class Retrieve:
kwargs["facility"] = "tahoe.mutable.retrieve"
return log.msg(*args, **kwargs)
def _set_current_status(self, state):
seg = "%d/%d" % (self._current_segment, self._last_segment)
self._status.set_status("segment %s (%s)" % (seg, state))
###################
# IPushProducer
@ -168,7 +177,7 @@ class Retrieve:
# fired when the download is unpaused.
self._old_status = self._status.get_status()
self._status.set_status("Paused")
self._set_current_status("paused")
self._pause_deferred = defer.Deferred()
@ -806,8 +815,9 @@ class Retrieve:
# validate this block, then generate the block hash root.
self.log("validating share %d for segment %d" % (reader.shnum,
segnum))
self._status.add_fetch_timing(reader.peerid, started)
self._status.set_status("Valdiating blocks for segment %d" % segnum)
elapsed = time.time() - started
self._status.add_fetch_timing(reader.peerid, elapsed)
self._set_current_status("validating blocks")
# Did we fail to fetch either of the things that we were
# supposed to? Fail if so.
if not results[0][0] and results[1][0]:
@ -946,7 +956,7 @@ class Retrieve:
shareids.append(shareid)
shares.append(share)
self._status.set_status("Decoding")
self._set_current_status("decoding")
started = time.time()
assert len(shareids) >= self._required_shares, len(shareids)
# zfec really doesn't want extra shares
@ -971,8 +981,7 @@ class Retrieve:
size_to_use = self._segment_size
segment = segment[:size_to_use]
self.log(" segment len=%d" % len(segment))
self._status.timings.setdefault("decode", 0)
self._status.timings['decode'] = time.time() - started
self._status.accumulate_decode_time(time.time() - started)
return segment, salt
d.addCallback(_process)
return d
@ -984,14 +993,13 @@ class Retrieve:
the plaintext of the segment that is in my argument.
"""
segment, salt = segment_and_salt
self._status.set_status("decrypting")
self._set_current_status("decrypting")
self.log("decrypting segment %d" % self._current_segment)
started = time.time()
key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
decryptor = AES(key)
plaintext = decryptor.process(segment)
self._status.timings.setdefault("decrypt", 0)
self._status.timings['decrypt'] = time.time() - started
self._status.accumulate_decrypt_time(time.time() - started)
return plaintext
@ -1079,6 +1087,8 @@ class Retrieve:
now = time.time()
self._status.timings['total'] = now - self._started
self._status.timings['fetch'] = now - self._started_fetching
self._status.set_status("Finished")
self._status.set_progress(1.0)
# remember the encoding parameters, use them again next time
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
@ -1109,6 +1119,7 @@ class Retrieve:
now = time.time()
self._status.timings['total'] = now - self._started
self._status.timings['fetch'] = now - self._started_fetching
self._status.set_status("Failed")
if self._verify:
ret = list(self._bad_shares)