stats: add a simple stats gathering system

We have a desire to collect runtime statistics from multiple nodes primarily
for server monitoring purposes.   This implements a simple implementation of
such a system, as a skeleton to build more sophistication upon.

Each client now looks for a 'stats_gatherer.furl' config file.  If it has
been configured to use a stats gatherer, then it instantiates internally
a StatsProvider.  This is a central place for code which wishes to offer
stats up for monitoring to report them to, either by calling 
stats_provider.count('stat.name', value) to increment a counter, or by
registering a class as a stats producer with sp.register_producer(obj).

The StatsProvider connects to the StatsGatherer server and provides its
provider upon startup.  The StatsGatherer is then responsible for polling
the attached providers periodically to retrieve the data provided.
The provider queries each registered producer when the gatherer queries
the provider.  Both the internal 'counters' and the queried 'stats' are
then reported to the gatherer.

This provides a simple gatherer app, (c.f. make stats-gatherer-run)
which prints its furl and listens for incoming connections.  Once a
minute, the gatherer polls all connected providers, and writes the
retrieved data into a pickle file.

Also included is a munin plugin which knows how to read the gatherer's
stats.pickle and output data munin can interpret.  this plugin, 
tahoe-stats.py can be symlinked as multiple different names within
munin's 'plugins' directory, and inspects argv to determine which
data to display, doing a lookup in a table within that file.
It looks in the environment for 'statsfile' to determine the path to
the gatherer's stats.pickle.  An example plugins-conf.d file is
provided.
This commit is contained in:
robk-tahoe 2008-01-30 20:11:07 -07:00
parent 22071c00e0
commit 7b9f3207d0
7 changed files with 449 additions and 2 deletions

View File

@ -440,3 +440,8 @@ mac-cleanup:
mac-dbg:
cd mac && $(PP) $(PYTHON)w allmydata_tahoe.py
# This target runs a stats gatherer server
.PHONY: stats-gatherer-run
stats-gatherer-run:
cd stats_gatherer && $(PP) $(PYTHON) ../src/allmydata/stats.py

View File

@ -0,0 +1,12 @@
[tahoe_storage_allocated]
env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
[tahoe_storage_consumed]
env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
[tahoe_runtime_load_avg]
env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
[tahoe_runtime_load_peak]
env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
[tahoe_storage_bytes_added]
env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
[tahoe_storage_bytes_freed]
env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle

145
misc/munin/tahoe-stats.py Normal file
View File

@ -0,0 +1,145 @@
#!/usr/bin/python
import os
import pickle
import re
import sys
PLUGINS = {
'tahoe_storage_consumed':
{ 'statid': 'storage_server.consumed',
'category': 'stats',
'configheader': '\n'.join(['graph_title Tahoe Storage Server Space Consumed',
'graph_vlabel bytes',
'graph_category tahoe_storage_server',
'graph_info This graph shows space consumed',
'graph_args --base 1024',
]),
'graph_config': '\n'.join(['%(name)s.label %(name)s',
'%(name)s.draw LINE1',
]),
'graph_render': '\n'.join(['%(name)s.value %(value)s',
]),
},
'tahoe_storage_allocated':
{ 'statid': 'storage_server.allocated',
'category': 'stats',
'configheader': '\n'.join(['graph_title Tahoe Storage Server Space Allocated',
'graph_vlabel bytes',
'graph_category tahoe_storage_server',
'graph_info This graph shows space allocated',
'graph_args --base 1024',
]),
'graph_config': '\n'.join(['%(name)s.label %(name)s',
'%(name)s.draw LINE1',
]),
'graph_render': '\n'.join(['%(name)s.value %(value)s',
]),
},
'tahoe_runtime_load_avg':
{ 'statid': 'load_monitor.avg_load',
'category': 'stats',
'configheader': '\n'.join(['graph_title Tahoe Runtime Load Average',
'graph_vlabel load',
'graph_category tahoe',
'graph_info This graph shows average reactor delay',
]),
'graph_config': '\n'.join(['%(name)s.label %(name)s',
'%(name)s.draw LINE1',
]),
'graph_render': '\n'.join(['%(name)s.value %(value)s',
]),
},
'tahoe_runtime_load_peak':
{ 'statid': 'load_monitor.max_load',
'category': 'stats',
'configheader': '\n'.join(['graph_title Tahoe Runtime Load Peak',
'graph_vlabel load',
'graph_category tahoe',
'graph_info This graph shows peak reactor delay',
]),
'graph_config': '\n'.join(['%(name)s.label %(name)s',
'%(name)s.draw LINE1',
]),
'graph_render': '\n'.join(['%(name)s.value %(value)s',
]),
},
'tahoe_storage_bytes_added':
{ 'statid': 'storage_server.bytes_added',
'category': 'counters',
'configheader': '\n'.join(['graph_title Tahoe Storage Server Bytes Added',
'graph_vlabel bytes',
'graph_category tahoe_storage_server',
'graph_info This graph shows cummulative bytes added',
]),
'graph_config': '\n'.join(['%(name)s.label %(name)s',
'%(name)s.draw LINE1',
]),
'graph_render': '\n'.join(['%(name)s.value %(value)s',
]),
},
'tahoe_storage_bytes_freed':
{ 'statid': 'storage_server.bytes_freed',
'category': 'counters',
'configheader': '\n'.join(['graph_title Tahoe Storage Server Bytes Removed',
'graph_vlabel bytes',
'graph_category tahoe_storage_server',
'graph_info This graph shows cummulative bytes removed',
]),
'graph_config': '\n'.join(['%(name)s.label %(name)s',
'%(name)s.draw LINE1',
]),
'graph_render': '\n'.join(['%(name)s.value %(value)s',
]),
},
}
def smash_name(name):
return re.sub('[^a-zA-Z0-9]', '_', name)
def open_stats(fname):
f = open(fname, 'rb')
stats = pickle.load(f)
f.close()
return stats
def main(argv):
graph_name = os.path.basename(argv[0])
if graph_name.endswith('.py'):
graph_name = graph_name[:-3]
plugin_conf = PLUGINS.get(graph_name)
for k,v in os.environ.items():
if k.startswith('statsfile'):
stats_file = v
break
else:
raise RuntimeError("No 'statsfile' env var found")
stats = open_stats(stats_file)
def output_nodes(output_section):
for tubid, nodestats in stats.items():
name = smash_name("%s_%s" % (nodestats['nickname'], tubid[:8]))
#value = nodestats['stats'][plugin_conf['category']].get(plugin_conf['statid'])
category = plugin_conf['category']
statid = plugin_conf['statid']
value = nodestats['stats'][category].get(statid)
if value is not None:
args = { 'name': name, 'value': value }
print plugin_conf[output_section] % args
if len(argv) > 1:
if sys.argv[1] == 'config':
print plugin_conf['configheader']
output_nodes('graph_config')
sys.exit(0)
output_nodes('graph_render')
if __name__ == '__main__':
main(sys.argv)

View File

@ -21,6 +21,7 @@ from allmydata.util import hashutil, idlib, testutil
from allmydata.filenode import FileNode
from allmydata.dirnode import NewDirectoryNode
from allmydata.mutable import MutableFileNode
from allmydata.stats import StatsProvider
from allmydata.interfaces import IURI, INewDirectoryURI, \
IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
@ -56,6 +57,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
self.logSource="Client"
self.my_furl = None
self.introducer_client = None
self.init_stats_provider()
self.init_lease_secret()
self.init_storage()
self.init_options()
@ -79,6 +81,15 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
if webport:
self.init_web(webport) # strports string
def init_stats_provider(self):
gatherer_furl = self.get_config('stats_gatherer.furl')
if gatherer_furl:
nickname = self.get_config('nickname')
self.stats_provider = StatsProvider(self.tub, nickname, gatherer_furl)
self.add_service(self.stats_provider)
else:
self.stats_provider = None
def init_lease_secret(self):
def make_secret():
return idlib.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
@ -106,7 +117,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
}[suffix]
sizelimit = int(number) * multiplier
no_storage = self.get_config("debug_no_storage") is not None
self.add_service(StorageServer(storedir, sizelimit, no_storage))
self.add_service(StorageServer(storedir, sizelimit, no_storage, self.stats_provider))
def init_options(self):
self.push_to_ourselves = None

View File

@ -1309,3 +1309,35 @@ class RIHelper(RemoteInterface):
will finish and return the upload results.
"""
return (UploadResults, ChoiceOf(RICHKUploadHelper, None))
class RIStatsProvider(RemoteInterface):
__remote_name__ = "RIStatsProvider.tahoe.allmydata.com"
"""
Provides access to statistics and monitoring information.
"""
def get_stats():
"""
returns a dictionary containing 'counters' and 'stats', each a dictionary
with string counter/stat name keys, and numeric values. counters are
monotonically increasing measures of work done, and stats are instantaneous
measures (potentially time averaged internally)
"""
return DictOf(str, DictOf(str, ChoiceOf(float, int, long)))
class RIStatsGatherer(RemoteInterface):
__remote_name__ = "RIStatsGatherer.tahoe.allmydata.com"
"""
Provides a monitoring service for centralised collection of stats
"""
def provide(provider=RIStatsProvider, nickname=str):
"""
@param provider: a stats collector instance which should be polled
periodically by the gatherer to collect stats.
@param nickname: a name useful to identify the provided client
"""
return None

230
src/allmydata/stats.py Normal file
View File

@ -0,0 +1,230 @@
import os
import pickle
import pprint
import sys
import time
from collections import deque
from twisted.internet import reactor, defer
from twisted.application import service
from twisted.application.internet import TimerService
from zope.interface import implements
import foolscap
from foolscap.logging.gatherer import get_local_ip_for
from allmydata.util import log
from allmydata.interfaces import RIStatsProvider, RIStatsGatherer
class LoadMonitor(service.MultiService):
loop_interval = 1
num_samples = 60
def __init__(self, provider, warn_if_delay_exceeds=1):
service.MultiService.__init__(self)
self.provider = provider
self.warn_if_delay_exceeds = warn_if_delay_exceeds
self.running = False
self.last = None
self.stats = deque()
def startService(self):
if not self.running:
self.running = True
reactor.callLater(self.loop_interval, self.loop)
service.MultiService.startService(self)
def stopService(self):
self.running = False
def loop(self):
if not self.running:
return
now = time.time()
if self.last is not None:
delay = now - self.last - self.loop_interval
if delay > self.warn_if_delay_exceeds:
log.msg(format='excessive reactor delay (%ss)', args=(delay,),
level=log.UNUSUAL)
self.stats.append(delay)
while len(self.stats) > self.num_samples:
self.stats.popleft()
self.last = now
reactor.callLater(self.loop_interval, self.loop)
def get_stats(self):
if self.stats:
avg = sum(self.stats) / len(self.stats)
m_x = max(self.stats)
else:
avg = m_x = 0
return { 'load_monitor.avg_load': avg,
'load_monitor.max_load': m_x, }
class StatsProvider(foolscap.Referenceable, service.MultiService):
implements(RIStatsProvider)
def __init__(self, tub, nickname, gatherer_furl):
service.MultiService.__init__(self)
self.gatherer_furl = gatherer_furl
self.counters = {}
self.stats_producers = []
self.load_monitor = LoadMonitor(self)
self.load_monitor.setServiceParent(self)
self.register_producer(self.load_monitor)
if tub:
tub.connectTo(gatherer_furl, self._connected_to_gatherer, nickname)
def count(self, name, delta):
val = self.counters.setdefault(name, 0)
self.counters[name] = val + delta
def register_producer(self, stats_producer):
self.stats_producers.append(stats_producer)
def remote_get_stats(self):
stats = {}
for sp in self.stats_producers:
stats.update(sp.get_stats())
return { 'counters': self.counters, 'stats': stats }
def _connected_to_gatherer(self, gatherer, nickname):
gatherer.callRemote('provide', self, nickname or '')
class StatsGatherer(foolscap.Referenceable, service.MultiService):
implements(RIStatsGatherer)
poll_interval = 60
def __init__(self, tub):
service.MultiService.__init__(self)
self.tub = tub
self.clients = {}
self.nicknames = {}
def startService(self):
self.timer = TimerService(self.poll_interval, self.poll)
self.timer.setServiceParent(self)
service.MultiService.startService(self)
def get_furl(self):
return self.tub.registerReference(self, furlFile='stats_gatherer.furl')
def get_tubid(self, rref):
return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
def remote_provide(self, provider, nickname):
tubid = self.get_tubid(provider)
self.clients[tubid] = provider
self.nicknames[tubid] = nickname
provider.notifyOnDisconnect(self.lost_client, tubid)
def lost_client(self, tubid):
del self.clients[tubid]
del self.nicknames[tubid]
def poll(self):
for tubid,client in self.clients.items():
nickname = self.nicknames.get(tubid)
d = client.callRemote('get_stats')
d.addCallback(self.got_stats, tubid, nickname)
def got_stats(self, stats, tubid, nickname):
raise NotImplementedError()
class StdOutStatsGatherer(StatsGatherer):
def remote_provide(self, provider, nickname):
tubid = self.get_tubid(provider)
print 'connect "%s" [%s]' % (nickname, tubid)
StatsGatherer.remote_provide(self, provider, nickname)
def lost_client(self, tubid):
print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
StatsGatherer.lost_client(self, tubid)
def got_stats(self, stats, tubid, nickname):
print '"%s" [%s]:' % (nickname, tubid)
pprint.pprint(stats)
class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications;
#class PickleStatsGatherer(StatsGatherer):
def __init__(self, tub, picklefile):
StatsGatherer.__init__(self, tub)
self.picklefile = picklefile
if os.path.exists(picklefile):
f = open(picklefile, 'rb')
self.gathered_stats = pickle.load(f)
f.close()
else:
self.gathered_stats = {}
def got_stats(self, stats, tubid, nickname):
s = self.gathered_stats.setdefault(tubid, {})
s['timestamp'] = time.time()
s['nickname'] = nickname
s['stats'] = stats
self.dump_pickle()
def dump_pickle(self):
tmp = "%s.tmp" % (self.picklefile,)
f = open(tmp, 'wb')
pickle.dump(self.gathered_stats, f)
f.close()
if os.path.exists(self.picklefile):
os.unlink(self.picklefile)
os.rename(tmp, self.picklefile)
class GathererApp(object):
def __init__(self):
d = self.setup_tub()
d.addCallback(self._tub_ready)
def setup_tub(self):
self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
self._tub.setOption("logLocalFailures", True)
self._tub.setOption("logRemoteFailures", True)
self._tub.startService()
portnumfile = "portnum"
try:
portnum = int(open(portnumfile, "r").read())
except (EnvironmentError, ValueError):
portnum = 0
self._tub.listenOn("tcp:%d" % portnum)
d = defer.maybeDeferred(get_local_ip_for)
d.addCallback(self._set_location)
d.addCallback(lambda res: self._tub)
return d
def _set_location(self, local_address):
if local_address is None:
local_addresses = ["127.0.0.1"]
else:
local_addresses = [local_address, "127.0.0.1"]
l = self._tub.getListeners()[0]
portnum = l.getPortnum()
portnumfile = "portnum"
open(portnumfile, "w").write("%d\n" % portnum)
local_addresses = [ "%s:%d" % (addr, portnum,)
for addr in local_addresses ]
assert len(local_addresses) >= 1
location = ",".join(local_addresses)
self._tub.setLocation(location)
def _tub_ready(self, tub):
sg = PickleStatsGatherer(tub, 'stats.pickle')
sg.setServiceParent(tub)
sg.verbose = True
print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
def main(argv):
ga = GathererApp()
reactor.run()
if __name__ == '__main__':
main(sys.argv)

View File

@ -659,7 +659,7 @@ class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer)
name = 'storageserver'
def __init__(self, storedir, sizelimit=None, no_storage=False):
def __init__(self, storedir, sizelimit=None, no_storage=False, stats_provider=None):
service.MultiService.__init__(self)
self.storedir = storedir
sharedir = os.path.join(storedir, "shares")
@ -667,6 +667,9 @@ class StorageServer(service.MultiService, Referenceable):
self.sharedir = sharedir
self.sizelimit = sizelimit
self.no_storage = no_storage
self.stats_provider = stats_provider
if self.stats_provider:
self.stats_provider.register_producer(self)
self.incomingdir = os.path.join(sharedir, 'incoming')
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
@ -693,6 +696,11 @@ class StorageServer(service.MultiService, Referenceable):
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
def get_stats(self):
return { 'storage_server.consumed': self.consumed,
'storage_server.allocated': self.allocated_size(),
}
def measure_size(self):
self.consumed = fileutil.du(self.sharedir)
@ -821,11 +829,15 @@ class StorageServer(service.MultiService, Referenceable):
if not remaining_files:
fileutil.rm_dir(storagedir)
self.consumed -= total_space_freed
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
if not found_buckets:
raise IndexError("no such lease to cancel")
def bucket_writer_closed(self, bw, consumed_size):
self.consumed += consumed_size
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._active_writers[bw]
def _get_bucket_shares(self, storage_index):