stats: make StatsGatherer happy about sharing a process with other services, add one during system test to get some test coverage

This commit is contained in:
Brian Warner 2008-03-03 23:55:58 -07:00
parent abb51e70f3
commit 7e159feb27
2 changed files with 75 additions and 25 deletions

View File

@ -12,6 +12,8 @@ from twisted.application.internet import TimerService
from zope.interface import implements
import foolscap
from foolscap.logging.gatherer import get_local_ip_for
from twisted.internet.error import ConnectionDone, ConnectionLost
from foolscap import DeadReferenceError
from allmydata.util import log
from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
@ -29,17 +31,23 @@ class LoadMonitor(service.MultiService):
self.started = False
self.last = None
self.stats = deque()
self.timer = None
def startService(self):
if not self.started:
self.started = True
reactor.callLater(self.loop_interval, self.loop)
self.timer = reactor.callLater(self.loop_interval, self.loop)
service.MultiService.startService(self)
def stopService(self):
self.started = False
if self.timer:
self.timer.cancel()
self.timer = None
return service.MultiService.stopService(self)
def loop(self):
self.timer = None
if not self.started:
return
now = time.time()
@ -53,7 +61,7 @@ class LoadMonitor(service.MultiService):
self.stats.popleft()
self.last = now
reactor.callLater(self.loop_interval, self.loop)
self.timer = reactor.callLater(self.loop_interval, self.loop)
def get_stats(self):
if self.stats:
@ -102,27 +110,34 @@ class StatsProvider(foolscap.Referenceable, service.MultiService):
return { 'counters': self.counters, 'stats': stats }
def _connected(self, gatherer, nickname):
gatherer.callRemote('provide', self, nickname or '')
gatherer.callRemoteOnly('provide', self, nickname or '')
class StatsGatherer(foolscap.Referenceable, service.MultiService):
implements(RIStatsGatherer)
poll_interval = 60
def __init__(self, tub):
def __init__(self, tub, basedir):
service.MultiService.__init__(self)
self.tub = tub
self.basedir = basedir
self.clients = {}
self.nicknames = {}
def startService(self):
# the Tub must have a location set on it by now
service.MultiService.startService(self)
self.timer = TimerService(self.poll_interval, self.poll)
self.timer.setServiceParent(self)
service.MultiService.startService(self)
self.registerGatherer()
def get_furl(self):
return self.tub.registerReference(self, furlFile='stats_gatherer.furl')
return self.my_furl
def registerGatherer(self):
furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
def get_tubid(self, rref):
return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
@ -135,43 +150,55 @@ class StatsGatherer(foolscap.Referenceable, service.MultiService):
return
self.clients[tubid] = provider
self.nicknames[tubid] = nickname
provider.notifyOnDisconnect(self.lost_client, tubid)
def lost_client(self, tubid):
del self.clients[tubid]
del self.nicknames[tubid]
def poll(self):
for tubid,client in self.clients.items():
nickname = self.nicknames.get(tubid)
d = client.callRemote('get_stats')
d.addCallback(self.got_stats, tubid, nickname)
d.addCallbacks(self.got_stats, self.lost_client,
callbackArgs=(tubid, nickname),
errbackArgs=(tubid,))
d.addErrback(self.log_client_error, tubid)
def lost_client(self, f, tubid):
# this is called lazily, when a get_stats request fails
del self.clients[tubid]
del self.nicknames[tubid]
f.trap(DeadReferenceError, ConnectionDone, ConnectionLost)
def log_client_error(self, f, tubid):
log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
level=log.UNUSUAL, failure=f)
def got_stats(self, stats, tubid, nickname):
raise NotImplementedError()
class StdOutStatsGatherer(StatsGatherer):
verbose = True
def remote_provide(self, provider, nickname):
tubid = self.get_tubid(provider)
print 'connect "%s" [%s]' % (nickname, tubid)
if self.verbose:
print 'connect "%s" [%s]' % (nickname, tubid)
provider.notifyOnDisconnect(self.announce_lost_client, tubid)
StatsGatherer.remote_provide(self, provider, nickname)
def lost_client(self, tubid):
def announce_lost_client(self, tubid):
print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
StatsGatherer.lost_client(self, tubid)
def got_stats(self, stats, tubid, nickname):
print '"%s" [%s]:' % (nickname, tubid)
pprint.pprint(stats)
class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications;
#class PickleStatsGatherer(StatsGatherer):
def __init__(self, tub, picklefile):
StatsGatherer.__init__(self, tub)
self.picklefile = picklefile
class PickleStatsGatherer(StdOutStatsGatherer):
# inherit from StdOutStatsGatherer for connect/disconnect notifications
if os.path.exists(picklefile):
f = open(picklefile, 'rb')
def __init__(self, tub, basedir=".", verbose=True):
self.verbose = verbose
StatsGatherer.__init__(self, tub, basedir)
self.picklefile = os.path.join(basedir, "stats.pickle")
if os.path.exists(self.picklefile):
f = open(self.picklefile, 'rb')
self.gathered_stats = pickle.load(f)
f.close()
else:
@ -230,7 +257,7 @@ class GathererApp(object):
self._tub.setLocation(location)
def _tub_ready(self, tub):
sg = PickleStatsGatherer(tub, 'stats.pickle')
sg = PickleStatsGatherer(tub, ".")
sg.setServiceParent(tub)
sg.verbose = True
print '\nStatsGatherer: %s\n' % (sg.get_furl(),)

View File

@ -14,8 +14,9 @@ from allmydata.util import log
from allmydata.scripts import runner
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
from allmydata.mutable import NotMutableError
from allmydata.stats import PickleStatsGatherer
from foolscap.eventual import flushEventualQueue
from foolscap import DeadReferenceError
from foolscap import DeadReferenceError, Tub
from twisted.python.failure import Failure
from twisted.web.client import getPage
from twisted.web.error import Error
@ -73,9 +74,24 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
iv = IntroducerNode(basedir=iv_dir)
self.introducer = self.add_service(iv)
d = self.introducer.when_tub_ready()
d.addCallback(self._set_up_stats_gatherer)
d.addCallback(self._set_up_nodes_2)
d.addCallback(self._grab_stats)
return d
def _set_up_stats_gatherer(self, res):
statsdir = self.getdir("stats_gatherer")
fileutil.make_dirs(statsdir)
t = Tub()
self.add_service(t)
l = t.listenOn("tcp:0")
p = l.getPortnum()
t.setLocation("localhost:%d" % p)
self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
self.add_service(self.stats_gatherer)
self.stats_gatherer_furl = self.stats_gatherer.get_furl()
def _set_up_nodes_2(self, res):
q = self.introducer
self.introducer_furl = q.introducer_url
@ -96,6 +112,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
fileutil.make_dirs(os.path.join(basedir, "private"))
open(os.path.join(basedir, "private", "root_dir.cap"), "w")
open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
# start client[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl).
@ -131,6 +148,10 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
d.addCallback(_connected)
return d
def _grab_stats(self, res):
d = self.stats_gatherer.poll()
return d
def bounce_client(self, num):
c = self.clients[num]
d = c.disownServiceParent()
@ -224,15 +245,16 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
del test_connections
def test_upload_and_download_random_key(self):
self.basedir = "system/SystemTest/test_upload_and_download_random_key"
return self._test_upload_and_download(False)
test_upload_and_download_random_key.timeout = 4800
def test_upload_and_download_content_hash_key(self):
self.basedir = "system/SystemTest/test_upload_and_download_CHK"
return self._test_upload_and_download(True)
test_upload_and_download_content_hash_key.timeout = 4800
def _test_upload_and_download(self, contenthashkey):
self.basedir = "system/SystemTest/test_upload_and_download"
# we use 4000 bytes of data, which will result in about 400k written
# to disk among all our simulated nodes
DATA = "Some data to upload\n" * 200
@ -836,6 +858,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
# P/test_put/ (empty)
d.addCallback(self._test_checker)
d.addCallback(self._test_verifier)
d.addCallback(self._grab_stats)
return d
test_vdrive.timeout = 1100