Don't implement the stats gatherer or support configuring or talking to one

This commit is contained in:
Jean-Paul Calderone 2020-12-09 10:34:16 -05:00
parent 3fd1b336b4
commit d7ec5a19be
7 changed files with 10 additions and 385 deletions

View File

@ -3,7 +3,6 @@ from past.builtins import unicode
import os, stat, time, weakref
from base64 import urlsafe_b64encode
from functools import partial
# On Python 2 this will be the backported package:
from configparser import NoSectionError
@ -85,7 +84,6 @@ _client_config = configutil.ValidConfiguration(
"shares.happy",
"shares.needed",
"shares.total",
"stats_gatherer.furl",
"storage.plugins",
),
"ftpd": (
@ -678,11 +676,7 @@ class _Client(node.Node, pollmixin.PollMixin):
self.init_web(webport) # strports string
def init_stats_provider(self):
gatherer_furl = self.config.get_config("client", "stats_gatherer.furl", None)
if gatherer_furl:
# FURLs should be bytes:
gatherer_furl = gatherer_furl.encode("utf-8")
self.stats_provider = StatsProvider(self, gatherer_furl)
self.stats_provider = StatsProvider(self)
self.stats_provider.setServiceParent(self)
self.stats_provider.register_producer(self)

View File

@ -2931,38 +2931,6 @@ class RIHelper(RemoteInterface):
return (UploadResults, ChoiceOf(RICHKUploadHelper, None))
class RIStatsProvider(RemoteInterface):
__remote_name__ = native_str("RIStatsProvider.tahoe.allmydata.com")
"""
Provides access to statistics and monitoring information.
"""
def get_stats():
"""
returns a dictionary containing 'counters' and 'stats', each a
dictionary with string counter/stat name keys, and numeric or None values.
counters are monotonically increasing measures of work done, and
stats are instantaneous measures (potentially time averaged
internally)
"""
return DictOf(bytes, DictOf(bytes, ChoiceOf(float, int, long, None)))
class RIStatsGatherer(RemoteInterface):
__remote_name__ = native_str("RIStatsGatherer.tahoe.allmydata.com")
"""
Provides a monitoring service for centralised collection of stats
"""
def provide(provider=RIStatsProvider, nickname=bytes):
"""
@param provider: a stats collector instance that should be polled
periodically by the gatherer to collect stats.
@param nickname: a name useful to identify the provided client
"""
return None
class IStatsProducer(Interface):
def get_stats():
"""

View File

@ -318,7 +318,6 @@ def write_client_config(c, config):
c.write("[client]\n")
c.write("helper.furl =\n")
c.write("#stats_gatherer.furl =\n")
c.write("\n")
c.write("# Encoding parameters this client will use for newly-uploaded files\n")
c.write("# This can be changed at any time: the encoding is saved in\n")

View File

@ -47,8 +47,8 @@ def get_pid_from_pidfile(pidfile):
def identify_node_type(basedir):
"""
:return unicode: None or one of: 'client', 'introducer',
'key-generator' or 'stats-gatherer'
:return unicode: None or one of: 'client', 'introducer', or
'key-generator'
"""
tac = u''
try:
@ -59,7 +59,7 @@ def identify_node_type(basedir):
except OSError:
return None
for t in (u"client", u"introducer", u"key-generator", u"stats-gatherer"):
for t in (u"client", u"introducer", u"key-generator"):
if t in tac:
return t
return None
@ -135,7 +135,6 @@ class DaemonizeTheRealService(Service, HookMixin):
node_to_instance = {
u"client": lambda: maybeDeferred(namedAny("allmydata.client.create_client"), self.basedir),
u"introducer": lambda: maybeDeferred(namedAny("allmydata.introducer.server.create_introducer"), self.basedir),
u"stats-gatherer": lambda: maybeDeferred(namedAny("allmydata.stats.StatsGathererService"), read_config(self.basedir, None), self.basedir, verbose=True),
u"key-generator": key_generator_removed,
}

View File

@ -9,7 +9,7 @@ from twisted.internet import defer, task, threads
from allmydata.scripts.common import get_default_nodedir
from allmydata.scripts import debug, create_node, cli, \
stats_gatherer, admin, tahoe_daemonize, tahoe_start, \
admin, tahoe_daemonize, tahoe_start, \
tahoe_stop, tahoe_restart, tahoe_run, tahoe_invite
from allmydata.util.encodingutil import quote_output, quote_local_unicode_path, get_io_encoding
from allmydata.util.eliotutil import (
@ -60,7 +60,6 @@ class Options(usage.Options):
stderr = sys.stderr
subCommands = ( create_node.subCommands
+ stats_gatherer.subCommands
+ admin.subCommands
+ process_control_commands
+ debug.subCommands
@ -107,7 +106,7 @@ class Options(usage.Options):
create_dispatch = {}
for module in (create_node, stats_gatherer):
for module in (create_node,):
create_dispatch.update(module.dispatch)
def parse_options(argv, config=None):

View File

@ -1,103 +0,0 @@
from __future__ import print_function
import os
# Python 2 compatibility
from future.utils import PY2
if PY2:
from future.builtins import str # noqa: F401
from twisted.python import usage
from allmydata.scripts.common import NoDefaultBasedirOptions
from allmydata.scripts.create_node import write_tac
from allmydata.util.assertutil import precondition
from allmydata.util.encodingutil import listdir_unicode, quote_output
from allmydata.util import fileutil, iputil
class CreateStatsGathererOptions(NoDefaultBasedirOptions):
subcommand_name = "create-stats-gatherer"
optParameters = [
("hostname", None, None, "Hostname of this machine, used to build location"),
("location", None, None, "FURL connection hints, e.g. 'tcp:HOSTNAME:PORT'"),
("port", None, None, "listening endpoint, e.g. 'tcp:PORT'"),
]
def postOptions(self):
if self["hostname"] and (not self["location"]) and (not self["port"]):
pass
elif (not self["hostname"]) and self["location"] and self["port"]:
pass
else:
raise usage.UsageError("You must provide --hostname, or --location and --port.")
description = """
Create a "stats-gatherer" service, which is a standalone process that
collects and stores runtime statistics from many server nodes. This is a
tool for operations personnel to keep track of free disk space, server
load, and protocol activity, across a fleet of Tahoe storage servers.
The "stats-gatherer" listens on a TCP port and publishes a Foolscap FURL
by writing it into a file named "stats_gatherer.furl". You must copy this
FURL into the servers' tahoe.cfg, as the [client] stats_gatherer.furl=
entry. Those servers will then establish a connection to the
stats-gatherer and publish their statistics on a periodic basis. The
gatherer writes a summary JSON file out to disk after each update.
The stats-gatherer listens on a configurable port, and writes a
configurable hostname+port pair into the FURL that it publishes. There
are two configuration modes you can use.
* In the first, you provide --hostname=, and the service chooses its own
TCP port number. If the host is named "example.org" and you provide
--hostname=example.org, the node will pick a port number (e.g. 12345)
and use location="tcp:example.org:12345" and port="tcp:12345".
* In the second, you provide both --location= and --port=, and the
service will refrain from doing any allocation of its own. --location=
must be a Foolscap "FURL connection hint sequence", which is a
comma-separated list of "tcp:HOSTNAME:PORTNUM" strings. --port= must be
a Twisted server endpoint specification, which is generally
"tcp:PORTNUM". So, if your host is named "example.org" and you want to
use port 6789, you should provide --location=tcp:example.org:6789 and
--port=tcp:6789. You are responsible for making sure --location= and
--port= match each other.
"""
def create_stats_gatherer(config):
err = config.stderr
basedir = config['basedir']
# This should always be called with an absolute Unicode basedir.
precondition(isinstance(basedir, str), basedir)
if os.path.exists(basedir):
if listdir_unicode(basedir):
print("The base directory %s is not empty." % quote_output(basedir), file=err)
print("To avoid clobbering anything, I am going to quit now.", file=err)
print("Please use a different directory, or empty this one.", file=err)
return -1
# we're willing to use an empty directory
else:
os.mkdir(basedir)
write_tac(basedir, "stats-gatherer")
if config["hostname"]:
portnum = iputil.allocate_tcp_port()
location = "tcp:%s:%d" % (config["hostname"], portnum)
port = "tcp:%d" % portnum
else:
location = config["location"]
port = config["port"]
fileutil.write(os.path.join(basedir, "location"), location+"\n")
fileutil.write(os.path.join(basedir, "port"), port+"\n")
return 0
subCommands = [
["create-stats-gatherer", None, CreateStatsGathererOptions, "Create a stats-gatherer service."],
]
dispatch = {
"create-stats-gatherer": create_stats_gatherer,
}

View File

@ -1,79 +1,19 @@
from __future__ import print_function
import json
import os
import pprint
import time
from collections import deque
# Python 2 compatibility
from future.utils import PY2
if PY2:
from future.builtins import str # noqa: F401
from twisted.internet import reactor
from twisted.application import service
from twisted.application.internet import TimerService
from zope.interface import implementer
from foolscap.api import eventually, DeadReferenceError, Referenceable, Tub
from foolscap.api import eventually
from allmydata.util import log
from allmydata.util.encodingutil import quote_local_unicode_path
from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
@implementer(IStatsProducer)
class LoadMonitor(service.MultiService):
loop_interval = 1
num_samples = 60
def __init__(self, provider, warn_if_delay_exceeds=1):
service.MultiService.__init__(self)
self.provider = provider
self.warn_if_delay_exceeds = warn_if_delay_exceeds
self.started = False
self.last = None
self.stats = deque()
self.timer = None
def startService(self):
if not self.started:
self.started = True
self.timer = reactor.callLater(self.loop_interval, self.loop)
service.MultiService.startService(self)
def stopService(self):
self.started = False
if self.timer:
self.timer.cancel()
self.timer = None
return service.MultiService.stopService(self)
def loop(self):
self.timer = None
if not self.started:
return
now = time.time()
if self.last is not None:
delay = now - self.last - self.loop_interval
if delay > self.warn_if_delay_exceeds:
log.msg(format='excessive reactor delay (%ss)', args=(delay,),
level=log.UNUSUAL)
self.stats.append(delay)
while len(self.stats) > self.num_samples:
self.stats.popleft()
self.last = now
self.timer = reactor.callLater(self.loop_interval, self.loop)
def get_stats(self):
if self.stats:
avg = sum(self.stats) / len(self.stats)
m_x = max(self.stats)
else:
avg = m_x = 0
return { 'load_monitor.avg_load': avg,
'load_monitor.max_load': m_x, }
from allmydata.interfaces import IStatsProducer
@implementer(IStatsProducer)
class CPUUsageMonitor(service.MultiService):
@ -128,37 +68,18 @@ class CPUUsageMonitor(service.MultiService):
return s
@implementer(RIStatsProvider)
class StatsProvider(Referenceable, service.MultiService):
class StatsProvider(service.MultiService):
def __init__(self, node, gatherer_furl):
def __init__(self, node):
service.MultiService.__init__(self)
self.node = node
self.gatherer_furl = gatherer_furl # might be None
self.counters = {}
self.stats_producers = []
# only run the LoadMonitor (which submits a timer every second) if
# there is a gatherer who is going to be paying attention. Our stats
# are visible through HTTP even without a gatherer, so run the rest
# of the stats (including the once-per-minute CPUUsageMonitor)
if gatherer_furl:
self.load_monitor = LoadMonitor(self)
self.load_monitor.setServiceParent(self)
self.register_producer(self.load_monitor)
self.cpu_monitor = CPUUsageMonitor()
self.cpu_monitor.setServiceParent(self)
self.register_producer(self.cpu_monitor)
def startService(self):
if self.node and self.gatherer_furl:
nickname_utf8 = self.node.nickname.encode("utf-8")
self.node.tub.connectTo(self.gatherer_furl,
self._connected, nickname_utf8)
service.MultiService.startService(self)
def count(self, name, delta=1):
if isinstance(name, str):
name = name.encode("utf-8")
@ -175,155 +96,3 @@ class StatsProvider(Referenceable, service.MultiService):
ret = { 'counters': self.counters, 'stats': stats }
log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
return ret
def remote_get_stats(self):
# The remote API expects keys to be bytes:
def to_bytes(d):
result = {}
for (k, v) in d.items():
if isinstance(k, str):
k = k.encode("utf-8")
result[k] = v
return result
stats = self.get_stats()
return {b"counters": to_bytes(stats["counters"]),
b"stats": to_bytes(stats["stats"])}
def _connected(self, gatherer, nickname):
gatherer.callRemoteOnly('provide', self, nickname or '')
@implementer(RIStatsGatherer)
class StatsGatherer(Referenceable, service.MultiService):
poll_interval = 60
def __init__(self, basedir):
service.MultiService.__init__(self)
self.basedir = basedir
self.clients = {}
self.nicknames = {}
self.timer = TimerService(self.poll_interval, self.poll)
self.timer.setServiceParent(self)
def get_tubid(self, rref):
return rref.getRemoteTubID()
def remote_provide(self, provider, nickname):
tubid = self.get_tubid(provider)
if tubid == '<unauth>':
print("WARNING: failed to get tubid for %s (%s)" % (provider, nickname))
# don't add to clients to poll (polluting data) don't care about disconnect
return
self.clients[tubid] = provider
self.nicknames[tubid] = nickname
def poll(self):
for tubid,client in self.clients.items():
nickname = self.nicknames.get(tubid)
d = client.callRemote('get_stats')
d.addCallbacks(self.got_stats, self.lost_client,
callbackArgs=(tubid, nickname),
errbackArgs=(tubid,))
d.addErrback(self.log_client_error, tubid)
def lost_client(self, f, tubid):
# this is called lazily, when a get_stats request fails
del self.clients[tubid]
del self.nicknames[tubid]
f.trap(DeadReferenceError)
def log_client_error(self, f, tubid):
log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
level=log.UNUSUAL, failure=f)
def got_stats(self, stats, tubid, nickname):
raise NotImplementedError()
class StdOutStatsGatherer(StatsGatherer):
verbose = True
def remote_provide(self, provider, nickname):
tubid = self.get_tubid(provider)
if self.verbose:
print('connect "%s" [%s]' % (nickname, tubid))
provider.notifyOnDisconnect(self.announce_lost_client, tubid)
StatsGatherer.remote_provide(self, provider, nickname)
def announce_lost_client(self, tubid):
print('disconnect "%s" [%s]' % (self.nicknames[tubid], tubid))
def got_stats(self, stats, tubid, nickname):
print('"%s" [%s]:' % (nickname, tubid))
pprint.pprint(stats)
class JSONStatsGatherer(StdOutStatsGatherer):
# inherit from StdOutStatsGatherer for connect/disconnect notifications
def __init__(self, basedir=u".", verbose=True):
self.verbose = verbose
StatsGatherer.__init__(self, basedir)
self.jsonfile = os.path.join(basedir, "stats.json")
if os.path.exists(self.jsonfile):
try:
with open(self.jsonfile, 'rb') as f:
self.gathered_stats = json.load(f)
except Exception:
print("Error while attempting to load stats file %s.\n"
"You may need to restore this file from a backup,"
" or delete it if no backup is available.\n" %
quote_local_unicode_path(self.jsonfile))
raise
else:
self.gathered_stats = {}
def got_stats(self, stats, tubid, nickname):
s = self.gathered_stats.setdefault(tubid, {})
s['timestamp'] = time.time()
s['nickname'] = nickname
s['stats'] = stats
self.dump_json()
def dump_json(self):
tmp = "%s.tmp" % (self.jsonfile,)
with open(tmp, 'wb') as f:
json.dump(self.gathered_stats, f)
if os.path.exists(self.jsonfile):
os.unlink(self.jsonfile)
os.rename(tmp, self.jsonfile)
class StatsGathererService(service.MultiService):
furl_file = "stats_gatherer.furl"
def __init__(self, basedir=".", verbose=False):
service.MultiService.__init__(self)
self.basedir = basedir
self.tub = Tub(certFile=os.path.join(self.basedir,
"stats_gatherer.pem"))
self.tub.setServiceParent(self)
self.tub.setOption("logLocalFailures", True)
self.tub.setOption("logRemoteFailures", True)
self.tub.setOption("expose-remote-exception-types", False)
self.stats_gatherer = JSONStatsGatherer(self.basedir, verbose)
self.stats_gatherer.setServiceParent(self)
try:
with open(os.path.join(self.basedir, "location")) as f:
location = f.read().strip()
except EnvironmentError:
raise ValueError("Unable to find 'location' in BASEDIR, please rebuild your stats-gatherer")
try:
with open(os.path.join(self.basedir, "port")) as f:
port = f.read().strip()
except EnvironmentError:
raise ValueError("Unable to find 'port' in BASEDIR, please rebuild your stats-gatherer")
self.tub.listenOn(port)
self.tub.setLocation(location)
ff = os.path.join(self.basedir, self.furl_file)
self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,
furlFile=ff)