webish: add 'download results', with some basic timing information

This commit is contained in:
Brian Warner 2008-03-03 19:19:21 -07:00
parent c8e24f0904
commit 2b49605c51
7 changed files with 247 additions and 14 deletions

View File

@ -122,6 +122,7 @@ class SimpleCHKFileVerifier(download.FileDownloader):
self._paused = False
self._stopped = False
self._results = None
self.active_buckets = {} # k: shnum, v: bucket
self._share_buckets = [] # list of (sharenum, bucket) tuples
self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets

View File

@ -1,16 +1,16 @@
import os, random, weakref, itertools
import os, random, weakref, itertools, time
from zope.interface import implements
from twisted.internet import defer
from twisted.internet.interfaces import IPushProducer, IConsumer
from twisted.application import service
from foolscap.eventual import eventually
from allmydata.util import base32, mathutil, hashutil, log, idlib
from allmydata.util import base32, mathutil, hashutil, log
from allmydata.util.assertutil import _assert
from allmydata import codec, hashtree, storage, uri
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
IDownloadStatus
IDownloadStatus, IDownloadResults
from allmydata.encode import NotEnoughPeersError
from pycryptopp.cipher.aes import AES
@ -28,6 +28,16 @@ class BadCrypttextHashValue(Exception):
class DownloadStopped(Exception):
pass
class DownloadResults:
implements(IDownloadResults)
def __init__(self):
self.servers_used = set()
self.server_problems = {}
self.servermap = {}
self.timings = {}
self.file_size = None
class Output:
def __init__(self, downloadable, key, total_length, log_parent,
download_status):
@ -338,6 +348,7 @@ class DownloadStatus:
self.paused = False
self.stopped = False
self.active = True
self.results = None
self.counter = self.statusid_counter.next()
def get_storage_index(self):
@ -357,6 +368,8 @@ class DownloadStatus:
return self.progress
def get_active(self):
return self.active
def get_results(self):
return self.results
def get_counter(self):
return self.counter
@ -376,6 +389,8 @@ class DownloadStatus:
self.progress = value
def set_active(self, value):
self.active = value
def set_results(self, value):
self.results = value
class FileDownloader:
implements(IPushProducer)
@ -396,6 +411,7 @@ class FileDownloader:
self._si_s = storage.si_b2a(self._storage_index)
self.init_logging()
self._started = time.time()
self._status = s = DownloadStatus()
s.set_status("Starting")
s.set_storage_index(self._storage_index)
@ -403,6 +419,10 @@ class FileDownloader:
s.set_helper(False)
s.set_active(True)
self._results = DownloadResults()
s.set_results(self._results)
self._results.file_size = self._size
if IConsumer.providedBy(downloadable):
downloadable.registerProducer(self, True)
self._downloadable = downloadable
@ -463,6 +483,8 @@ class FileDownloader:
def start(self):
self.log("starting download")
if self._results:
self._results.timings["servers_peer_selection"] = {}
# first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders)
@ -495,10 +517,9 @@ class FileDownloader:
dl = []
for (peerid,ss) in self._client.get_permuted_peers("storage",
self._storage_index):
peerid_s = idlib.shortnodeid_b2a(peerid)
d = ss.callRemote("get_buckets", self._storage_index)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid_s,))
callbackArgs=(peerid,))
dl.append(d)
self._responses_received = 0
self._queries_sent = len(dl)
@ -508,14 +529,17 @@ class FileDownloader:
self._queries_sent))
return defer.DeferredList(dl)
def _got_response(self, buckets, peerid_s):
def _got_response(self, buckets, peerid):
self._responses_received += 1
if self._results:
elapsed = time.time() - self._started
self._results.timings["servers_peer_selection"][peerid] = elapsed
if self._status:
self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received,
self._queries_sent))
for sharenum, bucket in buckets.iteritems():
b = storage.ReadBucketProxy(bucket, peerid_s, self._si_s)
b = storage.ReadBucketProxy(bucket, peerid, self._si_s)
self.add_share_bucket(sharenum, b)
self._uri_extension_sources.append(b)
@ -539,6 +563,10 @@ class FileDownloader:
del self._share_vbuckets[shnum]
def _got_all_shareholders(self, res):
if self._results:
now = time.time()
self._results.timings["peer_selection"] = now - self._started
if len(self._share_buckets) < self._num_needed_shares:
raise NotEnoughPeersError
@ -558,6 +586,7 @@ class FileDownloader:
if self._status:
self._status.set_status("Obtaining URI Extension")
self._uri_extension_fetch_started = time.time()
def _validate(proposal, bucket):
h = hashutil.uri_extension_hash(proposal)
if h != self._uri_extension_hash:
@ -599,6 +628,10 @@ class FileDownloader:
return d
def _got_uri_extension(self, uri_extension_data):
if self._results:
elapsed = time.time() - self._uri_extension_fetch_started
self._results.timings["uri_extension"] = elapsed
d = self._uri_extension_data = uri_extension_data
self._codec = codec.get_decoder_by_name(d['codec_name'])
@ -621,6 +654,7 @@ class FileDownloader:
self._share_hashtree.set_hashes({0: self._roothash})
def _get_hashtrees(self, res):
self._get_hashtrees_started = time.time()
if self._status:
self._status.set_status("Retrieving Hash Trees")
d = self._get_plaintext_hashtrees()
@ -679,7 +713,9 @@ class FileDownloader:
def _setup_hashtrees(self, res):
self._output.setup_hashtrees(self._plaintext_hashtree,
self._crypttext_hashtree)
if self._results:
elapsed = time.time() - self._get_hashtrees_started
self._results.timings["hashtrees"] = elapsed
def _create_validated_buckets(self, ignored=None):
self._share_vbuckets = {}
@ -719,6 +755,8 @@ class FileDownloader:
# RIBucketReader references.
self.active_buckets = {} # k: shnum, v: ValidatedBucket instance
self._started_fetching = time.time()
d = defer.succeed(None)
for segnum in range(self._total_segments-1):
d.addCallback(self._download_segment, segnum)
@ -801,6 +839,10 @@ class FileDownloader:
def _done(self, res):
self.log("download done")
if self._results:
now = time.time()
self._results.timings["total"] = now - self._started
self._results.timings["fetching"] = now - self._started_fetching
self._output.close()
if self.check_crypttext_hash:
_assert(self._crypttext_hash == self._output.crypttext_hash,

View File

@ -1278,9 +1278,52 @@ class IUploadable(Interface):
class IUploadResults(Interface):
"""I am returned by upload() methods. I contain a number of public
attributes which can be read to determine the results of the upload::
attributes which can be read to determine the results of the upload. Some
of these are functional, some are timing information. All of these may be
None.::
.file_size : the size of the file, in bytes
.uri : the CHK read-cap for the file
.ciphertext_fetched : how many bytes were fetched by the helper
.sharemap : dict mapping share number to placement string
.servermap : dict mapping server peerid to a set of share numbers
.timings : dict of timing information, mapping name to seconds (float)
total : total upload time, start to finish
storage_index : time to compute the storage index
peer_selection : time to decide which peers will be used
contacting_helper : initial helper query to upload/no-upload decision
existence_check : helper pre-upload existence check
helper_total : initial helper query to helper finished pushing
cumulative_fetch : helper waiting for ciphertext requests
total_fetch : helper start to last ciphertext response
cumulative_encoding : just time spent in zfec
cumulative_sending : just time spent waiting for storage servers
hashes_and_close : last segment push to shareholder close
total_encode_and_push : first encode to shareholder close
"""
class IDownloadResults(Interface):
"""I am created internally by download() methods. I contain a number of
public attributes which contain details about the download process.::
.file_size : the size of the file, in bytes
.servers_used : set of server peerids that were used during download
.server_problems : dict mapping server peerid to a problem string. Only
servers that had problems (bad hashes, disconnects) are
listed here.
.servermap : dict mapping server peerid to a set of share numbers. Only
servers that had any shares are listed here.
.timings : dict of timing information, mapping name to seconds (float)
peer_selection : time to ask servers about shares
servers_peer_selection : dict of peerid to DYHB-query time
uri_extension : time to fetch a copy of the URI extension block
hashtrees : time to fetch the hash trees
fetching : time to fetch, decode, and deliver segments
cumulative_fetching : time spent waiting for storage servers
cumulative_decoding : just time spent in zfec
total : total download time, start to finish
servers_fetching : dict of peerid to list of per-segment fetch times
"""

View File

@ -84,7 +84,7 @@ class CHKCheckerAndUEBFetcher:
self.log("no readers, so no UEB", level=log.NOISY)
return
b,peerid = self._readers.pop()
rbp = storage.ReadBucketProxy(b, idlib.shortnodeid_b2a(peerid),
rbp = storage.ReadBucketProxy(b, peerid,
storage.si_b2a(self._storage_index))
d = rbp.startIfNecessary()
d.addCallback(lambda res: rbp.get_uri_extension())

View File

@ -1201,14 +1201,15 @@ class WriteBucketProxy:
class ReadBucketProxy:
implements(IStorageBucketReader)
def __init__(self, rref, peerid_s=None, storage_index_s=None):
def __init__(self, rref, peerid=None, storage_index_s=None):
self._rref = rref
self._peerid_s = peerid_s
self._peerid = peerid
self._si_s = storage_index_s
self._started = False
def __repr__(self):
return "<ReadBucketProxy to peer [%s] SI %s>" % (self._peerid_s,
peerid_s = idlib.shortnodeid_b2a(self._peerid)
return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
self._si_s)
def startIfNecessary(self):

View File

@ -18,6 +18,34 @@
<li>Status: <span n:render="status"/></li>
</ul>
<div n:render="results">
<h2>Download Results</h2>
<ul>
<li>Servers Used: <span n:render="servers_used" /></li>
<li>Servermap: <span n:render="servermap" /></li>
<li>Timings:</li>
<ul>
<li>File Size: <span n:render="string" n:data="file_size" /> bytes</li>
<li>Total: <span n:render="time" n:data="time_total" />
(<span n:render="rate" n:data="rate_total" />)</li>
<ul>
<li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>UEB Fetch: <span n:render="time" n:data="time_uri_extension" /></li>
<li>Hashtree Fetch: <span n:render="time" n:data="time_hashtrees" /></li>
<li>Segment Fetch: <span n:render="time" n:data="time_fetching" />
(<span n:render="rate" n:data="rate_fetching" />)</li>
<ul>
<li>Cumulative Fetching: <span n:render="time" n:data="time_cumulative_fetch" />
(<span n:render="rate" n:data="rate_fetch" />)</li>
<li>Cumulative Decoding: <span n:render="time" n:data="time_cumulative_decoding" />
(<span n:render="rate" n:data="rate_decode" />)</li>
</ul>
</ul>
</ul>
</ul>
</div>
<div>Return to the <a href="/">Welcome Page</a></div>
</body>

View File

@ -1663,9 +1663,127 @@ class UploadStatusPage(UploadResultsRendererMixin, rend.Page):
def render_status(self, ctx, data):
return data.get_status()
class DownloadStatusPage(rend.Page):
class DownloadResultsRendererMixin:
# this requires a method named 'download_results'
def render_servers_used(self, ctx, data):
return "nope"
def render_servermap(self, ctx, data):
d = self.download_results()
d.addCallback(lambda res: res.servermap)
def _render(servermap):
if servermap is None:
return "None"
l = T.ul()
for peerid in sorted(servermap.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
return l
d.addCallback(_render)
return d
def data_file_size(self, ctx, data):
d = self.download_results()
d.addCallback(lambda res: res.file_size)
return d
def render_time(self, ctx, data):
# 1.23s, 790ms, 132us
if data is None:
return ""
s = float(data)
if s >= 1.0:
return "%.2fs" % s
if s >= 0.01:
return "%dms" % (1000*s)
if s >= 0.001:
return "%.1fms" % (1000*s)
return "%dus" % (1000000*s)
def render_rate(self, ctx, data):
# 21.8kBps, 554.4kBps 4.37MBps
if data is None:
return ""
r = float(data)
if r > 1000000:
return "%1.2fMBps" % (r/1000000)
if r > 1000:
return "%.1fkBps" % (r/1000)
return "%dBps" % r
def _get_time(self, name):
d = self.download_results()
d.addCallback(lambda res: res.timings.get(name))
return d
def data_time_total(self, ctx, data):
return self._get_time("total")
def data_time_peer_selection(self, ctx, data):
return self._get_time("peer_selection")
def data_time_uri_extension(self, ctx, data):
return self._get_time("uri_extension")
def data_time_hashtrees(self, ctx, data):
return self._get_time("hashtrees")
def data_time_fetching(self, ctx, data):
return self._get_time("fetching")
def data_time_cumulative_fetch(self, ctx, data):
return self._get_time("cumulative_fetch")
def data_time_cumulative_decoding(self, ctx, data):
return self._get_time("cumulative_decoding")
def _get_rate(self, name):
d = self.download_results()
def _convert(r):
file_size = r.file_size
time = r.timings.get(name)
if time is None:
return None
try:
return 1.0 * file_size / time
except ZeroDivisionError:
return None
d.addCallback(_convert)
return d
def data_rate_total(self, ctx, data):
return self._get_rate("total")
def data_rate_fetching(self, ctx, data):
return self._get_rate("fetching")
def data_rate_decode(self, ctx, data):
return self._get_rate("cumulative_decoding")
def data_rate_fetch(self, ctx, data):
return self._get_rate("cumulative_fetching")
class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
docFactory = getxmlfile("download-status.xhtml")
def __init__(self, data):
rend.Page.__init__(self, data)
self.download_status = data
def download_results(self):
return defer.maybeDeferred(self.download_status.get_results)
def render_results(self, ctx, data):
d = self.download_results()
def _got_results(results):
if results:
return ctx.tag
return ""
d.addCallback(_got_results)
return d
def render_si(self, ctx, data):
si_s = base32.b2a_or_none(data.get_storage_index())
if si_s is None: