add upload timings and rates to the POST /uri?t=upload results page

This commit is contained in:
Brian Warner 2008-02-06 00:41:51 -07:00
parent 1e4504463c
commit 93d45abb02
4 changed files with 220 additions and 12 deletions

View File

@ -1,5 +1,6 @@
# -*- test-case-name: allmydata.test.test_encode -*-
import time
from zope.interface import implements
from twisted.internet import defer
from foolscap import eventual
@ -207,6 +208,14 @@ class Encoder(object):
# that we sent to that landlord.
self.share_root_hashes = [None] * self.num_shares
self._times = {
"cumulative_encoding": 0.0,
"cumulative_sending": 0.0,
"hashes_and_close": 0.0,
"total_encode_and_push": 0.0,
}
self._start_total_timestamp = time.time()
d = eventual.fireEventually()
d.addCallback(lambda res: self.start_all_shareholders())
@ -269,6 +278,7 @@ class Encoder(object):
def _encode_segment(self, segnum):
codec = self._codec
start = time.time()
# the ICodecEncoder API wants to receive a total of self.segment_size
# bytes on each encode() call, broken up into a number of
@ -297,17 +307,23 @@ class Encoder(object):
d = self._gather_data(self.required_shares, input_piece_size,
crypttext_segment_hasher)
def _done(chunks):
def _done_gathering(chunks):
for c in chunks:
assert len(c) == input_piece_size
self._crypttext_hashes.append(crypttext_segment_hasher.digest())
# during this call, we hit 5*segsize memory
return codec.encode(chunks)
d.addCallback(_done_gathering)
def _done(res):
elapsed = time.time() - start
self._times["cumulative_encoding"] += elapsed
return res
d.addCallback(_done)
return d
def _encode_tail_segment(self, segnum):
start = time.time()
codec = self._tail_codec
input_piece_size = codec.get_block_size()
@ -316,13 +332,18 @@ class Encoder(object):
d = self._gather_data(self.required_shares, input_piece_size,
crypttext_segment_hasher,
allow_short=True)
def _done(chunks):
def _done_gathering(chunks):
for c in chunks:
# a short trailing chunk will have been padded by
# _gather_data
assert len(c) == input_piece_size
self._crypttext_hashes.append(crypttext_segment_hasher.digest())
return codec.encode(chunks)
d.addCallback(_done_gathering)
def _done(res):
elapsed = time.time() - start
self._times["cumulative_encoding"] += elapsed
return res
d.addCallback(_done)
return d
@ -386,6 +407,7 @@ class Encoder(object):
# *doesn't* have a share, that's an error.
_assert(set(self.landlords.keys()).issubset(set(shareids)),
shareids=shareids, landlords=self.landlords)
start = time.time()
dl = []
lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
for i in range(len(shares)):
@ -410,6 +432,8 @@ class Encoder(object):
100 * (segnum+1) / self.num_segments,
),
level=log.OPERATIONAL)
elapsed = time.time() - start
self._times["cumulative_sending"] += elapsed
return res
dl.addCallback(_logit)
return dl
@ -463,6 +487,7 @@ class Encoder(object):
return d
def finish_hashing(self):
self._start_hashing_and_close_timestamp = time.time()
crypttext_hash = self._crypttext_hasher.digest()
self.uri_extension_data["crypttext_hash"] = crypttext_hash
d = self._uploadable.get_plaintext_hash()
@ -607,6 +632,14 @@ class Encoder(object):
def done(self):
self.log("upload done", level=log.OPERATIONAL)
now = time.time()
h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
self._times["hashes_and_close"] = h_and_c_elapsed
total_elapsed = now - self._start_total_timestamp
self._times["total_encode_and_push"] = total_elapsed
# update our sharemap
self._shares_placed = set(self.landlords.keys())
return (self.uri_extension_hash, self.required_shares,
self.num_shares, self.file_size)
@ -628,3 +661,18 @@ class Encoder(object):
return f
d.addCallback(_done)
return d
def get_shares_placed(self):
# return a set of share numbers that were successfully placed.
return self._shares_placed
def get_times(self):
# return a dictionary of encode+push timings
return self._times
def get_rates(self):
# return a dictionary of encode+push speeds
rates = {
"encode": self.file_size / self._times["cumulative_encoding"],
"push": self.file_size / self._times["cumulative_sending"],
}
return rates

View File

@ -1,5 +1,5 @@
import os
import os, time
from zope.interface import implements
from twisted.python import failure
from twisted.internet import defer
@ -38,6 +38,12 @@ class TooFullError(Exception):
class UploadResults:
implements(IUploadResults)
uri = None
sharemap = None # dict of shnum to placement string
servermap = None # dict of peerid to set(shnums)
def __init__(self):
self.timings = {} # dict of name to number of seconds
self.rates = {} # dict of name to rates (in bytes per second)
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
@ -551,6 +557,7 @@ class CHKUploader:
self._default_encoding_parameters = default_encoding_parameters
self._log_number = self._client.log("CHKUploader starting")
self._encoder = None
self._results = UploadResults()
def log(self, *args, **kwargs):
if "parent" not in kwargs:
@ -565,6 +572,7 @@ class CHKUploader:
This method returns a Deferred that will fire with the URI (a
string)."""
self._started = time.time()
uploadable = IUploadable(uploadable)
self.log("starting upload of %s" % uploadable)
@ -608,9 +616,14 @@ class CHKUploader:
num_segments = encoder.get_param("num_segments")
k,desired,n = encoder.get_param("share_counts")
self._peer_selection_started = time.time()
d = peer_selector.get_shareholders(self._client, storage_index,
share_size, block_size,
num_segments, n, desired)
def _done(res):
self._peer_selection_finished = time.time()
return res
d.addCallback(_done)
return d
def set_shareholders(self, used_peers, encoder):
@ -618,11 +631,14 @@ class CHKUploader:
@param used_peers: a sequence of PeerTracker objects
"""
self.log("_send_shares, used_peers is %s" % (used_peers,))
self._sharemap = {}
for peer in used_peers:
assert isinstance(peer, PeerTracker)
buckets = {}
for peer in used_peers:
buckets.update(peer.buckets)
for shnum in peer.buckets:
self._sharemap[shnum] = peer
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
encoder.set_shareholders(buckets)
@ -635,9 +651,27 @@ class CHKUploader:
total_shares=total_shares,
size=size,
)
results = UploadResults()
results.uri = u.to_string()
return results
r = self._results
r.uri = u.to_string()
r.sharemap = {}
r.servermap = {}
for shnum in self._encoder.get_shares_placed():
peer_tracker = self._sharemap[shnum]
peerid = peer_tracker.peerid
peerid_s = idlib.shortnodeid_b2a(peerid)
r.sharemap[shnum] = "Placed on [%s]" % peerid_s
if peerid not in r.servermap:
r.servermap[peerid] = set()
r.servermap[peerid].add(shnum)
peer_selection_time = (self._peer_selection_finished
- self._peer_selection_started)
now = time.time()
r.timings["total"] = now - self._started
r.rates["total"] = 1.0 * self._encoder.file_size / r.timings["total"]
r.timings["peer_selection"] = peer_selection_time
r.timings.update(self._encoder.get_times())
r.rates.update(self._encoder.get_rates())
return r
def read_this_many_bytes(uploadable, size, prepend_data=[]):
@ -661,6 +695,7 @@ class LiteralUploader:
def __init__(self, client):
self._client = client
self._results = UploadResults()
def set_params(self, encoding_parameters):
pass
@ -675,9 +710,8 @@ class LiteralUploader:
return d
def _build_results(self, uri):
results = UploadResults()
results.uri = uri
return results
self._results.uri = uri
return self._results
def close(self):
pass
@ -760,6 +794,7 @@ class AssistedUploader:
assert isinstance(default_encoding_parameters, dict)
self._default_encoding_parameters = default_encoding_parameters
self._log_number = log.msg("AssistedUploader starting")
self._results = UploadResults()
def log(self, msg, parent=None, **kwargs):
if parent is None:
@ -767,6 +802,7 @@ class AssistedUploader:
return log.msg(msg, parent=parent, **kwargs)
def start(self, uploadable):
self._started = time.time()
u = IUploadable(uploadable)
eu = EncryptAnUploadable(u, self._default_encoding_parameters)
self._encuploadable = eu
@ -802,11 +838,16 @@ class AssistedUploader:
self._storage_index = storage_index
def _contact_helper(self, res):
now = self._time_contacting_helper = time.time()
self._results.timings["local_hashing"] = now - self._started
self.log("contacting helper..")
d = self._helper.callRemote("upload_chk", self._storage_index)
d.addCallback(self._contacted_helper)
return d
def _contacted_helper(self, (upload_results, upload_helper)):
now = time.time()
elapsed = now - self._time_contacting_helper
self._results.timings["contacting_helper"] = elapsed
if upload_helper:
self.log("helper says we need to upload")
# we need to upload the file
@ -849,9 +890,12 @@ class AssistedUploader:
total_shares=self._total_shares,
size=self._size,
)
results = UploadResults()
results.uri = u.to_string()
return results
r = self._results
r.uri = u.to_string()
now = time.time()
r.timings["total"] = now - self._started
r.rates["total"] = 1.0 * self._size / r.timings["total"]
return r
class NoParameterPreferencesMixin:
max_segment_size = None

View File

@ -14,6 +14,24 @@
<ul>
<li>URI: <tt><span n:render="string" n:data="uri" /></tt></li>
<li>Download link: <span n:render="download_link" /></li>
<li>Sharemap: <span n:render="sharemap" /></li>
<li>Servermap: <span n:render="servermap" /></li>
<li>Timings:</li>
<ul>
<li>Total: <span n:render="time" n:data="time_total" />
(<span n:render="rate" n:data="rate_total" />)</li>
<ul>
<li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>Encode And Push: <span n:render="time" n:data="time_total_encode_and_push" /></li>
<ul>
<li>Cumulative Encoding: <span n:render="time" n:data="time_cumulative_encoding" />
(<span n:render="rate" n:data="rate_encode" />)</li>
<li>Cumulative Pushing: <span n:render="time" n:data="time_cumulative_sending" />
(<span n:render="rate" n:data="rate_push" />)</li>
<li>Send Hashes And Close: <span n:render="time" n:data="time_hashes_and_close" /></li>
</ul>
</ul>
</ul>
</ul>
<div>Return to the <a href="/">Welcome Page</a></div>

View File

@ -1275,6 +1275,104 @@ class UnlinkedPOSTCHKUploader(rend.Page):
["/uri/" + res.uri])
return d
def render_sharemap(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.sharemap)
def _render(sharemap):
if sharemap is None:
return "None"
l = T.ul()
for shnum in sorted(sharemap.keys()):
l[T.li["%d -> %s" % (shnum, sharemap[shnum])]]
return l
d.addCallback(_render)
return d
def render_servermap(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.servermap)
def _render(servermap):
if servermap is None:
return "None"
l = T.ul()
for peerid in sorted(servermap.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
return l
d.addCallback(_render)
return d
def render_time(self, ctx, data):
# 1.23s, 790ms, 132us
if data is None:
return ""
s = float(data)
if s >= 1.0:
return "%.2fs" % s
if s >= 0.01:
return "%dms" % (1000*s)
if s >= 0.001:
return "%.1fms" % (1000*s)
return "%dus" % (1000000*s)
def render_rate(self, ctx, data):
# 21.8kBps, 554.4kBps 4.37MBps
if data is None:
return ""
r = float(data)
if r > 1000000:
return "%1.2fMBps" % (r/1000000)
if r > 1000:
return "%.1fkBps" % (r/1000)
return "%dBps" % r
def data_time_total(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("total"))
return d
def data_time_peer_selection(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("peer_selection"))
return d
def data_time_total_encode_and_push(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("total_encode_and_push"))
return d
def data_time_cumulative_encoding(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("cumulative_encoding"))
return d
def data_time_cumulative_sending(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("cumulative_sending"))
return d
def data_time_hashes_and_close(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("hashes_and_close"))
return d
def data_rate_total(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.rates.get("total"))
return d
def data_rate_encode(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.rates.get("encode"))
return d
def data_rate_push(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.rates.get("push"))
return d
class UnlinkedPOSTSSKUploader(rend.Page):
def renderHTTP(self, ctx):
req = inevow.IRequest(ctx)