webish: add more mutable-retrieve timing status

This commit is contained in:
Brian Warner 2008-03-04 21:04:36 -07:00
parent ca1a1762e2
commit ec23da28a6
3 changed files with 185 additions and 60 deletions

View File

@ -203,10 +203,14 @@ class RetrieveStatus:
statusid_counter = count(0)
def __init__(self):
self.timings = {}
self.sharemap = None
self.timings["fetch_per_server"] = {}
self.sharemap = {}
self.problems = {}
self.active = True
self.storage_index = None
self.helper = False
self.encoding = ("?","?")
self.search_distance = None
self.size = None
self.status = "Not started"
self.progress = 0.0
@ -217,6 +221,10 @@ class RetrieveStatus:
return self.started
def get_storage_index(self):
return self.storage_index
def get_encoding(self):
return self.encoding
def get_search_distance(self):
return self.search_distance
def using_helper(self):
return self.helper
def get_size(self):
@ -234,6 +242,10 @@ class RetrieveStatus:
self.storage_index = si
def set_helper(self, helper):
self.helper = helper
def set_encoding(self, k, n):
self.encoding = (k, n)
def set_search_distance(self, value):
self.search_distance = value
def set_size(self, size):
self.size = size
def set_status(self, status):
@ -346,6 +358,7 @@ class Retrieve:
# the hashes over and over again.
self._valid_shares = {}
self._started = time.time()
self._done_deferred = defer.Deferred()
d = defer.succeed(initial_query_count)
@ -359,6 +372,7 @@ class Retrieve:
def _choose_initial_peers(self, numqueries):
n = self._node
started = time.time()
full_peerlist = n._client.get_permuted_peers("storage",
self._storage_index)
@ -373,9 +387,13 @@ class Retrieve:
# we later increase this limit, it may be useful to re-scan the
# permuted list.
self._peerlist_limit = numqueries
self._status.set_search_distance(len(self._peerlist))
elapsed = time.time() - started
self._status.timings["peer_selection"] = elapsed
return self._peerlist
def _send_initial_requests(self, peerlist):
self._first_query_sent = time.time()
self._bad_peerids = set()
self._running = True
self._queries_outstanding = set()
@ -392,9 +410,11 @@ class Retrieve:
return None
def _do_query(self, ss, peerid, storage_index, readsize):
started = time.time()
self._queries_outstanding.add(peerid)
d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)])
d.addCallback(self._got_results, peerid, readsize, (ss, storage_index))
d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
started)
d.addErrback(self._query_failed, peerid)
# errors that aren't handled by _query_failed (and errors caused by
# _query_failed) get logged, but we still want to check for doneness.
@ -406,12 +426,19 @@ class Retrieve:
verifier = rsa.create_verifying_key_from_string(pubkey_s)
return verifier
def _got_results(self, datavs, peerid, readsize, stuff):
def _got_results(self, datavs, peerid, readsize, stuff, started):
elapsed = time.time() - started
if peerid not in self._status.timings["fetch_per_server"]:
self._status.timings["fetch_per_server"][peerid] = []
self._status.timings["fetch_per_server"][peerid].append(elapsed)
self._queries_outstanding.discard(peerid)
self._used_peers.add(peerid)
if not self._running:
return
if peerid not in self._status.sharemap:
self._status.sharemap[peerid] = set()
for shnum,datav in datavs.items():
data = datav[0]
try:
@ -447,16 +474,20 @@ class Retrieve:
fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
assert len(fingerprint) == 32
if fingerprint != self._node._fingerprint:
self._status.problems[peerid] = "sh#%d: pubkey doesn't match fingerprint" % shnum
raise CorruptShareError(peerid, shnum,
"pubkey doesn't match fingerprint")
self._pubkey = self._deserialize_pubkey(pubkey_s)
self._node._populate_pubkey(self._pubkey)
verinfo = (seqnum, root_hash, IV, segsize, datalength)
self._status.sharemap[peerid].add(verinfo)
if verinfo not in self._valid_versions:
# it's a new pair. Verify the signature.
valid = self._pubkey.verify(prefix, signature)
if not valid:
self._status.problems[peerid] = "sh#%d: invalid signature" % shnum
raise CorruptShareError(peerid, shnum,
"signature is invalid")
# ok, it's a valid verinfo. Add it to the list of validated
@ -486,11 +517,15 @@ class Retrieve:
# rest of the shares), we need to implement the refactoring mentioned
# above.
if k != self._required_shares:
self._status.problems[peerid] = "sh#%d: k=%d, we want %d" \
% (shnum, k, self._required_shares)
raise CorruptShareError(peerid, shnum,
"share has k=%d, we want k=%d" %
(k, self._required_shares))
if N != self._total_shares:
self._status.problems[peerid] = "sh#%d: N=%d, we want %d" \
% (shnum, N, self._total_shares)
raise CorruptShareError(peerid, shnum,
"share has N=%d, we want N=%d" %
(N, self._total_shares))
@ -587,14 +622,19 @@ class Retrieve:
level=log.UNUSUAL)
# are there any peers on the list that we haven't used?
new_query_peers = []
for (peerid, ss) in self._peerlist:
peer_indicies = []
for i, (peerid, ss) in enumerate(self._peerlist):
if peerid not in self._used_peers:
new_query_peers.append( (peerid, ss) )
peer_indicies.append(i)
if len(new_query_peers) > 5:
# only query in batches of 5. TODO: this is pretty
# arbitrary, really I want this to be something like
# k - max(known_version_sharecounts) + some extra
break
new_search_distance = max(max(peer_indicies),
self._status.get_search_distance())
self._status.set_search_distance(new_search_distance)
if new_query_peers:
self.log("sending %d new queries (read %d bytes)" %
(len(new_query_peers), self._read_size), level=log.UNUSUAL)
@ -671,6 +711,8 @@ class Retrieve:
# now that the big loop is done, all shares in the sharemap are
# valid, and they're all for the same seqnum+root_hash version, so
# it's now down to doing FEC and decrypt.
elapsed = time.time() - self._started
self._status.timings["fetch"] = elapsed
assert len(shares) >= self._required_shares, len(shares)
d = defer.maybeDeferred(self._decode, shares, segsize, datalength)
d.addCallback(self._decrypt, IV, seqnum, root_hash)
@ -728,8 +770,12 @@ class Retrieve:
self.log("params %s, we have %d shares" % (params, len(shares)))
self.log("about to decode, shareids=%s" % (shareids,))
started = time.time()
d = defer.maybeDeferred(fec.decode, shares, shareids)
def _done(buffers):
elapsed = time.time() - started
self._status.timings["decode"] = elapsed
self._status.set_encoding(self._required_shares, self._total_shares)
self.log(" decode done, %d buffers" % len(buffers))
segment = "".join(buffers)
self.log(" joined length %d, datalength %d" %
@ -745,9 +791,12 @@ class Retrieve:
return d
def _decrypt(self, crypttext, IV, seqnum, root_hash):
started = time.time()
key = hashutil.ssk_readkey_data_hash(IV, self._readkey)
decryptor = AES(key)
plaintext = decryptor.process(crypttext)
elapsed = time.time() - started
self._status.timings["decrypt"] = elapsed
# it worked, so record the seqnum and root_hash for next time
self._node._populate_seqnum(seqnum)
self._node._populate_root_hash(root_hash)
@ -760,6 +809,8 @@ class Retrieve:
self._status.set_status("Done")
self._status.set_progress(1.0)
self._status.set_size(len(contents))
elapsed = time.time() - self._started
self._status.timings["total"] = elapsed
eventually(self._done_deferred.callback, contents)
def get_status(self):

View File

@ -19,4 +19,28 @@
<li>Status: <span n:render="status"/></li>
</ul>
<h2>Retrieve Results</h2>
<ul>
<li n:render="encoding" />
<li n:render="search_distance" />
<li n:render="problems" />
<li>Timings:</li>
<ul>
<li>Total: <span n:render="time" n:data="time_total" />
(<span n:render="rate" n:data="rate_total" />)</li>
<ul>
<li>Initial Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>Fetching: <span n:render="time" n:data="time_fetch" />
(<span n:render="rate" n:data="rate_fetch" />)</li>
<li>Decoding: <span n:render="time" n:data="time_decode" />
(<span n:render="rate" n:data="rate_decode" />)</li>
<li>Decrypting: <span n:render="time" n:data="time_decrypt" />
(<span n:render="rate" n:data="rate_decrypt" />)</li>
</ul>
<li n:render="server_timings" />
</ul>
</ul>
<div>Return to the <a href="/">Welcome Page</a></div>
</body></html>

View File

@ -1373,12 +1373,42 @@ class UnlinkedPUTCreateDirectory(rend.Page):
# XXX add redirect_to_result
return d
def plural(sequence):
if len(sequence) == 1:
def plural(sequence_or_length):
if isinstance(sequence_or_length, int):
length = sequence_or_length
else:
length = len(sequence_or_length)
if length == 1:
return ""
return "s"
class UploadResultsRendererMixin:
class RateAndTimeMixin:
def render_time(self, ctx, data):
# 1.23s, 790ms, 132us
if data is None:
return ""
s = float(data)
if s >= 1.0:
return "%.2fs" % s
if s >= 0.01:
return "%dms" % (1000*s)
if s >= 0.001:
return "%.1fms" % (1000*s)
return "%dus" % (1000000*s)
def render_rate(self, ctx, data):
# 21.8kBps, 554.4kBps 4.37MBps
if data is None:
return ""
r = float(data)
if r > 1000000:
return "%1.2fMBps" % (r/1000000)
if r > 1000:
return "%.1fkBps" % (r/1000)
return "%dBps" % r
class UploadResultsRendererMixin(RateAndTimeMixin):
# this requires a method named 'upload_results'
def render_sharemap(self, ctx, data):
@ -1417,30 +1447,6 @@ class UploadResultsRendererMixin:
d.addCallback(lambda res: res.file_size)
return d
def render_time(self, ctx, data):
# 1.23s, 790ms, 132us
if data is None:
return ""
s = float(data)
if s >= 1.0:
return "%.2fs" % s
if s >= 0.01:
return "%dms" % (1000*s)
if s >= 0.001:
return "%.1fms" % (1000*s)
return "%dus" % (1000000*s)
def render_rate(self, ctx, data):
# 21.8kBps, 554.4kBps 4.37MBps
if data is None:
return ""
r = float(data)
if r > 1000000:
return "%1.2fMBps" % (r/1000000)
if r > 1000:
return "%.1fkBps" % (r/1000)
return "%dBps" % r
def _get_time(self, name):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get(name))
@ -1678,7 +1684,7 @@ class UploadStatusPage(UploadResultsRendererMixin, rend.Page):
def render_status(self, ctx, data):
return data.get_status()
class DownloadResultsRendererMixin:
class DownloadResultsRendererMixin(RateAndTimeMixin):
# this requires a method named 'download_results'
def render_servermap(self, ctx, data):
@ -1730,30 +1736,6 @@ class DownloadResultsRendererMixin:
d.addCallback(lambda res: res.file_size)
return d
def render_time(self, ctx, data):
# 1.23s, 790ms, 132us
if data is None:
return ""
s = float(data)
if s >= 1.0:
return "%.2fs" % s
if s >= 0.01:
return "%dms" % (1000*s)
if s >= 0.001:
return "%.1fms" % (1000*s)
return "%dus" % (1000000*s)
def render_rate(self, ctx, data):
# 21.8kBps, 554.4kBps 4.37MBps
if data is None:
return ""
r = float(data)
if r > 1000000:
return "%1.2fMBps" % (r/1000000)
if r > 1000:
return "%.1fkBps" % (r/1000)
return "%dBps" % r
def _get_time(self, name):
d = self.download_results()
d.addCallback(lambda res: res.timings.get(name))
@ -1877,9 +1859,13 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
def render_status(self, ctx, data):
return data.get_status()
class RetrieveStatusPage(rend.Page):
class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
docFactory = getxmlfile("retrieve-status.xhtml")
def __init__(self, data):
rend.Page.__init__(self, data)
self.retrieve_status = data
def render_started(self, ctx, data):
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
started_s = time.strftime(TIME_FORMAT,
@ -1910,6 +1896,70 @@ class RetrieveStatusPage(rend.Page):
def render_status(self, ctx, data):
return data.get_status()
def render_encoding(self, ctx, data):
k, n = data.get_encoding()
return ctx.tag["Encoding: %s of %s" % (k, n)]
def render_search_distance(self, ctx, data):
d = data.get_search_distance()
return ctx.tag["Search Distance: %s peer%s" % (d, plural(d))]
def render_problems(self, ctx, data):
problems = data.problems
if not problems:
return ""
l = T.ul()
for peerid in sorted(problems.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]]
return ctx.tag["Server Problems:", l]
def _get_rate(self, data, name):
file_size = self.retrieve_status.get_size()
time = self.retrieve_status.timings.get(name)
if time is None:
return None
try:
return 1.0 * file_size / time
except ZeroDivisionError:
return None
def data_time_total(self, ctx, data):
return self.retrieve_status.timings.get("total")
def data_rate_total(self, ctx, data):
return self._get_rate(data, "total")
def data_time_peer_selection(self, ctx, data):
return self.retrieve_status.timings.get("peer_selection")
def data_time_fetch(self, ctx, data):
return self.retrieve_status.timings.get("fetch")
def data_rate_fetch(self, ctx, data):
return self._get_rate(data, "fetch")
def data_time_decode(self, ctx, data):
return self.retrieve_status.timings.get("decode")
def data_rate_decode(self, ctx, data):
return self._get_rate(data, "decode")
def data_time_decrypt(self, ctx, data):
return self.retrieve_status.timings.get("decrypt")
def data_rate_decrypt(self, ctx, data):
return self._get_rate(data, "decrypt")
def render_server_timings(self, ctx, data):
per_server = self.retrieve_status.timings.get("fetch_per_server")
if not per_server:
return ""
l = T.ul()
for peerid in sorted(per_server.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
times_s = ", ".join([self.render_time(None, t)
for t in per_server[peerid]])
l[T.li["[%s]: %s" % (peerid_s, times_s)]]
return T.li["Per-Server Fetch Response Times: ", l]
class PublishStatusPage(rend.Page):
docFactory = getxmlfile("publish-status.xhtml")
@ -1956,9 +2006,9 @@ class Status(rend.Page):
def data_recent_operations(self, ctx, data):
recent = [o for o in (IClient(ctx).list_recent_uploads() +
IClient(ctx).list_recent_downloads() +
IClient(ctx).list_recent_publish() +
IClient(ctx).list_recent_retrieve())
IClient(ctx).list_recent_downloads() +
IClient(ctx).list_recent_publish() +
IClient(ctx).list_recent_retrieve())
if not o.get_active()]
recent.sort(lambda a,b: cmp(a.get_started(), b.get_started()))
recent.reverse()