disk-watcher: first draft of a daemon to use the HTTP stats interface and its new storage_server.disk_avail feature, to track changes in disk space over time

This commit is contained in:
Brian Warner 2008-08-06 21:22:22 -07:00
parent 4863b17371
commit 46d0c9c995
2 changed files with 270 additions and 0 deletions

View File

@ -0,0 +1,14 @@
# put this definition in a separate file, because axiom uses the
# fully-qualified classname as a database table name, so __builtin__ is kinda
# ugly.
from axiom.item import Item
from axiom.attributes import text, integer, timestamp
class Sample(Item):
url = text()
when = timestamp(indexed=True)
used = integer()
avail = integer()

View File

@ -0,0 +1,256 @@
# -*- python -*-
"""
Run this tool with twistd in its own directory, with a file named 'urls.txt'
describing which nodes to query. It will request disk-usage numbers from the
nodes once per hour (or slower), and store them in a local database. It will
compute usage-per-unit time values over several time ranges and make them
available through an HTTP query (using ./webport). It will also provide an
estimate of how much time is left before the grid's storage is exhausted.
Each line of urls.txt points to a single node. Each node should have its own
dedicated disk: if multiple nodes share a disk, only list one of them in
urls.txt (otherwise that space will be double-counted, confusing the
results). Each line should be in the form:
http://host:webport/statistics?t=json
"""
# TODO:
# built-in graphs on web interface
import os.path, pprint, time, urllib
from datetime import timedelta
from twisted.application import internet, service, strports
from twisted.web import server, resource, http
from twisted.python import log
import simplejson
from axiom.attributes import AND
from axiom.store import Store
from epsilon import extime
from diskwatcher import Sample
#from axiom.item import Item
#from axiom.attributes import text, integer, timestamp
#class Sample(Item):
# url = text()
# when = timestamp()
# used = integer()
# avail = integer()
#s = Store("history.axiom")
#ns = Store("new-history.axiom")
#for sa in s.query(Sample):
# diskwatcher.Sample(store=ns,
# url=sa.url, when=sa.when, used=sa.used, avail=sa.avail)
#print "done"
HOUR = 3600
DAY = 24*3600
WEEK = 7*DAY
MONTH = 30*DAY
YEAR = 365*DAY
class DiskWatcher(service.MultiService, resource.Resource):
POLL_INTERVAL = 60#1*HOUR
AVERAGES = {"60s": 60,
"5m": 5*60,
"30m": 30*60,
"1hr": 1*HOUR,
"1day": 1*DAY,
"2wk": 2*WEEK,
"4wk": 4*WEEK,
}
def __init__(self):
assert os.path.exists("diskwatcher.tac") # run from the right directory
service.MultiService.__init__(self)
resource.Resource.__init__(self)
self.store = Store("history.axiom")
ts = internet.TimerService(self.POLL_INTERVAL, self.poll)
ts.setServiceParent(self)
def startService(self):
service.MultiService.startService(self)
try:
desired_webport = open("webport", "r").read().strip()
except EnvironmentError:
desired_webport = None
webport = desired_webport or "tcp:0"
root = self
serv = strports.service(webport, server.Site(root))
serv.setServiceParent(self)
if not desired_webport:
got_port = serv._port.getHost().port
open("webport", "w").write("tcp:%d\n" % got_port)
def get_urls(self):
for url in open("urls.txt","r").readlines():
if "#" in url:
url = url[:url.find("#")]
url = url.strip()
if not url:
continue
yield url
def poll(self):
log.msg("polling..")
attempts = 0
fetched = 0
for url in self.get_urls():
attempts += 1
try:
when = extime.Time()
data = simplejson.load(urllib.urlopen(url))
total = data[u"stats"][u"storage_server.disk_total"]
used = data[u"stats"][u"storage_server.disk_used"]
avail = data[u"stats"][u"storage_server.disk_avail"]
#print "%s : total=%s, used=%s, avail=%s" % (url,
# total, used, avail)
s = Sample(store=self.store,
url=unicode(url), when=when, used=used, avail=avail)
fetched += 1
except:
log.msg("error while fetching: %s" % url)
log.err()
log.msg("fetched %d of %d" % (fetched, attempts))
def calculate(self):
timespans = []
total_avail_space = self.find_total_avail_space()
pairs = [ (timespan,name)
for name,timespan in self.AVERAGES.items() ]
pairs.sort()
for (timespan,name) in pairs:
growth = self.growth(timespan)
print name, total_avail_space, growth
if growth is not None:
try:
timeleft = total_avail_space / growth
except ZeroDivisionError:
timeleft = None
timespans.append( (name, timespan, growth, timeleft) )
return timespans
def find_total_avail_space(self):
# this returns the sum of disk-avail stats for all servers that 1)
# are listed in urls.txt and 2) have responded recently.
now = extime.Time()
recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
total_avail_space = 0
for url in self.get_urls():
url = unicode(url)
latest = list(self.store.query(Sample,
AND(Sample.url == url,
Sample.when > recent),
sort=Sample.when.descending,
limit=1))
if latest:
total_avail_space += latest[0].avail
return total_avail_space
def growth(self, timespan):
"""Calculate the bytes-per-second growth of the total disk-used stat,
over a period of TIMESPAN seconds (i.e. between the most recent
sample and the latest one that's at least TIMESPAN seconds ago),
summed over all nodes which 1) are listed in urls.txt, 2) have
responded recently, and 3) have a response at least as old as
TIMESPAN. If there are no nodes which meet these criteria, we'll
return None; this is likely to happen for the longer timespans (4wk)
until the gatherer has been running and collecting data for that
long."""
td = timedelta(seconds=timespan)
now = extime.Time()
then = now - td
recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
total_growth = 0.0
num_nodes = 0
for url in self.get_urls():
url = unicode(url)
latest = list(self.store.query(Sample,
AND(Sample.url == url,
Sample.when > recent),
sort=Sample.when.descending,
limit=1))
if not latest:
#print "no latest sample from", url
continue # skip this node
latest = latest[0]
old = list(self.store.query(Sample,
AND(Sample.url == url,
Sample.when < then),
sort=Sample.when.descending,
limit=1))
if not old:
#print "no old sample from", url
continue # skip this node
old = old[0]
duration = latest.when.asPOSIXTimestamp() - old.when.asPOSIXTimestamp()
if not duration:
print "only one sample from", url
continue
rate = float(latest.used - old.used) / duration
#print url, rate
total_growth += rate
num_nodes += 1
if not num_nodes:
return None
return total_growth
def getChild(self, path, req):
if path == "":
return self
return resource.Resource.getChild(self, path, req)
def abbreviate_time(self, s):
def _plural(count, unit):
count = int(count)
if count == 1:
return "%d %s" % (count, unit)
return "%d %ss" % (count, unit)
if s is None:
return "unknown"
if s < 120:
return _plural(s, "second")
if s < 3*HOUR:
return _plural(s/60, "minute")
if s < 2*DAY:
return _plural(s/HOUR, "hour")
if s < 2*MONTH:
return _plural(s/DAY, "day")
if s < 4*YEAR:
return _plural(s/MONTH, "month")
return _plural(s/YEAR, "year")
def render(self, req):
t = req.args.get("t", ["html"])[0]
ctype = "text/plain"
data = ""
if t == "html":
data = ""
for (name, timespan, growth, timeleft) in self.calculate():
data += "%f bytes per second, %s remaining (over %s)\n" % \
(growth, self.abbreviate_time(timeleft), name)
elif t == "json":
current = self.calculate()
#data = str(current) + "\n" # isn't that convenient? almost.
data = simplejson.dumps(current, indent=True)
else:
req.setResponseCode(http.BAD_REQUEST)
data = "Unknown t= %s\n" % t
req.setHeader("content-type", ctype)
return data
application = service.Application("disk-watcher")
DiskWatcher().setServiceParent(application)