mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-31 08:25:35 +00:00
Merge pull request #920 from tahoe-lafs/3549.remove-stats-gatherer
Remove the stats gatherer Fixes: ticket:3549
This commit is contained in:
commit
d096cc54ae
@ -75,7 +75,7 @@ The item descriptions below use the following types:
|
||||
Node Types
|
||||
==========
|
||||
|
||||
A node can be a client/server, an introducer, or a statistics gatherer.
|
||||
A node can be a client/server or an introducer.
|
||||
|
||||
Client/server nodes provide one or more of the following services:
|
||||
|
||||
@ -593,11 +593,6 @@ Client Configuration
|
||||
If provided, the node will attempt to connect to and use the given helper
|
||||
for uploads. See :doc:`helper` for details.
|
||||
|
||||
``stats_gatherer.furl = (FURL string, optional)``
|
||||
|
||||
If provided, the node will connect to the given stats gatherer and
|
||||
provide it with operational statistics.
|
||||
|
||||
``shares.needed = (int, optional) aka "k", default 3``
|
||||
|
||||
``shares.total = (int, optional) aka "N", N >= k, default 10``
|
||||
@ -911,11 +906,6 @@ This section describes these other files.
|
||||
This file is used to construct an introducer, and is created by the
|
||||
"``tahoe create-introducer``" command.
|
||||
|
||||
``tahoe-stats-gatherer.tac``
|
||||
|
||||
This file is used to construct a statistics gatherer, and is created by the
|
||||
"``tahoe create-stats-gatherer``" command.
|
||||
|
||||
``private/control.furl``
|
||||
|
||||
This file contains a FURL that provides access to a control port on the
|
||||
|
@ -45,9 +45,6 @@ Create a client node (with storage initially disabled).
|
||||
.TP
|
||||
.B \f[B]create-introducer\f[]
|
||||
Create an introducer node.
|
||||
.TP
|
||||
.B \f[B]create-stats-gatherer\f[]
|
||||
Create a stats-gatherer service.
|
||||
.SS OPTIONS
|
||||
.TP
|
||||
.B \f[B]-C,\ --basedir=\f[]
|
||||
|
@ -6,8 +6,7 @@ Tahoe Statistics
|
||||
|
||||
1. `Overview`_
|
||||
2. `Statistics Categories`_
|
||||
3. `Running a Tahoe Stats-Gatherer Service`_
|
||||
4. `Using Munin To Graph Stats Values`_
|
||||
3. `Using Munin To Graph Stats Values`_
|
||||
|
||||
Overview
|
||||
========
|
||||
@ -243,92 +242,6 @@ The currently available stats (as of release 1.6.0 or so) are described here:
|
||||
the process was started. Ticket #472 indicates that .total may
|
||||
sometimes be negative due to wraparound of the kernel's counter.
|
||||
|
||||
**stats.load_monitor.\***
|
||||
|
||||
When enabled, the "load monitor" continually schedules a one-second
|
||||
callback, and measures how late the response is. This estimates system load
|
||||
(if the system is idle, the response should be on time). This is only
|
||||
enabled if a stats-gatherer is configured.
|
||||
|
||||
avg_load
|
||||
average "load" value (seconds late) over the last minute
|
||||
|
||||
max_load
|
||||
maximum "load" value over the last minute
|
||||
|
||||
|
||||
Running a Tahoe Stats-Gatherer Service
|
||||
======================================
|
||||
|
||||
The "stats-gatherer" is a simple daemon that periodically collects stats from
|
||||
several tahoe nodes. It could be useful, e.g., in a production environment,
|
||||
where you want to monitor dozens of storage servers from a central management
|
||||
host. It merely gatherers statistics from many nodes into a single place: it
|
||||
does not do any actual analysis.
|
||||
|
||||
The stats gatherer listens on a network port using the same Foolscap_
|
||||
connection library that Tahoe clients use to connect to storage servers.
|
||||
Tahoe nodes can be configured to connect to the stats gatherer and publish
|
||||
their stats on a periodic basis. (In fact, what happens is that nodes connect
|
||||
to the gatherer and offer it a second FURL which points back to the node's
|
||||
"stats port", which the gatherer then uses to pull stats on a periodic basis.
|
||||
The initial connection is flipped to allow the nodes to live behind NAT
|
||||
boxes, as long as the stats-gatherer has a reachable IP address.)
|
||||
|
||||
.. _Foolscap: https://foolscap.lothar.com/trac
|
||||
|
||||
The stats-gatherer is created in the same fashion as regular tahoe client
|
||||
nodes and introducer nodes. Choose a base directory for the gatherer to live
|
||||
in (but do not create the directory). Choose the hostname that should be
|
||||
advertised in the gatherer's FURL. Then run:
|
||||
|
||||
::
|
||||
|
||||
tahoe create-stats-gatherer --hostname=HOSTNAME $BASEDIR
|
||||
|
||||
and start it with "tahoe start $BASEDIR". Once running, the gatherer will
|
||||
write a FURL into $BASEDIR/stats_gatherer.furl .
|
||||
|
||||
To configure a Tahoe client/server node to contact the stats gatherer, copy
|
||||
this FURL into the node's tahoe.cfg file, in a section named "[client]",
|
||||
under a key named "stats_gatherer.furl", like so:
|
||||
|
||||
::
|
||||
|
||||
[client]
|
||||
stats_gatherer.furl = pb://qbo4ktl667zmtiuou6lwbjryli2brv6t@HOSTNAME:PORTNUM/wxycb4kaexzskubjnauxeoptympyf45y
|
||||
|
||||
or simply copy the stats_gatherer.furl file into the node's base directory
|
||||
(next to the tahoe.cfg file): it will be interpreted in the same way.
|
||||
|
||||
When the gatherer is created, it will allocate a random unused TCP port, so
|
||||
it should not conflict with anything else that you have running on that host
|
||||
at that time. To explicitly control which port it uses, run the creation
|
||||
command with ``--location=`` and ``--port=`` instead of ``--hostname=``. If
|
||||
you use a hostname of ``example.org`` and a port number of ``1234``, then
|
||||
run::
|
||||
|
||||
tahoe create-stats-gatherer --location=tcp:example.org:1234 --port=tcp:1234
|
||||
|
||||
``--location=`` is a Foolscap FURL hints string (so it can be a
|
||||
comma-separated list of connection hints), and ``--port=`` is a Twisted
|
||||
"server endpoint specification string", as described in :doc:`configuration`.
|
||||
|
||||
Once running, the stats gatherer will create a standard JSON file in
|
||||
``$BASEDIR/stats.json``. Once a minute, the gatherer will pull stats
|
||||
information from every connected node and write them into the file. The file
|
||||
will contain a dictionary, in which node identifiers (known as "tubid"
|
||||
strings) are the keys, and the values are a dict with 'timestamp',
|
||||
'nickname', and 'stats' keys. d[tubid][stats] will contain the stats
|
||||
dictionary as made available at http://localhost:3456/statistics?t=json . The
|
||||
file will only contain the most recent update from each node.
|
||||
|
||||
Other tools can be built to examine these stats and render them into
|
||||
something useful. For example, a tool could sum the
|
||||
"storage_server.disk_avail' values from all servers to compute a
|
||||
total-disk-available number for the entire grid (however, the "disk watcher"
|
||||
daemon, in misc/operations_helpers/spacetime/, is better suited for this
|
||||
specific task).
|
||||
|
||||
Using Munin To Graph Stats Values
|
||||
=================================
|
||||
|
1
newsfragments/3549.removed
Normal file
1
newsfragments/3549.removed
Normal file
@ -0,0 +1 @@
|
||||
The stats gatherer, broken since at least Tahoe-LAFS 1.13.0, has been removed. The ``[client]stats_gatherer.furl`` configuration item in ``tahoe.cfg`` is no longer allowed. The Tahoe-LAFS project recommends using a third-party metrics aggregation tool instead.
|
@ -3,7 +3,6 @@ from past.builtins import unicode
|
||||
import os, stat, time, weakref
|
||||
from base64 import urlsafe_b64encode
|
||||
from functools import partial
|
||||
|
||||
# On Python 2 this will be the backported package:
|
||||
from configparser import NoSectionError
|
||||
|
||||
@ -85,7 +84,6 @@ _client_config = configutil.ValidConfiguration(
|
||||
"shares.happy",
|
||||
"shares.needed",
|
||||
"shares.total",
|
||||
"stats_gatherer.furl",
|
||||
"storage.plugins",
|
||||
),
|
||||
"ftpd": (
|
||||
@ -678,11 +676,7 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
self.init_web(webport) # strports string
|
||||
|
||||
def init_stats_provider(self):
|
||||
gatherer_furl = self.config.get_config("client", "stats_gatherer.furl", None)
|
||||
if gatherer_furl:
|
||||
# FURLs should be bytes:
|
||||
gatherer_furl = gatherer_furl.encode("utf-8")
|
||||
self.stats_provider = StatsProvider(self, gatherer_furl)
|
||||
self.stats_provider = StatsProvider(self)
|
||||
self.stats_provider.setServiceParent(self)
|
||||
self.stats_provider.register_producer(self)
|
||||
|
||||
|
@ -2931,38 +2931,6 @@ class RIHelper(RemoteInterface):
|
||||
return (UploadResults, ChoiceOf(RICHKUploadHelper, None))
|
||||
|
||||
|
||||
class RIStatsProvider(RemoteInterface):
|
||||
__remote_name__ = native_str("RIStatsProvider.tahoe.allmydata.com")
|
||||
"""
|
||||
Provides access to statistics and monitoring information.
|
||||
"""
|
||||
|
||||
def get_stats():
|
||||
"""
|
||||
returns a dictionary containing 'counters' and 'stats', each a
|
||||
dictionary with string counter/stat name keys, and numeric or None values.
|
||||
counters are monotonically increasing measures of work done, and
|
||||
stats are instantaneous measures (potentially time averaged
|
||||
internally)
|
||||
"""
|
||||
return DictOf(bytes, DictOf(bytes, ChoiceOf(float, int, long, None)))
|
||||
|
||||
|
||||
class RIStatsGatherer(RemoteInterface):
|
||||
__remote_name__ = native_str("RIStatsGatherer.tahoe.allmydata.com")
|
||||
"""
|
||||
Provides a monitoring service for centralised collection of stats
|
||||
"""
|
||||
|
||||
def provide(provider=RIStatsProvider, nickname=bytes):
|
||||
"""
|
||||
@param provider: a stats collector instance that should be polled
|
||||
periodically by the gatherer to collect stats.
|
||||
@param nickname: a name useful to identify the provided client
|
||||
"""
|
||||
return None
|
||||
|
||||
|
||||
class IStatsProducer(Interface):
|
||||
def get_stats():
|
||||
"""
|
||||
|
@ -318,7 +318,6 @@ def write_client_config(c, config):
|
||||
|
||||
c.write("[client]\n")
|
||||
c.write("helper.furl =\n")
|
||||
c.write("#stats_gatherer.furl =\n")
|
||||
c.write("\n")
|
||||
c.write("# Encoding parameters this client will use for newly-uploaded files\n")
|
||||
c.write("# This can be changed at any time: the encoding is saved in\n")
|
||||
|
@ -10,7 +10,6 @@ from twisted.application.service import Service
|
||||
|
||||
from allmydata.scripts.default_nodedir import _default_nodedir
|
||||
from allmydata.util import fileutil
|
||||
from allmydata.node import read_config
|
||||
from allmydata.util.encodingutil import listdir_unicode, quote_local_unicode_path
|
||||
from allmydata.util.configutil import UnknownConfigError
|
||||
from allmydata.util.deferredutil import HookMixin
|
||||
@ -47,8 +46,8 @@ def get_pid_from_pidfile(pidfile):
|
||||
|
||||
def identify_node_type(basedir):
|
||||
"""
|
||||
:return unicode: None or one of: 'client', 'introducer',
|
||||
'key-generator' or 'stats-gatherer'
|
||||
:return unicode: None or one of: 'client', 'introducer', or
|
||||
'key-generator'
|
||||
"""
|
||||
tac = u''
|
||||
try:
|
||||
@ -59,7 +58,7 @@ def identify_node_type(basedir):
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
for t in (u"client", u"introducer", u"key-generator", u"stats-gatherer"):
|
||||
for t in (u"client", u"introducer", u"key-generator"):
|
||||
if t in tac:
|
||||
return t
|
||||
return None
|
||||
@ -135,7 +134,6 @@ class DaemonizeTheRealService(Service, HookMixin):
|
||||
node_to_instance = {
|
||||
u"client": lambda: maybeDeferred(namedAny("allmydata.client.create_client"), self.basedir),
|
||||
u"introducer": lambda: maybeDeferred(namedAny("allmydata.introducer.server.create_introducer"), self.basedir),
|
||||
u"stats-gatherer": lambda: maybeDeferred(namedAny("allmydata.stats.StatsGathererService"), read_config(self.basedir, None), self.basedir, verbose=True),
|
||||
u"key-generator": key_generator_removed,
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,7 @@ from twisted.internet import defer, task, threads
|
||||
|
||||
from allmydata.scripts.common import get_default_nodedir
|
||||
from allmydata.scripts import debug, create_node, cli, \
|
||||
stats_gatherer, admin, tahoe_daemonize, tahoe_start, \
|
||||
admin, tahoe_daemonize, tahoe_start, \
|
||||
tahoe_stop, tahoe_restart, tahoe_run, tahoe_invite
|
||||
from allmydata.util.encodingutil import quote_output, quote_local_unicode_path, get_io_encoding
|
||||
from allmydata.util.eliotutil import (
|
||||
@ -60,7 +60,6 @@ class Options(usage.Options):
|
||||
stderr = sys.stderr
|
||||
|
||||
subCommands = ( create_node.subCommands
|
||||
+ stats_gatherer.subCommands
|
||||
+ admin.subCommands
|
||||
+ process_control_commands
|
||||
+ debug.subCommands
|
||||
@ -107,7 +106,7 @@ class Options(usage.Options):
|
||||
|
||||
|
||||
create_dispatch = {}
|
||||
for module in (create_node, stats_gatherer):
|
||||
for module in (create_node,):
|
||||
create_dispatch.update(module.dispatch)
|
||||
|
||||
def parse_options(argv, config=None):
|
||||
|
@ -1,103 +0,0 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import os
|
||||
|
||||
# Python 2 compatibility
|
||||
from future.utils import PY2
|
||||
if PY2:
|
||||
from future.builtins import str # noqa: F401
|
||||
|
||||
from twisted.python import usage
|
||||
|
||||
from allmydata.scripts.common import NoDefaultBasedirOptions
|
||||
from allmydata.scripts.create_node import write_tac
|
||||
from allmydata.util.assertutil import precondition
|
||||
from allmydata.util.encodingutil import listdir_unicode, quote_output
|
||||
from allmydata.util import fileutil, iputil
|
||||
|
||||
|
||||
class CreateStatsGathererOptions(NoDefaultBasedirOptions):
|
||||
subcommand_name = "create-stats-gatherer"
|
||||
optParameters = [
|
||||
("hostname", None, None, "Hostname of this machine, used to build location"),
|
||||
("location", None, None, "FURL connection hints, e.g. 'tcp:HOSTNAME:PORT'"),
|
||||
("port", None, None, "listening endpoint, e.g. 'tcp:PORT'"),
|
||||
]
|
||||
def postOptions(self):
|
||||
if self["hostname"] and (not self["location"]) and (not self["port"]):
|
||||
pass
|
||||
elif (not self["hostname"]) and self["location"] and self["port"]:
|
||||
pass
|
||||
else:
|
||||
raise usage.UsageError("You must provide --hostname, or --location and --port.")
|
||||
|
||||
description = """
|
||||
Create a "stats-gatherer" service, which is a standalone process that
|
||||
collects and stores runtime statistics from many server nodes. This is a
|
||||
tool for operations personnel to keep track of free disk space, server
|
||||
load, and protocol activity, across a fleet of Tahoe storage servers.
|
||||
|
||||
The "stats-gatherer" listens on a TCP port and publishes a Foolscap FURL
|
||||
by writing it into a file named "stats_gatherer.furl". You must copy this
|
||||
FURL into the servers' tahoe.cfg, as the [client] stats_gatherer.furl=
|
||||
entry. Those servers will then establish a connection to the
|
||||
stats-gatherer and publish their statistics on a periodic basis. The
|
||||
gatherer writes a summary JSON file out to disk after each update.
|
||||
|
||||
The stats-gatherer listens on a configurable port, and writes a
|
||||
configurable hostname+port pair into the FURL that it publishes. There
|
||||
are two configuration modes you can use.
|
||||
|
||||
* In the first, you provide --hostname=, and the service chooses its own
|
||||
TCP port number. If the host is named "example.org" and you provide
|
||||
--hostname=example.org, the node will pick a port number (e.g. 12345)
|
||||
and use location="tcp:example.org:12345" and port="tcp:12345".
|
||||
|
||||
* In the second, you provide both --location= and --port=, and the
|
||||
service will refrain from doing any allocation of its own. --location=
|
||||
must be a Foolscap "FURL connection hint sequence", which is a
|
||||
comma-separated list of "tcp:HOSTNAME:PORTNUM" strings. --port= must be
|
||||
a Twisted server endpoint specification, which is generally
|
||||
"tcp:PORTNUM". So, if your host is named "example.org" and you want to
|
||||
use port 6789, you should provide --location=tcp:example.org:6789 and
|
||||
--port=tcp:6789. You are responsible for making sure --location= and
|
||||
--port= match each other.
|
||||
"""
|
||||
|
||||
|
||||
def create_stats_gatherer(config):
|
||||
err = config.stderr
|
||||
basedir = config['basedir']
|
||||
# This should always be called with an absolute Unicode basedir.
|
||||
precondition(isinstance(basedir, str), basedir)
|
||||
|
||||
if os.path.exists(basedir):
|
||||
if listdir_unicode(basedir):
|
||||
print("The base directory %s is not empty." % quote_output(basedir), file=err)
|
||||
print("To avoid clobbering anything, I am going to quit now.", file=err)
|
||||
print("Please use a different directory, or empty this one.", file=err)
|
||||
return -1
|
||||
# we're willing to use an empty directory
|
||||
else:
|
||||
os.mkdir(basedir)
|
||||
write_tac(basedir, "stats-gatherer")
|
||||
if config["hostname"]:
|
||||
portnum = iputil.allocate_tcp_port()
|
||||
location = "tcp:%s:%d" % (config["hostname"], portnum)
|
||||
port = "tcp:%d" % portnum
|
||||
else:
|
||||
location = config["location"]
|
||||
port = config["port"]
|
||||
fileutil.write(os.path.join(basedir, "location"), location+"\n")
|
||||
fileutil.write(os.path.join(basedir, "port"), port+"\n")
|
||||
return 0
|
||||
|
||||
subCommands = [
|
||||
["create-stats-gatherer", None, CreateStatsGathererOptions, "Create a stats-gatherer service."],
|
||||
]
|
||||
|
||||
dispatch = {
|
||||
"create-stats-gatherer": create_stats_gatherer,
|
||||
}
|
||||
|
||||
|
@ -1,79 +1,19 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import json
|
||||
import os
|
||||
import pprint
|
||||
import time
|
||||
from collections import deque
|
||||
|
||||
# Python 2 compatibility
|
||||
from future.utils import PY2
|
||||
if PY2:
|
||||
from future.builtins import str # noqa: F401
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.application import service
|
||||
from twisted.application.internet import TimerService
|
||||
from zope.interface import implementer
|
||||
from foolscap.api import eventually, DeadReferenceError, Referenceable, Tub
|
||||
from foolscap.api import eventually
|
||||
|
||||
from allmydata.util import log
|
||||
from allmydata.util.encodingutil import quote_local_unicode_path
|
||||
from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
|
||||
|
||||
@implementer(IStatsProducer)
|
||||
class LoadMonitor(service.MultiService):
|
||||
|
||||
loop_interval = 1
|
||||
num_samples = 60
|
||||
|
||||
def __init__(self, provider, warn_if_delay_exceeds=1):
|
||||
service.MultiService.__init__(self)
|
||||
self.provider = provider
|
||||
self.warn_if_delay_exceeds = warn_if_delay_exceeds
|
||||
self.started = False
|
||||
self.last = None
|
||||
self.stats = deque()
|
||||
self.timer = None
|
||||
|
||||
def startService(self):
|
||||
if not self.started:
|
||||
self.started = True
|
||||
self.timer = reactor.callLater(self.loop_interval, self.loop)
|
||||
service.MultiService.startService(self)
|
||||
|
||||
def stopService(self):
|
||||
self.started = False
|
||||
if self.timer:
|
||||
self.timer.cancel()
|
||||
self.timer = None
|
||||
return service.MultiService.stopService(self)
|
||||
|
||||
def loop(self):
|
||||
self.timer = None
|
||||
if not self.started:
|
||||
return
|
||||
now = time.time()
|
||||
if self.last is not None:
|
||||
delay = now - self.last - self.loop_interval
|
||||
if delay > self.warn_if_delay_exceeds:
|
||||
log.msg(format='excessive reactor delay (%ss)', args=(delay,),
|
||||
level=log.UNUSUAL)
|
||||
self.stats.append(delay)
|
||||
while len(self.stats) > self.num_samples:
|
||||
self.stats.popleft()
|
||||
|
||||
self.last = now
|
||||
self.timer = reactor.callLater(self.loop_interval, self.loop)
|
||||
|
||||
def get_stats(self):
|
||||
if self.stats:
|
||||
avg = sum(self.stats) / len(self.stats)
|
||||
m_x = max(self.stats)
|
||||
else:
|
||||
avg = m_x = 0
|
||||
return { 'load_monitor.avg_load': avg,
|
||||
'load_monitor.max_load': m_x, }
|
||||
from allmydata.interfaces import IStatsProducer
|
||||
|
||||
@implementer(IStatsProducer)
|
||||
class CPUUsageMonitor(service.MultiService):
|
||||
@ -128,37 +68,18 @@ class CPUUsageMonitor(service.MultiService):
|
||||
return s
|
||||
|
||||
|
||||
@implementer(RIStatsProvider)
|
||||
class StatsProvider(Referenceable, service.MultiService):
|
||||
class StatsProvider(service.MultiService):
|
||||
|
||||
def __init__(self, node, gatherer_furl):
|
||||
def __init__(self, node):
|
||||
service.MultiService.__init__(self)
|
||||
self.node = node
|
||||
self.gatherer_furl = gatherer_furl # might be None
|
||||
|
||||
self.counters = {}
|
||||
self.stats_producers = []
|
||||
|
||||
# only run the LoadMonitor (which submits a timer every second) if
|
||||
# there is a gatherer who is going to be paying attention. Our stats
|
||||
# are visible through HTTP even without a gatherer, so run the rest
|
||||
# of the stats (including the once-per-minute CPUUsageMonitor)
|
||||
if gatherer_furl:
|
||||
self.load_monitor = LoadMonitor(self)
|
||||
self.load_monitor.setServiceParent(self)
|
||||
self.register_producer(self.load_monitor)
|
||||
|
||||
self.cpu_monitor = CPUUsageMonitor()
|
||||
self.cpu_monitor.setServiceParent(self)
|
||||
self.register_producer(self.cpu_monitor)
|
||||
|
||||
def startService(self):
|
||||
if self.node and self.gatherer_furl:
|
||||
nickname_utf8 = self.node.nickname.encode("utf-8")
|
||||
self.node.tub.connectTo(self.gatherer_furl,
|
||||
self._connected, nickname_utf8)
|
||||
service.MultiService.startService(self)
|
||||
|
||||
def count(self, name, delta=1):
|
||||
if isinstance(name, str):
|
||||
name = name.encode("utf-8")
|
||||
@ -175,155 +96,3 @@ class StatsProvider(Referenceable, service.MultiService):
|
||||
ret = { 'counters': self.counters, 'stats': stats }
|
||||
log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
|
||||
return ret
|
||||
|
||||
def remote_get_stats(self):
|
||||
# The remote API expects keys to be bytes:
|
||||
def to_bytes(d):
|
||||
result = {}
|
||||
for (k, v) in d.items():
|
||||
if isinstance(k, str):
|
||||
k = k.encode("utf-8")
|
||||
result[k] = v
|
||||
return result
|
||||
|
||||
stats = self.get_stats()
|
||||
return {b"counters": to_bytes(stats["counters"]),
|
||||
b"stats": to_bytes(stats["stats"])}
|
||||
|
||||
def _connected(self, gatherer, nickname):
|
||||
gatherer.callRemoteOnly('provide', self, nickname or '')
|
||||
|
||||
|
||||
@implementer(RIStatsGatherer)
|
||||
class StatsGatherer(Referenceable, service.MultiService):
|
||||
|
||||
poll_interval = 60
|
||||
|
||||
def __init__(self, basedir):
|
||||
service.MultiService.__init__(self)
|
||||
self.basedir = basedir
|
||||
|
||||
self.clients = {}
|
||||
self.nicknames = {}
|
||||
|
||||
self.timer = TimerService(self.poll_interval, self.poll)
|
||||
self.timer.setServiceParent(self)
|
||||
|
||||
def get_tubid(self, rref):
|
||||
return rref.getRemoteTubID()
|
||||
|
||||
def remote_provide(self, provider, nickname):
|
||||
tubid = self.get_tubid(provider)
|
||||
if tubid == '<unauth>':
|
||||
print("WARNING: failed to get tubid for %s (%s)" % (provider, nickname))
|
||||
# don't add to clients to poll (polluting data) don't care about disconnect
|
||||
return
|
||||
self.clients[tubid] = provider
|
||||
self.nicknames[tubid] = nickname
|
||||
|
||||
def poll(self):
|
||||
for tubid,client in self.clients.items():
|
||||
nickname = self.nicknames.get(tubid)
|
||||
d = client.callRemote('get_stats')
|
||||
d.addCallbacks(self.got_stats, self.lost_client,
|
||||
callbackArgs=(tubid, nickname),
|
||||
errbackArgs=(tubid,))
|
||||
d.addErrback(self.log_client_error, tubid)
|
||||
|
||||
def lost_client(self, f, tubid):
|
||||
# this is called lazily, when a get_stats request fails
|
||||
del self.clients[tubid]
|
||||
del self.nicknames[tubid]
|
||||
f.trap(DeadReferenceError)
|
||||
|
||||
def log_client_error(self, f, tubid):
|
||||
log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
|
||||
level=log.UNUSUAL, failure=f)
|
||||
|
||||
def got_stats(self, stats, tubid, nickname):
|
||||
raise NotImplementedError()
|
||||
|
||||
class StdOutStatsGatherer(StatsGatherer):
|
||||
verbose = True
|
||||
def remote_provide(self, provider, nickname):
|
||||
tubid = self.get_tubid(provider)
|
||||
if self.verbose:
|
||||
print('connect "%s" [%s]' % (nickname, tubid))
|
||||
provider.notifyOnDisconnect(self.announce_lost_client, tubid)
|
||||
StatsGatherer.remote_provide(self, provider, nickname)
|
||||
|
||||
def announce_lost_client(self, tubid):
|
||||
print('disconnect "%s" [%s]' % (self.nicknames[tubid], tubid))
|
||||
|
||||
def got_stats(self, stats, tubid, nickname):
|
||||
print('"%s" [%s]:' % (nickname, tubid))
|
||||
pprint.pprint(stats)
|
||||
|
||||
class JSONStatsGatherer(StdOutStatsGatherer):
|
||||
# inherit from StdOutStatsGatherer for connect/disconnect notifications
|
||||
|
||||
def __init__(self, basedir=u".", verbose=True):
|
||||
self.verbose = verbose
|
||||
StatsGatherer.__init__(self, basedir)
|
||||
self.jsonfile = os.path.join(basedir, "stats.json")
|
||||
|
||||
if os.path.exists(self.jsonfile):
|
||||
try:
|
||||
with open(self.jsonfile, 'rb') as f:
|
||||
self.gathered_stats = json.load(f)
|
||||
except Exception:
|
||||
print("Error while attempting to load stats file %s.\n"
|
||||
"You may need to restore this file from a backup,"
|
||||
" or delete it if no backup is available.\n" %
|
||||
quote_local_unicode_path(self.jsonfile))
|
||||
raise
|
||||
else:
|
||||
self.gathered_stats = {}
|
||||
|
||||
def got_stats(self, stats, tubid, nickname):
|
||||
s = self.gathered_stats.setdefault(tubid, {})
|
||||
s['timestamp'] = time.time()
|
||||
s['nickname'] = nickname
|
||||
s['stats'] = stats
|
||||
self.dump_json()
|
||||
|
||||
def dump_json(self):
|
||||
tmp = "%s.tmp" % (self.jsonfile,)
|
||||
with open(tmp, 'wb') as f:
|
||||
json.dump(self.gathered_stats, f)
|
||||
if os.path.exists(self.jsonfile):
|
||||
os.unlink(self.jsonfile)
|
||||
os.rename(tmp, self.jsonfile)
|
||||
|
||||
class StatsGathererService(service.MultiService):
|
||||
furl_file = "stats_gatherer.furl"
|
||||
|
||||
def __init__(self, basedir=".", verbose=False):
|
||||
service.MultiService.__init__(self)
|
||||
self.basedir = basedir
|
||||
self.tub = Tub(certFile=os.path.join(self.basedir,
|
||||
"stats_gatherer.pem"))
|
||||
self.tub.setServiceParent(self)
|
||||
self.tub.setOption("logLocalFailures", True)
|
||||
self.tub.setOption("logRemoteFailures", True)
|
||||
self.tub.setOption("expose-remote-exception-types", False)
|
||||
|
||||
self.stats_gatherer = JSONStatsGatherer(self.basedir, verbose)
|
||||
self.stats_gatherer.setServiceParent(self)
|
||||
|
||||
try:
|
||||
with open(os.path.join(self.basedir, "location")) as f:
|
||||
location = f.read().strip()
|
||||
except EnvironmentError:
|
||||
raise ValueError("Unable to find 'location' in BASEDIR, please rebuild your stats-gatherer")
|
||||
try:
|
||||
with open(os.path.join(self.basedir, "port")) as f:
|
||||
port = f.read().strip()
|
||||
except EnvironmentError:
|
||||
raise ValueError("Unable to find 'port' in BASEDIR, please rebuild your stats-gatherer")
|
||||
|
||||
self.tub.listenOn(port)
|
||||
self.tub.setLocation(location)
|
||||
ff = os.path.join(self.basedir, self.furl_file)
|
||||
self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,
|
||||
furlFile=ff)
|
||||
|
@ -142,9 +142,8 @@ class BinTahoe(common_util.SignalMixin, unittest.TestCase, RunBinTahoeMixin):
|
||||
|
||||
|
||||
class CreateNode(unittest.TestCase):
|
||||
# exercise "tahoe create-node", create-introducer,
|
||||
# create-key-generator, and create-stats-gatherer, by calling the
|
||||
# corresponding code as a subroutine.
|
||||
# exercise "tahoe create-node", create-introducer, and
|
||||
# create-key-generator by calling the corresponding code as a subroutine.
|
||||
|
||||
def workdir(self, name):
|
||||
basedir = os.path.join("test_runner", "CreateNode", name)
|
||||
@ -243,48 +242,11 @@ class CreateNode(unittest.TestCase):
|
||||
def test_introducer(self):
|
||||
self.do_create("introducer", "--hostname=127.0.0.1")
|
||||
|
||||
def test_stats_gatherer(self):
|
||||
self.do_create("stats-gatherer", "--hostname=127.0.0.1")
|
||||
|
||||
def test_subcommands(self):
|
||||
# no arguments should trigger a command listing, via UsageError
|
||||
self.failUnlessRaises(usage.UsageError, parse_cli,
|
||||
)
|
||||
|
||||
@inlineCallbacks
|
||||
def test_stats_gatherer_good_args(self):
|
||||
rc,out,err = yield run_cli("create-stats-gatherer", "--hostname=foo",
|
||||
self.mktemp())
|
||||
self.assertEqual(rc, 0)
|
||||
rc,out,err = yield run_cli("create-stats-gatherer",
|
||||
"--location=tcp:foo:1234",
|
||||
"--port=tcp:1234", self.mktemp())
|
||||
self.assertEqual(rc, 0)
|
||||
|
||||
|
||||
def test_stats_gatherer_bad_args(self):
|
||||
def _test(args):
|
||||
argv = args.split()
|
||||
self.assertRaises(usage.UsageError, parse_cli, *argv)
|
||||
|
||||
# missing hostname/location/port
|
||||
_test("create-stats-gatherer D")
|
||||
|
||||
# missing port
|
||||
_test("create-stats-gatherer --location=foo D")
|
||||
|
||||
# missing location
|
||||
_test("create-stats-gatherer --port=foo D")
|
||||
|
||||
# can't provide both
|
||||
_test("create-stats-gatherer --hostname=foo --port=foo D")
|
||||
|
||||
# can't provide both
|
||||
_test("create-stats-gatherer --hostname=foo --location=foo D")
|
||||
|
||||
# can't provide all three
|
||||
_test("create-stats-gatherer --hostname=foo --location=foo --port=foo D")
|
||||
|
||||
|
||||
class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin,
|
||||
RunBinTahoeMixin):
|
||||
|
@ -23,7 +23,6 @@ from allmydata.util import log, base32
|
||||
from allmydata.util.encodingutil import quote_output, unicode_to_argv
|
||||
from allmydata.util.fileutil import abspath_expanduser_unicode
|
||||
from allmydata.util.consumer import MemoryConsumer, download_to_data
|
||||
from allmydata.stats import StatsGathererService
|
||||
from allmydata.interfaces import IDirectoryNode, IFileNode, \
|
||||
NoSuchChildError, NoSharesError
|
||||
from allmydata.monitor import Monitor
|
||||
@ -667,9 +666,6 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
|
||||
self.sparent = service.MultiService()
|
||||
self.sparent.startService()
|
||||
|
||||
self.stats_gatherer = None
|
||||
self.stats_gatherer_furl = None
|
||||
|
||||
def tearDown(self):
|
||||
log.msg("shutting down SystemTest services")
|
||||
d = self.sparent.stopService()
|
||||
@ -713,7 +709,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
|
||||
return f.read().strip()
|
||||
|
||||
@inlineCallbacks
|
||||
def set_up_nodes(self, NUMCLIENTS=5, use_stats_gatherer=False):
|
||||
def set_up_nodes(self, NUMCLIENTS=5):
|
||||
"""
|
||||
Create an introducer and ``NUMCLIENTS`` client nodes pointed at it. All
|
||||
of the nodes are running in this process.
|
||||
@ -726,9 +722,6 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
|
||||
|
||||
:param int NUMCLIENTS: The number of client nodes to create.
|
||||
|
||||
:param bool use_stats_gatherer: If ``True`` then also create a stats
|
||||
gatherer and configure the other nodes to use it.
|
||||
|
||||
:return: A ``Deferred`` that fires when the nodes have connected to
|
||||
each other.
|
||||
"""
|
||||
@ -737,33 +730,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
|
||||
self.introducer = yield self._create_introducer()
|
||||
self.add_service(self.introducer)
|
||||
self.introweb_url = self._get_introducer_web()
|
||||
|
||||
if use_stats_gatherer:
|
||||
yield self._set_up_stats_gatherer()
|
||||
yield self._set_up_client_nodes()
|
||||
if use_stats_gatherer:
|
||||
yield self._grab_stats()
|
||||
|
||||
def _set_up_stats_gatherer(self):
|
||||
statsdir = self.getdir("stats_gatherer")
|
||||
fileutil.make_dirs(statsdir)
|
||||
|
||||
location_hint, port_endpoint = self.port_assigner.assign(reactor)
|
||||
fileutil.write(os.path.join(statsdir, "location"), location_hint)
|
||||
fileutil.write(os.path.join(statsdir, "port"), port_endpoint)
|
||||
self.stats_gatherer_svc = StatsGathererService(statsdir)
|
||||
self.stats_gatherer = self.stats_gatherer_svc.stats_gatherer
|
||||
self.stats_gatherer_svc.setServiceParent(self.sparent)
|
||||
|
||||
d = fireEventually()
|
||||
sgf = os.path.join(statsdir, 'stats_gatherer.furl')
|
||||
def check_for_furl():
|
||||
return os.path.exists(sgf)
|
||||
d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
|
||||
def get_furl(junk):
|
||||
self.stats_gatherer_furl = file(sgf, 'rb').read().strip()
|
||||
d.addCallback(get_furl)
|
||||
return d
|
||||
|
||||
@inlineCallbacks
|
||||
def _set_up_client_nodes(self):
|
||||
@ -833,15 +800,11 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
|
||||
value = value.encode("utf-8")
|
||||
config.setdefault(section, {})[feature] = value
|
||||
|
||||
setclient = partial(setconf, config, which, "client")
|
||||
setnode = partial(setconf, config, which, "node")
|
||||
sethelper = partial(setconf, config, which, "helper")
|
||||
|
||||
setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,))
|
||||
|
||||
if self.stats_gatherer_furl:
|
||||
setclient("stats_gatherer.furl", self.stats_gatherer_furl)
|
||||
|
||||
tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor)
|
||||
setnode("tub.port", tub_port_endpoint)
|
||||
setnode("tub.location", tub_location_hint)
|
||||
@ -872,10 +835,6 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
|
||||
fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
|
||||
return basedir
|
||||
|
||||
def _grab_stats(self):
|
||||
d = self.stats_gatherer.poll()
|
||||
return d
|
||||
|
||||
def bounce_client(self, num):
|
||||
c = self.clients[num]
|
||||
d = c.disownServiceParent()
|
||||
@ -1303,25 +1262,11 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
|
||||
d.addCallback(_upload_resumable)
|
||||
|
||||
def _grab_stats(ignored):
|
||||
# the StatsProvider doesn't normally publish a FURL:
|
||||
# instead it passes a live reference to the StatsGatherer
|
||||
# (if and when it connects). To exercise the remote stats
|
||||
# interface, we manually publish client0's StatsProvider
|
||||
# and use client1 to query it.
|
||||
sp = self.clients[0].stats_provider
|
||||
sp_furl = self.clients[0].tub.registerReference(sp)
|
||||
d = self.clients[1].tub.getReference(sp_furl)
|
||||
d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
|
||||
def _got_stats(stats):
|
||||
#print("STATS")
|
||||
#from pprint import pprint
|
||||
#pprint(stats)
|
||||
s = stats["stats"]
|
||||
self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
|
||||
c = stats["counters"]
|
||||
self.failUnless("storage_server.allocate" in c)
|
||||
d.addCallback(_got_stats)
|
||||
return d
|
||||
stats = self.clients[0].stats_provider.get_stats()
|
||||
s = stats["stats"]
|
||||
self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
|
||||
c = stats["counters"]
|
||||
self.failUnless("storage_server.allocate" in c)
|
||||
d.addCallback(_grab_stats)
|
||||
|
||||
return d
|
||||
@ -1629,7 +1574,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
|
||||
def test_filesystem(self):
|
||||
self.basedir = "system/SystemTest/test_filesystem"
|
||||
self.data = LARGE_DATA
|
||||
d = self.set_up_nodes(use_stats_gatherer=True)
|
||||
d = self.set_up_nodes()
|
||||
def _new_happy_semantics(ign):
|
||||
for c in self.clients:
|
||||
c.encoding_params['happy'] = 1
|
||||
|
@ -12,8 +12,6 @@
|
||||
<h2>General</h2>
|
||||
|
||||
<ul>
|
||||
<li>Load Average: <t:transparent t:render="load_average" /></li>
|
||||
<li>Peak Load: <t:transparent t:render="peak_load" /></li>
|
||||
<li>Files Uploaded (immutable): <t:transparent t:render="uploads" /></li>
|
||||
<li>Files Downloaded (immutable): <t:transparent t:render="downloads" /></li>
|
||||
<li>Files Published (mutable): <t:transparent t:render="publishes" /></li>
|
||||
|
@ -1565,14 +1565,6 @@ class StatisticsElement(Element):
|
||||
# Note that `counters` can be empty.
|
||||
self._stats = provider.get_stats()
|
||||
|
||||
@renderer
|
||||
def load_average(self, req, tag):
|
||||
return tag(str(self._stats["stats"].get("load_monitor.avg_load")))
|
||||
|
||||
@renderer
|
||||
def peak_load(self, req, tag):
|
||||
return tag(str(self._stats["stats"].get("load_monitor.max_load")))
|
||||
|
||||
@renderer
|
||||
def uploads(self, req, tag):
|
||||
files = self._stats["counters"].get("uploader.files_uploaded", 0)
|
||||
|
Loading…
x
Reference in New Issue
Block a user