mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-31 08:25:35 +00:00
mutable: move recent operation history management code (MutableWatcher) into history.py, have History provide stats
This commit is contained in:
parent
aa50c30aa2
commit
2fe099a0b3
@ -22,7 +22,7 @@ from allmydata.util import hashutil, base32, pollmixin, cachedir
|
||||
from allmydata.util.abbreviate import parse_abbreviated_size
|
||||
from allmydata.uri import LiteralFileURI
|
||||
from allmydata.dirnode import NewDirectoryNode
|
||||
from allmydata.mutable.filenode import MutableFileNode, MutableWatcher
|
||||
from allmydata.mutable.filenode import MutableFileNode
|
||||
from allmydata.stats import StatsProvider
|
||||
from allmydata.history import History
|
||||
from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
|
||||
@ -189,14 +189,13 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||
convergence_s = self.get_or_create_private_config('convergence', _make_secret)
|
||||
self.convergence = base32.a2b(convergence_s)
|
||||
self._node_cache = weakref.WeakValueDictionary() # uri -> node
|
||||
self.add_service(History())
|
||||
self.add_service(History(self.stats_provider))
|
||||
self.add_service(Uploader(helper_furl, self.stats_provider))
|
||||
download_cachedir = os.path.join(self.basedir,
|
||||
"private", "cache", "download")
|
||||
self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
|
||||
self.download_cache.setServiceParent(self)
|
||||
self.add_service(Downloader(self.stats_provider))
|
||||
self.add_service(MutableWatcher(self.stats_provider))
|
||||
def _publish(res):
|
||||
# we publish an empty object so that the introducer can count how
|
||||
# many clients are connected and see what versions they're
|
||||
@ -373,14 +372,6 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||
self._node_cache[u_s] = node
|
||||
return self._node_cache[u_s]
|
||||
|
||||
def notify_publish(self, publish_status, size):
|
||||
self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
|
||||
size)
|
||||
def notify_retrieve(self, retrieve_status):
|
||||
self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
|
||||
def notify_mapupdate(self, update_status):
|
||||
self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
|
||||
|
||||
def create_empty_dirnode(self):
|
||||
n = NewDirectoryNode(self)
|
||||
d = n.create(self._generate_pubprivkeys)
|
||||
@ -421,14 +412,11 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||
return self.get_history().list_all_download_statuses()
|
||||
|
||||
def list_all_mapupdate_statuses(self):
|
||||
watcher = self.getServiceNamed("mutable-watcher")
|
||||
return watcher.list_all_mapupdate_statuses()
|
||||
return self.get_history().list_all_mapupdate_statuses()
|
||||
def list_all_publish_statuses(self):
|
||||
watcher = self.getServiceNamed("mutable-watcher")
|
||||
return watcher.list_all_publish_statuses()
|
||||
return self.get_history().list_all_publish_statuses()
|
||||
def list_all_retrieve_statuses(self):
|
||||
watcher = self.getServiceNamed("mutable-watcher")
|
||||
return watcher.list_all_retrieve_statuses()
|
||||
return self.get_history().list_all_retrieve_statuses()
|
||||
|
||||
def list_all_helper_statuses(self):
|
||||
try:
|
||||
|
@ -8,13 +8,26 @@ class History(service.Service):
|
||||
name = "history"
|
||||
MAX_DOWNLOAD_STATUSES = 10
|
||||
MAX_UPLOAD_STATUSES = 10
|
||||
MAX_MAPUPDATE_STATUSES = 20
|
||||
MAX_PUBLISH_STATUSES = 20
|
||||
MAX_RETRIEVE_STATUSES = 20
|
||||
|
||||
def __init__(self, stats_provider=None):
|
||||
self.stats_provider = stats_provider
|
||||
|
||||
def __init__(self):
|
||||
self.all_downloads_statuses = weakref.WeakKeyDictionary()
|
||||
self.recent_download_statuses = []
|
||||
self.all_upload_statuses = weakref.WeakKeyDictionary()
|
||||
self.recent_upload_statuses = []
|
||||
|
||||
self.all_mapupdate_status = weakref.WeakKeyDictionary()
|
||||
self.recent_mapupdate_status = []
|
||||
self.all_publish_status = weakref.WeakKeyDictionary()
|
||||
self.recent_publish_status = []
|
||||
self.all_retrieve_status = weakref.WeakKeyDictionary()
|
||||
self.recent_retrieve_status = []
|
||||
|
||||
|
||||
def add_download(self, download_status):
|
||||
self.all_downloads_statuses[download_status] = None
|
||||
self.recent_download_statuses.append(download_status)
|
||||
@ -34,3 +47,47 @@ class History(service.Service):
|
||||
def list_all_upload_statuses(self):
|
||||
for us in self.all_upload_statuses:
|
||||
yield us
|
||||
|
||||
|
||||
|
||||
def notify_mapupdate(self, p):
|
||||
self.all_mapupdate_status[p] = None
|
||||
self.recent_mapupdate_status.append(p)
|
||||
while len(self.recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
|
||||
self.recent_mapupdate_status.pop(0)
|
||||
|
||||
def notify_publish(self, p, size):
|
||||
self.all_publish_status[p] = None
|
||||
self.recent_publish_status.append(p)
|
||||
if self.stats_provider:
|
||||
self.stats_provider.count('mutable.files_published', 1)
|
||||
# We must be told bytes_published as an argument, since the
|
||||
# publish_status does not yet know how much data it will be asked
|
||||
# to send. When we move to MDMF we'll need to find a better way
|
||||
# to handle this.
|
||||
self.stats_provider.count('mutable.bytes_published', size)
|
||||
while len(self.recent_publish_status) > self.MAX_PUBLISH_STATUSES:
|
||||
self.recent_publish_status.pop(0)
|
||||
|
||||
def notify_retrieve(self, r):
|
||||
self.all_retrieve_status[r] = None
|
||||
self.recent_retrieve_status.append(r)
|
||||
if self.stats_provider:
|
||||
self.stats_provider.count('mutable.files_retrieved', 1)
|
||||
self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
|
||||
while len(self.recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
|
||||
self.recent_retrieve_status.pop(0)
|
||||
|
||||
|
||||
def list_all_mapupdate_statuses(self):
|
||||
for s in self.all_mapupdate_status:
|
||||
yield s
|
||||
def list_all_publish_statuses(self):
|
||||
for s in self.all_publish_status:
|
||||
yield s
|
||||
def list_all_retrieve_statuses(self):
|
||||
for s in self.all_retrieve_status:
|
||||
yield s
|
||||
|
||||
|
||||
|
||||
|
@ -24,7 +24,9 @@ class MutableChecker:
|
||||
def check(self, verify=False):
|
||||
servermap = ServerMap()
|
||||
u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK)
|
||||
self._node._client.notify_mapupdate(u.get_status())
|
||||
history = self._node._client.get_history()
|
||||
if history:
|
||||
history.notify_mapupdate(u.get_status())
|
||||
d = u.update()
|
||||
d.addCallback(self._got_mapupdate_results)
|
||||
if verify:
|
||||
|
@ -1,6 +1,5 @@
|
||||
|
||||
import weakref, random
|
||||
from twisted.application import service
|
||||
import random
|
||||
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer, reactor
|
||||
@ -407,7 +406,9 @@ class MutableFileNode:
|
||||
return self._update_servermap(servermap, mode)
|
||||
def _update_servermap(self, servermap, mode):
|
||||
u = ServermapUpdater(self, Monitor(), servermap, mode)
|
||||
self._client.notify_mapupdate(u.get_status())
|
||||
history = self._client.get_history()
|
||||
if history:
|
||||
history.notify_mapupdate(u.get_status())
|
||||
return u.update()
|
||||
|
||||
def download_version(self, servermap, version, fetch_privkey=False):
|
||||
@ -416,7 +417,9 @@ class MutableFileNode:
|
||||
def _try_once_to_download_version(self, servermap, version,
|
||||
fetch_privkey=False):
|
||||
r = Retrieve(self, servermap, version, fetch_privkey)
|
||||
self._client.notify_retrieve(r.get_status())
|
||||
history = self._client.get_history()
|
||||
if history:
|
||||
history.notify_retrieve(r.get_status())
|
||||
return r.download()
|
||||
|
||||
def upload(self, new_contents, servermap):
|
||||
@ -424,61 +427,7 @@ class MutableFileNode:
|
||||
def _upload(self, new_contents, servermap):
|
||||
assert self._pubkey, "update_servermap must be called before publish"
|
||||
p = Publish(self, servermap)
|
||||
self._client.notify_publish(p.get_status(), len(new_contents))
|
||||
history = self._client.get_history()
|
||||
if history:
|
||||
history.notify_publish(p.get_status(), len(new_contents))
|
||||
return p.publish(new_contents)
|
||||
|
||||
|
||||
|
||||
|
||||
class MutableWatcher(service.MultiService):
|
||||
MAX_MAPUPDATE_STATUSES = 20
|
||||
MAX_PUBLISH_STATUSES = 20
|
||||
MAX_RETRIEVE_STATUSES = 20
|
||||
name = "mutable-watcher"
|
||||
|
||||
def __init__(self, stats_provider=None):
|
||||
service.MultiService.__init__(self)
|
||||
self.stats_provider = stats_provider
|
||||
self._all_mapupdate_status = weakref.WeakKeyDictionary()
|
||||
self._recent_mapupdate_status = []
|
||||
self._all_publish_status = weakref.WeakKeyDictionary()
|
||||
self._recent_publish_status = []
|
||||
self._all_retrieve_status = weakref.WeakKeyDictionary()
|
||||
self._recent_retrieve_status = []
|
||||
|
||||
|
||||
def notify_mapupdate(self, p):
|
||||
self._all_mapupdate_status[p] = None
|
||||
self._recent_mapupdate_status.append(p)
|
||||
while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
|
||||
self._recent_mapupdate_status.pop(0)
|
||||
|
||||
def notify_publish(self, p, size):
|
||||
self._all_publish_status[p] = None
|
||||
self._recent_publish_status.append(p)
|
||||
if self.stats_provider:
|
||||
self.stats_provider.count('mutable.files_published', 1)
|
||||
# We must be told bytes_published as an argument, since the
|
||||
# publish_status does not yet know how much data it will be asked
|
||||
# to send. When we move to MDMF we'll need to find a better way
|
||||
# to handle this.
|
||||
self.stats_provider.count('mutable.bytes_published', size)
|
||||
while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
|
||||
self._recent_publish_status.pop(0)
|
||||
|
||||
def notify_retrieve(self, r):
|
||||
self._all_retrieve_status[r] = None
|
||||
self._recent_retrieve_status.append(r)
|
||||
if self.stats_provider:
|
||||
self.stats_provider.count('mutable.files_retrieved', 1)
|
||||
self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
|
||||
while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
|
||||
self._recent_retrieve_status.pop(0)
|
||||
|
||||
|
||||
def list_all_mapupdate_statuses(self):
|
||||
return self._all_mapupdate_status.keys()
|
||||
def list_all_publish_statuses(self):
|
||||
return self._all_publish_status.keys()
|
||||
def list_all_retrieve_statuses(self):
|
||||
return self._all_retrieve_status.keys()
|
||||
|
@ -194,12 +194,8 @@ class FakeClient:
|
||||
d.addCallback(lambda res: n)
|
||||
return d
|
||||
|
||||
def notify_retrieve(self, r):
|
||||
pass
|
||||
def notify_publish(self, p, size):
|
||||
pass
|
||||
def notify_mapupdate(self, u):
|
||||
pass
|
||||
def get_history(self):
|
||||
return None
|
||||
|
||||
def create_node_from_uri(self, u):
|
||||
u = IURI(u)
|
||||
|
@ -1199,11 +1199,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
|
||||
for us in self.clients[0].list_all_upload_statuses():
|
||||
if us.get_size() > 200:
|
||||
self._up_status = us.get_counter()
|
||||
rs = self.clients[0].list_all_retrieve_statuses()[0]
|
||||
rs = list(self.clients[0].list_all_retrieve_statuses())[0]
|
||||
self._retrieve_status = rs.get_counter()
|
||||
ps = self.clients[0].list_all_publish_statuses()[0]
|
||||
ps = list(self.clients[0].list_all_publish_statuses())[0]
|
||||
self._publish_status = ps.get_counter()
|
||||
us = self.clients[0].list_all_mapupdate_statuses()[0]
|
||||
us = list(self.clients[0].list_all_mapupdate_statuses())[0]
|
||||
self._update_status = us.get_counter()
|
||||
|
||||
# and that there are some upload- and download- status pages
|
||||
|
Loading…
x
Reference in New Issue
Block a user