webish: add publish status

This commit is contained in:
Brian Warner 2008-03-05 18:41:10 -07:00
parent 2c81988ed3
commit dd4a951770
4 changed files with 249 additions and 10 deletions

View File

@ -248,6 +248,7 @@ class RetrieveStatus:
def __init__(self):
self.timings = {}
self.timings["fetch_per_server"] = {}
self.timings["cumulative_verify"] = 0.0
self.sharemap = {}
self.problems = {}
self.active = True
@ -534,11 +535,18 @@ class Retrieve:
if verinfo not in self._valid_versions:
# it's a new pair. Verify the signature.
started = time.time()
valid = self._pubkey.verify(prefix, signature)
# this records the total verification time for all versions we've
# seen. This time is included in "fetch".
elapsed = time.time() - started
self._status.timings["cumulative_verify"] += elapsed
if not valid:
self._status.problems[peerid] = "sh#%d: invalid signature" % shnum
raise CorruptShareError(peerid, shnum,
"signature is invalid")
# ok, it's a valid verinfo. Add it to the list of validated
# versions.
self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
@ -878,20 +886,34 @@ class PublishStatus:
statusid_counter = count(0)
def __init__(self):
self.timings = {}
self.sharemap = None
self.timings["per_server"] = {}
self.privkey_from = None
self.peers_queried = None
self.sharemap = None # DictOfSets
self.problems = {}
self.active = True
self.storage_index = None
self.helper = False
self.encoding = ("?", "?")
self.initial_read_size = None
self.size = None
self.status = "Not started"
self.progress = 0.0
self.counter = self.statusid_counter.next()
self.started = time.time()
def add_per_server_time(self, peerid, op, elapsed):
assert op in ("read", "write")
if peerid not in self.timings["per_server"]:
self.timings["per_server"][peerid] = []
self.timings["per_server"][peerid].append((op,elapsed))
def get_started(self):
return self.started
def get_storage_index(self):
return self.storage_index
def get_encoding(self):
return self.encoding
def using_helper(self):
return self.helper
def get_size(self):
@ -909,6 +931,8 @@ class PublishStatus:
self.storage_index = si
def set_helper(self, helper):
self.helper = helper
def set_encoding(self, k, n):
self.encoding = (k, n)
def set_size(self, size):
self.size = size
def set_status(self, status):
@ -932,6 +956,7 @@ class Publish:
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
self._started = time.time()
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
@ -962,7 +987,6 @@ class Publish:
# 5: when enough responses are back, we're done
self.log("starting publish, datalen is %s" % len(newdata))
self._started = time.time()
self._status.set_size(len(newdata))
self._writekey = self._node.get_writekey()
@ -978,6 +1002,7 @@ class Publish:
required_shares = self._node.get_required_shares()
total_shares = self._node.get_total_shares()
self._pubkey = self._node.get_pubkey()
self._status.set_encoding(required_shares, total_shares)
# these two may not be, we might have to get them from the first peer
self._privkey = self._node.get_privkey()
@ -995,10 +1020,13 @@ class Publish:
# with up to 7 entries, allowing us to make an update in 2 RTT
# instead of 3.
self._read_size = 1000
self._status.initial_read_size = self._read_size
d = defer.succeed(total_shares)
d.addCallback(self._query_peers)
d.addCallback(self._query_peers_done)
d.addCallback(self._obtain_privkey)
d.addCallback(self._obtain_privkey_done)
d.addCallback(self._encrypt_and_encode, newdata, readkey, IV,
required_shares, total_shares)
@ -1011,6 +1039,9 @@ class Publish:
def _query_peers(self, total_shares):
self.log("_query_peers")
self._query_peers_started = now = time.time()
elapsed = now - self._started
self._status.timings["setup"] = elapsed
storage_index = self._storage_index
@ -1040,16 +1071,18 @@ class Publish:
EPSILON = total_shares / 2
#partial_peerlist = islice(peerlist, total_shares + EPSILON)
partial_peerlist = peerlist[:total_shares+EPSILON]
self._status.peers_queried = len(partial_peerlist)
self._storage_servers = {}
started = time.time()
dl = []
for permutedid, (peerid, ss) in enumerate(partial_peerlist):
self._storage_servers[peerid] = ss
d = self._do_query(ss, peerid, storage_index)
d.addCallback(self._got_query_results,
peerid, permutedid,
reachable_peers, current_share_peers)
reachable_peers, current_share_peers, started)
dl.append(d)
d = defer.DeferredList(dl)
d.addCallback(self._got_all_query_results,
@ -1072,10 +1105,13 @@ class Publish:
return d
def _got_query_results(self, datavs, peerid, permutedid,
reachable_peers, current_share_peers):
reachable_peers, current_share_peers, started):
lp = self.log(format="_got_query_results from %(peerid)s",
peerid=idlib.shortnodeid_b2a(peerid))
elapsed = time.time() - started
self._status.add_per_server_time(peerid, "read", elapsed)
assert isinstance(datavs, dict)
reachable_peers[peerid] = permutedid
if not datavs:
@ -1164,6 +1200,7 @@ class Publish:
total_shares, reachable_peers,
current_share_peers):
self.log("_got_all_query_results")
# now that we know everything about the shares currently out there,
# decide where to place the new shares.
@ -1230,16 +1267,25 @@ class Publish:
target_info = (target_map, shares_per_peer)
return target_info
def _query_peers_done(self, target_info):
self._obtain_privkey_started = now = time.time()
elapsed = time.time() - self._query_peers_started
self._status.timings["query"] = elapsed
return target_info
def _obtain_privkey(self, target_info):
# make sure we've got a copy of our private key.
if self._privkey:
# Must have picked it up during _query_peers. We're good to go.
if "privkey_fetch" not in self._status.timings:
self._status.timings["privkey_fetch"] = 0.0
return target_info
# Nope, we haven't managed to grab a copy, and we still need it. Ask
# peers one at a time until we get a copy. Only bother asking peers
# who've admitted to holding a share.
self._privkey_fetch_started = time.time()
target_map, shares_per_peer = target_info
# pull shares from self._encprivkey_shares
if not self._encprivkey_shares:
@ -1255,25 +1301,44 @@ class Publish:
return d
def _do_privkey_query(self, rref, peerid, shnum, offset, length):
started = time.time()
d = rref.callRemote("slot_readv", self._storage_index,
[shnum], [(offset, length)] )
d.addCallback(self._privkey_query_response, peerid, shnum)
d.addCallback(self._privkey_query_response, peerid, shnum, started)
return d
def _privkey_query_response(self, datav, peerid, shnum):
def _privkey_query_response(self, datav, peerid, shnum, started):
elapsed = time.time() - started
self._status.add_per_server_time(peerid, "read", elapsed)
data = datav[shnum][0]
self._try_to_validate_privkey(data, peerid, shnum)
elapsed = time.time() - self._privkey_fetch_started
self._status.timings["privkey_fetch"] = elapsed
self._status.privkey_from = peerid
def _obtain_privkey_done(self, target_info):
elapsed = time.time() - self._obtain_privkey_started
self._status.timings["privkey"] = elapsed
return target_info
def _encrypt_and_encode(self, target_info,
newdata, readkey, IV,
required_shares, total_shares):
self.log("_encrypt_and_encode")
started = time.time()
key = hashutil.ssk_readkey_data_hash(IV, readkey)
enc = AES(key)
crypttext = enc.process(newdata)
assert len(crypttext) == len(newdata)
now = time.time()
self._status.timings["encrypt"] = now - started
started = now
# now apply FEC
self.MAX_SEGMENT_SIZE = 1024*1024
data_length = len(crypttext)
@ -1298,6 +1363,11 @@ class Publish:
assert len(piece) == piece_size
d = fec.encode(crypttext_pieces)
def _done_encoding(res):
elapsed = time.time() - started
self._status.timings["encode"] = elapsed
return res
d.addCallback(_done_encoding)
d.addCallback(lambda shares_and_shareids:
(shares_and_shareids,
required_shares, total_shares,
@ -1311,6 +1381,7 @@ class Publish:
target_info),
seqnum, IV):
self.log("_generate_shares")
started = time.time()
# we should know these by now
privkey = self._privkey
@ -1356,7 +1427,9 @@ class Publish:
# then they all share the same encprivkey at the end. The sizes
# of everything are the same for all shares.
sign_started = time.time()
signature = privkey.sign(prefix)
self._status.timings["sign"] = time.time() - sign_started
verification_key = pubkey.serialize()
@ -1370,11 +1443,15 @@ class Publish:
all_shares[shnum],
encprivkey)
final_shares[shnum] = final_share
elapsed = time.time() - started
self._status.timings["pack"] = elapsed
return (seqnum, root_hash, final_shares, target_info)
def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV):
self.log("_send_shares")
started = time.time()
# we're finally ready to send out our shares. If we encounter any
# surprises here, it's because somebody else is writing at the same
# time. (Note: in the future, when we remove the _query_peers() step
@ -1417,10 +1494,17 @@ class Publish:
d = self._do_testreadwrite(peerid, secrets,
tw_vectors, read_vector)
d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
peerid, expected_old_shares[peerid], dispatch_map)
peerid, expected_old_shares[peerid], dispatch_map,
started)
dl.append(d)
d = defer.DeferredList(dl)
def _done_sending(res):
elapsed = time.time() - started
self._status.timings["push"] = elapsed
self._status.sharemap = dispatch_map
return res
d.addCallback(_done_sending)
d.addCallback(lambda res: (self._surprised, dispatch_map))
return d
@ -1438,9 +1522,12 @@ class Publish:
def _got_write_answer(self, answer, tw_vectors, my_checkstring,
peerid, expected_old_shares,
dispatch_map):
dispatch_map, started):
lp = self.log("_got_write_answer from %s" %
idlib.shortnodeid_b2a(peerid))
elapsed = time.time() - started
self._status.add_per_server_time(peerid, "write", elapsed)
wrote, read_data = answer
surprised = False

View File

@ -19,4 +19,41 @@
<li>Status: <span n:render="status"/></li>
</ul>
<h2>Retrieve Results</h2>
<ul>
<li n:render="encoding" />
<li n:render="peers_queried" />
<li n:render="problems" />
<li n:render="sharemap" />
<li>Timings:</li>
<ul>
<li>Total: <span n:render="time" n:data="time_total" />
(<span n:render="rate" n:data="rate_total" />)</li>
<ul>
<li>Setup: <span n:render="time" n:data="time_setup" /></li>
<li>Initial Version Query: <span n:render="time" n:data="time_query" />
(read size <span n:render="string" n:data="initial_read_size"/> bytes)</li>
<li>Obtain Privkey: <span n:render="time" n:data="time_privkey" />
<ul>
<li>Separate Privkey Fetch: <span n:render="time" n:data="time_privkey_fetch" /> <span n:render="privkey_from"/></li>
</ul></li>
<li>Encrypting: <span n:render="time" n:data="time_encrypt" />
(<span n:render="rate" n:data="rate_encrypt" />)</li>
<li>Encoding: <span n:render="time" n:data="time_encode" />
(<span n:render="rate" n:data="rate_encode" />)</li>
<li>Packing Shares: <span n:render="time" n:data="time_pack" />
(<span n:render="rate" n:data="rate_pack" />)
<ul>
<li>RSA Signature: <span n:render="time" n:data="time_sign" /></li>
</ul></li>
<li>Pushing: <span n:render="time" n:data="time_push" />
(<span n:render="rate" n:data="rate_push" />)</li>
</ul>
<li n:render="server_timings" />
</ul>
</ul>
<div>Return to the <a href="/">Welcome Page</a></div>
</body></html>

View File

@ -31,7 +31,10 @@
<ul>
<li>Initial Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>Fetching: <span n:render="time" n:data="time_fetch" />
(<span n:render="rate" n:data="rate_fetch" />)</li>
(<span n:render="rate" n:data="rate_fetch" />)
<ul>
<li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li>
</ul></li>
<li>Decoding: <span n:render="time" n:data="time_decode" />
(<span n:render="rate" n:data="rate_decode" />)</li>
<li>Decrypting: <span n:render="time" n:data="time_decrypt" />

View File

@ -492,6 +492,9 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
def data_rate_fetch(self, ctx, data):
return self._get_rate(data, "fetch")
def data_time_cumulative_verify(self, ctx, data):
return self.retrieve_status.timings.get("cumulative_verify")
def data_time_decode(self, ctx, data):
return self.retrieve_status.timings.get("decode")
def data_rate_decode(self, ctx, data):
@ -515,9 +518,13 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
return T.li["Per-Server Fetch Response Times: ", l]
class PublishStatusPage(rend.Page):
class PublishStatusPage(rend.Page, RateAndTimeMixin):
docFactory = getxmlfile("publish-status.xhtml")
def __init__(self, data):
rend.Page.__init__(self, data)
self.publish_status = data
def render_started(self, ctx, data):
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
started_s = time.strftime(TIME_FORMAT,
@ -548,6 +555,111 @@ class PublishStatusPage(rend.Page):
def render_status(self, ctx, data):
return data.get_status()
def render_encoding(self, ctx, data):
k, n = data.get_encoding()
return ctx.tag["Encoding: %s of %s" % (k, n)]
def render_peers_queried(self, ctx, data):
return ctx.tag["Peers Queried: ", data.peers_queried]
def render_sharemap(self, ctx, data):
sharemap = data.sharemap
if sharemap is None:
return ctx.tag["None"]
l = T.ul()
for shnum in sorted(sharemap.keys()):
l[T.li["%d -> Placed on " % shnum,
", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
for (peerid,seqnum,root_hash)
in sharemap[shnum]])]]
return ctx.tag["Sharemap:", l]
def render_problems(self, ctx, data):
problems = data.problems
if not problems:
return ""
l = T.ul()
for peerid in sorted(problems.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]]
return ctx.tag["Server Problems:", l]
def _get_rate(self, data, name):
file_size = self.publish_status.get_size()
time = self.publish_status.timings.get(name)
if time is None:
return None
try:
return 1.0 * file_size / time
except ZeroDivisionError:
return None
def data_time_total(self, ctx, data):
return self.publish_status.timings.get("total")
def data_rate_total(self, ctx, data):
return self._get_rate(data, "total")
def data_time_setup(self, ctx, data):
return self.publish_status.timings.get("setup")
def data_time_query(self, ctx, data):
return self.publish_status.timings.get("query")
def data_time_privkey(self, ctx, data):
return self.publish_status.timings.get("privkey")
def data_time_privkey_fetch(self, ctx, data):
return self.publish_status.timings.get("privkey_fetch")
def render_privkey_from(self, ctx, data):
peerid = data.privkey_from
if peerid:
return " (got from [%s])" % idlib.shortnodeid_b2a(peerid)
else:
return ""
def data_time_encrypt(self, ctx, data):
return self.publish_status.timings.get("encrypt")
def data_rate_encrypt(self, ctx, data):
return self._get_rate(data, "encrypt")
def data_time_encode(self, ctx, data):
return self.publish_status.timings.get("encode")
def data_rate_encode(self, ctx, data):
return self._get_rate(data, "encode")
def data_time_pack(self, ctx, data):
return self.publish_status.timings.get("pack")
def data_rate_pack(self, ctx, data):
return self._get_rate(data, "pack")
def data_time_sign(self, ctx, data):
return self.publish_status.timings.get("sign")
def data_time_push(self, ctx, data):
return self.publish_status.timings.get("push")
def data_rate_push(self, ctx, data):
return self._get_rate(data, "push")
def data_initial_read_size(self, ctx, data):
return self.publish_status.initial_read_size
def render_server_timings(self, ctx, data):
per_server = self.publish_status.timings.get("per_server")
if not per_server:
return ""
l = T.ul()
for peerid in sorted(per_server.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
times = []
for op,t in per_server[peerid]:
if op == "read":
times.append( "(" + self.render_time(None, t) + ")" )
else:
times.append( self.render_time(None, t) )
times_s = ", ".join(times)
l[T.li["[%s]: %s" % (peerid_s, times_s)]]
return T.li["Per-Server Response Times: ", l]
class Status(rend.Page):
docFactory = getxmlfile("status.xhtml")
addSlash = True