crawler: provide for one-shot crawlers, which stop after their first full cycle, for share-upgraders and database-populaters

This commit is contained in:
Brian Warner 2009-02-20 15:19:11 -07:00
parent 00677ff9a5
commit c6a061e600
2 changed files with 70 additions and 9 deletions

View File

@ -16,20 +16,33 @@ class ShareCrawler(service.MultiService):
since large servers will have several million shares, which can take
hours or days to read.
Once the crawler starts a cycle, it will proceed at a rate limited by the
allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
after it has worked for 'cpu_slice' seconds, and not resuming right away,
always trying to use less than 'allowed_cpu_percentage'.
Once the crawler finishes a cycle, it will put off starting the next one
long enough to ensure that 'minimum_cycle_time' elapses between the start
of two consecutive cycles.
We assume that the normal upload/download/get_buckets traffic of a tahoe
grid will cause the prefixdir contents to be mostly cached, or that the
number of buckets in each prefixdir will be small enough to load quickly.
A 1TB allmydata.com server was measured to have 2.56M buckets, spread
into the 1040 prefixdirs, with about 2460 buckets per prefix. On this
into the 1024 prefixdirs, with about 2500 buckets per prefix. On this
server, each prefixdir took 130ms-200ms to list the first time, and 17ms
to list the second time.
To use this, create a subclass which implements the process_bucket()
To use a crawler, create a subclass which implements the process_bucket()
method. It will be called with a prefixdir and a base32 storage index
string. process_bucket() should run synchronously. Any keys added to
string. process_bucket() must run synchronously. Any keys added to
self.state will be preserved. Override add_initial_state() to set up
initial state keys. Override finished_cycle() to perform additional
processing when the cycle is complete.
processing when the cycle is complete. Any status that the crawler
produces should be put in the self.state dictionary. Status renderers
(like a web page which describes the accomplishments of your crawler)
will use crawler.get_state() to retrieve this dictionary; they can
present the contents as they see fit.
Then create an instance, with a reference to a StorageServer and a
filename where it can store persistent state. The statefile is used to
@ -39,8 +52,7 @@ class ShareCrawler(service.MultiService):
processed.
The crawler instance must be started with startService() before it will
do any work. To make it stop doing work, call stopService() and wait for
the Deferred that it returns.
do any work. To make it stop doing work, call stopService().
"""
# all three of these can be changed at any time
@ -162,6 +174,9 @@ class ShareCrawler(service.MultiService):
finished_cycle = True
except TimeSliceExceeded:
finished_cycle = False
if not self.running:
# someone might have used stopService() to shut us down
return
# either we finished a whole cycle, or we ran out of time
now = time.time()
this_slice = now - start_slice
@ -254,6 +269,13 @@ class ShareCrawler(service.MultiService):
that just finished. This method should perform summary work and
update self.state to publish information to status displays.
One-shot crawlers, such as those used to upgrade shares to a new
format or populate a database for the first time, can call
self.stopService() (or more likely self.disownServiceParent()) to
prevent it from running a second time. Don't forget to set some
persistent state so that the upgrader won't be run again the next
time the node is started.
This method for subclasses to override. No upcall is necessary.
"""
pass

View File

@ -4,7 +4,7 @@ import os.path
from twisted.trial import unittest
from twisted.application import service
from twisted.internet import defer
from foolscap.eventual import eventually
from foolscap import eventual
from allmydata.util import fileutil, hashutil, pollmixin
from allmydata.storage.server import StorageServer, si_b2a
@ -22,7 +22,7 @@ class BucketEnumeratingCrawler(ShareCrawler):
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
self.all_buckets.append(storage_index_b32)
def finished_cycle(self, cycle):
eventually(self.finished_d.callback, None)
eventual.eventually(self.finished_d.callback, None)
class PacedCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
@ -40,7 +40,7 @@ class PacedCrawler(ShareCrawler):
def yielding(self, sleep_time):
self.cpu_slice = 500
def finished_cycle(self, cycle):
eventually(self.finished_d.callback, None)
eventual.eventually(self.finished_d.callback, None)
class ConsumingCrawler(ShareCrawler):
cpu_slice = 0.5
@ -63,6 +63,18 @@ class ConsumingCrawler(ShareCrawler):
def yielding(self, sleep_time):
self.last_yield = 0.0
class OneShotCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
def __init__(self, *args, **kwargs):
ShareCrawler.__init__(self, *args, **kwargs)
self.counter = 0
self.finished_d = defer.Deferred()
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
self.counter += 1
def finished_cycle(self, cycle):
self.finished_d.callback(None)
self.disownServiceParent()
class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
def setUp(self):
self.s = service.MultiService()
@ -330,3 +342,30 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
d.addCallback(_done)
return d
def test_oneshot(self):
self.basedir = "crawler/Basic/oneshot"
fileutil.make_dirs(self.basedir)
serverid = "\x00" * 20
ss = StorageServer(self.basedir, serverid)
ss.setServiceParent(self.s)
sis = [self.write(i, ss, serverid) for i in range(30)]
statefile = os.path.join(self.basedir, "statefile")
c = OneShotCrawler(ss, statefile)
c.setServiceParent(self.s)
d = c.finished_d
def _finished_first_cycle(ignored):
return eventual.fireEventually(c.counter)
d.addCallback(_finished_first_cycle)
def _check(old_counter):
# the crawler should do any work after it's been stopped
self.failUnlessEqual(old_counter, c.counter)
self.failIf(c.running)
self.failIf(c.timer)
self.failIf(c.current_sleep_time)
d.addCallback(_check)
return d