webish download results: add servermap, decrypt time

This commit is contained in:
Brian Warner 2008-03-03 20:09:32 -07:00
parent 2b49605c51
commit def910c391
4 changed files with 97 additions and 26 deletions

View File

@ -422,6 +422,10 @@ class FileDownloader:
self._results = DownloadResults()
s.set_results(self._results)
self._results.file_size = self._size
self._results.timings["servers_peer_selection"] = {}
self._results.timings["cumulative_fetch"] = 0.0
self._results.timings["cumulative_decode"] = 0.0
self._results.timings["cumulative_decrypt"] = 0.0
if IConsumer.providedBy(downloadable):
downloadable.registerProducer(self, True)
@ -483,8 +487,6 @@ class FileDownloader:
def start(self):
self.log("starting download")
if self._results:
self._results.timings["servers_peer_selection"] = {}
# first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders)
@ -499,6 +501,7 @@ class FileDownloader:
if self._status:
self._status.set_status("Finished")
self._status.set_active(False)
self._status.set_paused(False)
if IConsumer.providedBy(self._downloadable):
self._downloadable.unregisterProducer()
return res
@ -542,6 +545,10 @@ class FileDownloader:
b = storage.ReadBucketProxy(bucket, peerid, self._si_s)
self.add_share_bucket(sharenum, b)
self._uri_extension_sources.append(b)
if self._results:
if peerid not in self._results.servermap:
self._results.servermap[peerid] = set()
self._results.servermap[peerid].add(sharenum)
def add_share_bucket(self, sharenum, bucket):
# this is split out for the benefit of test_encode.py
@ -785,15 +792,33 @@ class FileDownloader:
# memory footprint: when the SegmentDownloader finishes pulling down
# all shares, we have 1*segment_size of usage.
segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares)
started = time.time()
d = segmentdler.start()
def _finished_fetching(res):
elapsed = time.time() - started
self._results.timings["cumulative_fetch"] += elapsed
return res
if self._results:
d.addCallback(_finished_fetching)
# pause before using more memory
d.addCallback(self._check_for_pause)
# while the codec does its job, we hit 2*segment_size
def _started_decode(res):
self._started_decode = time.time()
return res
if self._results:
d.addCallback(_started_decode)
d.addCallback(lambda (shares, shareids):
self._codec.decode(shares, shareids))
# once the codec is done, we drop back to 1*segment_size, because
# 'shares' goes out of scope. The memory usage is all in the
# plaintext now, spread out into a bunch of tiny buffers.
def _finished_decode(res):
elapsed = time.time() - self._started_decode
self._results.timings["cumulative_decode"] += elapsed
return res
if self._results:
d.addCallback(_finished_decode)
# pause/check-for-stop just before writing, to honor stopProducing
d.addCallback(self._check_for_pause)
@ -808,7 +833,11 @@ class FileDownloader:
# we're down to 1*segment_size right now, but write_segment()
# will decrypt a copy of the segment internally, which will push
# us up to 2*segment_size while it runs.
started_decrypt = time.time()
self._output.write_segment(segment)
if self._results:
elapsed = time.time() - started_decrypt
self._results.timings["cumulative_decrypt"] += elapsed
d.addCallback(_done)
return d
@ -817,11 +846,29 @@ class FileDownloader:
% (segnum, self._total_segments,
100.0 * segnum / self._total_segments))
segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares)
started = time.time()
d = segmentdler.start()
def _finished_fetching(res):
elapsed = time.time() - started
self._results.timings["cumulative_fetch"] += elapsed
return res
if self._results:
d.addCallback(_finished_fetching)
# pause before using more memory
d.addCallback(self._check_for_pause)
def _started_decode(res):
self._started_decode = time.time()
return res
if self._results:
d.addCallback(_started_decode)
d.addCallback(lambda (shares, shareids):
self._tail_codec.decode(shares, shareids))
def _finished_decode(res):
elapsed = time.time() - self._started_decode
self._results.timings["cumulative_decode"] += elapsed
return res
if self._results:
d.addCallback(_finished_decode)
# pause/check-for-stop just before writing, to honor stopProducing
d.addCallback(self._check_for_pause)
def _done(buffers):
@ -833,7 +880,11 @@ class FileDownloader:
pad_size = mathutil.pad_size(self._size, self._segment_size)
tail_size = self._segment_size - pad_size
segment = segment[:tail_size]
started_decrypt = time.time()
self._output.write_segment(segment)
if self._results:
elapsed = time.time() - started_decrypt
self._results.timings["cumulative_decrypt"] += elapsed
d.addCallback(_done)
return d
@ -842,7 +893,7 @@ class FileDownloader:
if self._results:
now = time.time()
self._results.timings["total"] = now - self._started
self._results.timings["fetching"] = now - self._started_fetching
self._results.timings["segments"] = now - self._started_fetching
self._output.close()
if self.check_crypttext_hash:
_assert(self._crypttext_hash == self._output.crypttext_hash,

View File

@ -1307,21 +1307,22 @@ class IDownloadResults(Interface):
"""I am created internally by download() methods. I contain a number of
public attributes which contain details about the download process.::
.file_size : the size of the file, in bytes
.file_size : the size of the file, in bytes
.servers_used : set of server peerids that were used during download
.server_problems : dict mapping server peerid to a problem string. Only
servers that had problems (bad hashes, disconnects) are
listed here.
.servermap : dict mapping server peerid to a set of share numbers. Only
servers that had any shares are listed here.
.servermap : dict mapping server peerid to a set of share numbers. Only
servers that had any shares are listed here.
.timings : dict of timing information, mapping name to seconds (float)
peer_selection : time to ask servers about shares
servers_peer_selection : dict of peerid to DYHB-query time
uri_extension : time to fetch a copy of the URI extension block
hashtrees : time to fetch the hash trees
fetching : time to fetch, decode, and deliver segments
cumulative_fetching : time spent waiting for storage servers
cumulative_decoding : just time spent in zfec
segments : time to fetch, decode, and deliver segments
cumulative_fetch : time spent waiting for storage servers
cumulative_decode : just time spent in zfec
cumulative_decrypt : just time spent in decryption
total : total download time, start to finish
servers_fetching : dict of peerid to list of per-segment fetch times

View File

@ -33,13 +33,15 @@
<li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>UEB Fetch: <span n:render="time" n:data="time_uri_extension" /></li>
<li>Hashtree Fetch: <span n:render="time" n:data="time_hashtrees" /></li>
<li>Segment Fetch: <span n:render="time" n:data="time_fetching" />
(<span n:render="rate" n:data="rate_fetching" />)</li>
<li>Segment Fetch: <span n:render="time" n:data="time_segments" />
(<span n:render="rate" n:data="rate_segments" />)</li>
<ul>
<li>Cumulative Fetching: <span n:render="time" n:data="time_cumulative_fetch" />
(<span n:render="rate" n:data="rate_fetch" />)</li>
<li>Cumulative Decoding: <span n:render="time" n:data="time_cumulative_decoding" />
<li>Cumulative Decoding: <span n:render="time" n:data="time_cumulative_decode" />
(<span n:render="rate" n:data="rate_decode" />)</li>
<li>Cumulative Decrypting: <span n:render="time" n:data="time_cumulative_decrypt" />
(<span n:render="rate" n:data="rate_decrypt" />)</li>
</ul>
</ul>
</ul>

View File

@ -1372,6 +1372,11 @@ class UnlinkedPUTCreateDirectory(rend.Page):
# XXX add redirect_to_result
return d
def plural(sequence):
if len(sequence) == 1:
return ""
return "s"
class UploadResultsRendererMixin:
# this requires a method named 'upload_results'
@ -1397,8 +1402,11 @@ class UploadResultsRendererMixin:
l = T.ul()
for peerid in sorted(servermap.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
shares_s = ",".join(["#%d" % shnum
for shnum in servermap[peerid]])
l[T.li["[%s] got share%s: %s" % (peerid_s,
plural(servermap[peerid]),
shares_s)]]
return l
d.addCallback(_render)
return d
@ -1678,8 +1686,11 @@ class DownloadResultsRendererMixin:
l = T.ul()
for peerid in sorted(servermap.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
shares_s = ",".join(["#%d" % shnum
for shnum in servermap[peerid]])
l[T.li["[%s] has share%s: %s" % (peerid_s,
plural(servermap[peerid]),
shares_s)]]
return l
d.addCallback(_render)
return d
@ -1730,14 +1741,17 @@ class DownloadResultsRendererMixin:
def data_time_hashtrees(self, ctx, data):
return self._get_time("hashtrees")
def data_time_fetching(self, ctx, data):
return self._get_time("fetching")
def data_time_segments(self, ctx, data):
return self._get_time("segments")
def data_time_cumulative_fetch(self, ctx, data):
return self._get_time("cumulative_fetch")
def data_time_cumulative_decoding(self, ctx, data):
return self._get_time("cumulative_decoding")
def data_time_cumulative_decode(self, ctx, data):
return self._get_time("cumulative_decode")
def data_time_cumulative_decrypt(self, ctx, data):
return self._get_time("cumulative_decrypt")
def _get_rate(self, name):
d = self.download_results()
@ -1756,14 +1770,17 @@ class DownloadResultsRendererMixin:
def data_rate_total(self, ctx, data):
return self._get_rate("total")
def data_rate_fetching(self, ctx, data):
return self._get_rate("fetching")
def data_rate_decode(self, ctx, data):
return self._get_rate("cumulative_decoding")
def data_rate_segments(self, ctx, data):
return self._get_rate("segments")
def data_rate_fetch(self, ctx, data):
return self._get_rate("cumulative_fetching")
return self._get_rate("cumulative_fetch")
def data_rate_decode(self, ctx, data):
return self._get_rate("cumulative_decode")
def data_rate_decrypt(self, ctx, data):
return self._get_rate("cumulative_decrypt")
class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
docFactory = getxmlfile("download-status.xhtml")