mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-19 21:17:54 +00:00
crawler: add ETA to get_progress()
This commit is contained in:
parent
3035dfb8ed
commit
112dc35563
@ -79,39 +79,80 @@ class ShareCrawler(service.MultiService):
|
||||
self.bucket_cache = (None, [])
|
||||
self.current_sleep_time = None
|
||||
self.next_wake_time = None
|
||||
self.last_prefix_finished_time = None
|
||||
self.last_prefix_elapsed_time = None
|
||||
self.last_cycle_started_time = None
|
||||
self.last_cycle_elapsed_time = None
|
||||
self.load_state()
|
||||
|
||||
def minus_or_none(self, a, b):
|
||||
if a is None:
|
||||
return None
|
||||
return a-b
|
||||
|
||||
def get_progress(self):
|
||||
"""I return information about how much progress the crawler is
|
||||
making. My return value is a dictionary. The primary key is
|
||||
'cycle-in-progress': True if the crawler is currently traversing the
|
||||
shares, False if it is idle between cycles.
|
||||
|
||||
Note that any of these 'time' keys could be None if I am called at
|
||||
certain moments, so application code must be prepared to tolerate
|
||||
this case. The estimates will also be None if insufficient data has
|
||||
been gatherered to form an estimate.
|
||||
|
||||
If cycle-in-progress is True, the following keys will be present::
|
||||
|
||||
cycle-complete-percentage': float, from 0.0 to 100.0, indicating how
|
||||
far the crawler has progressed through
|
||||
the current cycle
|
||||
remaining-sleep-time: float, seconds from now when we do more work
|
||||
|
||||
estimated-cycle-complete-time-left:
|
||||
float, seconds remaining until the current cycle is finished.
|
||||
TODO: this does not yet include the remaining time left in
|
||||
the current prefixdir, and it will be very inaccurate on fast
|
||||
crawlers (which can process a whole prefix in a single tick)
|
||||
estimated-time-per-cycle: float, seconds required to do a complete
|
||||
cycle
|
||||
|
||||
If cycle-in-progress is False, the following keys are available::
|
||||
|
||||
next-crawl-time: float, seconds-since-epoch when next crawl starts
|
||||
|
||||
remaining-wait-time: float, seconds from now when next crawl starts
|
||||
next-crawl-time: float, seconds-since-epoch when next crawl starts
|
||||
remaining-wait-time: float, seconds from now when next crawl starts
|
||||
estimated-time-per-cycle: float, seconds required to do a complete
|
||||
cycle
|
||||
"""
|
||||
|
||||
d = {}
|
||||
|
||||
if self.state["current-cycle"] is None:
|
||||
d["cycle-in-progress"] = False
|
||||
d["next-crawl-time"] = self.next_wake_time
|
||||
d["remaining-wait-time"] = self.next_wake_time - time.time()
|
||||
d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time,
|
||||
time.time())
|
||||
else:
|
||||
d["cycle-in-progress"] = True
|
||||
pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes)
|
||||
d["cycle-complete-percentage"] = pct
|
||||
d["remaining-sleep-time"] = self.next_wake_time - time.time()
|
||||
remaining = None
|
||||
if self.last_prefix_elapsed_time is not None:
|
||||
left = len(self.prefixes) - self.last_complete_prefix_index
|
||||
remaining = left * self.last_prefix_elapsed_time
|
||||
# TODO: remainder of this prefix: we need to estimate the
|
||||
# per-bucket time, probably by measuring the time spent on
|
||||
# this prefix so far, divided by the number of buckets we've
|
||||
# processed.
|
||||
d["estimated-cycle-complete-time-left"] = remaining
|
||||
# it's possible to call get_progress() from inside a crawler's
|
||||
# finished_prefix() function
|
||||
d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
|
||||
time.time())
|
||||
per_cycle = None
|
||||
if self.last_cycle_elapsed_time is not None:
|
||||
per_cycle = self.last_cycle_elapsed_time
|
||||
elif self.last_prefix_elapsed_time is not None:
|
||||
per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time
|
||||
d["estimated-time-per-cycle"] = per_cycle
|
||||
return d
|
||||
|
||||
def get_state(self):
|
||||
@ -247,6 +288,7 @@ class ShareCrawler(service.MultiService):
|
||||
def start_current_prefix(self, start_slice):
|
||||
state = self.state
|
||||
if state["current-cycle"] is None:
|
||||
self.last_cycle_started_time = time.time()
|
||||
if state["last-cycle-finished"] is None:
|
||||
state["current-cycle"] = 0
|
||||
else:
|
||||
@ -269,11 +311,23 @@ class ShareCrawler(service.MultiService):
|
||||
self.process_prefixdir(cycle, prefix, prefixdir,
|
||||
buckets, start_slice)
|
||||
self.last_complete_prefix_index = i
|
||||
|
||||
now = time.time()
|
||||
if self.last_prefix_finished_time is not None:
|
||||
elapsed = now - self.last_prefix_finished_time
|
||||
self.last_prefix_elapsed_time = elapsed
|
||||
self.last_prefix_finished_time = now
|
||||
|
||||
self.finished_prefix(cycle, prefix)
|
||||
if time.time() >= start_slice + self.cpu_slice:
|
||||
raise TimeSliceExceeded()
|
||||
|
||||
# yay! we finished the whole cycle
|
||||
self.last_complete_prefix_index = -1
|
||||
self.last_prefix_finished_time = None # don't include the sleep
|
||||
now = time.time()
|
||||
if self.last_cycle_started_time is not None:
|
||||
self.last_cycle_elapsed_time = now - self.last_cycle_started_time
|
||||
state["last-complete-bucket"] = None
|
||||
state["last-cycle-finished"] = cycle
|
||||
state["current-cycle"] = None
|
||||
|
@ -77,8 +77,10 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
"renew": [],
|
||||
"cancel": [],
|
||||
}
|
||||
self.add_bucket_counter()
|
||||
|
||||
statefile = os.path.join(storedir, "bucket_counter.state")
|
||||
def add_bucket_counter(self):
|
||||
statefile = os.path.join(self.storedir, "bucket_counter.state")
|
||||
self.bucket_counter = BucketCountingCrawler(self, statefile)
|
||||
self.bucket_counter.setServiceParent(self)
|
||||
|
||||
|
@ -14,6 +14,7 @@ from allmydata.storage.mutable import MutableShareFile
|
||||
from allmydata.storage.immutable import BucketWriter, BucketReader
|
||||
from allmydata.storage.common import DataTooLargeError
|
||||
from allmydata.storage.lease import LeaseInfo
|
||||
from allmydata.storage.crawler import BucketCountingCrawler
|
||||
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
|
||||
ReadBucketProxy
|
||||
from allmydata.interfaces import BadWriteEnablerError
|
||||
@ -1299,6 +1300,19 @@ def remove_tags(s):
|
||||
s = re.sub(r'\s+', ' ', s)
|
||||
return s
|
||||
|
||||
class MyBucketCountingCrawler(BucketCountingCrawler):
|
||||
def finished_prefix(self, cycle, prefix):
|
||||
BucketCountingCrawler.finished_prefix(self, cycle, prefix)
|
||||
if self.hook_ds:
|
||||
d = self.hook_ds.pop(0)
|
||||
d.callback(None)
|
||||
|
||||
class MyStorageServer(StorageServer):
|
||||
def add_bucket_counter(self):
|
||||
statefile = os.path.join(self.storedir, "bucket_counter.state")
|
||||
self.bucket_counter = MyBucketCountingCrawler(self, statefile)
|
||||
self.bucket_counter.setServiceParent(self)
|
||||
|
||||
class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
|
||||
|
||||
def setUp(self):
|
||||
@ -1398,6 +1412,45 @@ class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
|
||||
d.addCallback(_check2)
|
||||
return d
|
||||
|
||||
def test_bucket_counter_eta(self):
|
||||
basedir = "storage/BucketCounter/bucket_counter_eta"
|
||||
fileutil.make_dirs(basedir)
|
||||
ss = MyStorageServer(basedir, "\x00" * 20)
|
||||
ss.bucket_counter.slow_start = 0
|
||||
# these will be fired inside finished_prefix()
|
||||
hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
|
||||
w = StorageStatus(ss)
|
||||
|
||||
d = defer.Deferred()
|
||||
|
||||
def _check_1(ignored):
|
||||
# no ETA is available yet
|
||||
html = w.renderSynchronously()
|
||||
s = remove_tags(html)
|
||||
self.failUnlessIn("complete (next work", s)
|
||||
|
||||
def _check_2(ignored):
|
||||
# one prefix has finished, so an ETA based upon that elapsed time
|
||||
# should be available.
|
||||
html = w.renderSynchronously()
|
||||
s = remove_tags(html)
|
||||
self.failUnlessIn("complete (ETA ", s)
|
||||
|
||||
def _check_3(ignored):
|
||||
# two prefixes have finished
|
||||
html = w.renderSynchronously()
|
||||
s = remove_tags(html)
|
||||
self.failUnlessIn("complete (ETA ", s)
|
||||
d.callback("done")
|
||||
|
||||
hooks[0].addCallback(_check_1).addErrback(d.errback)
|
||||
hooks[1].addCallback(_check_2).addErrback(d.errback)
|
||||
hooks[2].addCallback(_check_3).addErrback(d.errback)
|
||||
|
||||
ss.setServiceParent(self.s)
|
||||
return d
|
||||
|
||||
|
||||
class NoStatvfsServer(StorageServer):
|
||||
def do_statvfs(self):
|
||||
raise AttributeError
|
||||
|
@ -72,11 +72,27 @@ class StorageStatus(rend.Page):
|
||||
|
||||
def render_count_crawler_status(self, ctx, storage):
|
||||
s = self.storage.bucket_counter.get_progress()
|
||||
|
||||
cycletime = s["estimated-time-per-cycle"]
|
||||
cycletime_s = ""
|
||||
if cycletime is not None:
|
||||
cycletime_s = " (estimated cycle time %ds)" % cycletime
|
||||
|
||||
if s["cycle-in-progress"]:
|
||||
pct = s["cycle-complete-percentage"]
|
||||
soon = s["remaining-sleep-time"]
|
||||
|
||||
eta = s["estimated-cycle-complete-time-left"]
|
||||
eta_s = ""
|
||||
if eta is not None:
|
||||
eta_s = " (ETA %ds)" % eta
|
||||
|
||||
return ctx.tag["Current crawl %.1f%% complete" % pct,
|
||||
" (next work in %s)" % abbreviate_time(soon)]
|
||||
eta_s,
|
||||
" (next work in %s)" % abbreviate_time(soon),
|
||||
cycletime_s,
|
||||
]
|
||||
else:
|
||||
soon = s["remaining-wait-time"]
|
||||
return ctx.tag["Next crawl in %s" % abbreviate_time(soon)]
|
||||
return ctx.tag["Next crawl in %s" % abbreviate_time(soon),
|
||||
cycletime_s]
|
||||
|
Loading…
Reference in New Issue
Block a user