Rewrite immutable downloader (#798). This patch includes higher-level

integration into the NodeMaker, and updates the web-status display to handle
the new download events.
This commit is contained in:
Brian Warner 2010-08-04 00:27:02 -07:00
parent 797828f47f
commit 7b7b0c9709
6 changed files with 183 additions and 18 deletions

View File

@ -1,9 +1,10 @@
import os, stat, time
import os, stat, time, weakref
from allmydata.interfaces import RIStorageServer
from allmydata import node
from zope.interface import implements
from twisted.internet import reactor, defer
from twisted.application import service
from twisted.application.internet import TimerService
from foolscap.api import Referenceable
from pycryptopp.publickey import rsa
@ -12,11 +13,10 @@ import allmydata
from allmydata.storage.server import StorageServer
from allmydata import storage_client
from allmydata.immutable.upload import Uploader
from allmydata.immutable.download import Downloader
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, pollmixin, cachedir, log
from allmydata.util import hashutil, base32, pollmixin, log
from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
@ -95,6 +95,16 @@ class KeyGenerator:
verifier = signer.get_verifying_key()
return defer.succeed( (verifier, signer) )
class Terminator(service.Service):
def __init__(self):
self._clients = weakref.WeakKeyDictionary()
def register(self, c):
self._clients[c] = None
def stopService(self):
for c in self._clients:
c.stop()
return service.Service.stopService(self)
class Client(node.Node, pollmixin.PollMixin):
implements(IStatsProducer)
@ -279,12 +289,9 @@ class Client(node.Node, pollmixin.PollMixin):
self.init_client_storage_broker()
self.history = History(self.stats_provider)
self.terminator = Terminator()
self.terminator.setServiceParent(self)
self.add_service(Uploader(helper_furl, self.stats_provider))
download_cachedir = os.path.join(self.basedir,
"private", "cache", "download")
self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
self.download_cache_dirman.setServiceParent(self)
self.downloader = Downloader(self.storage_broker, self.stats_provider)
self.init_stub_client()
self.init_nodemaker()
@ -343,8 +350,7 @@ class Client(node.Node, pollmixin.PollMixin):
self._secret_holder,
self.get_history(),
self.getServiceNamed("uploader"),
self.downloader,
self.download_cache_dirman,
self.terminator,
self.get_encoding_parameters(),
self._key_generator)

View File

@ -24,6 +24,9 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications
LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
KiB = 1024
DEFAULT_MAX_SEGMENT_SIZE = 128*KiB
class RIStubClient(RemoteInterface):
"""Each client publishes a service announcement for a dummy object called
the StubClient. This object doesn't actually offer any services, but the

View File

@ -1,7 +1,8 @@
import weakref
from zope.interface import implements
from allmydata.interfaces import INodeMaker
from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
from allmydata.immutable.literal import LiteralFileNode
from allmydata.immutable.filenode import ImmutableFileNode, CiphertextFileNode
from allmydata.immutable.upload import Data
from allmydata.mutable.filenode import MutableFileNode
from allmydata.dirnode import DirectoryNode, pack_children
@ -12,14 +13,13 @@ class NodeMaker:
implements(INodeMaker)
def __init__(self, storage_broker, secret_holder, history,
uploader, downloader, download_cache_dirman,
uploader, terminator,
default_encoding_parameters, key_generator):
self.storage_broker = storage_broker
self.secret_holder = secret_holder
self.history = history
self.uploader = uploader
self.downloader = downloader
self.download_cache_dirman = download_cache_dirman
self.terminator = terminator
self.default_encoding_parameters = default_encoding_parameters
self.key_generator = key_generator
@ -29,8 +29,10 @@ class NodeMaker:
return LiteralFileNode(cap)
def _create_immutable(self, cap):
return ImmutableFileNode(cap, self.storage_broker, self.secret_holder,
self.downloader, self.history,
self.download_cache_dirman)
self.terminator, self.history)
def _create_immutable_verifier(self, cap):
return CiphertextFileNode(cap, self.storage_broker, self.secret_holder,
self.terminator, self.history)
def _create_mutable(self, cap):
n = MutableFileNode(self.storage_broker, self.secret_holder,
self.default_encoding_parameters,
@ -73,6 +75,8 @@ class NodeMaker:
return self._create_lit(cap)
if isinstance(cap, uri.CHKFileURI):
return self._create_immutable(cap)
if isinstance(cap, uri.CHKFileVerifierURI):
return self._create_immutable_verifier(cap)
if isinstance(cap, (uri.ReadonlySSKFileURI, uri.WriteableSSKFileURI)):
return self._create_mutable(cap)
if isinstance(cap, (uri.DirectoryURI,

View File

@ -18,6 +18,7 @@
<li>Status: <span n:render="status"/></li>
</ul>
<div n:render="events"></div>
<div n:render="results">
<h2>Download Results</h2>

View File

@ -358,6 +358,147 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
def download_results(self):
return defer.maybeDeferred(self.download_status.get_results)
def relative_time(self, t):
if t is None:
return t
if self.download_status.started is not None:
return t - self.download_status.started
return t
def short_relative_time(self, t):
t = self.relative_time(t)
if t is None:
return ""
return "+%.6fs" % t
def renderHTTP(self, ctx):
req = inevow.IRequest(ctx)
t = get_arg(req, "t")
if t == "json":
return self.json(req)
return rend.Page.renderHTTP(self, ctx)
def json(self, req):
req.setHeader("content-type", "text/plain")
data = {}
dyhb_events = []
for serverid,requests in self.download_status.dyhb_requests.iteritems():
for req in requests:
dyhb_events.append( (base32.b2a(serverid),) + req )
dyhb_events.sort(key=lambda req: req[1])
data["dyhb"] = dyhb_events
request_events = []
for serverid,requests in self.download_status.requests.iteritems():
for req in requests:
request_events.append( (base32.b2a(serverid),) + req )
request_events.sort(key=lambda req: (req[4],req[1]))
data["requests"] = request_events
data["segment"] = self.download_status.segment_events
data["read"] = self.download_status.read_events
return simplejson.dumps(data, indent=1) + "\n"
def render_events(self, ctx, data):
if not self.download_status.storage_index:
return
srt = self.short_relative_time
l = T.ul()
t = T.table(class_="status-download-events")
t[T.tr[T.td["serverid"], T.td["sent"], T.td["received"],
T.td["shnums"], T.td["RTT"]]]
dyhb_events = []
for serverid,requests in self.download_status.dyhb_requests.iteritems():
for req in requests:
dyhb_events.append( (serverid,) + req )
dyhb_events.sort(key=lambda req: req[1])
for d_ev in dyhb_events:
(serverid, sent, shnums, received) = d_ev
serverid_s = idlib.shortnodeid_b2a(serverid)
rtt = received - sent
t[T.tr(style="background: %s" % self.color(serverid))[
[T.td[serverid_s], T.td[srt(sent)], T.td[srt(received)],
T.td[",".join([str(shnum) for shnum in shnums])],
T.td[self.render_time(None, rtt)],
]]]
l["DYHB Requests:", t]
t = T.table(class_="status-download-events")
t[T.tr[T.td["range"], T.td["start"], T.td["finish"], T.td["got"],
T.td["time"], T.td["decrypttime"], T.td["pausedtime"],
T.td["speed"]]]
for r_ev in self.download_status.read_events:
(start, length, requesttime, finishtime, bytes, decrypt, paused) = r_ev
print r_ev
if finishtime is not None:
rtt = finishtime - requesttime - paused
speed = self.render_rate(None, 1.0 * bytes / rtt)
rtt = self.render_time(None, rtt)
decrypt = self.render_time(None, decrypt)
paused = self.render_time(None, paused)
else:
speed, rtt, decrypt, paused = "","","",""
t[T.tr[T.td["[%d:+%d]" % (start, length)],
T.td[srt(requesttime)], T.td[srt(finishtime)],
T.td[bytes], T.td[rtt], T.td[decrypt], T.td[paused],
T.td[speed],
]]
l["Read Events:", t]
t = T.table(class_="status-download-events")
t[T.tr[T.td["type"], T.td["segnum"], T.td["when"], T.td["range"],
T.td["decodetime"], T.td["segtime"], T.td["speed"]]]
reqtime = (None, None)
for s_ev in self.download_status.segment_events:
(etype, segnum, when, segstart, seglen, decodetime) = s_ev
if etype == "request":
t[T.tr[T.td["request"], T.td["seg%d" % segnum],
T.td[srt(when)]]]
reqtime = (segnum, when)
elif etype == "delivery":
if reqtime[0] == segnum:
segtime = when - reqtime[1]
speed = self.render_rate(None, 1.0 * seglen / segtime)
segtime = self.render_time(None, segtime)
else:
segtime, speed = "", ""
t[T.tr[T.td["delivery"], T.td["seg%d" % segnum],
T.td[srt(when)],
T.td["[%d:+%d]" % (segstart, seglen)],
T.td[self.render_time(None,decodetime)],
T.td[segtime], T.td[speed]]]
elif etype == "error":
t[T.tr[T.td["error"], T.td["seg%d" % segnum]]]
l["Segment Events:", t]
t = T.table(border="1")
t[T.tr[T.td["serverid"], T.td["shnum"], T.td["range"],
T.td["txtime"], T.td["rxtime"], T.td["received"], T.td["RTT"]]]
reqtime = (None, None)
request_events = []
for serverid,requests in self.download_status.requests.iteritems():
for req in requests:
request_events.append( (serverid,) + req )
request_events.sort(key=lambda req: (req[4],req[1]))
for r_ev in request_events:
(peerid, shnum, start, length, sent, receivedlen, received) = r_ev
rtt = None
if received is not None:
rtt = received - sent
peerid_s = idlib.shortnodeid_b2a(peerid)
t[T.tr(style="background: %s" % self.color(peerid))[
T.td[peerid_s], T.td[shnum],
T.td["[%d:+%d]" % (start, length)],
T.td[srt(sent)], T.td[srt(received)], T.td[receivedlen],
T.td[self.render_time(None, rtt)],
]]
l["Requests:", t]
return l
def color(self, peerid):
def m(c):
return min(ord(c) / 2 + 0x80, 0xff)
return "#%02x%02x%02x" % (m(peerid[0]), m(peerid[1]), m(peerid[2]))
def render_results(self, ctx, data):
d = self.download_results()
def _got_results(results):
@ -371,7 +512,7 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
started_s = time.strftime(TIME_FORMAT,
time.localtime(data.get_started()))
return started_s
return started_s + " (%s)" % data.get_started()
def render_si(self, ctx, data):
si_s = base32.b2a_or_none(data.get_storage_index())

View File

@ -135,4 +135,14 @@ table.tahoe-directory {
display: inline;
text-align: center;
padding: 0 1em;
}
}
/* recent upload/download status pages */
table.status-download-events {
border: 1px solid #aaa;
}
table.status-download-events td {
border: 1px solid #a00;
padding: 2px
}