#330: convert stats-gatherer into a .tac file service, add 'tahoe create-stats-gatherer'

This commit is contained in:
Brian Warner 2008-11-18 01:46:20 -07:00
parent 7d68e4397b
commit 8473a96ada
4 changed files with 113 additions and 74 deletions

View File

@ -8,10 +8,11 @@ from twisted.python import usage
pkg_resources.require('allmydata-tahoe') pkg_resources.require('allmydata-tahoe')
from allmydata.scripts.common import BaseOptions from allmydata.scripts.common import BaseOptions
import debug, create_node, startstop_node, cli, keygen import debug, create_node, startstop_node, cli, keygen, stats_gatherer
_general_commands = ( create_node.subCommands _general_commands = ( create_node.subCommands
+ keygen.subCommands + keygen.subCommands
+ stats_gatherer.subCommands
+ debug.subCommands + debug.subCommands
+ cli.subCommands + cli.subCommands
) )
@ -77,6 +78,8 @@ def runner(argv,
rc = cli.dispatch[command](so) rc = cli.dispatch[command](so)
elif command in keygen.dispatch: elif command in keygen.dispatch:
rc = keygen.dispatch[command](so, stdout, stderr) rc = keygen.dispatch[command](so, stdout, stderr)
elif command in stats_gatherer.dispatch:
rc = stats_gatherer.dispatch[command](so)
elif command in ac_dispatch: elif command in ac_dispatch:
rc = ac_dispatch[command](so, stdout, stderr) rc = ac_dispatch[command](so, stdout, stderr)
else: else:

View File

@ -0,0 +1,57 @@
import os
from twisted.python import usage
class CreateStatsGathererOptions(usage.Options):
optParameters = [
["basedir", "C", None, "which directory to create the stats-gatherer in"],
]
def parseArgs(self, basedir=None):
if basedir is not None:
assert self["basedir"] is None
self["basedir"] = basedir
stats_gatherer_tac = """
# -*- python -*-
from allmydata import stats
from twisted.application import service
verbose = True
g = stats.StatsGathererService(verbose=verbose)
application = service.Application('allmydata_stats_gatherer')
g.setServiceParent(application)
"""
def create_stats_gatherer(config):
out = config.stdout
err = config.stderr
basedir = config['basedir']
if not basedir:
print >>err, "a basedir was not provided, please use --basedir or -C"
return -1
if os.path.exists(basedir):
if os.listdir(basedir):
print >>err, "The base directory \"%s\", which is \"%s\" is not empty." % (basedir, os.path.abspath(basedir))
print >>err, "To avoid clobbering anything, I am going to quit now."
print >>err, "Please use a different directory, or empty this one."
return -1
# we're willing to use an empty directory
else:
os.mkdir(basedir)
f = open(os.path.join(basedir, "tahoe-stats-gatherer.tac"), "wb")
f.write(stats_gatherer_tac)
f.close()
subCommands = [
["create-stats-gatherer", None, CreateStatsGathererOptions, "Create a stats-gatherer service."],
]
dispatch = {
"create-stats-gatherer": create_stats_gatherer,
}

View File

@ -2,17 +2,15 @@
import os import os
import pickle import pickle
import pprint import pprint
import sys
import time import time
from collections import deque from collections import deque
from twisted.internet import reactor, defer from twisted.internet import reactor
from twisted.application import service from twisted.application import service
from twisted.application.internet import TimerService from twisted.application.internet import TimerService
from zope.interface import implements from zope.interface import implements
import foolscap import foolscap
from foolscap.eventual import eventually from foolscap.eventual import eventually
from foolscap.logging.gatherer import get_local_ip_for
from twisted.internet.error import ConnectionDone, ConnectionLost from twisted.internet.error import ConnectionDone, ConnectionLost
from foolscap import DeadReferenceError from foolscap import DeadReferenceError
@ -125,6 +123,7 @@ class CPUUsageMonitor(service.MultiService):
s["cpu_monitor.total"] = now_cpu - self.initial_cpu s["cpu_monitor.total"] = now_cpu - self.initial_cpu
return s return s
class StatsProvider(foolscap.Referenceable, service.MultiService): class StatsProvider(foolscap.Referenceable, service.MultiService):
implements(RIStatsProvider) implements(RIStatsProvider)
@ -180,32 +179,21 @@ class StatsProvider(foolscap.Referenceable, service.MultiService):
def _connected(self, gatherer, nickname): def _connected(self, gatherer, nickname):
gatherer.callRemoteOnly('provide', self, nickname or '') gatherer.callRemoteOnly('provide', self, nickname or '')
class StatsGatherer(foolscap.Referenceable, service.MultiService): class StatsGatherer(foolscap.Referenceable, service.MultiService):
implements(RIStatsGatherer) implements(RIStatsGatherer)
poll_interval = 60 poll_interval = 60
def __init__(self, tub, basedir): def __init__(self, basedir):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self.tub = tub
self.basedir = basedir self.basedir = basedir
self.clients = {} self.clients = {}
self.nicknames = {} self.nicknames = {}
def startService(self):
# the Tub must have a location set on it by now
service.MultiService.startService(self)
self.timer = TimerService(self.poll_interval, self.poll) self.timer = TimerService(self.poll_interval, self.poll)
self.timer.setServiceParent(self) self.timer.setServiceParent(self)
self.registerGatherer()
def get_furl(self):
return self.my_furl
def registerGatherer(self):
furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
def get_tubid(self, rref): def get_tubid(self, rref):
return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID() return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
@ -251,7 +239,7 @@ class StdOutStatsGatherer(StatsGatherer):
StatsGatherer.remote_provide(self, provider, nickname) StatsGatherer.remote_provide(self, provider, nickname)
def announce_lost_client(self, tubid): def announce_lost_client(self, tubid):
print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid) print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
def got_stats(self, stats, tubid, nickname): def got_stats(self, stats, tubid, nickname):
print '"%s" [%s]:' % (nickname, tubid) print '"%s" [%s]:' % (nickname, tubid)
@ -260,9 +248,9 @@ class StdOutStatsGatherer(StatsGatherer):
class PickleStatsGatherer(StdOutStatsGatherer): class PickleStatsGatherer(StdOutStatsGatherer):
# inherit from StdOutStatsGatherer for connect/disconnect notifications # inherit from StdOutStatsGatherer for connect/disconnect notifications
def __init__(self, tub, basedir=".", verbose=True): def __init__(self, basedir=".", verbose=True):
self.verbose = verbose self.verbose = verbose
StatsGatherer.__init__(self, tub, basedir) StatsGatherer.__init__(self, basedir)
self.picklefile = os.path.join(basedir, "stats.pickle") self.picklefile = os.path.join(basedir, "stats.pickle")
if os.path.exists(self.picklefile): if os.path.exists(self.picklefile):
@ -288,51 +276,39 @@ class PickleStatsGatherer(StdOutStatsGatherer):
os.unlink(self.picklefile) os.unlink(self.picklefile)
os.rename(tmp, self.picklefile) os.rename(tmp, self.picklefile)
class GathererApp(object): class StatsGathererService(service.MultiService):
def __init__(self): furl_file = "stats_gatherer.furl"
d = self.setup_tub()
d.addCallback(self._tub_ready)
def setup_tub(self): def __init__(self, basedir=".", verbose=False):
self._tub = foolscap.Tub(certFile="stats_gatherer.pem") service.MultiService.__init__(self)
self._tub.setOption("logLocalFailures", True) self.basedir = basedir
self._tub.setOption("logRemoteFailures", True) self.tub = foolscap.Tub(certFile=os.path.join(self.basedir,
self._tub.startService() "stats_gatherer.pem"))
portnumfile = "portnum" self.tub.setServiceParent(self)
self.tub.setOption("logLocalFailures", True)
self.tub.setOption("logRemoteFailures", True)
self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
self.stats_gatherer.setServiceParent(self)
portnumfile = os.path.join(self.basedir, "portnum")
try: try:
portnum = int(open(portnumfile, "r").read()) portnum = open(portnumfile, "r").read()
except (EnvironmentError, ValueError): except EnvironmentError:
portnum = 0 portnum = None
self._tub.listenOn("tcp:%d" % portnum) self.listener = self.tub.listenOn(portnum or "tcp:0")
d = defer.maybeDeferred(get_local_ip_for) d = self.tub.setLocationAutomatically()
d.addCallback(self._set_location) if portnum is None:
d.addCallback(lambda res: self._tub) d.addCallback(self.save_portnum)
return d d.addCallback(self.tub_ready)
d.addErrback(log.err)
def _set_location(self, local_address): def save_portnum(self, junk):
if local_address is None: portnum = self.listener.getPortnum()
local_addresses = ["127.0.0.1"] portnumfile = os.path.join(self.basedir, 'portnum')
else: open(portnumfile, 'wb').write('%d\n' % (portnum,))
local_addresses = [local_address, "127.0.0.1"]
l = self._tub.getListeners()[0]
portnum = l.getPortnum()
portnumfile = "portnum"
open(portnumfile, "w").write("%d\n" % portnum)
local_addresses = [ "%s:%d" % (addr, portnum,)
for addr in local_addresses ]
assert len(local_addresses) >= 1
location = ",".join(local_addresses)
self._tub.setLocation(location)
def _tub_ready(self, tub): def tub_ready(self, ignored):
sg = PickleStatsGatherer(tub, ".") ff = os.path.join(self.basedir, self.furl_file)
sg.setServiceParent(tub) self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,
sg.verbose = True furlFile=ff)
print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
def main(argv):
ga = GathererApp()
reactor.run()
if __name__ == '__main__':
main(sys.argv)

View File

@ -6,7 +6,6 @@ from twisted.internet.interfaces import IConsumer
from twisted.python import failure from twisted.python import failure
from twisted.application import service from twisted.application import service
from twisted.web.error import Error as WebError from twisted.web.error import Error as WebError
from foolscap import Tub
from foolscap.eventual import flushEventualQueue, fireEventually from foolscap.eventual import flushEventualQueue, fireEventually
from allmydata import uri, dirnode, client from allmydata import uri, dirnode, client
from allmydata.introducer.server import IntroducerNode from allmydata.introducer.server import IntroducerNode
@ -17,7 +16,7 @@ from allmydata.checker_results import CheckerResults, CheckAndRepairResults, \
from allmydata.mutable.common import CorruptShareError from allmydata.mutable.common import CorruptShareError
from allmydata.storage import storage_index_to_dir from allmydata.storage import storage_index_to_dir
from allmydata.util import log, fileutil, pollmixin from allmydata.util import log, fileutil, pollmixin
from allmydata.stats import PickleStatsGatherer from allmydata.stats import StatsGathererService
from allmydata.key_generator import KeyGeneratorService from allmydata.key_generator import KeyGeneratorService
import common_util as testutil import common_util as testutil
@ -355,15 +354,19 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
def _set_up_stats_gatherer(self, res): def _set_up_stats_gatherer(self, res):
statsdir = self.getdir("stats_gatherer") statsdir = self.getdir("stats_gatherer")
fileutil.make_dirs(statsdir) fileutil.make_dirs(statsdir)
t = Tub() self.stats_gatherer_svc = StatsGathererService(statsdir)
self.add_service(t) self.stats_gatherer = self.stats_gatherer_svc.stats_gatherer
l = t.listenOn("tcp:0") self.add_service(self.stats_gatherer_svc)
p = l.getPortnum()
t.setLocation("localhost:%d" % p)
self.stats_gatherer = PickleStatsGatherer(t, statsdir, False) d = fireEventually()
self.add_service(self.stats_gatherer) sgf = os.path.join(statsdir, 'stats_gatherer.furl')
self.stats_gatherer_furl = self.stats_gatherer.get_furl() def check_for_furl():
return os.path.exists(sgf)
d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
def get_furl(junk):
self.stats_gatherer_furl = file(sgf, 'rb').read().strip()
d.addCallback(get_furl)
return d
def _set_up_key_generator(self, res): def _set_up_key_generator(self, res):
kgsdir = self.getdir("key_generator") kgsdir = self.getdir("key_generator")