storage: add bucket-counting share crawler, add its output (number of files+directories maintained by a storage server) and status to the webapi /storage page

This commit is contained in:
Brian Warner 2009-02-20 21:04:08 -07:00
parent d14f00c537
commit b3cd4952bd
6 changed files with 280 additions and 49 deletions

View File

@ -98,7 +98,6 @@ class ShareCrawler(service.MultiService):
d = {}
if self.state["current-cycle"] is None:
assert self.sleeping_between_cycles
d["cycle-in-progress"] = False
d["next-crawl-time"] = self.next_wake_time
d["remaining-wait-time"] = self.next_wake_time - time.time()
@ -145,7 +144,7 @@ class ShareCrawler(service.MultiService):
except EnvironmentError:
state = {"version": 1,
"last-cycle-finished": None,
"current-cycle": 0,
"current-cycle": None,
"last-complete-prefix": None,
"last-complete-bucket": None,
}
@ -184,6 +183,11 @@ class ShareCrawler(service.MultiService):
def startService(self):
self.load_state()
# arrange things to look like we were just sleeping, so
# status/progress values work correctly
self.sleeping_between_cycles = True
self.current_sleep_time = 0
self.next_wake_time = time.time()
self.timer = reactor.callLater(0, self.start_slice)
service.MultiService.startService(self)
@ -195,10 +199,12 @@ class ShareCrawler(service.MultiService):
def start_slice(self):
self.timer = None
self.sleeping_between_cycles = False
self.current_sleep_time = None
self.next_wake_time = None
start_slice = time.time()
try:
s = self.last_complete_prefix_index
self.start_current_prefix(start_slice)
finished_cycle = True
except TimeSliceExceeded:
@ -229,14 +235,15 @@ class ShareCrawler(service.MultiService):
self.timer = reactor.callLater(sleep_time, self.start_slice)
def start_current_prefix(self, start_slice):
if self.state["current-cycle"] is None:
assert self.state["last-cycle-finished"] is not None
self.state["current-cycle"] = self.state["last-cycle-finished"] + 1
cycle = self.state["current-cycle"]
state = self.state
if state["current-cycle"] is None:
if state["last-cycle-finished"] is None:
state["current-cycle"] = 0
else:
state["current-cycle"] = state["last-cycle-finished"] + 1
cycle = state["current-cycle"]
for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
if time.time() > start_slice + self.cpu_slice:
raise TimeSliceExceeded()
# if we want to yield earlier, just raise TimeSliceExceeded()
prefix = self.prefixes[i]
prefixdir = os.path.join(self.sharedir, prefix)
@ -253,11 +260,13 @@ class ShareCrawler(service.MultiService):
buckets, start_slice)
self.last_complete_prefix_index = i
self.save_state()
if time.time() > start_slice + self.cpu_slice:
raise TimeSliceExceeded()
# yay! we finished the whole cycle
self.last_complete_prefix_index = -1
self.state["last-complete-bucket"] = None
self.state["last-cycle-finished"] = cycle
self.state["current-cycle"] = None
state["last-complete-bucket"] = None
state["last-cycle-finished"] = cycle
state["current-cycle"] = None
self.finished_cycle(cycle)
self.save_state()
@ -272,11 +281,14 @@ class ShareCrawler(service.MultiService):
for bucket in buckets:
if bucket <= self.state["last-complete-bucket"]:
continue
if time.time() > start_slice + self.cpu_slice:
raise TimeSliceExceeded()
self.process_bucket(cycle, prefix, prefixdir, bucket)
self.state["last-complete-bucket"] = bucket
# note: saving the state after every bucket is somewhat
# time-consuming, but lets us avoid losing more than one bucket's
# worth of progress.
self.save_state()
if time.time() > start_slice + self.cpu_slice:
raise TimeSliceExceeded()
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
"""Examine a single bucket. Subclasses should do whatever they want
@ -317,3 +329,56 @@ class ShareCrawler(service.MultiService):
"""
pass
class BucketCountingCrawler(ShareCrawler):
"""I keep track of how many buckets are being managed by this server.
This is equivalent to the number of distributed files and directories for
which I am providing storage. The actual number of files+directories in
the full grid is probably higher (especially when there are more servers
than 'N', the number of generated shares), because some files+directories
will have shares on other servers instead of me.
"""
minimum_cycle_time = 60*60 # we don't need this more than once an hour
def __init__(self, server, statefile, num_sample_prefixes=1):
ShareCrawler.__init__(self, server, statefile)
self.num_sample_prefixes = num_sample_prefixes
def add_initial_state(self):
# ["share-counts"][cyclenum][prefix] = number
# ["last-complete-cycle"] = cyclenum # maintained by base class
# ["last-complete-share-count"] = number
# ["storage-index-samples"][prefix] = (cyclenum,
# list of SI strings (base32))
self.state.setdefault("share-counts", {})
self.state.setdefault("last-complete-share-count", None)
self.state.setdefault("storage-index-samples", {})
def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
# we override process_prefixdir() because we don't want to look at
# the individual buckets. We'll save state after each one. On my
# laptop, a mostly-empty storage server can process about 70
# prefixdirs in a 1.0s slice.
if cycle not in self.state["share-counts"]:
self.state["share-counts"][cycle] = {}
self.state["share-counts"][cycle][prefix] = len(buckets)
if prefix in self.prefixes[:self.num_sample_prefixes]:
self.state["storage-index-samples"][prefix] = (cycle, buckets)
def finished_cycle(self, cycle):
last_counts = self.state["share-counts"].get(cycle, [])
if len(last_counts) == len(self.prefixes):
# great, we have a whole cycle.
num_buckets = sum(last_counts.values())
self.state["last-complete-share-count"] = (cycle, num_buckets)
# get rid of old counts
for old_cycle in list(self.state["share-counts"].keys()):
if old_cycle != cycle:
del self.state["share-counts"][old_cycle]
# get rid of old samples too
for prefix in list(self.state["storage-index-samples"].keys()):
old_cycle,buckets = self.state["storage-index-samples"][prefix]
if old_cycle != cycle:
del self.state["storage-index-samples"][prefix]

View File

@ -14,6 +14,7 @@ from allmydata.storage.lease import LeaseInfo
from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
create_mutable_sharefile
from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
from allmydata.storage.crawler import BucketCountingCrawler
# storage/
# storage/shares/incoming
@ -77,6 +78,10 @@ class StorageServer(service.MultiService, Referenceable):
"cancel": [],
}
statefile = os.path.join(storedir, "bucket_counter.state")
self.bucket_counter = BucketCountingCrawler(self, statefile)
self.bucket_counter.setServiceParent(self)
def count(self, name, delta=1):
if self.stats_provider:
self.stats_provider.count("storage_server." + name, delta)

View File

@ -146,6 +146,15 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
c = BucketEnumeratingCrawler(ss, statefile)
c.setServiceParent(self.s)
# it should be legal to call get_state() and get_progress() right
# away, even before the first tick is performed. No work should have
# been done yet.
s = c.get_state()
p = c.get_progress()
self.failUnlessEqual(s["last-complete-prefix"], None)
self.failUnlessEqual(s["current-cycle"], None)
self.failUnlessEqual(p["cycle-in-progress"], False)
d = c.finished_d
def _check(ignored):
self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
@ -405,6 +414,9 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
self.failIf(c.running)
self.failIf(c.timer)
self.failIf(c.current_sleep_time)
s = c.get_state()
self.failUnlessEqual(s["last-cycle-finished"], 0)
self.failUnlessEqual(s["current-cycle"], None)
d.addCallback(_check)
return d

View File

@ -1,10 +1,14 @@
import time, os.path, stat, re
from twisted.trial import unittest
from twisted.internet import defer
import time, os.path, stat, re
from twisted.application import service
from foolscap import eventual
import itertools
from allmydata import interfaces
from allmydata.util import fileutil, hashutil, base32
from allmydata.util import fileutil, hashutil, base32, pollmixin
from allmydata.storage.server import StorageServer, storage_index_to_dir
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import BucketWriter, BucketReader
@ -14,8 +18,7 @@ from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
ReadBucketProxy
from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent
from allmydata.web.storage import StorageStatus, abbreviate_if_known, \
remove_prefix
from allmydata.web.storage import StorageStatus, remove_prefix
class Marker:
pass
@ -1290,33 +1293,135 @@ class Stats(unittest.TestCase):
self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
def remove_tags(s):
s = re.sub(r'<[^>]*>', ' ', s)
s = re.sub(r'\s+', ' ', s)
return s
class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
def setUp(self):
self.s = service.MultiService()
self.s.startService()
def tearDown(self):
return self.s.stopService()
def test_bucket_counter(self):
basedir = "storage/BucketCounter/bucket_counter"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20)
# to make sure we capture the bucket-counting-crawler in the middle
# of a cycle, we reach in and reduce its maximum slice time to 0.
orig_cpu_slice = ss.bucket_counter.cpu_slice
ss.bucket_counter.cpu_slice = 0
ss.setServiceParent(self.s)
w = StorageStatus(ss)
# this sample is before the crawler has started doing anything
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
s = remove_tags(html)
self.failUnless("Accepting new shares: Yes" in s, s)
self.failUnless("Reserved space: - 0 B (0)" in s, s)
self.failUnless("Total buckets: Not computed yet" in s, s)
self.failUnless("Next crawl in" in s, s)
# give the bucket-counting-crawler one tick to get started. The
# cpu_slice=0 will force it to yield right after it processes the
# first prefix
d = eventual.fireEventually()
def _check(ignored):
# are we really right after the first prefix?
state = ss.bucket_counter.get_state()
self.failUnlessEqual(state["last-complete-prefix"],
ss.bucket_counter.prefixes[0])
ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
html = w.renderSynchronously()
s = remove_tags(html)
self.failUnless(" Current crawl " in s, s)
self.failUnless(" (next work in " in s, s)
d.addCallback(_check)
# now give it enough time to complete a full cycle
def _watch():
return not ss.bucket_counter.get_progress()["cycle-in-progress"]
d.addCallback(lambda ignored: self.poll(_watch))
def _check2(ignored):
ss.bucket_counter.cpu_slice = orig_cpu_slice
html = w.renderSynchronously()
s = remove_tags(html)
self.failUnless("Total buckets: 0 (the number of" in s, s)
self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds
d.addCallback(_check2)
return d
def test_bucket_counter_cleanup(self):
basedir = "storage/BucketCounter/bucket_counter_cleanup"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20)
# to make sure we capture the bucket-counting-crawler in the middle
# of a cycle, we reach in and reduce its maximum slice time to 0.
orig_cpu_slice = ss.bucket_counter.cpu_slice
ss.bucket_counter.cpu_slice = 0
ss.setServiceParent(self.s)
d = eventual.fireEventually()
def _after_first_prefix(ignored):
ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
# now sneak in and mess with its state, to make sure it cleans up
# properly at the end of the cycle
state = ss.bucket_counter.state
self.failUnlessEqual(state["last-complete-prefix"],
ss.bucket_counter.prefixes[0])
state["share-counts"][-12] = {}
state["storage-index-samples"]["bogusprefix!"] = (-12, [])
ss.bucket_counter.save_state()
d.addCallback(_after_first_prefix)
# now give it enough time to complete a cycle
def _watch():
return not ss.bucket_counter.get_progress()["cycle-in-progress"]
d.addCallback(lambda ignored: self.poll(_watch))
def _check2(ignored):
ss.bucket_counter.cpu_slice = orig_cpu_slice
s = ss.bucket_counter.get_state()
self.failIf(-12 in s["share-counts"], s["share-counts"].keys())
self.failIf("bogusprefix!" in s["storage-index-samples"],
s["storage-index-samples"].keys())
d.addCallback(_check2)
return d
class NoStatvfsServer(StorageServer):
def do_statvfs(self):
raise AttributeError
class WebStatus(unittest.TestCase):
class WebStatus(unittest.TestCase, pollmixin.PollMixin):
def setUp(self):
self.s = service.MultiService()
self.s.startService()
def tearDown(self):
return self.s.stopService()
def test_no_server(self):
w = StorageStatus(None)
html = w.renderSynchronously()
self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
def remove_tags(self, s):
s = re.sub(r'<[^>]*>', ' ', s)
s = re.sub(r'\s+', ' ', s)
return s
def test_status(self):
basedir = "storage/WebStatus/status"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20)
ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
s = self.remove_tags(html)
s = remove_tags(html)
self.failUnless("Accepting new shares: Yes" in s, s)
self.failUnless("Reserved space: - 0B" in s, s)
self.failUnless("Reserved space: - 0 B (0)" in s, s)
def test_status_no_statvfs(self):
# windows has no os.statvfs . Make sure the code handles that even on
@ -1324,10 +1429,11 @@ class WebStatus(unittest.TestCase):
basedir = "storage/WebStatus/status_no_statvfs"
fileutil.make_dirs(basedir)
ss = NoStatvfsServer(basedir, "\x00" * 20)
ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
s = self.remove_tags(html)
s = remove_tags(html)
self.failUnless("Accepting new shares: Yes" in s, s)
self.failUnless("Total disk space: ?" in s, s)
@ -1335,25 +1441,39 @@ class WebStatus(unittest.TestCase):
basedir = "storage/WebStatus/readonly"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
s = self.remove_tags(html)
s = remove_tags(html)
self.failUnless("Accepting new shares: No" in s, s)
def test_reserved(self):
basedir = "storage/WebStatus/reserved"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
s = self.remove_tags(html)
self.failUnless("Reserved space: - 10.00MB" in s, s)
s = remove_tags(html)
self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
def test_huge_reserved(self):
basedir = "storage/WebStatus/reserved"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
s = remove_tags(html)
self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
def test_util(self):
self.failUnlessEqual(abbreviate_if_known(None), "?")
self.failUnlessEqual(abbreviate_if_known(10e6), "10.00MB")
w = StorageStatus(None)
self.failUnlessEqual(w.render_space(None, None), "?")
self.failUnlessEqual(w.render_space(None, 10e6), "10.00 MB (10000000)")
self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)

View File

@ -1,11 +1,7 @@
from nevow import rend, tags as T
from allmydata.web.common import getxmlfile, abbreviate_size
def abbreviate_if_known(size):
if size is None:
return "?"
return abbreviate_size(size)
from allmydata.web.common import getxmlfile, abbreviate_time
from allmydata.util.abbreviate import abbreviate_space
def remove_prefix(s, prefix):
if not s.startswith(prefix):
@ -29,8 +25,10 @@ class StorageStatus(rend.Page):
def render_bool(self, ctx, data):
return {True: "Yes", False: "No"}[bool(data)]
def render_space(self, ctx, data):
return abbreviate_if_known(data)
def render_space(self, ctx, size):
if size is None:
return "?"
return "%s (%d)" % (abbreviate_space(size), size)
def data_stats(self, ctx, data):
# FYI: 'data' appears to be self, rather than the StorageServer
@ -51,8 +49,9 @@ class StorageStatus(rend.Page):
# missing keys will cause an error, even if the renderer can tolerate
# None values. To overcome this, we either need a dict-like object
# that always returns None for unknown keys, or we must pre-populate
# our dict with those missing keys (or find some way to override
# Nevow's handling of dictionaries).
# our dict with those missing keys, or we should get rid of data_
# methods that return dicts (or find some way to override Nevow's
# handling of dictionaries).
d = dict([ (remove_prefix(k, "storage_server."), v)
for k,v in self.storage.get_stats().items() ])
@ -61,3 +60,22 @@ class StorageStatus(rend.Page):
d.setdefault("reserved_space", None)
d.setdefault("disk_avail", None)
return d
def data_last_complete_share_count(self, ctx, data):
s = self.storage.bucket_counter.get_state()
lcsc = s.get("last-complete-share-count")
if lcsc is None:
return "Not computed yet"
cycle, count = lcsc
return count
def render_count_crawler_status(self, ctx, storage):
s = self.storage.bucket_counter.get_progress()
if s["cycle-in-progress"]:
pct = s["cycle-complete-percentage"]
soon = s["remaining-sleep-time"]
return ctx.tag["Current crawl %.1f%% complete" % pct,
" (next work in %s)" % abbreviate_time(soon)]
else:
soon = s["remaining-wait-time"]
return ctx.tag["Next crawl in %s" % abbreviate_time(soon)]

View File

@ -10,11 +10,6 @@
<h1>Storage Server Status</h1>
<ul n:data="stats">
<li>Accepting new shares:
<span n:render="bool" n:data="accepting_immutable_shares" /></li>
</ul>
<table n:data="stats">
<tr><td>Total disk space:</td>
<td><span n:render="space" n:data="disk_total" /></td></tr>
@ -24,10 +19,26 @@
<td>- <span n:render="space" n:data="reserved_space" /></td></tr>
<tr><td />
<td>======</td></tr>
<tr><td>Space Available:</td>
<td>&lt; <span n:render="space" n:data="disk_avail" /></td></tr>
<tr><td>Space Available to Tahoe:</td>
<td><span n:render="space" n:data="disk_avail" /></td></tr>
</table>
<ul n:data="stats">
<li>Accepting new shares:
<span n:render="bool" n:data="accepting_immutable_shares" /></li>
</ul>
<ul>
<li>Total buckets:
<span n:render="string" n:data="last_complete_share_count" />
(the number of files and directories for which this server is holding
a share)
<ul>
<li n:render="count_crawler_status" />
</ul>
</li>
</ul>
</div>
</body>