388 lines
15 KiB
Python

# -*- python -*-
from __future__ import print_function
"""
Run this tool with twistd in its own directory, with a file named 'urls.txt'
describing which nodes to query. Make sure to copy diskwatcher.py into the
same directory. It will request disk-usage numbers from the nodes once per
hour (or slower), and store them in a local database. It will compute
usage-per-unit time values over several time ranges and make them available
through an HTTP query (using ./webport). It will also provide an estimate of
how much time is left before the grid's storage is exhausted.
There are munin plugins (named tahoe_doomsday and tahoe_diskusage) to graph
the values this tool computes.
Each line of urls.txt points to a single node. Each node should have its own
dedicated disk: if multiple nodes share a disk, only list one of them in
urls.txt (otherwise that space will be double-counted, confusing the
results). Each line should be in the form:
http://host:webport/statistics?t=json
"""
# TODO:
# built-in graphs on web interface
import os.path, urllib, time
from datetime import timedelta
from twisted.application import internet, service, strports
from twisted.web import server, resource, http, client
from twisted.internet import defer
from twisted.python import log
import json
from axiom.attributes import AND
from axiom.store import Store
from epsilon import extime
from diskwatcher import Sample
#from axiom.item import Item
#from axiom.attributes import text, integer, timestamp
#class Sample(Item):
# url = text()
# when = timestamp()
# used = integer()
# avail = integer()
#s = Store("history.axiom")
#ns = Store("new-history.axiom")
#for sa in s.query(Sample):
# diskwatcher.Sample(store=ns,
# url=sa.url, when=sa.when, used=sa.used, avail=sa.avail)
#print "done"
HOUR = 3600
DAY = 24*3600
WEEK = 7*DAY
MONTH = 30*DAY
YEAR = 365*DAY
class DiskWatcher(service.MultiService, resource.Resource):
POLL_INTERVAL = 1*HOUR
AVERAGES = {#"60s": 60,
#"5m": 5*60,
#"30m": 30*60,
"1hr": 1*HOUR,
"1day": 1*DAY,
"2wk": 2*WEEK,
"4wk": 4*WEEK,
}
def __init__(self):
assert os.path.exists("diskwatcher.tac") # run from the right directory
self.growth_cache = {}
service.MultiService.__init__(self)
resource.Resource.__init__(self)
self.store = Store("history.axiom")
self.store.whenFullyUpgraded().addCallback(self._upgrade_complete)
service.IService(self.store).setServiceParent(self) # let upgrader run
ts = internet.TimerService(self.POLL_INTERVAL, self.poll)
ts.setServiceParent(self)
def _upgrade_complete(self, ignored):
print("Axiom store upgrade complete")
def startService(self):
service.MultiService.startService(self)
try:
desired_webport = open("webport", "r").read().strip()
except EnvironmentError:
desired_webport = None
webport = desired_webport or "tcp:0"
root = self
serv = strports.service(webport, server.Site(root))
serv.setServiceParent(self)
if not desired_webport:
got_port = serv._port.getHost().port
open("webport", "w").write("tcp:%d\n" % got_port)
def get_urls(self):
for url in open("urls.txt","r").readlines():
if "#" in url:
url = url[:url.find("#")]
url = url.strip()
if not url:
continue
yield url
def poll(self):
log.msg("polling..")
#return self.poll_synchronous()
return self.poll_asynchronous()
def poll_asynchronous(self):
# this didn't actually seem to work any better than poll_synchronous:
# logs are more noisy, and I got frequent DNS failures. But with a
# lot of servers to query, this is probably the better way to go. A
# significant advantage of this approach is that we can use a
# timeout= argument to tolerate hanging servers.
dl = []
for url in self.get_urls():
when = extime.Time()
d = client.getPage(url, timeout=60)
d.addCallback(self.got_response, when, url)
dl.append(d)
d = defer.DeferredList(dl)
def _done(res):
fetched = len([1 for (success, value) in res if success])
log.msg("fetched %d of %d" % (fetched, len(dl)))
d.addCallback(_done)
return d
def poll_synchronous(self):
attempts = 0
fetched = 0
for url in self.get_urls():
attempts += 1
try:
when = extime.Time()
# if a server accepts the connection and then hangs, this
# will block forever
data_json = urllib.urlopen(url).read()
self.got_response(data_json, when, url)
fetched += 1
except:
log.msg("error while fetching: %s" % url)
log.err()
log.msg("fetched %d of %d" % (fetched, attempts))
def got_response(self, data_json, when, url):
data = json.loads(data_json)
total = data[u"stats"][u"storage_server.disk_total"]
used = data[u"stats"][u"storage_server.disk_used"]
avail = data[u"stats"][u"storage_server.disk_avail"]
print("%s : total=%s, used=%s, avail=%s" % (url,
total, used, avail))
Sample(store=self.store,
url=unicode(url), when=when, total=total, used=used, avail=avail)
def calculate_growth_timeleft(self):
timespans = []
total_avail_space = self.find_total_available_space()
pairs = [ (timespan,name)
for name,timespan in self.AVERAGES.items() ]
pairs.sort()
for (timespan,name) in pairs:
growth = self.growth(timespan)
print(name, total_avail_space, growth)
if growth is not None:
timeleft = None
if growth > 0:
timeleft = total_avail_space / growth
timespans.append( (name, timespan, growth, timeleft) )
return timespans
def find_total_space(self):
# this returns the sum of disk-avail stats for all servers that 1)
# are listed in urls.txt and 2) have responded recently.
now = extime.Time()
recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
total_space = 0
for url in self.get_urls():
url = unicode(url)
latest = list(self.store.query(Sample,
AND(Sample.url == url,
Sample.when > recent),
sort=Sample.when.descending,
limit=1))
if latest:
total_space += latest[0].total
return total_space
def find_total_available_space(self):
# this returns the sum of disk-avail stats for all servers that 1)
# are listed in urls.txt and 2) have responded recently.
now = extime.Time()
recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
total_avail_space = 0
for url in self.get_urls():
url = unicode(url)
latest = list(self.store.query(Sample,
AND(Sample.url == url,
Sample.when > recent),
sort=Sample.when.descending,
limit=1))
if latest:
total_avail_space += latest[0].avail
return total_avail_space
def find_total_used_space(self):
# this returns the sum of disk-used stats for all servers that 1) are
# listed in urls.txt and 2) have responded recently.
now = extime.Time()
recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
total_used_space = 0
for url in self.get_urls():
url = unicode(url)
latest = list(self.store.query(Sample,
AND(Sample.url == url,
Sample.when > recent),
sort=Sample.when.descending,
limit=1))
if latest:
total_used_space += latest[0].used
return total_used_space
def growth(self, timespan):
"""Calculate the bytes-per-second growth of the total disk-used stat,
over a period of TIMESPAN seconds (i.e. between the most recent
sample and the latest one that's at least TIMESPAN seconds ago),
summed over all nodes which 1) are listed in urls.txt, 2) have
responded recently, and 3) have a response at least as old as
TIMESPAN. If there are no nodes which meet these criteria, we'll
return None; this is likely to happen for the longer timespans (4wk)
until the gatherer has been running and collecting data for that
long."""
# a note about workload: for our oldest storage servers, as of
# 25-Jan-2009, the first DB query here takes about 40ms per server
# URL (some take as little as 10ms). There are about 110 servers, and
# two queries each, so the growth() function takes about 7s to run
# for each timespan. We track 4 timespans, and find_total_*_space()
# takes about 2.3s to run, so calculate_growth_timeleft() takes about
# 27s. Each HTTP query thus takes 27s, and we have six munin plugins
# which perform HTTP queries every 5 minutes. By adding growth_cache(),
# I hope to reduce this: the first HTTP query will still take 27s,
# but the subsequent five should be about 2.3s each.
# we're allowed to cache this value for 3 minutes
if timespan in self.growth_cache:
(when, value) = self.growth_cache[timespan]
if time.time() - when < 3*60:
return value
td = timedelta(seconds=timespan)
now = extime.Time()
then = now - td
recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
total_growth = 0.0
num_nodes = 0
for url in self.get_urls():
url = unicode(url)
latest = list(self.store.query(Sample,
AND(Sample.url == url,
Sample.when > recent),
sort=Sample.when.descending,
limit=1))
if not latest:
#print "no latest sample from", url
continue # skip this node
latest = latest[0]
old = list(self.store.query(Sample,
AND(Sample.url == url,
Sample.when < then),
sort=Sample.when.descending,
limit=1))
if not old:
#print "no old sample from", url
continue # skip this node
old = old[0]
duration = latest.when.asPOSIXTimestamp() - old.when.asPOSIXTimestamp()
if not duration:
print("only one sample from", url)
continue
rate = float(latest.used - old.used) / duration
#print url, rate
total_growth += rate
num_nodes += 1
if not num_nodes:
return None
self.growth_cache[timespan] = (time.time(), total_growth)
return total_growth
def getChild(self, path, req):
if path == "":
return self
return resource.Resource.getChild(self, path, req)
def abbreviate_time(self, s):
def _plural(count, unit):
count = int(count)
if count == 1:
return "%d %s" % (count, unit)
return "%d %ss" % (count, unit)
if s is None:
return "unknown"
if s < 120:
return _plural(s, "second")
if s < 3*HOUR:
return _plural(s/60, "minute")
if s < 2*DAY:
return _plural(s/HOUR, "hour")
if s < 2*MONTH:
return _plural(s/DAY, "day")
if s < 4*YEAR:
return _plural(s/MONTH, "month")
return _plural(s/YEAR, "year")
def abbreviate_space2(self, s, SI=True):
if s is None:
return "unknown"
if SI:
U = 1000.0
isuffix = "B"
else:
U = 1024.0
isuffix = "iB"
def r(count, suffix):
return "%.2f %s%s" % (count, suffix, isuffix)
if s < 1024: # 1000-1023 get emitted as bytes, even in SI mode
return r(s, "")
if s < U*U:
return r(s/U, "k")
if s < U*U*U:
return r(s/(U*U), "M")
if s < U*U*U*U:
return r(s/(U*U*U), "G")
if s < U*U*U*U*U:
return r(s/(U*U*U*U), "T")
return r(s/(U*U*U*U*U), "P")
def abbreviate_space(self, s):
return "(%s, %s)" % (self.abbreviate_space2(s, True),
self.abbreviate_space2(s, False))
def render(self, req):
t = req.args.get("t", ["html"])[0]
ctype = "text/plain"
data = ""
if t == "html":
data = ""
for (name, timespan, growth, timeleft) in self.calculate_growth_timeleft():
data += "%f bytes per second (%sps), %s remaining (over %s)\n" % \
(growth, self.abbreviate_space2(growth, True),
self.abbreviate_time(timeleft), name)
used = self.find_total_used_space()
data += "total used: %d bytes %s\n" % (used,
self.abbreviate_space(used))
total = self.find_total_space()
data += "total space: %d bytes %s\n" % (total,
self.abbreviate_space(total))
elif t == "json":
current = {"rates": self.calculate_growth_timeleft(),
"total": self.find_total_space(),
"used": self.find_total_used_space(),
"available": self.find_total_available_space(),
}
data = json.dumps(current, indent=True)
else:
req.setResponseCode(http.BAD_REQUEST)
data = "Unknown t= %s\n" % t
req.setHeader("content-type", ctype)
return data
application = service.Application("disk-watcher")
DiskWatcher().setServiceParent(application)