mutable WIP: add servermap update status pages

This commit is contained in:
Brian Warner
2008-04-16 19:05:41 -07:00
parent 842b1f1a70
commit a1670497a8
9 changed files with 273 additions and 14 deletions

View File

@ -304,6 +304,8 @@ class Client(node.Node, testutil.PollMixin):
self.getServiceNamed("mutable-watcher").notify_publish(publish_status) self.getServiceNamed("mutable-watcher").notify_publish(publish_status)
def notify_retrieve(self, retrieve_status): def notify_retrieve(self, retrieve_status):
self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status) self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
def notify_mapupdate(self, update_status):
self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
def create_empty_dirnode(self): def create_empty_dirnode(self):
n = NewDirectoryNode(self) n = NewDirectoryNode(self)
@ -357,6 +359,16 @@ class Client(node.Node, testutil.PollMixin):
downloader = self.getServiceNamed("downloader") downloader = self.getServiceNamed("downloader")
return downloader.list_recent_downloads() return downloader.list_recent_downloads()
def list_all_mapupdate(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_all_mapupdate()
def list_active_mapupdate(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_active_mapupdate()
def list_recent_mapupdate(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_recent_mapupdate()
def list_all_publish(self): def list_all_publish(self):
watcher = self.getServiceNamed("mutable-watcher") watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_all_publish() return watcher.list_all_publish()

View File

@ -1559,6 +1559,8 @@ class IDownloadStatus(Interface):
that number. This provides a handle to this particular download, so a that number. This provides a handle to this particular download, so a
web page can generate a suitable hyperlink.""" web page can generate a suitable hyperlink."""
class IServermapUpdaterStatus(Interface):
pass
class IPublishStatus(Interface): class IPublishStatus(Interface):
pass pass
class IRetrieveStatus(Interface): class IRetrieveStatus(Interface):

View File

@ -215,8 +215,7 @@ class MutableFileNode:
def update_servermap(self, old_map=None, mode=MODE_READ): def update_servermap(self, old_map=None, mode=MODE_READ):
servermap = old_map or ServerMap() servermap = old_map or ServerMap()
d = self.obtain_lock() d = self.obtain_lock()
d.addCallback(lambda res: d.addCallback(lambda res: self._update_servermap(servermap, mode))
ServermapUpdater(self, servermap, mode).update())
d.addBoth(self.release_lock) d.addBoth(self.release_lock)
return d return d
@ -263,6 +262,10 @@ class MutableFileNode:
verifier = self.get_verifier() verifier = self.get_verifier()
return self._client.getServiceNamed("checker").check(verifier) return self._client.getServiceNamed("checker").check(verifier)
def _update_servermap(self, old_map, mode):
u = ServermapUpdater(self, old_map, mode)
self._client.notify_mapupdate(u.get_status())
return u.update()
def _retrieve(self, servermap, verinfo): def _retrieve(self, servermap, verinfo):
r = Retrieve(self, servermap, verinfo) r = Retrieve(self, servermap, verinfo)
@ -325,6 +328,7 @@ class MutableFileNode:
class MutableWatcher(service.MultiService): class MutableWatcher(service.MultiService):
MAX_MAPUPDATE_STATUSES = 20
MAX_PUBLISH_STATUSES = 20 MAX_PUBLISH_STATUSES = 20
MAX_RETRIEVE_STATUSES = 20 MAX_RETRIEVE_STATUSES = 20
name = "mutable-watcher" name = "mutable-watcher"
@ -332,11 +336,28 @@ class MutableWatcher(service.MultiService):
def __init__(self, stats_provider=None): def __init__(self, stats_provider=None):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self.stats_provider = stats_provider self.stats_provider = stats_provider
self._all_mapupdate_status = weakref.WeakKeyDictionary()
self._recent_mapupdate_status = []
self._all_publish_status = weakref.WeakKeyDictionary() self._all_publish_status = weakref.WeakKeyDictionary()
self._recent_publish_status = [] self._recent_publish_status = []
self._all_retrieve_status = weakref.WeakKeyDictionary() self._all_retrieve_status = weakref.WeakKeyDictionary()
self._recent_retrieve_status = [] self._recent_retrieve_status = []
def notify_mapupdate(self, p):
self._all_mapupdate_status[p] = None
self._recent_mapupdate_status.append(p)
while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
self._recent_mapupdate_status.pop(0)
def list_all_mapupdate(self):
return self._all_mapupdate_status.keys()
def list_active_mapupdate(self):
return [p for p in self._all_mapupdate_status.keys() if p.get_active()]
def list_recent_mapupdate(self):
return self._recent_mapupdate_status
def notify_publish(self, p): def notify_publish(self, p):
self._all_publish_status[p] = None self._all_publish_status[p] = None
self._recent_publish_status.append(p) self._recent_publish_status.append(p)

View File

@ -1,16 +1,78 @@
import sys, time import sys, time
from zope.interface import implements
from itertools import count
from twisted.internet import defer from twisted.internet import defer
from twisted.python import failure from twisted.python import failure
from foolscap.eventual import eventually from foolscap.eventual import eventually
from allmydata.util import base32, hashutil, idlib, log from allmydata.util import base32, hashutil, idlib, log
from allmydata import storage from allmydata import storage
from allmydata.interfaces import IServermapUpdaterStatus
from pycryptopp.publickey import rsa from pycryptopp.publickey import rsa
from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
DictOfSets, CorruptShareError, NeedMoreDataError DictOfSets, CorruptShareError, NeedMoreDataError
from layout import unpack_prefix_and_signature, unpack_header, unpack_share from layout import unpack_prefix_and_signature, unpack_header, unpack_share
class UpdateStatus:
implements(IServermapUpdaterStatus)
statusid_counter = count(0)
def __init__(self):
self.timings = {}
self.timings["per_server"] = {}
self.timings["cumulative_verify"] = 0.0
self.privkey_from = None
self.problems = {}
self.active = True
self.storage_index = None
self.mode = "?"
self.status = "Not started"
self.progress = 0.0
self.counter = self.statusid_counter.next()
self.started = time.time()
def add_per_server_time(self, peerid, op, elapsed):
assert op in ("query", "privkey")
if peerid not in self.timings["per_server"]:
self.timings["per_server"][peerid] = []
self.timings["per_server"][peerid].append((op,elapsed))
def get_started(self):
return self.started
def get_storage_index(self):
return self.storage_index
def get_mode(self):
return self.mode
def get_servermap(self):
return self.servermap
def get_privkey_from(self):
return self.privkey_from
def using_helper(self):
return False
def get_size(self):
return "-NA-"
def get_status(self):
return self.status
def get_progress(self):
return self.progress
def get_active(self):
return self.active
def get_counter(self):
return self.counter
def set_storage_index(self, si):
self.storage_index = si
def set_mode(self, mode):
self.mode = mode
def set_privkey_from(self, peerid):
self.privkey_from = peerid
def set_status(self, status):
self.status = status
def set_progress(self, value):
self.progress = value
def set_active(self, value):
self.active = value
class ServerMap: class ServerMap:
"""I record the placement of mutable shares. """I record the placement of mutable shares.
@ -216,6 +278,11 @@ class ServermapUpdater:
self._storage_index = filenode.get_storage_index() self._storage_index = filenode.get_storage_index()
self._last_failure = None self._last_failure = None
self._status = UpdateStatus()
self._status.set_storage_index(self._storage_index)
self._status.set_progress(0.0)
self._status.set_mode(mode)
# how much data should we read? # how much data should we read?
# * if we only need the checkstring, then [0:75] # * if we only need the checkstring, then [0:75]
# * if we need to validate the checkstring sig, then [543ish:799ish] # * if we need to validate the checkstring sig, then [543ish:799ish]
@ -240,6 +307,9 @@ class ServermapUpdater:
self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)", self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
si=prefix, mode=mode) si=prefix, mode=mode)
def get_status(self):
return self._status
def log(self, *args, **kwargs): def log(self, *args, **kwargs):
if "parent" not in kwargs: if "parent" not in kwargs:
kwargs["parent"] = self._log_number kwargs["parent"] = self._log_number
@ -248,6 +318,8 @@ class ServermapUpdater:
def update(self): def update(self):
"""Update the servermap to reflect current conditions. Returns a """Update the servermap to reflect current conditions. Returns a
Deferred that fires with the servermap once the update has finished.""" Deferred that fires with the servermap once the update has finished."""
self._started = time.time()
self._status.set_active(True)
# self._valid_versions is a set of validated verinfo tuples. We just # self._valid_versions is a set of validated verinfo tuples. We just
# use it to remember which versions had valid signatures, so we can # use it to remember which versions had valid signatures, so we can
@ -259,7 +331,6 @@ class ServermapUpdater:
# be retrievable, and to make the eventual data download faster. # be retrievable, and to make the eventual data download faster.
self.versionmap = DictOfSets() self.versionmap = DictOfSets()
self._started = time.time()
self._done_deferred = defer.Deferred() self._done_deferred = defer.Deferred()
# first, which peers should be talk to? Any that were in our old # first, which peers should be talk to? Any that were in our old
@ -322,6 +393,7 @@ class ServermapUpdater:
# enough responses) # enough responses)
self._send_initial_requests(initial_peers_to_query) self._send_initial_requests(initial_peers_to_query)
self._status.timings["setup"] = time.time() - self._started
return self._done_deferred return self._done_deferred
def _build_initial_querylist(self): def _build_initial_querylist(self):
@ -342,6 +414,7 @@ class ServermapUpdater:
return initial_peers_to_query, must_query return initial_peers_to_query, must_query
def _send_initial_requests(self, peerlist): def _send_initial_requests(self, peerlist):
self._status.set_status("Sending %d initial queries" % len(peerlist))
self._queries_outstanding = set() self._queries_outstanding = set()
self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..] self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
dl = [] dl = []
@ -382,6 +455,9 @@ class ServermapUpdater:
peerid=idlib.shortnodeid_b2a(peerid), peerid=idlib.shortnodeid_b2a(peerid),
numshares=len(datavs), numshares=len(datavs),
level=log.NOISY) level=log.NOISY)
now = time.time()
elapsed = now - started
self._status.add_per_server_time(peerid, "query", elapsed)
self._queries_outstanding.discard(peerid) self._queries_outstanding.discard(peerid)
self._must_query.discard(peerid) self._must_query.discard(peerid)
self._queries_completed += 1 self._queries_completed += 1
@ -412,6 +488,8 @@ class ServermapUpdater:
self._servermap.problems.append(f) self._servermap.problems.append(f)
pass pass
self._status.timings["cumulative_verify"] += (time.time() - now)
if self._need_privkey and last_verinfo: if self._need_privkey and last_verinfo:
# send them a request for the privkey. We send one request per # send them a request for the privkey. We send one request per
# server. # server.
@ -422,9 +500,11 @@ class ServermapUpdater:
self._queries_outstanding.add(peerid) self._queries_outstanding.add(peerid)
readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ] readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
ss = self._servermap.connections[peerid] ss = self._servermap.connections[peerid]
privkey_started = time.time()
d = self._do_read(ss, peerid, self._storage_index, d = self._do_read(ss, peerid, self._storage_index,
[last_shnum], readv) [last_shnum], readv)
d.addCallback(self._got_privkey_results, peerid, last_shnum) d.addCallback(self._got_privkey_results, peerid, last_shnum,
privkey_started)
d.addErrback(self._privkey_query_failed, peerid, last_shnum) d.addErrback(self._privkey_query_failed, peerid, last_shnum)
d.addErrback(log.err) d.addErrback(log.err)
d.addCallback(self._check_for_done) d.addCallback(self._check_for_done)
@ -540,6 +620,7 @@ class ServermapUpdater:
self._node._populate_encprivkey(enc_privkey) self._node._populate_encprivkey(enc_privkey)
self._node._populate_privkey(privkey) self._node._populate_privkey(privkey)
self._need_privkey = False self._need_privkey = False
self._status.set_privkey_from(peerid)
def _query_failed(self, f, peerid): def _query_failed(self, f, peerid):
@ -554,7 +635,10 @@ class ServermapUpdater:
self._queries_completed += 1 self._queries_completed += 1
self._last_failure = f self._last_failure = f
def _got_privkey_results(self, datavs, peerid, shnum): def _got_privkey_results(self, datavs, peerid, shnum, started):
now = time.time()
elapsed = now - started
self._status.add_per_server_time(peerid, "privkey", elapsed)
self._queries_outstanding.discard(peerid) self._queries_outstanding.discard(peerid)
if not self._need_privkey: if not self._need_privkey:
return return
@ -769,6 +853,12 @@ class ServermapUpdater:
if not self._running: if not self._running:
return return
self._running = False self._running = False
elapsed = time.time() - self._started
self._status.timings["total"] = elapsed
self._status.set_progress(1.0)
self._status.set_status("Done")
self._status.set_active(False)
self._servermap.last_update_mode = self.mode self._servermap.last_update_mode = self.mode
self._servermap.last_update_time = self._started self._servermap.last_update_time = self._started
# the servermap will not be touched after this # the servermap will not be touched after this
@ -779,3 +869,4 @@ class ServermapUpdater:
self.log("fatal error", failure=f, level=log.WEIRD) self.log("fatal error", failure=f, level=log.WEIRD)
self._done_deferred.errback(f) self._done_deferred.errback(f)

View File

@ -168,6 +168,8 @@ class FakeClient:
pass pass
def notify_publish(self, p): def notify_publish(self, p):
pass pass
def notify_mapupdate(self, u):
pass
def create_node_from_uri(self, u): def create_node_from_uri(self, u):
u = IURI(u) u = IURI(u)

View File

@ -1291,10 +1291,10 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
for ul in self.clients[0].list_recent_uploads(): for ul in self.clients[0].list_recent_uploads():
if ul.get_size() > 200: if ul.get_size() > 200:
self._up_status = ul.get_counter() self._up_status = ul.get_counter()
#rs = self.clients[0].list_recent_retrieve()[0] rs = self.clients[0].list_recent_retrieve()[0]
#self._retrieve_status = rs.get_counter() self._retrieve_status = rs.get_counter()
#ps = self.clients[0].list_recent_publish()[0] ps = self.clients[0].list_recent_publish()[0]
#self._publish_status = ps.get_counter() self._publish_status = ps.get_counter()
# and that there are some upload- and download- status pages # and that there are some upload- and download- status pages
return self.GET("status/up-%d" % self._up_status) return self.GET("status/up-%d" % self._up_status)
@ -1304,10 +1304,10 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
d.addCallback(_got_up) d.addCallback(_got_up)
def _got_down(res): def _got_down(res):
return self.GET("status/publish-%d" % self._publish_status) return self.GET("status/publish-%d" % self._publish_status)
#d.addCallback(_got_down) # TODO: disabled during #303 refactoring d.addCallback(_got_down)
def _got_publish(res): def _got_publish(res):
return self.GET("status/retrieve-%d" % self._retrieve_status) return self.GET("status/retrieve-%d" % self._retrieve_status)
#d.addCallback(_got_publish) d.addCallback(_got_publish)
# check that the helper status page exists # check that the helper status page exists
d.addCallback(lambda res: d.addCallback(lambda res:

View File

@ -85,6 +85,10 @@ class FakeClient(service.MultiService):
return [] return []
def list_active_retrieve(self): def list_active_retrieve(self):
return [] return []
def list_active_mapupdate(self):
return []
def list_recent_mapupdate(self):
return []
def list_recent_uploads(self): def list_recent_uploads(self):
return self._all_upload_status return self._all_upload_status

View File

@ -0,0 +1,38 @@
<html xmlns:n="http://nevow.com/ns/nevow/0.1">
<head>
<title>AllMyData - Tahoe - Mutable File Servermap Update Status</title>
<!-- <link href="http://www.allmydata.com/common/css/styles.css"
rel="stylesheet" type="text/css"/> -->
<link href="/webform_css" rel="stylesheet" type="text/css"/>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
</head>
<body>
<h1>Mutable File Servermap Update Status</h1>
<ul>
<li>Started: <span n:render="started"/></li>
<li>Storage Index: <span n:render="si"/></li>
<li>Helper?: <span n:render="helper"/></li>
<li>Progress: <span n:render="progress"/></li>
<li>Status: <span n:render="status"/></li>
</ul>
<h2>Update Results</h2>
<ul>
<li n:render="problems" />
<li>Timings:</li>
<ul>
<li>Total: <span n:render="time" n:data="time_total" /></li>
<ul>
<li>Setup: <span n:render="time" n:data="time_setup" /></li>
<li n:render="privkey_from" />
<li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li>
</ul>
<li n:render="server_timings" />
</ul>
</ul>
<div>Return to the <a href="/">Welcome Page</a></div>
</body></html>

View File

@ -7,7 +7,7 @@ from allmydata.util import base32, idlib
from allmydata.web.common import IClient, getxmlfile, abbreviate_time, \ from allmydata.web.common import IClient, getxmlfile, abbreviate_time, \
abbreviate_rate, abbreviate_size, get_arg abbreviate_rate, abbreviate_size, get_arg
from allmydata.interfaces import IUploadStatus, IDownloadStatus, \ from allmydata.interfaces import IUploadStatus, IDownloadStatus, \
IPublishStatus, IRetrieveStatus IPublishStatus, IRetrieveStatus, IServermapUpdaterStatus
def plural(sequence_or_length): def plural(sequence_or_length):
if isinstance(sequence_or_length, int): if isinstance(sequence_or_length, int):
@ -623,6 +623,81 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
l[T.li["[%s]: %s" % (peerid_s, times_s)]] l[T.li["[%s]: %s" % (peerid_s, times_s)]]
return T.li["Per-Server Response Times: ", l] return T.li["Per-Server Response Times: ", l]
class MapupdateStatusPage(rend.Page, RateAndTimeMixin):
docFactory = getxmlfile("map-update-status.xhtml")
def __init__(self, data):
rend.Page.__init__(self, data)
self.update_status = data
def render_started(self, ctx, data):
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
started_s = time.strftime(TIME_FORMAT,
time.localtime(data.get_started()))
return started_s
def render_si(self, ctx, data):
si_s = base32.b2a_or_none(data.get_storage_index())
if si_s is None:
si_s = "(None)"
return si_s
def render_helper(self, ctx, data):
return {True: "Yes",
False: "No"}[data.using_helper()]
def render_progress(self, ctx, data):
progress = data.get_progress()
# TODO: make an ascii-art bar
return "%.1f%%" % (100.0 * progress)
def render_status(self, ctx, data):
return data.get_status()
def render_problems(self, ctx, data):
problems = data.problems
if not problems:
return ""
l = T.ul()
for peerid in sorted(problems.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
l[T.li["[%s]: %s" % (peerid_s, problems[peerid])]]
return ctx.tag["Server Problems:", l]
def render_privkey_from(self, ctx, data):
peerid = data.get_privkey_from()
if peerid:
return ctx.tag["Got privkey from: [%s]"
% idlib.shortnodeid_b2a(peerid)]
else:
return ""
def data_time_total(self, ctx, data):
return self.update_status.timings.get("total")
def data_time_setup(self, ctx, data):
return self.update_status.timings.get("setup")
def data_time_cumulative_verify(self, ctx, data):
return self.update_status.timings.get("cumulative_verify")
def render_server_timings(self, ctx, data):
per_server = self.update_status.timings.get("per_server")
if not per_server:
return ""
l = T.ul()
for peerid in sorted(per_server.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
times = []
for op,t in per_server[peerid]:
if op == "query":
times.append( self.render_time(None, t) )
else:
times.append( "(" + self.render_time(None, t) + ")" )
times_s = ", ".join(times)
l[T.li["[%s]: %s" % (peerid_s, times_s)]]
return T.li["Per-Server Response Times: ", l]
class Status(rend.Page): class Status(rend.Page):
docFactory = getxmlfile("status.xhtml") docFactory = getxmlfile("status.xhtml")
@ -632,6 +707,7 @@ class Status(rend.Page):
client = IClient(ctx) client = IClient(ctx)
active = (client.list_active_uploads() + active = (client.list_active_uploads() +
client.list_active_downloads() + client.list_active_downloads() +
client.list_active_mapupdate() +
client.list_active_publish() + client.list_active_publish() +
client.list_active_retrieve() + client.list_active_retrieve() +
client.list_active_helper_statuses()) client.list_active_helper_statuses())
@ -641,6 +717,7 @@ class Status(rend.Page):
client = IClient(ctx) client = IClient(ctx)
recent = [o for o in (client.list_recent_uploads() + recent = [o for o in (client.list_recent_uploads() +
client.list_recent_downloads() + client.list_recent_downloads() +
client.list_recent_mapupdate() +
client.list_recent_publish() + client.list_recent_publish() +
client.list_recent_retrieve() + client.list_recent_retrieve() +
client.list_recent_helper_statuses()) client.list_recent_helper_statuses())
@ -688,11 +765,15 @@ class Status(rend.Page):
link = "publish-%d" % data.get_counter() link = "publish-%d" % data.get_counter()
ctx.fillSlots("type", "publish") ctx.fillSlots("type", "publish")
ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress)) ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
else: elif IRetrieveStatus.providedBy(data):
assert IRetrieveStatus.providedBy(data)
ctx.fillSlots("type", "retrieve") ctx.fillSlots("type", "retrieve")
link = "retrieve-%d" % data.get_counter() link = "retrieve-%d" % data.get_counter()
ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress)) ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
else:
assert IServermapUpdaterStatus.providedBy(data)
ctx.fillSlots("type", "mapupdate %s" % data.get_mode())
link = "mapupdate-%d" % data.get_counter()
ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
ctx.fillSlots("status", T.a(href=link)[s.get_status()]) ctx.fillSlots("status", T.a(href=link)[s.get_status()])
return ctx.tag return ctx.tag
@ -723,6 +804,14 @@ class Status(rend.Page):
s = d.get_download_status() s = d.get_download_status()
if s.get_counter() == count: if s.get_counter() == count:
return DownloadStatusPage(s) return DownloadStatusPage(s)
if stype == "mapupdate":
for s in client.list_recent_mapupdate():
if s.get_counter() == count:
return MapupdateStatusPage(s)
for p in client.list_all_mapupdate():
s = p.get_status()
if s.get_counter() == count:
return MapupdateStatusPage(s)
if stype == "publish": if stype == "publish":
for s in client.list_recent_publish(): for s in client.list_recent_publish():
if s.get_counter() == count: if s.get_counter() == count: