diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index db71e0f3a..7be416948 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -11,6 +11,7 @@ class DumpOptions(usage.Options): optFlags = [ ["offsets", None, "Display a table of section offsets"], + ["leases-only", None, "Dump leases but not CHK contents"], ] def getUsage(self, width=None): @@ -46,13 +47,20 @@ def dump_share(options): return dump_immutable_share(options) def dump_immutable_share(options): - from allmydata import uri from allmydata.storage.immutable import ShareFile - from allmydata.util import base32 - from allmydata.immutable.layout import ReadBucketProxy out = options.stdout f = ShareFile(options['filename']) + if not options["leases-only"]: + dump_immutable_chk_share(f, out, options) + dump_immutable_lease_info(f, out) + print >>out + return 0 + +def dump_immutable_chk_share(f, out, options): + from allmydata import uri + from allmydata.util import base32 + from allmydata.immutable.layout import ReadBucketProxy # use a ReadBucketProxy to parse the bucket and find the uri extension bp = ReadBucketProxy(None, '', '') offsets = bp._parse_offsets(f.read_share_data(0, 0x44)) @@ -127,10 +135,10 @@ def dump_immutable_share(options): print >>out, " %20s: %s (0x%x)" % (name, offset, offset) print >>out, "%20s: %s" % ("leases", f._lease_offset) - +def dump_immutable_lease_info(f, out): # display lease information too print >>out - leases = list(f.iter_leases()) + leases = list(f.get_leases()) if leases: for i,lease in enumerate(leases): when = format_expiration_time(lease.expiration_time) @@ -139,9 +147,6 @@ def dump_immutable_share(options): else: print >>out, " No leases." - print >>out - return 0 - def format_expiration_time(expiration_time): now = time.time() remains = expiration_time - now @@ -551,9 +556,8 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): data_length = m._read_data_length(f) extra_lease_offset = m._read_extra_lease_offset(f) container_size = extra_lease_offset - m.DATA_OFFSET - leases = list(m._enumerate_leases(f)) - expiration_time = min( [lease[1].expiration_time - for lease in leases] ) + expiration_time = min( [lease.expiration_time + for (i,lease) in m._enumerate_leases(f)] ) expiration = max(0, expiration_time - now) share_type = "unknown" @@ -602,7 +606,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): bp = ImmediateReadBucketProxy(sf) expiration_time = min( [lease.expiration_time - for lease in sf.iter_leases()] ) + for lease in sf.get_leases()] ) expiration = max(0, expiration_time - now) UEB_data = call(bp.get_uri_extension) diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py index d0af7807c..e3a50e420 100644 --- a/src/allmydata/storage/crawler.py +++ b/src/allmydata/storage/crawler.py @@ -163,6 +163,9 @@ class ShareCrawler(service.MultiService): inside the process_prefixdir, process_bucket, or finished_cycle() methods, or if startService has not yet been called on this crawler), these two keys will be None. + + Subclasses can override this to add computed keys to the return value, + but don't forget to start with the upcall. """ state = self.state.copy() # it isn't a deepcopy, so don't go crazy return state @@ -176,6 +179,9 @@ class ShareCrawler(service.MultiService): # ["last-cycle-finished"]: int, or None if we have not yet finished # any cycle # ["current-cycle"]: int, or None if we are sleeping between cycles + # ["current-cycle-start-time"]: int, seconds-since-epoch of when this + # cycle was started, possibly by an earlier + # process # ["last-complete-prefix"]: str, two-letter name of the last prefixdir # that was fully processed, or None if we # are sleeping between cycles, or if we @@ -195,6 +201,7 @@ class ShareCrawler(service.MultiService): "last-complete-prefix": None, "last-complete-bucket": None, } + state.setdefault("current-cycle-start-time", time.time()) # approximate self.state = state lcp = state["last-complete-prefix"] if lcp == None: @@ -289,10 +296,12 @@ class ShareCrawler(service.MultiService): state = self.state if state["current-cycle"] is None: self.last_cycle_started_time = time.time() + state["current-cycle-start-time"] = self.last_cycle_started_time if state["last-cycle-finished"] is None: state["current-cycle"] = 0 else: state["current-cycle"] = state["last-cycle-finished"] + 1 + self.started_cycle(state["current-cycle"]) cycle = state["current-cycle"] for i in range(self.last_complete_prefix_index+1, len(self.prefixes)): @@ -356,6 +365,13 @@ class ShareCrawler(service.MultiService): # the remaining methods are explictly for subclasses to implement. + def started_cycle(self, cycle): + """Notify a subclass that the crawler is about to start a cycle. + + This method is for subclasses to override. No upcall is necessary. + """ + pass + def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): """Examine a single bucket. Subclasses should do whatever they want to do to the shares therein, then update self.state as necessary. @@ -375,7 +391,7 @@ class ShareCrawler(service.MultiService): allowed_cpu_percentage, and which may be considerable if process_bucket() runs quickly. - This method for subclasses to override. No upcall is necessary. + This method is for subclasses to override. No upcall is necessary. """ pass @@ -387,7 +403,7 @@ class ShareCrawler(service.MultiService): self.save_state() here, but be aware that it may represent a significant performance hit. - This method for subclasses to override. No upcall is necessary. + This method is for subclasses to override. No upcall is necessary. """ pass @@ -404,7 +420,7 @@ class ShareCrawler(service.MultiService): persistent state so that the upgrader won't be run again the next time the node is started. - This method for subclasses to override. No upcall is necessary. + This method is for subclasses to override. No upcall is necessary. """ pass @@ -412,7 +428,7 @@ class ShareCrawler(service.MultiService): """The crawler is about to sleep for 'sleep_time' seconds. This method is mostly for the convenience of unit tests. - This method for subclasses to override. No upcall is necessary. + This method is for subclasses to override. No upcall is necessary. """ pass diff --git a/src/allmydata/storage/expirer.py b/src/allmydata/storage/expirer.py new file mode 100644 index 000000000..0579135a1 --- /dev/null +++ b/src/allmydata/storage/expirer.py @@ -0,0 +1,359 @@ +import time, os, pickle +from crawler import ShareCrawler +from shares import get_share_file + +class LeaseCheckingCrawler(ShareCrawler): + """I examine the leases on all shares, determining which are still valid + and which have expired. I can remove the expired leases (if so + configured), and the share will be deleted when the last lease is + removed. + + I collect statistics on the leases and make these available to a web + status page, including:: + + Space recovered during this cycle-so-far: + actual (only if expire_leases=True): + num-buckets, num-shares, sum of share sizes, real disk usage + ('real disk usage' means we use stat(fn).st_blocks*512 and include any + space used by the directory) + what it would have been with the original lease expiration time + what it would have been with our configured expiration time + + Prediction of space that will be recovered during the rest of this cycle + Prediction of space that will be recovered by the entire current cycle. + + Space recovered during the last 10 cycles <-- saved in separate pickle + + Shares/buckets examined: + this cycle-so-far + prediction of rest of cycle + during last 10 cycles <-- separate pickle + start/finish time of last 10 cycles <-- separate pickle + expiration time used for last 10 cycles <-- separate pickle + + Histogram of leases-per-share: + this-cycle-to-date + last 10 cycles <-- separate pickle + Histogram of lease ages, buckets = expiration_time/10 + cycle-to-date + last 10 cycles <-- separate pickle + + All cycle-to-date values remain valid until the start of the next cycle. + + """ + + slow_start = 360 # wait 6 minutes after startup + minimum_cycle_time = 12*60*60 # not more than twice per day + + def __init__(self, server, statefile, historyfile, + expire_leases, expiration_time): + self.historyfile = historyfile + self.expire_leases = expire_leases + self.age_limit = expiration_time + ShareCrawler.__init__(self, server, statefile) + + def add_initial_state(self): + # we fill ["cycle-to-date"] here (even though they will be reset in + # self.started_cycle) just in case someone grabs our state before we + # get started: unit tests do this + so_far = self.create_empty_cycle_dict() + self.state.setdefault("cycle-to-date", so_far) + + # initialize history + if not os.path.exists(self.historyfile): + history = {} # cyclenum -> dict + f = open(self.historyfile, "wb") + pickle.dump(history, f) + f.close() + + def create_empty_cycle_dict(self): + recovered = self.create_empty_recovered_dict() + so_far = {"buckets-examined": 0, + "shares-examined": 0, + "space-recovered": recovered, + "lease-age-histogram": {}, # (minage,maxage)->count + "leases-per-share-histogram": {}, # leasecount->numshares + } + return so_far + + def create_empty_recovered_dict(self): + recovered = {} + for a in ("actual", "original-leasetimer", "configured-leasetimer"): + for b in ("numbuckets", "numshares", "sharebytes", "diskbytes"): + recovered[a+"-"+b] = 0 + return recovered + + def started_cycle(self, cycle): + self.state["cycle-to-date"] = self.create_empty_cycle_dict() + + def stat(self, fn): + return os.stat(fn) + + def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): + bucketdir = os.path.join(prefixdir, storage_index_b32) + try: + bucket_diskbytes = self.stat(bucketdir).st_blocks * 512 + except AttributeError: + bucket_diskbytes = 0 # no stat().st_blocks on windows + would_keep_shares = [] + for fn in os.listdir(bucketdir): + try: + shnum = int(fn) + wks = self.process_share(os.path.join(bucketdir, fn)) + would_keep_shares.append(wks) + except ValueError: + pass # non-numeric means not a sharefile + recovered = self.state["cycle-to-date"]["space-recovered"] + if sum([wks[0] for wks in would_keep_shares]) == 0: + self.increment(recovered, + "original-leasetimer-diskbytes", bucket_diskbytes) + self.increment(recovered, "original-leasetimer-numbuckets", 1) + if sum([wks[1] for wks in would_keep_shares]) == 0: + self.increment(recovered, + "configured-leasetimer-diskbytes", bucket_diskbytes) + self.increment(recovered, "configured-leasetimer-numbuckets", 1) + if sum([wks[2] for wks in would_keep_shares]) == 0: + self.increment(recovered, + "actual-diskbytes", bucket_diskbytes) + self.increment(recovered, "actual-numbuckets", 1) + self.state["cycle-to-date"]["buckets-examined"] += 1 + + def process_share(self, sharefilename): + # first, find out what kind of a share it is + sf = get_share_file(sharefilename) + now = time.time() + s = self.stat(sharefilename) + + num_leases = 0 + num_valid_leases_original = 0 + num_valid_leases_configured = 0 + expired_leases_configured = [] + + for li in sf.get_leases(): + num_leases += 1 + original_expiration_time = li.get_expiration_time() + grant_renew_time = li.get_grant_renew_time_time() + age = li.get_age() + self.add_lease_age_to_histogram(age) + + # expired-or-not according to original expiration time + if original_expiration_time > now: + num_valid_leases_original += 1 + + # expired-or-not according to our configured age limit + if age < self.age_limit: + num_valid_leases_configured += 1 + else: + expired_leases_configured.append(li) + + so_far = self.state["cycle-to-date"] + self.increment(so_far["leases-per-share-histogram"], num_leases, 1) + so_far["shares-examined"] += 1 + + would_keep_share = [1, 1, 1] + + if self.expire_leases: + for li in expired_leases_configured: + sf.cancel_lease(li.cancel_secret) + + if num_valid_leases_original == 0: + would_keep_share[0] = 0 + self.increment_space("original-leasetimer", s) + + if num_valid_leases_configured == 0: + would_keep_share[1] = 0 + self.increment_space("configured-leasetimer", s) + if self.expire_leases: + would_keep_share[2] = 0 + self.increment_space("actual", s) + + return would_keep_share + + def increment_space(self, a, s): + sharebytes = s.st_size + try: + # note that stat(2) says that st_blocks is 512 bytes, and that + # st_blksize is "optimal file sys I/O ops blocksize", which is + # independent of the block-size that st_blocks uses. + diskbytes = s.st_blocks * 512 + except AttributeError: + # the docs say that st_blocks is only on linux. I also see it on + # MacOS. But it isn't available on windows. + diskbytes = sharebytes + so_far_sr = self.state["cycle-to-date"]["space-recovered"] + self.increment(so_far_sr, a+"-numshares", 1) + self.increment(so_far_sr, a+"-sharebytes", sharebytes) + self.increment(so_far_sr, a+"-diskbytes", diskbytes) + + def increment(self, d, k, delta=1): + if k not in d: + d[k] = 0 + d[k] += delta + + def add_lease_age_to_histogram(self, age): + bucket_interval = self.age_limit / 10.0 + bucket_number = int(age/bucket_interval) + bucket_start = bucket_number * bucket_interval + bucket_end = bucket_start + bucket_interval + k = (bucket_start, bucket_end) + self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1) + + def convert_lease_age_histogram(self, lah): + # convert { (minage,maxage) : count } into [ (minage,maxage,count) ] + # since the former is not JSON-safe (JSON dictionaries must have + # string keys). + json_safe_lah = [] + for k in sorted(lah): + (minage,maxage) = k + json_safe_lah.append( (minage, maxage, lah[k]) ) + return json_safe_lah + + def finished_cycle(self, cycle): + # add to our history state, prune old history + h = {} + + start = self.state["current-cycle-start-time"] + now = time.time() + h["cycle-start-finish-times"] = (start, now) + h["expiration-enabled"] = self.expire_leases + h["configured-expiration-time"] = self.age_limit + + s = self.state["cycle-to-date"] + + # state["lease-age-histogram"] is a dictionary (mapping + # (minage,maxage) tuple to a sharecount), but we report + # self.get_state()["lease-age-histogram"] as a list of + # (min,max,sharecount) tuples, because JSON can handle that better. + # We record the list-of-tuples form into the history for the same + # reason. + lah = self.convert_lease_age_histogram(s["lease-age-histogram"]) + h["lease-age-histogram"] = lah + h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy() + h["buckets-examined"] = s["buckets-examined"] + h["shares-examined"] = s["shares-examined"] + # note: if ["shares-recovered"] ever acquires an internal dict, this + # copy() needs to become a deepcopy + h["space-recovered"] = s["space-recovered"].copy() + + history = pickle.load(open(self.historyfile, "rb")) + history[cycle] = h + while len(history) > 10: + oldcycles = sorted(history.keys()) + del history[oldcycles[0]] + f = open(self.historyfile, "wb") + pickle.dump(history, f) + f.close() + + def get_state(self): + """In addition to the crawler state described in + ShareCrawler.get_state(), I return the following keys which are + specific to the lease-checker/expirer. Note that the non-history keys + (with 'cycle' in their names) are only present if a cycle is + currently running. If the crawler is between cycles, it appropriate + to show the latest item in the 'history' key instead. Also note that + each history item has all the data in the 'cycle-to-date' value, plus + cycle-start-finish-times. + + cycle-to-date: + expiration-enabled + configured-expiration-time + lease-age-histogram (list of (minage,maxage,sharecount) tuples) + leases-per-share-histogram + buckets-examined + shares-examined + space-recovered + + estimated-remaining-cycle: + # Values may be None if not enough data has been gathered to + # produce an estimate. + buckets-examined + shares-examined + space-recovered + + estimated-current-cycle: + # cycle-to-date plus estimated-remaining. Values may be None if + # not enough data has been gathered to produce an estimate. + buckets-examined + shares-examined + space-recovered + + history: maps cyclenum to a dict with the following keys: + cycle-start-finish-times + expiration-enabled + configured-expiration-time + lease-age-histogram + leases-per-share-histogram + buckets-examined + shares-examined + space-recovered + + The 'space-recovered' structure is a dictionary with the following + keys: + # 'actual' is what was actually deleted + actual-numbuckets + actual-numshares + actual-sharebytes + actual-diskbytes + # would have been deleted, if the original lease timer was used + original-leasetimer-numbuckets + original-leasetimer-numshares + original-leasetimer-sharebytes + original-leasetimer-diskbytes + # would have been deleted, if our configured max_age was used + configured-leasetimer-numbuckets + configured-leasetimer-numshares + configured-leasetimer-sharebytes + configured-leasetimer-diskbytes + + """ + progress = self.get_progress() + + state = ShareCrawler.get_state(self) # does a shallow copy + history = pickle.load(open(self.historyfile, "rb")) + state["history"] = history + + if not progress["cycle-in-progress"]: + del state["cycle-to-date"] + return state + + so_far = state["cycle-to-date"].copy() + state["cycle-to-date"] = so_far + + lah = so_far["lease-age-histogram"] + so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah) + so_far["expiration-enabled"] = self.expire_leases + so_far["configured-expiration-time"] = self.age_limit + + so_far_sr = so_far["space-recovered"] + remaining_sr = {} + remaining = {"space-recovered": remaining_sr} + cycle_sr = {} + cycle = {"space-recovered": cycle_sr} + + if progress["cycle-complete-percentage"] > 0.0: + m = 100.0 / progress["cycle-complete-percentage"] + for a in ("actual", "original-leasetimer", "configured-leasetimer"): + for b in ("numbuckets", "numshares", "sharebytes", "diskbytes"): + k = a+"-"+b + remaining_sr[k] = m * so_far_sr[k] + cycle_sr[k] = so_far_sr[k] + remaining_sr[k] + predshares = m * so_far["shares-examined"] + remaining["shares-examined"] = predshares + cycle["shares-examined"] = so_far["shares-examined"] + predshares + predbuckets = m * so_far["buckets-examined"] + remaining["buckets-examined"] = predbuckets + cycle["buckets-examined"] = so_far["buckets-examined"] + predbuckets + else: + for a in ("actual", "original-leasetimer", "configured-leasetimer"): + for b in ("numbuckets", "numshares", "sharebytes", "diskbytes"): + k = a+"-"+b + remaining_sr[k] = None + cycle_sr[k] = None + remaining["shares-examined"] = None + cycle["shares-examined"] = None + remaining["buckets-examined"] = None + cycle["buckets-examined"] = None + + state["estimated-remaining-cycle"] = remaining + state["estimated-current-cycle"] = cycle + return state diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index 6ebadbbe2..a6da31f07 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -116,9 +116,8 @@ class ShareFile: def _truncate_leases(self, f, num_leases): f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE) - def iter_leases(self): - """Yields (ownernum, renew_secret, cancel_secret, expiration_time) - for all leases.""" + def get_leases(self): + """Yields a LeaseInfo instance for all leases.""" f = open(self.home, 'rb') (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc)) f.seek(self._lease_offset) @@ -135,7 +134,7 @@ class ShareFile: f.close() def renew_lease(self, renew_secret, new_expire_time): - for i,lease in enumerate(self.iter_leases()): + for i,lease in enumerate(self.get_leases()): if lease.renew_secret == renew_secret: # yup. See if we need to update the owner time. if new_expire_time > lease.expiration_time: @@ -163,10 +162,9 @@ class ShareFile: given cancel_secret. """ - leases = list(self.iter_leases()) - num_leases = len(leases) + leases = list(self.get_leases()) num_leases_removed = 0 - for i,lease in enumerate(leases[:]): + for i,lease in enumerate(leases): if lease.cancel_secret == cancel_secret: leases[i] = None num_leases_removed += 1 diff --git a/src/allmydata/storage/lease.py b/src/allmydata/storage/lease.py index 2b91d8726..cd176aeef 100644 --- a/src/allmydata/storage/lease.py +++ b/src/allmydata/storage/lease.py @@ -1,5 +1,4 @@ - -import struct +import struct, time class LeaseInfo: def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None, @@ -13,6 +12,14 @@ class LeaseInfo: assert len(nodeid) == 20 self.nodeid = nodeid + def get_expiration_time(self): + return self.expiration_time + def get_grant_renew_time_time(self): + # hack, based upon fixed 31day expiration period + return self.expiration_time - 31*24*60*60 + def get_age(self): + return time.time() - self.get_grant_renew_time_time() + def from_immutable_data(self, data): (self.owner_num, self.renew_secret, @@ -38,4 +45,3 @@ class LeaseInfo: self.renew_secret, self.cancel_secret, self.nodeid) = struct.unpack(">LL32s32s20s", data) return self - diff --git a/src/allmydata/storage/mutable.py b/src/allmydata/storage/mutable.py index de78a6559..6520acdee 100644 --- a/src/allmydata/storage/mutable.py +++ b/src/allmydata/storage/mutable.py @@ -230,23 +230,22 @@ class MutableShareFile: return i return None + def get_leases(self): + """Yields a LeaseInfo instance for all leases.""" + f = open(self.home, 'rb') + for i, lease in self._enumerate_leases(f): + yield lease + f.close() + def _enumerate_leases(self, f): - """Yields (leasenum, (ownerid, expiration_time, renew_secret, - cancel_secret, accepting_nodeid)) for all leases.""" for i in range(self._get_num_lease_slots(f)): try: data = self._read_lease_record(f, i) if data is not None: - yield (i,data) + yield i,data except IndexError: return - def debug_get_leases(self): - f = open(self.home, 'rb') - leases = list(self._enumerate_leases(f)) - f.close() - return leases - def add_lease(self, lease_info): precondition(lease_info.owner_num != 0) # 0 means "no lease here" f = open(self.home, 'rb+') diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 5f60ef180..e922854ad 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -15,6 +15,7 @@ from allmydata.storage.mutable import MutableShareFile, EmptyShare, \ create_mutable_sharefile from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader from allmydata.storage.crawler import BucketCountingCrawler +from allmydata.storage.expirer import LeaseCheckingCrawler # storage/ # storage/shares/incoming @@ -34,10 +35,12 @@ NUM_RE=re.compile("^[0-9]+$") class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer, IStatsProducer) name = 'storage' + LeaseCheckerClass = LeaseCheckingCrawler def __init__(self, storedir, nodeid, reserved_space=0, discard_storage=False, readonly_storage=False, - stats_provider=None): + stats_provider=None, + expire_leases=False, expiration_time=31*24*60*60): service.MultiService.__init__(self) assert isinstance(nodeid, str) assert len(nodeid) == 20 @@ -78,12 +81,22 @@ class StorageServer(service.MultiService, Referenceable): "cancel": [], } self.add_bucket_counter() + self.add_lease_checker(expire_leases, expiration_time) def add_bucket_counter(self): statefile = os.path.join(self.storedir, "bucket_counter.state") self.bucket_counter = BucketCountingCrawler(self, statefile) self.bucket_counter.setServiceParent(self) + def add_lease_checker(self, expire_leases, expiration_time): + statefile = os.path.join(self.storedir, "lease_checker.state") + historyfile = os.path.join(self.storedir, "lease_checker.history") + klass = self.LeaseCheckerClass + self.lease_checker = klass(self, statefile, historyfile, + expire_leases=expire_leases, + expiration_time=expiration_time) + self.lease_checker.setServiceParent(self) + def count(self, name, delta=1): if self.stats_provider: self.stats_provider.count("storage_server." + name, delta) @@ -146,11 +159,21 @@ class StorageServer(service.MultiService, Referenceable): writeable = False try: s = self.do_statvfs() - disk_total = s.f_bsize * s.f_blocks - disk_used = s.f_bsize * (s.f_blocks - s.f_bfree) + # on my mac laptop: + # statvfs(2) is a wrapper around statfs(2). + # statvfs.f_frsize = statfs.f_bsize : + # "minimum unit of allocation" (statvfs) + # "fundamental file system block size" (statfs) + # statvfs.f_bsize = statfs.f_iosize = stat.st_blocks : preferred IO size + # on an encrypted home directory ("FileVault"), it gets f_blocks + # wrong, and s.f_blocks*s.f_frsize is twice the size of my disk, + # but s.f_bavail*s.f_frsize is correct + + disk_total = s.f_frsize * s.f_blocks + disk_used = s.f_frsize * (s.f_blocks - s.f_bfree) # spacetime predictors should look at the slope of disk_used. - disk_free_for_root = s.f_bsize * s.f_bfree - disk_free_for_nonroot = s.f_bsize * s.f_bavail + disk_free_for_root = s.f_frsize * s.f_bfree + disk_free_for_nonroot = s.f_frsize * s.f_bavail # include our local policy here: if we stop accepting shares when # the available space drops below 1GB, then include that fact in @@ -182,7 +205,7 @@ class StorageServer(service.MultiService, Referenceable): def stat_disk(self, d): s = os.statvfs(d) # s.f_bavail: available to non-root users - disk_avail = s.f_bsize * s.f_bavail + disk_avail = s.f_frsize * s.f_bavail return disk_avail def get_available_space(self): @@ -397,8 +420,7 @@ class StorageServer(service.MultiService, Referenceable): def get_leases(self, storage_index): """Provide an iterator that yields all of the leases attached to this - bucket. Each lease is returned as a tuple of (owner_num, - renew_secret, cancel_secret, expiration_time). + bucket. Each lease is returned as a LeaseInfo instance. This method is not for client use. """ @@ -408,7 +430,7 @@ class StorageServer(service.MultiService, Referenceable): try: shnum, filename = self._get_bucket_shares(storage_index).next() sf = ShareFile(filename) - return sf.iter_leases() + return sf.get_leases() except StopIteration: return iter([]) diff --git a/src/allmydata/storage/shares.py b/src/allmydata/storage/shares.py new file mode 100644 index 000000000..c433cfbb6 --- /dev/null +++ b/src/allmydata/storage/shares.py @@ -0,0 +1,14 @@ +#! /usr/bin/python + +from mutable import MutableShareFile +from immutable import ShareFile + +def get_share_file(filename): + f = open(filename, "rb") + prefix = f.read(32) + f.close() + if prefix == MutableShareFile.MAGIC: + return MutableShareFile(filename) + # otherwise assume it's immutable + return ShareFile(filename) + diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 1d4b846e1..31d7071c5 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -1,5 +1,5 @@ -import time, os.path, stat, re +import time, os.path, stat, re, simplejson from twisted.trial import unittest @@ -9,16 +9,18 @@ from foolscap import eventual import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil, base32, pollmixin -from allmydata.storage.server import StorageServer, storage_index_to_dir +from allmydata.storage.server import StorageServer from allmydata.storage.mutable import MutableShareFile from allmydata.storage.immutable import BucketWriter, BucketReader -from allmydata.storage.common import DataTooLargeError +from allmydata.storage.common import DataTooLargeError, storage_index_to_dir from allmydata.storage.lease import LeaseInfo from allmydata.storage.crawler import BucketCountingCrawler +from allmydata.storage.expirer import LeaseCheckingCrawler from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ ReadBucketProxy from allmydata.interfaces import BadWriteEnablerError from allmydata.test.common import LoggingServiceParent +from allmydata.test.common_web import WebRenderingMixin from allmydata.web.storage import StorageStatus, remove_prefix class Marker: @@ -725,6 +727,7 @@ class MutableServer(unittest.TestCase): ss = self.create("test_container_size") self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0,1,2]), 100) + read = ss.remote_slot_readv rstaraw = ss.remote_slot_testv_and_readv_and_writev secrets = ( self.write_enabler("we1"), self.renew_secret("we1"), @@ -743,12 +746,22 @@ class MutableServer(unittest.TestCase): []) # it should be possible to make the container smaller, although at - # the moment this doesn't actually affect the share + # the moment this doesn't actually affect the share, unless the + # container size is dropped to zero, in which case the share is + # deleted. answer = rstaraw("si1", secrets, {0: ([], [(0,data)], len(data)+8)}, []) self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) ) + answer = rstaraw("si1", secrets, + {0: ([], [(0,data)], 0)}, + []) + self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) ) + + read_answer = read("si1", [0], [(0,10)]) + self.failUnlessEqual(read_answer, {}) + def test_allocate(self): ss = self.create("test_allocate") self.allocate(ss, "si1", "we1", self._lease_secret.next(), @@ -1023,9 +1036,8 @@ class MutableServer(unittest.TestCase): def compare_leases_without_timestamps(self, leases_a, leases_b): self.failUnlessEqual(len(leases_a), len(leases_b)) for i in range(len(leases_a)): - num_a, a = leases_a[i] - num_b, b = leases_b[i] - self.failUnlessEqual(num_a, num_b) + a = leases_a[i] + b = leases_b[i] self.failUnlessEqual(a.owner_num, b.owner_num) self.failUnlessEqual(a.renew_secret, b.renew_secret) self.failUnlessEqual(a.cancel_secret, b.cancel_secret) @@ -1034,9 +1046,8 @@ class MutableServer(unittest.TestCase): def compare_leases(self, leases_a, leases_b): self.failUnlessEqual(len(leases_a), len(leases_b)) for i in range(len(leases_a)): - num_a, a = leases_a[i] - num_b, b = leases_b[i] - self.failUnlessEqual(num_a, num_b) + a = leases_a[i] + b = leases_b[i] self.failUnlessEqual(a.owner_num, b.owner_num) self.failUnlessEqual(a.renew_secret, b.renew_secret) self.failUnlessEqual(a.cancel_secret, b.cancel_secret) @@ -1064,7 +1075,7 @@ class MutableServer(unittest.TestCase): f.close() s0 = MutableShareFile(os.path.join(bucket_dir, "0")) - self.failUnlessEqual(len(s0.debug_get_leases()), 1) + self.failUnlessEqual(len(list(s0.get_leases())), 1) # add-lease on a missing storage index is silently ignored self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None) @@ -1072,30 +1083,30 @@ class MutableServer(unittest.TestCase): # re-allocate the slots and use the same secrets, that should update # the lease write("si1", secrets(0), {0: ([], [(0,data)], None)}, []) - self.failUnlessEqual(len(s0.debug_get_leases()), 1) + self.failUnlessEqual(len(list(s0.get_leases())), 1) # renew it directly ss.remote_renew_lease("si1", secrets(0)[1]) - self.failUnlessEqual(len(s0.debug_get_leases()), 1) + self.failUnlessEqual(len(list(s0.get_leases())), 1) # now allocate them with a bunch of different secrets, to trigger the # extended lease code. Use add_lease for one of them. write("si1", secrets(1), {0: ([], [(0,data)], None)}, []) - self.failUnlessEqual(len(s0.debug_get_leases()), 2) + self.failUnlessEqual(len(list(s0.get_leases())), 2) secrets2 = secrets(2) ss.remote_add_lease("si1", secrets2[1], secrets2[2]) - self.failUnlessEqual(len(s0.debug_get_leases()), 3) + self.failUnlessEqual(len(list(s0.get_leases())), 3) write("si1", secrets(3), {0: ([], [(0,data)], None)}, []) write("si1", secrets(4), {0: ([], [(0,data)], None)}, []) write("si1", secrets(5), {0: ([], [(0,data)], None)}, []) - self.failUnlessEqual(len(s0.debug_get_leases()), 6) + self.failUnlessEqual(len(list(s0.get_leases())), 6) # cancel one of them ss.remote_cancel_lease("si1", secrets(5)[2]) - self.failUnlessEqual(len(s0.debug_get_leases()), 5) + self.failUnlessEqual(len(list(s0.get_leases())), 5) - all_leases = s0.debug_get_leases() + all_leases = list(s0.get_leases()) # and write enough data to expand the container, forcing the server # to move the leases write("si1", secrets(0), @@ -1103,20 +1114,18 @@ class MutableServer(unittest.TestCase): []) # read back the leases, make sure they're still intact. - self.compare_leases_without_timestamps(all_leases, - s0.debug_get_leases()) + self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) ss.remote_renew_lease("si1", secrets(0)[1]) ss.remote_renew_lease("si1", secrets(1)[1]) ss.remote_renew_lease("si1", secrets(2)[1]) ss.remote_renew_lease("si1", secrets(3)[1]) ss.remote_renew_lease("si1", secrets(4)[1]) - self.compare_leases_without_timestamps(all_leases, - s0.debug_get_leases()) + self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) # get a new copy of the leases, with the current timestamps. Reading # data and failing to renew/cancel leases should leave the timestamps # alone. - all_leases = s0.debug_get_leases() + all_leases = list(s0.get_leases()) # renewing with a bogus token should prompt an error message # examine the exception thus raised, make sure the old nodeid is @@ -1133,21 +1142,19 @@ class MutableServer(unittest.TestCase): self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si1", secrets(20)[2]) - self.compare_leases(all_leases, s0.debug_get_leases()) + self.compare_leases(all_leases, list(s0.get_leases())) # reading shares should not modify the timestamp read("si1", [], [(0,200)]) - self.compare_leases(all_leases, s0.debug_get_leases()) + self.compare_leases(all_leases, list(s0.get_leases())) write("si1", secrets(0), {0: ([], [(200, "make me bigger")], None)}, []) - self.compare_leases_without_timestamps(all_leases, - s0.debug_get_leases()) + self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) write("si1", secrets(0), {0: ([], [(500, "make me really bigger")], None)}, []) - self.compare_leases_without_timestamps(all_leases, - s0.debug_get_leases()) + self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) # now cancel them all ss.remote_cancel_lease("si1", secrets(0)[2]) @@ -1158,7 +1165,7 @@ class MutableServer(unittest.TestCase): # the slot should still be there remaining_shares = read("si1", [], [(0,10)]) self.failUnlessEqual(len(remaining_shares), 1) - self.failUnlessEqual(len(s0.debug_get_leases()), 1) + self.failUnlessEqual(len(list(s0.get_leases())), 1) # cancelling a non-existent lease should raise an IndexError self.failUnlessRaises(IndexError, @@ -1167,7 +1174,7 @@ class MutableServer(unittest.TestCase): # and the slot should still be there remaining_shares = read("si1", [], [(0,10)]) self.failUnlessEqual(len(remaining_shares), 1) - self.failUnlessEqual(len(s0.debug_get_leases()), 1) + self.failUnlessEqual(len(list(s0.get_leases())), 1) ss.remote_cancel_lease("si1", secrets(4)[2]) # now the slot should be gone @@ -1370,7 +1377,7 @@ class BucketCounter(unittest.TestCase, pollmixin.PollMixin): html = w.renderSynchronously() s = remove_tags(html) self.failUnless("Total buckets: 0 (the number of" in s, s) - self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds + self.failUnless("Next crawl in 59 minutes" in s, s) d.addCallback(_check2) return d @@ -1450,12 +1457,514 @@ class BucketCounter(unittest.TestCase, pollmixin.PollMixin): ss.setServiceParent(self.s) return d +class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler): + stop_after_first_bucket = False + def process_bucket(self, *args, **kwargs): + LeaseCheckingCrawler.process_bucket(self, *args, **kwargs) + if self.stop_after_first_bucket: + self.stop_after_first_bucket = False + self.cpu_slice = -1.0 + def yielding(self, sleep_time): + if not self.stop_after_first_bucket: + self.cpu_slice = 500 + +class BrokenStatResults: + pass +class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler): + def stat(self, fn): + s = os.stat(fn) + bsr = BrokenStatResults() + for attrname in dir(s): + if attrname.startswith("_"): + continue + if attrname == "st_blocks": + continue + setattr(bsr, attrname, getattr(s, attrname)) + return bsr + +class InstrumentedStorageServer(StorageServer): + LeaseCheckerClass = InstrumentedLeaseCheckingCrawler +class No_ST_BLOCKS_StorageServer(StorageServer): + LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler + +class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): + + def setUp(self): + self.s = service.MultiService() + self.s.startService() + def tearDown(self): + return self.s.stopService() + + def make_shares(self, ss): + def make(si): + return (si, hashutil.tagged_hash("renew", si), + hashutil.tagged_hash("cancel", si)) + def make_mutable(si): + return (si, hashutil.tagged_hash("renew", si), + hashutil.tagged_hash("cancel", si), + hashutil.tagged_hash("write-enabler", si)) + def make_extra_lease(si, num): + return (hashutil.tagged_hash("renew-%d" % num, si), + hashutil.tagged_hash("cancel-%d" % num, si)) + + immutable_si_0, rs0, cs0 = make("\x00" * 16) + immutable_si_1, rs1, cs1 = make("\x01" * 16) + rs1a, cs1a = make_extra_lease(immutable_si_1, 1) + mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16) + mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16) + rs3a, cs3a = make_extra_lease(mutable_si_3, 1) + sharenums = [0] + canary = FakeCanary() + # note: 'tahoe debug dump-share' will not handle this file, since the + # inner contents are not a valid CHK share + data = "\xff" * 1000 + + a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums, + 1000, canary) + w[0].remote_write(0, data) + w[0].remote_close() + + a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums, + 1000, canary) + w[0].remote_write(0, data) + w[0].remote_close() + ss.remote_add_lease(immutable_si_1, rs1a, cs1a) + + writev = ss.remote_slot_testv_and_readv_and_writev + writev(mutable_si_2, (we2, rs2, cs2), + {0: ([], [(0,data)], len(data))}, []) + writev(mutable_si_3, (we3, rs3, cs3), + {0: ([], [(0,data)], len(data))}, []) + ss.remote_add_lease(mutable_si_3, rs3a, cs3a) + + self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] + self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a] + self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a] + + def test_basic(self): + basedir = "storage/LeaseCrawler/basic" + fileutil.make_dirs(basedir) + ss = InstrumentedStorageServer(basedir, "\x00" * 20) + # make it start sooner than usual. + lc = ss.lease_checker + lc.slow_start = 0 + lc.cpu_slice = 500 + lc.stop_after_first_bucket = True + webstatus = StorageStatus(ss) + + # create a few shares, with some leases on them + self.make_shares(ss) + [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis + + # add a non-sharefile to exercise another code path + fn = os.path.join(ss.sharedir, + storage_index_to_dir(immutable_si_0), + "not-a-share") + f = open(fn, "wb") + f.write("I am not a share.\n") + f.close() + + # this is before the crawl has started, so we're not in a cycle yet + initial_state = lc.get_state() + self.failIf(lc.get_progress()["cycle-in-progress"]) + self.failIf("cycle-to-date" in initial_state) + self.failIf("estimated-remaining-cycle" in initial_state) + self.failIf("estimated-current-cycle" in initial_state) + self.failUnless("history" in initial_state) + self.failUnlessEqual(initial_state["history"], {}) + + ss.setServiceParent(self.s) + + d = eventual.fireEventually() + + # now examine the state right after the first bucket has been + # processed. + def _after_first_bucket(ignored): + initial_state = lc.get_state() + self.failUnless("cycle-to-date" in initial_state) + self.failUnless("estimated-remaining-cycle" in initial_state) + self.failUnless("estimated-current-cycle" in initial_state) + self.failUnless("history" in initial_state) + self.failUnlessEqual(initial_state["history"], {}) + + so_far = initial_state["cycle-to-date"] + self.failUnlessEqual(so_far["expiration-enabled"], False) + self.failUnless("configured-expiration-time" in so_far) + self.failUnless("lease-age-histogram" in so_far) + lah = so_far["lease-age-histogram"] + self.failUnlessEqual(type(lah), list) + self.failUnlessEqual(len(lah), 1) + self.failUnlessEqual(lah, [ (0.0, lc.age_limit/10.0, 1) ] ) + self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1}) + self.failUnlessEqual(so_far["buckets-examined"], 1) + self.failUnlessEqual(so_far["shares-examined"], 1) + sr1 = so_far["space-recovered"] + self.failUnlessEqual(sr1["actual-numshares"], 0) + self.failUnlessEqual(sr1["configured-leasetimer-diskbytes"], 0) + self.failUnlessEqual(sr1["original-leasetimer-sharebytes"], 0) + left = initial_state["estimated-remaining-cycle"] + self.failUnless(left["buckets-examined"] > 0, + left["buckets-examined"]) + self.failUnless(left["shares-examined"] > 0, + left["shares-examined"]) + sr2 = left["space-recovered"] + self.failIfEqual(sr2["actual-numshares"], None) + self.failIfEqual(sr2["configured-leasetimer-diskbytes"], None) + self.failIfEqual(sr2["original-leasetimer-sharebytes"], None) + d.addCallback(_after_first_bucket) + d.addCallback(lambda ign: self.render1(webstatus)) + def _check_html_in_cycle(html): + s = remove_tags(html) + self.failUnlessIn("So far, this cycle has examined " + "1 shares in 1 buckets " + "and has recovered: " + "0 buckets, 0 shares, 0 B ", s) + self.failUnlessIn("If expiration were enabled, " + "we would have recovered: " + "0 buckets, 0 shares, 0 B by now", s) + self.failUnlessIn("and the remainder of this cycle " + "would probably recover: " + "0 buckets, 0 shares, 0 B ", s) + self.failUnlessIn("and the whole cycle would probably recover: " + "0 buckets, 0 shares, 0 B ", s) + self.failUnlessIn("if we were using each lease's default " + "31-day lease lifetime", s) + self.failUnlessIn("this cycle would be expected to recover: ", s) + d.addCallback(_check_html_in_cycle) + + # wait for the crawler to finish the first cycle. Nothing should have + # been removed. + def _wait(): + return bool(lc.get_state()["last-cycle-finished"] is not None) + d.addCallback(lambda ign: self.poll(_wait)) + + def _after_first_cycle(ignored): + s = lc.get_state() + self.failIf("cycle-to-date" in s) + self.failIf("estimated-remaining-cycle" in s) + self.failIf("estimated-current-cycle" in s) + last = s["history"][0] + self.failUnless("cycle-start-finish-times" in last) + self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple) + self.failUnlessEqual(last["expiration-enabled"], False) + self.failUnless("configured-expiration-time" in last) + + self.failUnless("lease-age-histogram" in last) + lah = last["lease-age-histogram"] + self.failUnlessEqual(type(lah), list) + self.failUnlessEqual(len(lah), 1) + self.failUnlessEqual(lah, [ (0.0, lc.age_limit/10.0, 6) ] ) + + self.failUnlessEqual(last["leases-per-share-histogram"], + {1: 2, 2: 2}) + self.failUnlessEqual(last["buckets-examined"], 4) + self.failUnlessEqual(last["shares-examined"], 4) + + rec = last["space-recovered"] + self.failUnlessEqual(rec["actual-numbuckets"], 0) + self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 0) + self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 0) + self.failUnlessEqual(rec["actual-numshares"], 0) + self.failUnlessEqual(rec["original-leasetimer-numshares"], 0) + self.failUnlessEqual(rec["configured-leasetimer-numshares"], 0) + self.failUnlessEqual(rec["actual-diskbytes"], 0) + self.failUnlessEqual(rec["original-leasetimer-diskbytes"], 0) + self.failUnlessEqual(rec["configured-leasetimer-diskbytes"], 0) + self.failUnlessEqual(rec["actual-sharebytes"], 0) + self.failUnlessEqual(rec["original-leasetimer-sharebytes"], 0) + self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], 0) + + def _get_sharefile(si): + return list(ss._iter_share_files(si))[0] + def count_leases(si): + return len(list(_get_sharefile(si).get_leases())) + self.failUnlessEqual(count_leases(immutable_si_0), 1) + self.failUnlessEqual(count_leases(immutable_si_1), 2) + self.failUnlessEqual(count_leases(mutable_si_2), 1) + self.failUnlessEqual(count_leases(mutable_si_3), 2) + d.addCallback(_after_first_cycle) + d.addCallback(lambda ign: self.render1(webstatus)) + def _check_html(html): + s = remove_tags(html) + self.failUnlessIn("recovered: 0 buckets, 0 shares, 0 B " + "but expiration was not enabled", s) + d.addCallback(_check_html) + return d + + def backdate_lease(self, sf, renew_secret, new_expire_time): + # ShareFile.renew_lease ignores attempts to back-date a lease (i.e. + # "renew" a lease with a new_expire_time that is older than what the + # current lease has), so we have to reach inside it. + for i,lease in enumerate(sf.get_leases()): + if lease.renew_secret == renew_secret: + lease.expiration_time = new_expire_time + f = open(sf.home, 'rb+') + sf._write_lease_record(f, i, lease) + f.close() + return + raise IndexError("unable to renew non-existent lease") + + def test_expire(self): + basedir = "storage/LeaseCrawler/expire" + fileutil.make_dirs(basedir) + # setting expiration_time to 2000 means that any lease which is more + # than 2000s old will be expired. + ss = InstrumentedStorageServer(basedir, "\x00" * 20, + expire_leases=True, + expiration_time=2000) + # make it start sooner than usual. + lc = ss.lease_checker + lc.slow_start = 0 + lc.stop_after_first_bucket = True + webstatus = StorageStatus(ss) + + # create a few shares, with some leases on them + self.make_shares(ss) + [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis + + def count_shares(si): + return len(list(ss._iter_share_files(si))) + def _get_sharefile(si): + return list(ss._iter_share_files(si))[0] + def count_leases(si): + return len(list(_get_sharefile(si).get_leases())) + + self.failUnlessEqual(count_shares(immutable_si_0), 1) + self.failUnlessEqual(count_leases(immutable_si_0), 1) + self.failUnlessEqual(count_shares(immutable_si_1), 1) + self.failUnlessEqual(count_leases(immutable_si_1), 2) + self.failUnlessEqual(count_shares(mutable_si_2), 1) + self.failUnlessEqual(count_leases(mutable_si_2), 1) + self.failUnlessEqual(count_shares(mutable_si_3), 1) + self.failUnlessEqual(count_leases(mutable_si_3), 2) + + # artificially crank back the expiration time on the first lease of + # each share, to make it look like it expired already (age=1000s). + # Some shares have an extra lease which is set to expire at the + # default time in 31 days from now (age=31days). We then run the + # crawler, which will expire the first lease, making some shares get + # deleted and others stay alive (with one remaining lease) + now = time.time() + + sf0 = _get_sharefile(immutable_si_0) + self.backdate_lease(sf0, self.renew_secrets[0], now - 1000) + sf0_size = os.stat(sf0.home).st_size + + # immutable_si_1 gets an extra lease + sf1 = _get_sharefile(immutable_si_1) + self.backdate_lease(sf1, self.renew_secrets[1], now - 1000) + + sf2 = _get_sharefile(mutable_si_2) + self.backdate_lease(sf2, self.renew_secrets[3], now - 1000) + sf2_size = os.stat(sf2.home).st_size + + # mutable_si_3 gets an extra lease + sf3 = _get_sharefile(mutable_si_3) + self.backdate_lease(sf3, self.renew_secrets[4], now - 1000) + + ss.setServiceParent(self.s) + + d = eventual.fireEventually() + # examine the state right after the first bucket has been processed + def _after_first_bucket(ignored): + p = lc.get_progress() + self.failUnless(p["cycle-in-progress"]) + d.addCallback(_after_first_bucket) + d.addCallback(lambda ign: self.render1(webstatus)) + def _check_html_in_cycle(html): + s = remove_tags(html) + # the first bucket encountered gets deleted, and its prefix + # happens to be about 1/6th of the way through the ring, so the + # predictor thinks we'll have 6 shares and that we'll delete them + # all. This part of the test depends upon the SIs landing right + # where they do now. + self.failUnlessIn("The remainder of this cycle is expected to " + "recover: 5 buckets, 5 shares", s) + self.failUnlessIn("The whole cycle is expected to examine " + "6 shares in 6 buckets and to recover: " + "6 buckets, 6 shares", s) + d.addCallback(_check_html_in_cycle) + + # wait for the crawler to finish the first cycle. Two shares should + # have been removed + def _wait(): + return bool(lc.get_state()["last-cycle-finished"] is not None) + d.addCallback(lambda ign: self.poll(_wait)) + + def _after_first_cycle(ignored): + self.failUnlessEqual(count_shares(immutable_si_0), 0) + self.failUnlessEqual(count_shares(immutable_si_1), 1) + self.failUnlessEqual(count_leases(immutable_si_1), 1) + self.failUnlessEqual(count_shares(mutable_si_2), 0) + self.failUnlessEqual(count_shares(mutable_si_3), 1) + self.failUnlessEqual(count_leases(mutable_si_3), 1) + + s = lc.get_state() + last = s["history"][0] + + self.failUnlessEqual(last["expiration-enabled"], True) + self.failUnlessEqual(last["configured-expiration-time"], 2000) + self.failUnlessEqual(last["buckets-examined"], 4) + self.failUnlessEqual(last["shares-examined"], 4) + self.failUnlessEqual(last["leases-per-share-histogram"], + {1: 2, 2: 2}) + + rec = last["space-recovered"] + self.failUnlessEqual(rec["actual-numbuckets"], 2) + self.failUnlessEqual(rec["original-leasetimer-numbuckets"], 2) + self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 2) + self.failUnlessEqual(rec["actual-numshares"], 2) + self.failUnlessEqual(rec["original-leasetimer-numshares"], 2) + self.failUnlessEqual(rec["configured-leasetimer-numshares"], 2) + size = sf0_size + sf2_size + self.failUnlessEqual(rec["actual-sharebytes"], size) + self.failUnlessEqual(rec["original-leasetimer-sharebytes"], size) + self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], size) + self.failUnless(rec["actual-diskbytes"] >= size, + rec["actual-diskbytes"]) + self.failUnless(rec["original-leasetimer-diskbytes"] >= size, + rec["original-leasetimer-diskbytes"]) + self.failUnless(rec["configured-leasetimer-diskbytes"] >= size, + rec["configured-leasetimer-diskbytes"]) + d.addCallback(_after_first_cycle) + d.addCallback(lambda ign: self.render1(webstatus)) + def _check_html(html): + s = remove_tags(html) + self.failUnlessIn("Expiration Enabled: expired leases will be removed", s) + self.failUnlessIn(" recovered: 2 buckets, 2 shares, ", s) + d.addCallback(_check_html) + return d + + def test_limited_history(self): + basedir = "storage/LeaseCrawler/limited_history" + fileutil.make_dirs(basedir) + ss = StorageServer(basedir, "\x00" * 20) + # make it start sooner than usual. + lc = ss.lease_checker + lc.slow_start = 0 + lc.cpu_slice = 500 + + # create a few shares, with some leases on them + self.make_shares(ss) + + ss.setServiceParent(self.s) + + def _wait_until_15_cycles_done(): + last = lc.state["last-cycle-finished"] + if last is not None and last >= 15: + return True + if lc.timer: + lc.timer.reset(0) + return False + d = self.poll(_wait_until_15_cycles_done) + + def _check(ignored): + s = lc.get_state() + h = s["history"] + self.failUnlessEqual(len(h), 10) + self.failUnlessEqual(max(h.keys()), 15) + self.failUnlessEqual(min(h.keys()), 6) + d.addCallback(_check) + return d + + def test_unpredictable_future(self): + basedir = "storage/LeaseCrawler/unpredictable_future" + fileutil.make_dirs(basedir) + ss = StorageServer(basedir, "\x00" * 20) + # make it start sooner than usual. + lc = ss.lease_checker + lc.slow_start = 0 + lc.cpu_slice = -1.0 # stop quickly + + self.make_shares(ss) + + ss.setServiceParent(self.s) + + d = eventual.fireEventually() + def _check(ignored): + # this should fire after the first bucket is complete, but before + # the first prefix is complete, so the progress-measurer won't + # think we've gotten far enough to raise our percent-complete + # above 0%, triggering the cannot-predict-the-future code in + # expirer.py . This will have to change if/when the + # progress-measurer gets smart enough to count buckets (we'll + # have to interrupt it even earlier, before it's finished the + # first bucket). + s = lc.get_state() + self.failUnless("cycle-to-date" in s) + self.failUnless("estimated-remaining-cycle" in s) + self.failUnless("estimated-current-cycle" in s) + + left = s["estimated-remaining-cycle"]["space-recovered"] + self.failUnlessEqual(left["actual-numbuckets"], None) + self.failUnlessEqual(left["original-leasetimer-numbuckets"], None) + self.failUnlessEqual(left["configured-leasetimer-numbuckets"], None) + self.failUnlessEqual(left["actual-numshares"], None) + self.failUnlessEqual(left["original-leasetimer-numshares"], None) + self.failUnlessEqual(left["configured-leasetimer-numshares"], None) + self.failUnlessEqual(left["actual-diskbytes"], None) + self.failUnlessEqual(left["original-leasetimer-diskbytes"], None) + self.failUnlessEqual(left["configured-leasetimer-diskbytes"], None) + self.failUnlessEqual(left["actual-sharebytes"], None) + self.failUnlessEqual(left["original-leasetimer-sharebytes"], None) + self.failUnlessEqual(left["configured-leasetimer-sharebytes"], None) + + full = s["estimated-remaining-cycle"]["space-recovered"] + self.failUnlessEqual(full["actual-numbuckets"], None) + self.failUnlessEqual(full["original-leasetimer-numbuckets"], None) + self.failUnlessEqual(full["configured-leasetimer-numbuckets"], None) + self.failUnlessEqual(full["actual-numshares"], None) + self.failUnlessEqual(full["original-leasetimer-numshares"], None) + self.failUnlessEqual(full["configured-leasetimer-numshares"], None) + self.failUnlessEqual(full["actual-diskbytes"], None) + self.failUnlessEqual(full["original-leasetimer-diskbytes"], None) + self.failUnlessEqual(full["configured-leasetimer-diskbytes"], None) + self.failUnlessEqual(full["actual-sharebytes"], None) + self.failUnlessEqual(full["original-leasetimer-sharebytes"], None) + self.failUnlessEqual(full["configured-leasetimer-sharebytes"], None) + + d.addCallback(_check) + return d + + def test_no_st_blocks(self): + basedir = "storage/LeaseCrawler/no_st_blocks" + fileutil.make_dirs(basedir) + ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20, + expiration_time=-1000) + # a negative expiration_time= means the "configured-leasetimer-" + # space-recovered counts will be non-zero, since all shares will have + # expired by then + + # make it start sooner than usual. + lc = ss.lease_checker + lc.slow_start = 0 + + self.make_shares(ss) + ss.setServiceParent(self.s) + def _wait(): + return bool(lc.get_state()["last-cycle-finished"] is not None) + d = self.poll(_wait) + + def _check(ignored): + s = lc.get_state() + last = s["history"][0] + rec = last["space-recovered"] + self.failUnlessEqual(rec["configured-leasetimer-numbuckets"], 4) + self.failUnlessEqual(rec["configured-leasetimer-numshares"], 4) + self.failUnless(rec["configured-leasetimer-sharebytes"] > 0, + rec["configured-leasetimer-sharebytes"]) + # without the .st_blocks field in os.stat() results, we should be + # reporting diskbytes==sharebytes + self.failUnlessEqual(rec["configured-leasetimer-sharebytes"], + rec["configured-leasetimer-diskbytes"]) + d.addCallback(_check) + return d class NoStatvfsServer(StorageServer): def do_statvfs(self): raise AttributeError -class WebStatus(unittest.TestCase, pollmixin.PollMixin): +class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin): def setUp(self): self.s = service.MultiService() @@ -1474,11 +1983,27 @@ class WebStatus(unittest.TestCase, pollmixin.PollMixin): ss = StorageServer(basedir, "\x00" * 20) ss.setServiceParent(self.s) w = StorageStatus(ss) - html = w.renderSynchronously() - self.failUnless("
[1]: Some of this space may be reserved for the superuser.
[2]: This reports the space available to non-root users, including the Tahoe node.
+See this page in JSON