mutable WIP: clean up status handling, shrink the code a lot, improve test coverage

This commit is contained in:
Brian Warner 2008-04-17 13:02:22 -07:00
parent a1670497a8
commit e1838ba217
9 changed files with 143 additions and 208 deletions

View File

@ -329,7 +329,8 @@ class Client(node.Node, testutil.PollMixin):
d.addCallback(make_key_objs)
return d
else:
# RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
# RSA key generation for a 2048 bit key takes between 0.8 and 3.2
# secs
signer = rsa.generate(key_size)
verifier = signer.get_verifying_key()
return verifier, signer
@ -339,66 +340,28 @@ class Client(node.Node, testutil.PollMixin):
return uploader.upload(uploadable)
def list_all_uploads(self):
def list_all_upload_statuses(self):
uploader = self.getServiceNamed("uploader")
return uploader.list_all_uploads()
def list_active_uploads(self):
uploader = self.getServiceNamed("uploader")
return uploader.list_active_uploads()
def list_recent_uploads(self):
uploader = self.getServiceNamed("uploader")
return uploader.list_recent_uploads()
return uploader.list_all_upload_statuses()
def list_all_downloads(self):
def list_all_download_statuses(self):
downloader = self.getServiceNamed("downloader")
return downloader.list_all_downloads()
def list_active_downloads(self):
downloader = self.getServiceNamed("downloader")
return downloader.list_active_downloads()
def list_recent_downloads(self):
downloader = self.getServiceNamed("downloader")
return downloader.list_recent_downloads()
return downloader.list_all_download_statuses()
def list_all_mapupdate(self):
def list_all_mapupdate_statuses(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_all_mapupdate()
def list_active_mapupdate(self):
return watcher.list_all_mapupdate_statuses()
def list_all_publish_statuses(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_active_mapupdate()
def list_recent_mapupdate(self):
return watcher.list_all_publish_statuses()
def list_all_retrieve_statuses(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_recent_mapupdate()
return watcher.list_all_retrieve_statuses()
def list_all_publish(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_all_publish()
def list_active_publish(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_active_publish()
def list_recent_publish(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_recent_publish()
def list_all_retrieve(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_all_retrieve()
def list_active_retrieve(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_active_retrieve()
def list_recent_retrieve(self):
watcher = self.getServiceNamed("mutable-watcher")
return watcher.list_recent_retrieve()
def list_active_helper_statuses(self):
def list_all_helper_statuses(self):
try:
helper = self.getServiceNamed("helper")
except KeyError:
return []
return helper.get_active_upload_statuses()
def list_recent_helper_statuses(self):
try:
helper = self.getServiceNamed("helper")
except KeyError:
return []
return helper.get_recent_upload_statuses()
return helper.get_all_upload_statuses()

View File

@ -1045,9 +1045,10 @@ class Downloader(service.MultiService):
def __init__(self, stats_provider=None):
service.MultiService.__init__(self)
self._all_downloads = weakref.WeakKeyDictionary()
self.stats_provider = stats_provider
self._recent_download_status = []
self._all_downloads = weakref.WeakKeyDictionary() # for debugging
self._all_download_statuses = weakref.WeakKeyDictionary()
self._recent_download_statuses = []
def download(self, u, t):
assert self.parent
@ -1067,10 +1068,7 @@ class Downloader(service.MultiService):
dl = FileDownloader(self.parent, u, t)
else:
raise RuntimeError("I don't know how to download a %s" % u)
self._all_downloads[dl] = None
self._recent_download_status.append(dl.get_download_status())
while len(self._recent_download_status) > self.MAX_DOWNLOAD_STATUSES:
self._recent_download_status.pop(0)
self._add_download(dl)
d = dl.start()
return d
@ -1082,11 +1080,14 @@ class Downloader(service.MultiService):
def download_to_filehandle(self, uri, filehandle):
return self.download(uri, FileHandle(filehandle))
def _add_download(self, downloader):
self._all_downloads[downloader] = None
s = downloader.get_download_status()
self._all_download_statuses[s] = None
self._recent_download_statuses.append(s)
while len(self._recent_download_statuses) > self.MAX_DOWNLOAD_STATUSES:
self._recent_download_statuses.pop(0)
def list_all_downloads(self):
return self._all_downloads.keys()
def list_active_downloads(self):
return [d.get_download_status() for d in self._all_downloads.keys()
if d.get_download_status().get_active()]
def list_recent_downloads(self):
return self._recent_download_status
def list_all_download_statuses(self):
for ds in self._all_download_statuses:
yield ds

View File

@ -350,14 +350,6 @@ class MutableWatcher(service.MultiService):
while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
self._recent_mapupdate_status.pop(0)
def list_all_mapupdate(self):
return self._all_mapupdate_status.keys()
def list_active_mapupdate(self):
return [p for p in self._all_mapupdate_status.keys() if p.get_active()]
def list_recent_mapupdate(self):
return self._recent_mapupdate_status
def notify_publish(self, p):
self._all_publish_status[p] = None
self._recent_publish_status.append(p)
@ -371,14 +363,6 @@ class MutableWatcher(service.MultiService):
while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
self._recent_publish_status.pop(0)
def list_all_publish(self):
return self._all_publish_status.keys()
def list_active_publish(self):
return [p for p in self._all_publish_status.keys() if p.get_active()]
def list_recent_publish(self):
return self._recent_publish_status
def notify_retrieve(self, r):
self._all_retrieve_status[r] = None
self._recent_retrieve_status.append(r)
@ -388,9 +372,10 @@ class MutableWatcher(service.MultiService):
while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
self._recent_retrieve_status.pop(0)
def list_all_retrieve(self):
def list_all_mapupdate_statuses(self):
return self._all_mapupdate_status.keys()
def list_all_publish_statuses(self):
return self._all_publish_status.keys()
def list_all_retrieve_statuses(self):
return self._all_retrieve_status.keys()
def list_active_retrieve(self):
return [p for p in self._all_retrieve_status.keys() if p.get_active()]
def list_recent_retrieve(self):
return self._recent_retrieve_status

View File

@ -109,7 +109,6 @@ class Publish:
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
self._status.set_servermap(servermap)
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
@ -160,6 +159,7 @@ class Publish:
# initial publish
self._new_seqnum = 1
self._servermap = ServerMap()
self._status.set_servermap(self._servermap)
self.log(format="new seqnum will be %(seqnum)d",
seqnum=self._new_seqnum, level=log.NOISY)

View File

@ -1,5 +1,5 @@
import os, stat, time
import os, stat, time, weakref
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
@ -490,6 +490,8 @@ class Helper(Referenceable, service.MultiService):
fileutil.make_dirs(self._chk_incoming)
fileutil.make_dirs(self._chk_encoding)
self._active_uploads = {}
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
self._all_upload_statuses = weakref.WeakKeyDictionary()
self._recent_upload_statuses = []
self.stats_provider = stats_provider
if stats_provider:
@ -588,6 +590,7 @@ class Helper(Referenceable, service.MultiService):
incoming_file, encoding_file,
r, lp)
self._active_uploads[storage_index] = uh
self._add_upload(uh)
return uh.start()
d.addCallback(_checked)
def _err(f):
@ -622,18 +625,21 @@ class Helper(Referenceable, service.MultiService):
d.addCallback(_checked)
return d
def _add_upload(self, uh):
self._all_uploads[uh] = None
s = uh.get_upload_status()
self._all_upload_statuses[s] = None
self._recent_upload_statuses.append(s)
while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
self._recent_upload_statuses.pop(0)
def upload_finished(self, storage_index, size):
# this is called with size=0 if the upload failed
self.count("chk_upload_helper.encoded_bytes", size)
uh = self._active_uploads[storage_index]
del self._active_uploads[storage_index]
s = uh.get_upload_status()
s.set_active(False)
self._recent_upload_statuses.append(s)
while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
self._recent_upload_statuses.pop(0)
def get_active_upload_statuses(self):
return [u.get_upload_status() for u in self._active_uploads.values()]
def get_recent_upload_statuses(self):
return self._recent_upload_statuses
def get_all_upload_statuses(self):
return self._all_upload_statuses

View File

@ -1285,16 +1285,18 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
def _got_status(res):
# find an interesting upload and download to look at. LIT files
# are not interesting.
for dl in self.clients[0].list_recent_downloads():
if dl.get_size() > 200:
self._down_status = dl.get_counter()
for ul in self.clients[0].list_recent_uploads():
if ul.get_size() > 200:
self._up_status = ul.get_counter()
rs = self.clients[0].list_recent_retrieve()[0]
for ds in self.clients[0].list_all_download_statuses():
if ds.get_size() > 200:
self._down_status = ds.get_counter()
for us in self.clients[0].list_all_upload_statuses():
if us.get_size() > 200:
self._up_status = us.get_counter()
rs = self.clients[0].list_all_retrieve_statuses()[0]
self._retrieve_status = rs.get_counter()
ps = self.clients[0].list_recent_publish()[0]
ps = self.clients[0].list_all_publish_statuses()[0]
self._publish_status = ps.get_counter()
us = self.clients[0].list_all_mapupdate_statuses()[0]
self._update_status = us.get_counter()
# and that there are some upload- and download- status pages
return self.GET("status/up-%d" % self._up_status)
@ -1303,8 +1305,11 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
return self.GET("status/down-%d" % self._down_status)
d.addCallback(_got_up)
def _got_down(res):
return self.GET("status/publish-%d" % self._publish_status)
return self.GET("status/mapupdate-%d" % self._update_status)
d.addCallback(_got_down)
def _got_update(res):
return self.GET("status/publish-%d" % self._publish_status)
d.addCallback(_got_update)
def _got_publish(res):
return self.GET("status/retrieve-%d" % self._retrieve_status)
d.addCallback(_got_publish)

View File

@ -8,8 +8,11 @@ from twisted.python import failure, log
from allmydata import interfaces, provisioning, uri, webish, upload, download
from allmydata.web import status
from allmydata.util import fileutil
from allmydata.test.common import FakeDirectoryNode, FakeCHKFileNode, FakeMutableFileNode, create_chk_filenode
from allmydata.interfaces import IURI, INewDirectoryURI, IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode
from allmydata.test.common import FakeDirectoryNode, FakeCHKFileNode, \
FakeMutableFileNode, create_chk_filenode
from allmydata.interfaces import IURI, INewDirectoryURI, \
IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode
from allmydata.mutable import servermap, publish, retrieve
# create a fake uploader/downloader, and a couple of fake dirnodes, then
# create a webserver that works against them
@ -35,6 +38,9 @@ class FakeClient(service.MultiService):
introducer_client = FakeIntroducerClient()
_all_upload_status = [upload.UploadStatus()]
_all_download_status = [download.DownloadStatus()]
_all_mapupdate_statuses = [servermap.UpdateStatus()]
_all_publish_statuses = [publish.PublishStatus()]
_all_retrieve_statuses = [retrieve.RetrieveStatus()]
convergence = "some random string"
def connected_to_introducer(self):
@ -72,35 +78,17 @@ class FakeClient(service.MultiService):
d.addCallback(_got_data)
return d
def list_all_uploads(self):
return []
def list_all_downloads(self):
return []
def list_active_uploads(self):
def list_all_upload_statuses(self):
return self._all_upload_status
def list_active_downloads(self):
def list_all_download_statuses(self):
return self._all_download_status
def list_active_publish(self):
return []
def list_active_retrieve(self):
return []
def list_active_mapupdate(self):
return []
def list_recent_mapupdate(self):
return []
def list_recent_uploads(self):
return self._all_upload_status
def list_recent_downloads(self):
return self._all_download_status
def list_recent_publish(self):
return []
def list_recent_retrieve(self):
return []
def list_active_helper_statuses(self):
return []
def list_recent_helper_statuses(self):
def list_all_mapupdate_statuses(self):
return self._all_mapupdate_statuses
def list_all_publish_statuses(self):
return self._all_publish_statuses
def list_all_retrieve_statuses(self):
return self._all_retrieve_statuses
def list_all_helper_statuses(self):
return []
class WebMixin(object):
@ -411,13 +399,19 @@ class Web(WebMixin, unittest.TestCase):
return d
def test_status(self):
dl_num = self.s.list_recent_downloads()[0].get_counter()
ul_num = self.s.list_recent_uploads()[0].get_counter()
dl_num = self.s.list_all_download_statuses()[0].get_counter()
ul_num = self.s.list_all_upload_statuses()[0].get_counter()
mu_num = self.s.list_all_mapupdate_statuses()[0].get_counter()
pub_num = self.s.list_all_publish_statuses()[0].get_counter()
ret_num = self.s.list_all_retrieve_statuses()[0].get_counter()
d = self.GET("/status", followRedirect=True)
def _check(res):
self.failUnless('Upload and Download Status' in res, res)
self.failUnless('"down-%d"' % dl_num in res, res)
self.failUnless('"up-%d"' % ul_num in res, res)
self.failUnless('"mapupdate-%d"' % mu_num in res, res)
self.failUnless('"publish-%d"' % pub_num in res, res)
self.failUnless('"retrieve-%d"' % ret_num in res, res)
d.addCallback(_check)
d.addCallback(lambda res: self.GET("/status/down-%d" % dl_num))
def _check_dl(res):
@ -427,6 +421,19 @@ class Web(WebMixin, unittest.TestCase):
def _check_ul(res):
self.failUnless("File Upload Status" in res, res)
d.addCallback(_check_ul)
d.addCallback(lambda res: self.GET("/status/mapupdate-%d" % mu_num))
def _check_mapupdate(res):
self.failUnless("Mutable File Servermap Update Status" in res, res)
d.addCallback(_check_mapupdate)
d.addCallback(lambda res: self.GET("/status/publish-%d" % pub_num))
def _check_publish(res):
self.failUnless("Mutable File Publish Status" in res, res)
d.addCallback(_check_publish)
d.addCallback(lambda res: self.GET("/status/retrieve-%d" % ret_num))
def _check_retrieve(res):
self.failUnless("Mutable File Retrieve Status" in res, res)
d.addCallback(_check_retrieve)
return d
def test_status_numbers(self):

View File

@ -1200,8 +1200,9 @@ class Uploader(service.MultiService):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary()
self._recent_upload_status = []
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
self._all_upload_statuses = weakref.WeakKeyDictionary()
self._recent_upload_statuses = []
service.MultiService.__init__(self)
def startService(self):
@ -1243,10 +1244,7 @@ class Uploader(service.MultiService):
uploader = AssistedUploader(self._helper)
else:
uploader = self.uploader_class(self.parent)
self._all_uploads[uploader] = None
self._recent_upload_status.append(uploader.get_upload_status())
while len(self._recent_upload_status) > self.MAX_UPLOAD_STATUSES:
self._recent_upload_status.pop(0)
self._add_upload(uploader)
return uploader.start(uploadable)
d.addCallback(_got_size)
def _done(res):
@ -1255,10 +1253,14 @@ class Uploader(service.MultiService):
d.addBoth(_done)
return d
def list_all_uploads(self):
return self._all_uploads.keys()
def list_active_uploads(self):
return [u.get_upload_status() for u in self._all_uploads.keys()
if u.get_upload_status().get_active()]
def list_recent_uploads(self):
return self._recent_upload_status
def _add_upload(self, uploader):
s = uploader.get_upload_status()
self._all_uploads[uploader] = None
self._all_upload_statuses[s] = None
self._recent_upload_statuses.append(s)
while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
self._recent_upload_statuses.pop(0)
def list_all_upload_statuses(self):
for us in self._all_upload_statuses:
yield us

View File

@ -1,5 +1,5 @@
import time, pprint
import time, pprint, itertools
import simplejson
from twisted.internet import defer
from nevow import rend, inevow, tags as T
@ -143,13 +143,9 @@ class UploadResultsRendererMixin(RateAndTimeMixin):
d = self.upload_results()
def _convert(r):
file_size = r.file_size
if file_size is None:
return None
time1 = r.timings.get("cumulative_encoding")
if time1 is None:
return None
time2 = r.timings.get("cumulative_sending")
if time2 is None:
if (file_size is None or time1 is None or time2 is None):
return None
try:
return 1.0 * file_size / (time1+time2)
@ -162,10 +158,8 @@ class UploadResultsRendererMixin(RateAndTimeMixin):
d = self.upload_results()
def _convert(r):
fetch_size = r.ciphertext_fetched
if fetch_size is None:
return None
time = r.timings.get("cumulative_fetch")
if time is None:
if (fetch_size is None or time is None):
return None
try:
return 1.0 * fetch_size / time
@ -474,17 +468,11 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
def data_rate_total(self, ctx, data):
return self._get_rate(data, "total")
def data_time_peer_selection(self, ctx, data):
return self.retrieve_status.timings.get("peer_selection")
def data_time_fetch(self, ctx, data):
return self.retrieve_status.timings.get("fetch")
def data_rate_fetch(self, ctx, data):
return self._get_rate(data, "fetch")
def data_time_cumulative_verify(self, ctx, data):
return self.retrieve_status.timings.get("cumulative_verify")
def data_time_decode(self, ctx, data):
return self.retrieve_status.timings.get("decode")
def data_rate_decode(self, ctx, data):
@ -703,25 +691,27 @@ class Status(rend.Page):
docFactory = getxmlfile("status.xhtml")
addSlash = True
def _get_all_statuses(self, client):
return itertools.chain(client.list_all_upload_statuses(),
client.list_all_download_statuses(),
client.list_all_mapupdate_statuses(),
client.list_all_publish_statuses(),
client.list_all_retrieve_statuses(),
client.list_all_helper_statuses(),
)
def data_active_operations(self, ctx, data):
client = IClient(ctx)
active = (client.list_active_uploads() +
client.list_active_downloads() +
client.list_active_mapupdate() +
client.list_active_publish() +
client.list_active_retrieve() +
client.list_active_helper_statuses())
active = [s
for s in self._get_all_statuses(client)
if s.get_active()]
return active
def data_recent_operations(self, ctx, data):
client = IClient(ctx)
recent = [o for o in (client.list_recent_uploads() +
client.list_recent_downloads() +
client.list_recent_mapupdate() +
client.list_recent_publish() +
client.list_recent_retrieve() +
client.list_recent_helper_statuses())
if not o.get_active()]
recent = [s
for s in self._get_all_statuses(client)
if not s.get_active()]
recent.sort(lambda a,b: cmp(a.get_started(), b.get_started()))
recent.reverse()
return recent
@ -782,50 +772,26 @@ class Status(rend.Page):
stype,count_s = name.split("-")
count = int(count_s)
if stype == "up":
for s in client.list_recent_uploads():
for s in itertools.chain(client.list_all_upload_statuses(),
client.list_all_helper_statuses()):
# immutable-upload helpers use the same status object as a
# regular immutable-upload
if s.get_counter() == count:
return UploadStatusPage(s)
for u in client.list_all_uploads():
# u is an uploader object
s = u.get_upload_status()
if s.get_counter() == count:
return UploadStatusPage(s)
for s in (client.list_active_helper_statuses() +
client.list_recent_helper_statuses()):
if s.get_counter() == count:
# immutable-upload helpers use the same status object as
# a regular immutable-upload
return UploadStatusPage(s)
if stype == "down":
for s in client.list_recent_downloads():
if s.get_counter() == count:
return DownloadStatusPage(s)
for d in client.list_all_downloads():
s = d.get_download_status()
for s in client.list_all_download_statuses():
if s.get_counter() == count:
return DownloadStatusPage(s)
if stype == "mapupdate":
for s in client.list_recent_mapupdate():
if s.get_counter() == count:
return MapupdateStatusPage(s)
for p in client.list_all_mapupdate():
s = p.get_status()
for s in client.list_all_mapupdate_statuses():
if s.get_counter() == count:
return MapupdateStatusPage(s)
if stype == "publish":
for s in client.list_recent_publish():
if s.get_counter() == count:
return PublishStatusPage(s)
for p in client.list_all_publish():
s = p.get_status()
for s in client.list_all_publish_statuses():
if s.get_counter() == count:
return PublishStatusPage(s)
if stype == "retrieve":
for s in client.list_recent_retrieve():
if s.get_counter() == count:
return RetrieveStatusPage(s)
for r in client.list_all_retrieve():
s = r.get_status()
for s in client.list_all_retrieve_statuses():
if s.get_counter() == count:
return RetrieveStatusPage(s)