mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-09 03:44:23 +00:00
crawler: fix performance problems: only save state once per timeslice (not after every bucket), don't start the crawler until 5 minutes after node startup
This commit is contained in:
parent
a04d3b8fe8
commit
77f3b83d68
@ -13,8 +13,8 @@ class ShareCrawler(service.MultiService):
|
||||
"""A ShareCrawler subclass is attached to a StorageServer, and
|
||||
periodically walks all of its shares, processing each one in some
|
||||
fashion. This crawl is rate-limited, to reduce the IO burden on the host,
|
||||
since large servers will have several million shares, which can take
|
||||
hours or days to read.
|
||||
since large servers can easily have a terabyte of shares, in several
|
||||
million files, which can take hours or days to read.
|
||||
|
||||
Once the crawler starts a cycle, it will proceed at a rate limited by the
|
||||
allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
|
||||
@ -26,12 +26,12 @@ class ShareCrawler(service.MultiService):
|
||||
of two consecutive cycles.
|
||||
|
||||
We assume that the normal upload/download/get_buckets traffic of a tahoe
|
||||
grid will cause the prefixdir contents to be mostly cached, or that the
|
||||
number of buckets in each prefixdir will be small enough to load quickly.
|
||||
A 1TB allmydata.com server was measured to have 2.56M buckets, spread
|
||||
into the 1024 prefixdirs, with about 2500 buckets per prefix. On this
|
||||
server, each prefixdir took 130ms-200ms to list the first time, and 17ms
|
||||
to list the second time.
|
||||
grid will cause the prefixdir contents to be mostly cached in the kernel,
|
||||
or that the number of buckets in each prefixdir will be small enough to
|
||||
load quickly. A 1TB allmydata.com server was measured to have 2.56M
|
||||
buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
|
||||
prefix. On this server, each prefixdir took 130ms-200ms to list the first
|
||||
time, and 17ms to list the second time.
|
||||
|
||||
To use a crawler, create a subclass which implements the process_bucket()
|
||||
method. It will be called with a prefixdir and a base32 storage index
|
||||
@ -48,13 +48,18 @@ class ShareCrawler(service.MultiService):
|
||||
filename where it can store persistent state. The statefile is used to
|
||||
keep track of how far around the ring the process has travelled, as well
|
||||
as timing history to allow the pace to be predicted and controlled. The
|
||||
statefile will be updated and written to disk after every bucket is
|
||||
processed.
|
||||
statefile will be updated and written to disk after each time slice (just
|
||||
before the crawler yields to the reactor), and also after each cycle is
|
||||
finished, and also when stopService() is called. Note that this means
|
||||
that a crawler which is interrupted with SIGKILL while it is in the
|
||||
middle of a time slice will lose progress: the next time the node is
|
||||
started, the crawler will repeat some unknown amount of work.
|
||||
|
||||
The crawler instance must be started with startService() before it will
|
||||
do any work. To make it stop doing work, call stopService().
|
||||
"""
|
||||
|
||||
slow_start = 300 # don't start crawling for 5 minutes after startup
|
||||
# all three of these can be changed at any time
|
||||
allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
|
||||
cpu_slice = 1.0 # use up to 1.0 seconds before yielding
|
||||
@ -186,29 +191,31 @@ class ShareCrawler(service.MultiService):
|
||||
# arrange things to look like we were just sleeping, so
|
||||
# status/progress values work correctly
|
||||
self.sleeping_between_cycles = True
|
||||
self.current_sleep_time = 0
|
||||
self.next_wake_time = time.time()
|
||||
self.timer = reactor.callLater(0, self.start_slice)
|
||||
self.current_sleep_time = self.slow_start
|
||||
self.next_wake_time = time.time() + self.slow_start
|
||||
self.timer = reactor.callLater(self.slow_start, self.start_slice)
|
||||
service.MultiService.startService(self)
|
||||
|
||||
def stopService(self):
|
||||
if self.timer:
|
||||
self.timer.cancel()
|
||||
self.timer = None
|
||||
self.save_state()
|
||||
return service.MultiService.stopService(self)
|
||||
|
||||
def start_slice(self):
|
||||
start_slice = time.time()
|
||||
self.timer = None
|
||||
self.sleeping_between_cycles = False
|
||||
self.current_sleep_time = None
|
||||
self.next_wake_time = None
|
||||
start_slice = time.time()
|
||||
try:
|
||||
s = self.last_complete_prefix_index
|
||||
self.start_current_prefix(start_slice)
|
||||
finished_cycle = True
|
||||
except TimeSliceExceeded:
|
||||
finished_cycle = False
|
||||
self.save_state()
|
||||
if not self.running:
|
||||
# someone might have used stopService() to shut us down
|
||||
return
|
||||
@ -219,7 +226,10 @@ class ShareCrawler(service.MultiService):
|
||||
# this_slice/percentage = this_slice+sleep_time
|
||||
# sleep_time = (this_slice/percentage) - this_slice
|
||||
sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
|
||||
# if the math gets weird, or a timequake happens, don't sleep forever
|
||||
# if the math gets weird, or a timequake happens, don't sleep
|
||||
# forever. Note that this means that, while a cycle is running, we
|
||||
# will process at least one bucket every 5 minutes, no matter how
|
||||
# long that bucket takes.
|
||||
sleep_time = max(0.0, min(sleep_time, 299))
|
||||
if finished_cycle:
|
||||
# how long should we sleep between cycles? Don't run faster than
|
||||
@ -259,7 +269,7 @@ class ShareCrawler(service.MultiService):
|
||||
self.process_prefixdir(cycle, prefix, prefixdir,
|
||||
buckets, start_slice)
|
||||
self.last_complete_prefix_index = i
|
||||
self.save_state()
|
||||
self.finished_prefix(cycle, prefix)
|
||||
if time.time() >= start_slice + self.cpu_slice:
|
||||
raise TimeSliceExceeded()
|
||||
# yay! we finished the whole cycle
|
||||
@ -274,31 +284,54 @@ class ShareCrawler(service.MultiService):
|
||||
"""This gets a list of bucket names (i.e. storage index strings,
|
||||
base32-encoded) in sorted order.
|
||||
|
||||
Override this if your crawler doesn't care about the actual shares,
|
||||
for example a crawler which merely keeps track of how many buckets
|
||||
are being managed by this server.
|
||||
You can override this if your crawler doesn't care about the actual
|
||||
shares, for example a crawler which merely keeps track of how many
|
||||
buckets are being managed by this server.
|
||||
|
||||
Subclasses which *do* care about actual bucket should leave this
|
||||
method along, and implement process_bucket() instead.
|
||||
"""
|
||||
|
||||
for bucket in buckets:
|
||||
if bucket <= self.state["last-complete-bucket"]:
|
||||
continue
|
||||
self.process_bucket(cycle, prefix, prefixdir, bucket)
|
||||
self.state["last-complete-bucket"] = bucket
|
||||
# note: saving the state after every bucket is somewhat
|
||||
# time-consuming, but lets us avoid losing more than one bucket's
|
||||
# worth of progress.
|
||||
self.save_state()
|
||||
if time.time() >= start_slice + self.cpu_slice:
|
||||
raise TimeSliceExceeded()
|
||||
|
||||
# the remaining methods are explictly for subclasses to implement.
|
||||
|
||||
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
|
||||
"""Examine a single bucket. Subclasses should do whatever they want
|
||||
to do to the shares therein, then update self.state as necessary.
|
||||
|
||||
This method will be called exactly once per share (per cycle), unless
|
||||
the crawler was interrupted (by node restart, for example), in which
|
||||
case it might be called a second time on a bucket which was processed
|
||||
during the previous node's incarnation. However, in that case, no
|
||||
changes to self.state will have been recorded.
|
||||
If the crawler is never interrupted by SIGKILL, this method will be
|
||||
called exactly once per share (per cycle). If it *is* interrupted,
|
||||
then the next time the node is started, some amount of work will be
|
||||
duplicated, according to when self.save_state() was last called. By
|
||||
default, save_state() is called at the end of each timeslice, and
|
||||
after finished_cycle() returns, and when stopService() is called.
|
||||
|
||||
To reduce the chance of duplicate work (i.e. to avoid adding multiple
|
||||
records to a database), you can call save_state() at the end of your
|
||||
process_bucket() method. This will reduce the maximum duplicated work
|
||||
to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
|
||||
per bucket (and some disk writes), which will count against your
|
||||
allowed_cpu_percentage, and which may be considerable if
|
||||
process_bucket() runs quickly.
|
||||
|
||||
This method for subclasses to override. No upcall is necessary.
|
||||
"""
|
||||
pass
|
||||
|
||||
def finished_prefix(self, cycle, prefix):
|
||||
"""Notify a subclass that the crawler has just finished processing a
|
||||
prefix directory (all buckets with the same two-character/10bit
|
||||
prefix). To impose a limit on how much work might be duplicated by a
|
||||
SIGKILL that occurs during a timeslice, you can call
|
||||
self.save_state() here, but be aware that it may represent a
|
||||
significant performance hit.
|
||||
|
||||
This method for subclasses to override. No upcall is necessary.
|
||||
"""
|
||||
|
@ -15,6 +15,7 @@ from common_util import StallMixin
|
||||
|
||||
class BucketEnumeratingCrawler(ShareCrawler):
|
||||
cpu_slice = 500 # make sure it can complete in a single slice
|
||||
slow_start = 0
|
||||
def __init__(self, *args, **kwargs):
|
||||
ShareCrawler.__init__(self, *args, **kwargs)
|
||||
self.all_buckets = []
|
||||
@ -26,6 +27,7 @@ class BucketEnumeratingCrawler(ShareCrawler):
|
||||
|
||||
class PacedCrawler(ShareCrawler):
|
||||
cpu_slice = 500 # make sure it can complete in a single slice
|
||||
slow_start = 0
|
||||
def __init__(self, *args, **kwargs):
|
||||
ShareCrawler.__init__(self, *args, **kwargs)
|
||||
self.countdown = 6
|
||||
@ -49,6 +51,7 @@ class ConsumingCrawler(ShareCrawler):
|
||||
cpu_slice = 0.5
|
||||
allowed_cpu_percentage = 0.5
|
||||
minimum_cycle_time = 0
|
||||
slow_start = 0
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
ShareCrawler.__init__(self, *args, **kwargs)
|
||||
@ -68,6 +71,7 @@ class ConsumingCrawler(ShareCrawler):
|
||||
|
||||
class OneShotCrawler(ShareCrawler):
|
||||
cpu_slice = 500 # make sure it can complete in a single slice
|
||||
slow_start = 0
|
||||
def __init__(self, *args, **kwargs):
|
||||
ShareCrawler.__init__(self, *args, **kwargs)
|
||||
self.counter = 0
|
||||
@ -182,7 +186,10 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
|
||||
c.start_current_prefix(time.time())
|
||||
except TimeSliceExceeded:
|
||||
pass
|
||||
# that should stop in the middle of one of the buckets.
|
||||
# that should stop in the middle of one of the buckets. Since we
|
||||
# aren't using its normal scheduler, we have to save its state
|
||||
# manually.
|
||||
c.save_state()
|
||||
c.cpu_slice = PacedCrawler.cpu_slice
|
||||
self.failUnlessEqual(len(c.all_buckets), 6)
|
||||
|
||||
@ -204,7 +211,10 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
|
||||
c.start_current_prefix(time.time())
|
||||
except TimeSliceExceeded:
|
||||
pass
|
||||
# that should stop in the middle of one of the buckets
|
||||
# that should stop in the middle of one of the buckets. Since we
|
||||
# aren't using its normal scheduler, we have to save its state
|
||||
# manually.
|
||||
c.save_state()
|
||||
c.cpu_slice = PacedCrawler.cpu_slice
|
||||
|
||||
# a third crawler should pick up from where it left off
|
||||
@ -226,7 +236,9 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
|
||||
c.start_current_prefix(time.time())
|
||||
except TimeSliceExceeded:
|
||||
pass
|
||||
# that should stop at the end of one of the buckets.
|
||||
# that should stop at the end of one of the buckets. Again we must
|
||||
# save state manually.
|
||||
c.save_state()
|
||||
c.cpu_slice = PacedCrawler.cpu_slice
|
||||
self.failUnlessEqual(len(c.all_buckets), 4)
|
||||
c.start_current_prefix(time.time()) # finish it
|
||||
@ -244,6 +256,7 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
|
||||
except TimeSliceExceeded:
|
||||
pass
|
||||
# that should stop at the end of one of the buckets.
|
||||
c.save_state()
|
||||
|
||||
c2 = PacedCrawler(ss, statefile)
|
||||
c2.all_buckets = c.all_buckets[:]
|
||||
@ -376,6 +389,7 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
|
||||
|
||||
statefile = os.path.join(self.basedir, "statefile")
|
||||
c = ShareCrawler(ss, statefile)
|
||||
c.slow_start = 0
|
||||
c.setServiceParent(self.s)
|
||||
|
||||
# we just let it run for a while, to get figleaf coverage of the
|
||||
|
@ -1312,7 +1312,9 @@ class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
|
||||
fileutil.make_dirs(basedir)
|
||||
ss = StorageServer(basedir, "\x00" * 20)
|
||||
# to make sure we capture the bucket-counting-crawler in the middle
|
||||
# of a cycle, we reach in and reduce its maximum slice time to 0.
|
||||
# of a cycle, we reach in and reduce its maximum slice time to 0. We
|
||||
# also make it start sooner than usual.
|
||||
ss.bucket_counter.slow_start = 0
|
||||
orig_cpu_slice = ss.bucket_counter.cpu_slice
|
||||
ss.bucket_counter.cpu_slice = 0
|
||||
ss.setServiceParent(self.s)
|
||||
@ -1364,6 +1366,7 @@ class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
|
||||
ss = StorageServer(basedir, "\x00" * 20)
|
||||
# to make sure we capture the bucket-counting-crawler in the middle
|
||||
# of a cycle, we reach in and reduce its maximum slice time to 0.
|
||||
ss.bucket_counter.slow_start = 0
|
||||
orig_cpu_slice = ss.bucket_counter.cpu_slice
|
||||
ss.bucket_counter.cpu_slice = 0
|
||||
ss.setServiceParent(self.s)
|
||||
|
Loading…
x
Reference in New Issue
Block a user