mutable WIP: re-enable publish/retrieve status

This commit is contained in:
Brian Warner
2008-04-16 17:49:06 -07:00
parent 5afc26f1d1
commit 749c42fa2c
9 changed files with 176 additions and 134 deletions

View File

@ -300,10 +300,10 @@ class Client(node.Node, testutil.PollMixin):
assert IMutableFileURI.providedBy(u), u assert IMutableFileURI.providedBy(u), u
return MutableFileNode(self).init_from_uri(u) return MutableFileNode(self).init_from_uri(u)
def notify_publish(self, p): def notify_publish(self, publish_status):
self.getServiceNamed("mutable-watcher").notify_publish(p) self.getServiceNamed("mutable-watcher").notify_publish(publish_status)
def notify_retrieve(self, r): def notify_retrieve(self, retrieve_status):
self.getServiceNamed("mutable-watcher").notify_retrieve(r) self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
def create_empty_dirnode(self): def create_empty_dirnode(self):
n = NewDirectoryNode(self) n = NewDirectoryNode(self)

View File

@ -22,8 +22,6 @@ from retrieve import Retrieve
class MutableFileNode: class MutableFileNode:
implements(IMutableFileNode) implements(IMutableFileNode)
publish_class = Publish
retrieve_class = Retrieve
SIGNATURE_KEY_SIZE = 2048 SIGNATURE_KEY_SIZE = 2048
DEFAULT_ENCODING = (3, 10) DEFAULT_ENCODING = (3, 10)
@ -90,7 +88,7 @@ class MutableFileNode:
# nobody knows about us yet" # nobody knows about us yet"
self._current_seqnum = 0 self._current_seqnum = 0
self._current_roothash = "\x00"*32 self._current_roothash = "\x00"*32
return self._publish(initial_contents) return self._publish(None, initial_contents)
d.addCallback(_generated) d.addCallback(_generated)
return d return d
@ -225,17 +223,13 @@ class MutableFileNode:
def download_version(self, servermap, versionid): def download_version(self, servermap, versionid):
"""Returns a Deferred that fires with a string.""" """Returns a Deferred that fires with a string."""
d = self.obtain_lock() d = self.obtain_lock()
d.addCallback(lambda res: d.addCallback(lambda res: self._retrieve(servermap, versionid))
Retrieve(self, servermap, versionid).download())
d.addBoth(self.release_lock) d.addBoth(self.release_lock)
return d return d
def publish(self, servermap, newdata): def publish(self, servermap, new_contents):
assert self._pubkey, "update_servermap must be called before publish"
d = self.obtain_lock() d = self.obtain_lock()
d.addCallback(lambda res: Publish(self, servermap).publish(newdata)) d.addCallback(lambda res: self._publish(servermap, new_contents))
# p = self.publish_class(self)
# self._client.notify_publish(p)
d.addBoth(self.release_lock) d.addBoth(self.release_lock)
return d return d
@ -269,16 +263,11 @@ class MutableFileNode:
verifier = self.get_verifier() verifier = self.get_verifier()
return self._client.getServiceNamed("checker").check(verifier) return self._client.getServiceNamed("checker").check(verifier)
def download(self, target):
# fake it. TODO: make this cleaner. def _retrieve(self, servermap, verinfo):
d = self.download_to_data() r = Retrieve(self, servermap, verinfo)
def _done(data): self._client.notify_retrieve(r.get_status())
target.open(len(data)) return r.download()
target.write(data)
target.close()
return target.finish()
d.addCallback(_done)
return d
def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ): def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ):
d = self.update_servermap(old_map=old_map, mode=mode) d = self.update_servermap(old_map=old_map, mode=mode)
@ -306,22 +295,33 @@ class MutableFileNode:
d.addBoth(self.release_lock) d.addBoth(self.release_lock)
return d return d
def _publish(self, initial_contents): def download(self, target):
p = Publish(self, None) # fake it. TODO: make this cleaner.
d = p.publish(initial_contents) d = self.download_to_data()
d.addCallback(lambda res: self) def _done(data):
target.open(len(data))
target.write(data)
target.close()
return target.finish()
d.addCallback(_done)
return d return d
def update(self, newdata):
def _publish(self, servermap, new_contents):
assert self._pubkey, "update_servermap must be called before publish"
p = Publish(self, servermap)
self._client.notify_publish(p.get_status())
return p.publish(new_contents)
def update(self, new_contents):
d = self.obtain_lock() d = self.obtain_lock()
d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE)) d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE))
d.addCallback(lambda smap: d.addCallback(self._publish, new_contents)
Publish(self, smap).publish(newdata))
d.addBoth(self.release_lock) d.addBoth(self.release_lock)
return d return d
def overwrite(self, newdata): def overwrite(self, new_contents):
return self.update(newdata) return self.update(new_contents)
class MutableWatcher(service.MultiService): class MutableWatcher(service.MultiService):
@ -332,42 +332,40 @@ class MutableWatcher(service.MultiService):
def __init__(self, stats_provider=None): def __init__(self, stats_provider=None):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self.stats_provider = stats_provider self.stats_provider = stats_provider
self._all_publish = weakref.WeakKeyDictionary() self._all_publish_status = weakref.WeakKeyDictionary()
self._recent_publish_status = [] self._recent_publish_status = []
self._all_retrieve = weakref.WeakKeyDictionary() self._all_retrieve_status = weakref.WeakKeyDictionary()
self._recent_retrieve_status = [] self._recent_retrieve_status = []
def notify_publish(self, p): def notify_publish(self, p):
self._all_publish[p] = None self._all_publish_status[p] = None
self._recent_publish_status.append(p.get_status()) self._recent_publish_status.append(p)
if self.stats_provider: if self.stats_provider:
self.stats_provider.count('mutable.files_published', 1) self.stats_provider.count('mutable.files_published', 1)
#self.stats_provider.count('mutable.bytes_published', p._node.get_size()) self.stats_provider.count('mutable.bytes_published', p.get_size())
while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES: while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
self._recent_publish_status.pop(0) self._recent_publish_status.pop(0)
def list_all_publish(self): def list_all_publish(self):
return self._all_publish.keys() return self._all_publish_status.keys()
def list_active_publish(self): def list_active_publish(self):
return [p.get_status() for p in self._all_publish.keys() return [p for p in self._all_publish_status.keys() if p.get_active()]
if p.get_status().get_active()]
def list_recent_publish(self): def list_recent_publish(self):
return self._recent_publish_status return self._recent_publish_status
def notify_retrieve(self, r): def notify_retrieve(self, r):
self._all_retrieve[r] = None self._all_retrieve_status[r] = None
self._recent_retrieve_status.append(r.get_status()) self._recent_retrieve_status.append(r)
if self.stats_provider: if self.stats_provider:
self.stats_provider.count('mutable.files_retrieved', 1) self.stats_provider.count('mutable.files_retrieved', 1)
#self.stats_provider.count('mutable.bytes_retrieved', r._node.get_size()) self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES: while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
self._recent_retrieve_status.pop(0) self._recent_retrieve_status.pop(0)
def list_all_retrieve(self): def list_all_retrieve(self):
return self._all_retrieve.keys() return self._all_retrieve_status.keys()
def list_active_retrieve(self): def list_active_retrieve(self):
return [p.get_status() for p in self._all_retrieve.keys() return [p for p in self._all_retrieve_status.keys() if p.get_active()]
if p.get_status().get_active()]
def list_recent_retrieve(self): def list_recent_retrieve(self):
return self._recent_retrieve_status return self._recent_retrieve_status

View File

@ -4,10 +4,12 @@ import os, struct, time
from itertools import count from itertools import count
from zope.interface import implements from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer
from twisted.python import failure
from allmydata.interfaces import IPublishStatus from allmydata.interfaces import IPublishStatus
from allmydata.util import base32, hashutil, mathutil, idlib, log from allmydata.util import base32, hashutil, mathutil, idlib, log
from allmydata import hashtree, codec, storage from allmydata import hashtree, codec, storage
from pycryptopp.cipher.aes import AES from pycryptopp.cipher.aes import AES
from foolscap.eventual import eventually
from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
from servermap import ServerMap from servermap import ServerMap
@ -19,27 +21,23 @@ class PublishStatus:
statusid_counter = count(0) statusid_counter = count(0)
def __init__(self): def __init__(self):
self.timings = {} self.timings = {}
self.timings["per_server"] = {} self.timings["send_per_server"] = {}
self.privkey_from = None self.servermap = None
self.peers_queried = None
self.sharemap = None # DictOfSets
self.problems = {} self.problems = {}
self.active = True self.active = True
self.storage_index = None self.storage_index = None
self.helper = False self.helper = False
self.encoding = ("?", "?") self.encoding = ("?", "?")
self.initial_read_size = None
self.size = None self.size = None
self.status = "Not started" self.status = "Not started"
self.progress = 0.0 self.progress = 0.0
self.counter = self.statusid_counter.next() self.counter = self.statusid_counter.next()
self.started = time.time() self.started = time.time()
def add_per_server_time(self, peerid, op, elapsed): def add_per_server_time(self, peerid, elapsed):
assert op in ("read", "write") if peerid not in self.timings["send_per_server"]:
if peerid not in self.timings["per_server"]: self.timings["send_per_server"][peerid] = []
self.timings["per_server"][peerid] = [] self.timings["send_per_server"][peerid].append(elapsed)
self.timings["per_server"][peerid].append((op,elapsed))
def get_started(self): def get_started(self):
return self.started return self.started
@ -49,6 +47,8 @@ class PublishStatus:
return self.encoding return self.encoding
def using_helper(self): def using_helper(self):
return self.helper return self.helper
def get_servermap(self):
return self.servermap
def get_size(self): def get_size(self):
return self.size return self.size
def get_status(self): def get_status(self):
@ -64,6 +64,8 @@ class PublishStatus:
self.storage_index = si self.storage_index = si
def set_helper(self, helper): def set_helper(self, helper):
self.helper = helper self.helper = helper
def set_servermap(self, servermap):
self.servermap = servermap
def set_encoding(self, k, n): def set_encoding(self, k, n):
self.encoding = (k, n) self.encoding = (k, n)
def set_size(self, size): def set_size(self, size):
@ -102,6 +104,13 @@ class Publish:
self._log_number = num self._log_number = num
self._running = True self._running = True
self._status = PublishStatus()
self._status.set_storage_index(self._storage_index)
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
self._status.set_servermap(servermap)
def log(self, *args, **kwargs): def log(self, *args, **kwargs):
if 'parent' not in kwargs: if 'parent' not in kwargs:
kwargs['parent'] = self._log_number kwargs['parent'] = self._log_number
@ -129,6 +138,9 @@ class Publish:
# 5: when enough responses are back, we're done # 5: when enough responses are back, we're done
self.log("starting publish, datalen is %s" % len(newdata)) self.log("starting publish, datalen is %s" % len(newdata))
self._status.set_size(len(newdata))
self._status.set_status("Started")
self._started = time.time()
self.done_deferred = defer.Deferred() self.done_deferred = defer.Deferred()
@ -160,6 +172,8 @@ class Publish:
assert self.required_shares is not None assert self.required_shares is not None
self.total_shares = self._node.get_total_shares() self.total_shares = self._node.get_total_shares()
assert self.total_shares is not None assert self.total_shares is not None
self._status.set_encoding(self.required_shares, self.total_shares)
self._pubkey = self._node.get_pubkey() self._pubkey = self._node.get_pubkey()
assert self._pubkey assert self._pubkey
self._privkey = self._node.get_privkey() self._privkey = self._node.get_privkey()
@ -209,8 +223,13 @@ class Publish:
# create the shares. We'll discard these as they are delivered. SMDF: # create the shares. We'll discard these as they are delivered. SMDF:
# we're allowed to hold everything in memory. # we're allowed to hold everything in memory.
self._status.timings["setup"] = time.time() - self._started
d = self._encrypt_and_encode() d = self._encrypt_and_encode()
d.addCallback(self._generate_shares) d.addCallback(self._generate_shares)
def _start_pushing(res):
self._started_pushing = time.time()
return res
d.addCallback(_start_pushing)
d.addCallback(self.loop) # trigger delivery d.addCallback(self.loop) # trigger delivery
d.addErrback(self._fatal_error) d.addErrback(self._fatal_error)
@ -233,11 +252,22 @@ class Publish:
self.log("error during loop", failure=f, level=log.SCARY) self.log("error during loop", failure=f, level=log.SCARY)
self._done(f) self._done(f)
def _update_status(self):
self._status.set_status("Sending Shares: %d placed out of %d, "
"%d messages outstanding" %
(len(self.placed),
len(self.goal),
len(self.outstanding)))
self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
def loop(self, ignored=None): def loop(self, ignored=None):
self.log("entering loop", level=log.NOISY) self.log("entering loop", level=log.NOISY)
if not self._running:
return
self.update_goal() self.update_goal()
# how far are we from our goal? # how far are we from our goal?
needed = self.goal - self.placed - self.outstanding needed = self.goal - self.placed - self.outstanding
self._update_status()
if needed: if needed:
# we need to send out new shares # we need to send out new shares
@ -258,6 +288,9 @@ class Publish:
# no queries outstanding, no placements needed: we're done # no queries outstanding, no placements needed: we're done
self.log("no queries outstanding, no placements needed: done", self.log("no queries outstanding, no placements needed: done",
level=log.OPERATIONAL) level=log.OPERATIONAL)
now = time.time()
elapsed = now - self._started_pushing
self._status.timings["push"] = elapsed
return self._done(None) return self._done(None)
def log_goal(self, goal): def log_goal(self, goal):
@ -331,19 +364,21 @@ class Publish:
# shares that we care about. # shares that we care about.
self.log("_encrypt_and_encode") self.log("_encrypt_and_encode")
#started = time.time() self._status.set_status("Encrypting")
started = time.time()
key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey) key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
enc = AES(key) enc = AES(key)
crypttext = enc.process(self.newdata) crypttext = enc.process(self.newdata)
assert len(crypttext) == len(self.newdata) assert len(crypttext) == len(self.newdata)
#now = time.time() now = time.time()
#self._status.timings["encrypt"] = now - started self._status.timings["encrypt"] = now - started
#started = now started = now
# now apply FEC # now apply FEC
self._status.set_status("Encoding")
fec = codec.CRSEncoder() fec = codec.CRSEncoder()
fec.set_params(self.segment_size, fec.set_params(self.segment_size,
self.required_shares, self.total_shares) self.required_shares, self.total_shares)
@ -358,8 +393,8 @@ class Publish:
d = fec.encode(crypttext_pieces) d = fec.encode(crypttext_pieces)
def _done_encoding(res): def _done_encoding(res):
#elapsed = time.time() - started elapsed = time.time() - started
#self._status.timings["encode"] = elapsed self._status.timings["encode"] = elapsed
return res return res
d.addCallback(_done_encoding) d.addCallback(_done_encoding)
return d return d
@ -367,7 +402,8 @@ class Publish:
def _generate_shares(self, shares_and_shareids): def _generate_shares(self, shares_and_shareids):
# this sets self.shares and self.root_hash # this sets self.shares and self.root_hash
self.log("_generate_shares") self.log("_generate_shares")
#started = time.time() self._status.set_status("Generating Shares")
started = time.time()
# we should know these by now # we should know these by now
privkey = self._privkey privkey = self._privkey
@ -413,9 +449,9 @@ class Publish:
# then they all share the same encprivkey at the end. The sizes # then they all share the same encprivkey at the end. The sizes
# of everything are the same for all shares. # of everything are the same for all shares.
#sign_started = time.time() sign_started = time.time()
signature = privkey.sign(prefix) signature = privkey.sign(prefix)
#self._status.timings["sign"] = time.time() - sign_started self._status.timings["sign"] = time.time() - sign_started
verification_key = pubkey.serialize() verification_key = pubkey.serialize()
@ -429,8 +465,8 @@ class Publish:
all_shares[shnum], all_shares[shnum],
encprivkey) encprivkey)
final_shares[shnum] = final_share final_shares[shnum] = final_share
#elapsed = time.time() - started elapsed = time.time() - started
#self._status.timings["pack"] = elapsed self._status.timings["pack"] = elapsed
self.shares = final_shares self.shares = final_shares
self.root_hash = root_hash self.root_hash = root_hash
@ -449,7 +485,6 @@ class Publish:
def _send_shares(self, needed): def _send_shares(self, needed):
self.log("_send_shares") self.log("_send_shares")
#started = time.time()
# we're finally ready to send out our shares. If we encounter any # we're finally ready to send out our shares. If we encounter any
# surprises here, it's because somebody else is writing at the same # surprises here, it's because somebody else is writing at the same
@ -547,6 +582,7 @@ class Publish:
d.addErrback(self._fatal_error) d.addErrback(self._fatal_error)
dl.append(d) dl.append(d)
self._update_status()
return defer.DeferredList(dl) # purely for testing return defer.DeferredList(dl) # purely for testing
def _do_testreadwrite(self, peerid, secrets, def _do_testreadwrite(self, peerid, secrets,
@ -568,6 +604,10 @@ class Publish:
for shnum in shnums: for shnum in shnums:
self.outstanding.discard( (peerid, shnum) ) self.outstanding.discard( (peerid, shnum) )
now = time.time()
elapsed = now - started
self._status.add_per_server_time(peerid, elapsed)
wrote, read_data = answer wrote, read_data = answer
if not wrote: if not wrote:
@ -650,13 +690,16 @@ class Publish:
if not self._running: if not self._running:
return return
self._running = False self._running = False
#now = time.time() now = time.time()
#self._status.timings["total"] = now - self._started self._status.timings["total"] = now - self._started
#self._status.set_active(False) self._status.set_active(False)
#self._status.set_status("Done") if isinstance(res, failure.Failure):
#self._status.set_progress(1.0) self.log("Retrieve done, with failure", failure=res)
self.done_deferred.callback(res) self._status.set_status("Failed")
return None else:
self._status.set_status("Done")
self._status.set_progress(1.0)
eventually(self.done_deferred.callback, res)
def get_status(self): def get_status(self):
return self._status return self._status

View File

@ -21,13 +21,11 @@ class RetrieveStatus:
self.timings = {} self.timings = {}
self.timings["fetch_per_server"] = {} self.timings["fetch_per_server"] = {}
self.timings["cumulative_verify"] = 0.0 self.timings["cumulative_verify"] = 0.0
self.sharemap = {}
self.problems = {} self.problems = {}
self.active = True self.active = True
self.storage_index = None self.storage_index = None
self.helper = False self.helper = False
self.encoding = ("?","?") self.encoding = ("?","?")
self.search_distance = None
self.size = None self.size = None
self.status = "Not started" self.status = "Not started"
self.progress = 0.0 self.progress = 0.0
@ -40,8 +38,6 @@ class RetrieveStatus:
return self.storage_index return self.storage_index
def get_encoding(self): def get_encoding(self):
return self.encoding return self.encoding
def get_search_distance(self):
return self.search_distance
def using_helper(self): def using_helper(self):
return self.helper return self.helper
def get_size(self): def get_size(self):
@ -55,14 +51,16 @@ class RetrieveStatus:
def get_counter(self): def get_counter(self):
return self.counter return self.counter
def add_fetch_timing(self, peerid, elapsed):
if peerid not in self.timings["fetch_per_server"]:
self.timings["fetch_per_server"][peerid] = []
self.timings["fetch_per_server"][peerid].append(elapsed)
def set_storage_index(self, si): def set_storage_index(self, si):
self.storage_index = si self.storage_index = si
def set_helper(self, helper): def set_helper(self, helper):
self.helper = helper self.helper = helper
def set_encoding(self, k, n): def set_encoding(self, k, n):
self.encoding = (k, n) self.encoding = (k, n)
def set_search_distance(self, value):
self.search_distance = value
def set_size(self, size): def set_size(self, size):
self.size = size self.size = size
def set_status(self, status): def set_status(self, status):
@ -99,6 +97,19 @@ class Retrieve:
assert self._node._pubkey assert self._node._pubkey
self.verinfo = verinfo self.verinfo = verinfo
self._status = RetrieveStatus()
self._status.set_storage_index(self._storage_index)
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
self._status.set_size(datalength)
self._status.set_encoding(k, N)
def get_status(self):
return self._status
def log(self, *args, **kwargs): def log(self, *args, **kwargs):
if "parent" not in kwargs: if "parent" not in kwargs:
kwargs["parent"] = self._log_number kwargs["parent"] = self._log_number
@ -106,6 +117,8 @@ class Retrieve:
def download(self): def download(self):
self._done_deferred = defer.Deferred() self._done_deferred = defer.Deferred()
self._started = time.time()
self._status.set_status("Retrieving Shares")
# first, which servers can we use? # first, which servers can we use?
versionmap = self.servermap.make_versionmap() versionmap = self.servermap.make_versionmap()
@ -165,6 +178,7 @@ class Retrieve:
self._outstanding_queries[m] = (peerid, shnum, started) self._outstanding_queries[m] = (peerid, shnum, started)
# ask the cache first # ask the cache first
got_from_cache = False
datav = [] datav = []
#for (offset, length) in readv: #for (offset, length) in readv:
# (data, timestamp) = self._node._cache.read(self.verinfo, shnum, # (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
@ -173,13 +187,14 @@ class Retrieve:
# datav.append(data) # datav.append(data)
if len(datav) == len(readv): if len(datav) == len(readv):
self.log("got data from cache") self.log("got data from cache")
got_from_cache = True
d = defer.succeed(datav) d = defer.succeed(datav)
else: else:
self.remaining_sharemap[shnum].remove(peerid) self.remaining_sharemap[shnum].remove(peerid)
d = self._do_read(ss, peerid, self._storage_index, [shnum], readv) d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
d.addCallback(self._fill_cache, readv) d.addCallback(self._fill_cache, readv)
d.addCallback(self._got_results, m, peerid, started) d.addCallback(self._got_results, m, peerid, started, got_from_cache)
d.addErrback(self._query_failed, m, peerid) d.addErrback(self._query_failed, m, peerid)
# errors that aren't handled by _query_failed (and errors caused by # errors that aren't handled by _query_failed (and errors caused by
# _query_failed) get logged, but we still want to check for doneness. # _query_failed) get logged, but we still want to check for doneness.
@ -216,7 +231,11 @@ class Retrieve:
for shnum in list(self.remaining_sharemap.keys()): for shnum in list(self.remaining_sharemap.keys()):
self.remaining_sharemap.discard(shnum, peerid) self.remaining_sharemap.discard(shnum, peerid)
def _got_results(self, datavs, marker, peerid, started): def _got_results(self, datavs, marker, peerid, started, got_from_cache):
now = time.time()
elapsed = now - started
if not got_from_cache:
self._status.add_fetch_timing(peerid, elapsed)
self.log(format="got results (%(shares)d shares) from [%(peerid)s]", self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
shares=len(datavs), shares=len(datavs),
peerid=idlib.shortnodeid_b2a(peerid), peerid=idlib.shortnodeid_b2a(peerid),
@ -241,6 +260,7 @@ class Retrieve:
self.remove_peer(peerid) self.remove_peer(peerid)
self.servermap.mark_bad_share(peerid, shnum) self.servermap.mark_bad_share(peerid, shnum)
self._bad_shares.add( (peerid, shnum) ) self._bad_shares.add( (peerid, shnum) )
self._status.problems[peerid] = f
self._last_failure = f self._last_failure = f
pass pass
# all done! # all done!
@ -284,6 +304,7 @@ class Retrieve:
self.log(format="query to [%(peerid)s] failed", self.log(format="query to [%(peerid)s] failed",
peerid=idlib.shortnodeid_b2a(peerid), peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY) level=log.NOISY)
self._status.problems[peerid] = f
self._outstanding_queries.pop(marker, None) self._outstanding_queries.pop(marker, None)
if not self._running: if not self._running:
return return
@ -317,6 +338,10 @@ class Retrieve:
# to fix it, so the download will fail. # to fix it, so the download will fail.
self._decoding = True # avoid reentrancy self._decoding = True # avoid reentrancy
self._status.set_status("decoding")
now = time.time()
elapsed = now - self._started
self._status.timings["fetch"] = elapsed
d = defer.maybeDeferred(self._decode) d = defer.maybeDeferred(self._decode)
d.addCallback(self._decrypt, IV, self._node._readkey) d.addCallback(self._decrypt, IV, self._node._readkey)
@ -366,6 +391,7 @@ class Retrieve:
peerid = list(self.remaining_sharemap[shnum])[0] peerid = list(self.remaining_sharemap[shnum])[0]
# get_data will remove that peerid from the sharemap, and add the # get_data will remove that peerid from the sharemap, and add the
# query to self._outstanding_queries # query to self._outstanding_queries
self._status.set_status("Retrieving More Shares")
self.get_data(shnum, peerid) self.get_data(shnum, peerid)
needed -= 1 needed -= 1
if not needed: if not needed:
@ -400,6 +426,7 @@ class Retrieve:
return return
def _decode(self): def _decode(self):
started = time.time()
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix, (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo offsets_tuple) = self.verinfo
@ -423,6 +450,7 @@ class Retrieve:
self.log("about to decode, shareids=%s" % (shareids,)) self.log("about to decode, shareids=%s" % (shareids,))
d = defer.maybeDeferred(fec.decode, shares, shareids) d = defer.maybeDeferred(fec.decode, shares, shareids)
def _done(buffers): def _done(buffers):
self._status.timings["decode"] = time.time() - started
self.log(" decode done, %d buffers" % len(buffers)) self.log(" decode done, %d buffers" % len(buffers))
segment = "".join(buffers) segment = "".join(buffers)
self.log(" joined length %d, datalength %d" % self.log(" joined length %d, datalength %d" %
@ -438,21 +466,28 @@ class Retrieve:
return d return d
def _decrypt(self, crypttext, IV, readkey): def _decrypt(self, crypttext, IV, readkey):
self._status.set_status("decrypting")
started = time.time() started = time.time()
key = hashutil.ssk_readkey_data_hash(IV, readkey) key = hashutil.ssk_readkey_data_hash(IV, readkey)
decryptor = AES(key) decryptor = AES(key)
plaintext = decryptor.process(crypttext) plaintext = decryptor.process(crypttext)
self._status.timings["decrypt"] = time.time() - started
return plaintext return plaintext
def _done(self, res): def _done(self, res):
if not self._running: if not self._running:
return return
self._running = False self._running = False
self._status.set_active(False)
self._status.timings["total"] = time.time() - self._started
# res is either the new contents, or a Failure # res is either the new contents, or a Failure
if isinstance(res, failure.Failure): if isinstance(res, failure.Failure):
self.log("Retrieve done, with failure", failure=res) self.log("Retrieve done, with failure", failure=res)
self._status.set_status("Failed")
else: else:
self.log("Retrieve done, success!") self.log("Retrieve done, success!")
self._status.set_status("Done")
self._status.set_progress(1.0)
# remember the encoding parameters, use them again next time # remember the encoding parameters, use them again next time
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix, (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo offsets_tuple) = self.verinfo

View File

@ -85,6 +85,13 @@ class ServerMap:
for (peerid, shnum) for (peerid, shnum)
in self.servermap]) in self.servermap])
def make_sharemap(self):
"""Return a dict that maps shnum to a set of peerds that hold it."""
sharemap = DictOfSets()
for (peerid, shnum) in self.servermap:
sharemap.add(shnum, peerid)
return sharemap
def make_versionmap(self): def make_versionmap(self):
"""Return a dict that maps versionid to sets of (shnum, peerid, """Return a dict that maps versionid to sets of (shnum, peerid,
timestamp) tuples.""" timestamp) tuples."""

View File

@ -25,6 +25,7 @@ class KeyGenService(unittest.TestCase, testutil.PollMixin):
t.setServiceParent(self.parent) t.setServiceParent(self.parent)
t.listenOn("tcp:0") t.listenOn("tcp:0")
t.setLocationAutomatically() t.setLocationAutomatically()
return eventual.fireEventually()
def tearDown(self): def tearDown(self):
d = self.parent.stopService() d = self.parent.stopService()

View File

@ -22,7 +22,6 @@
<h2>Retrieve Results</h2> <h2>Retrieve Results</h2>
<ul> <ul>
<li n:render="encoding" /> <li n:render="encoding" />
<li n:render="peers_queried" />
<li n:render="problems" /> <li n:render="problems" />
<li n:render="sharemap" /> <li n:render="sharemap" />
<li>Timings:</li> <li>Timings:</li>
@ -31,12 +30,6 @@
(<span n:render="rate" n:data="rate_total" />)</li> (<span n:render="rate" n:data="rate_total" />)</li>
<ul> <ul>
<li>Setup: <span n:render="time" n:data="time_setup" /></li> <li>Setup: <span n:render="time" n:data="time_setup" /></li>
<li>Initial Version Query: <span n:render="time" n:data="time_query" />
(read size <span n:render="string" n:data="initial_read_size"/> bytes)</li>
<li>Obtain Privkey: <span n:render="time" n:data="time_privkey" />
<ul>
<li>Separate Privkey Fetch: <span n:render="time" n:data="time_privkey_fetch" /> <span n:render="privkey_from"/></li>
</ul></li>
<li>Encrypting: <span n:render="time" n:data="time_encrypt" /> <li>Encrypting: <span n:render="time" n:data="time_encrypt" />
(<span n:render="rate" n:data="rate_encrypt" />)</li> (<span n:render="rate" n:data="rate_encrypt" />)</li>
<li>Encoding: <span n:render="time" n:data="time_encode" /> <li>Encoding: <span n:render="time" n:data="time_encode" />

View File

@ -22,19 +22,14 @@
<h2>Retrieve Results</h2> <h2>Retrieve Results</h2>
<ul> <ul>
<li n:render="encoding" /> <li n:render="encoding" />
<li n:render="search_distance" />
<li n:render="problems" /> <li n:render="problems" />
<li>Timings:</li> <li>Timings:</li>
<ul> <ul>
<li>Total: <span n:render="time" n:data="time_total" /> <li>Total: <span n:render="time" n:data="time_total" />
(<span n:render="rate" n:data="rate_total" />)</li> (<span n:render="rate" n:data="rate_total" />)</li>
<ul> <ul>
<li>Initial Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>Fetching: <span n:render="time" n:data="time_fetch" /> <li>Fetching: <span n:render="time" n:data="time_fetch" />
(<span n:render="rate" n:data="rate_fetch" />) (<span n:render="rate" n:data="rate_fetch" />)</li>
<ul>
<li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li>
</ul></li>
<li>Decoding: <span n:render="time" n:data="time_decode" /> <li>Decoding: <span n:render="time" n:data="time_decode" />
(<span n:render="rate" n:data="rate_decode" />)</li> (<span n:render="rate" n:data="rate_decode" />)</li>
<li>Decrypting: <span n:render="time" n:data="time_decrypt" /> <li>Decrypting: <span n:render="time" n:data="time_decrypt" />

View File

@ -449,10 +449,6 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
k, n = data.get_encoding() k, n = data.get_encoding()
return ctx.tag["Encoding: %s of %s" % (k, n)] return ctx.tag["Encoding: %s of %s" % (k, n)]
def render_search_distance(self, ctx, data):
d = data.get_search_distance()
return ctx.tag["Search Distance: %s peer%s" % (d, plural(d))]
def render_problems(self, ctx, data): def render_problems(self, ctx, data):
problems = data.problems problems = data.problems
if not problems: if not problems:
@ -553,19 +549,16 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
k, n = data.get_encoding() k, n = data.get_encoding()
return ctx.tag["Encoding: %s of %s" % (k, n)] return ctx.tag["Encoding: %s of %s" % (k, n)]
def render_peers_queried(self, ctx, data):
return ctx.tag["Peers Queried: ", data.peers_queried]
def render_sharemap(self, ctx, data): def render_sharemap(self, ctx, data):
sharemap = data.sharemap servermap = data.get_servermap()
if sharemap is None: if servermap is None:
return ctx.tag["None"] return ctx.tag["None"]
l = T.ul() l = T.ul()
sharemap = servermap.make_sharemap()
for shnum in sorted(sharemap.keys()): for shnum in sorted(sharemap.keys()):
l[T.li["%d -> Placed on " % shnum, l[T.li["%d -> Placed on " % shnum,
", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid) ", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
for (peerid,seqnum,root_hash) for peerid in sharemap[shnum]])]]
in sharemap[shnum]])]]
return ctx.tag["Sharemap:", l] return ctx.tag["Sharemap:", l]
def render_problems(self, ctx, data): def render_problems(self, ctx, data):
@ -596,21 +589,6 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
def data_time_setup(self, ctx, data): def data_time_setup(self, ctx, data):
return self.publish_status.timings.get("setup") return self.publish_status.timings.get("setup")
def data_time_query(self, ctx, data):
return self.publish_status.timings.get("query")
def data_time_privkey(self, ctx, data):
return self.publish_status.timings.get("privkey")
def data_time_privkey_fetch(self, ctx, data):
return self.publish_status.timings.get("privkey_fetch")
def render_privkey_from(self, ctx, data):
peerid = data.privkey_from
if peerid:
return " (got from [%s])" % idlib.shortnodeid_b2a(peerid)
else:
return ""
def data_time_encrypt(self, ctx, data): def data_time_encrypt(self, ctx, data):
return self.publish_status.timings.get("encrypt") return self.publish_status.timings.get("encrypt")
def data_rate_encrypt(self, ctx, data): def data_rate_encrypt(self, ctx, data):
@ -633,23 +611,15 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
def data_rate_push(self, ctx, data): def data_rate_push(self, ctx, data):
return self._get_rate(data, "push") return self._get_rate(data, "push")
def data_initial_read_size(self, ctx, data):
return self.publish_status.initial_read_size
def render_server_timings(self, ctx, data): def render_server_timings(self, ctx, data):
per_server = self.publish_status.timings.get("per_server") per_server = self.publish_status.timings.get("send_per_server")
if not per_server: if not per_server:
return "" return ""
l = T.ul() l = T.ul()
for peerid in sorted(per_server.keys()): for peerid in sorted(per_server.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid) peerid_s = idlib.shortnodeid_b2a(peerid)
times = [] times_s = ", ".join([self.render_time(None, t)
for op,t in per_server[peerid]: for t in per_server[peerid]])
if op == "read":
times.append( "(" + self.render_time(None, t) + ")" )
else:
times.append( self.render_time(None, t) )
times_s = ", ".join(times)
l[T.li["[%s]: %s" % (peerid_s, times_s)]] l[T.li["[%s]: %s" % (peerid_s, times_s)]]
return T.li["Per-Server Response Times: ", l] return T.li["Per-Server Response Times: ", l]