expirer: tolerate corrupt shares, add them to the state and history for future examination

This commit is contained in:
Brian Warner 2009-03-08 20:08:40 -07:00
parent 1a98521c3d
commit 6d6049430c
2 changed files with 80 additions and 4 deletions

View File

@ -1,6 +1,9 @@
import time, os, pickle
import time, os, pickle, struct
from crawler import ShareCrawler
from shares import get_share_file
from common import UnknownMutableContainerVersionError, \
UnknownImmutableContainerVersionError
from twisted.python import log as twlog
class LeaseCheckingCrawler(ShareCrawler):
"""I examine the leases on all shares, determining which are still valid
@ -70,6 +73,7 @@ class LeaseCheckingCrawler(ShareCrawler):
recovered = self.create_empty_recovered_dict()
so_far = {"buckets-examined": 0,
"shares-examined": 0,
"corrupt-shares": [],
"space-recovered": recovered,
"lease-age-histogram": {}, # (minage,maxage)->count
"leases-per-share-histogram": {}, # leasecount->numshares
@ -99,10 +103,20 @@ class LeaseCheckingCrawler(ShareCrawler):
for fn in os.listdir(bucketdir):
try:
shnum = int(fn)
wks = self.process_share(os.path.join(bucketdir, fn))
would_keep_shares.append(wks)
except ValueError:
pass # non-numeric means not a sharefile
continue # non-numeric means not a sharefile
sharefile = os.path.join(bucketdir, fn)
try:
wks = self.process_share(sharefile)
except (UnknownMutableContainerVersionError,
UnknownImmutableContainerVersionError,
struct.error):
twlog.msg("lease-checker error processing %s" % sharefile)
twlog.err()
which = (storage_index_b32, shnum)
self.state["cycle-to-date"]["corrupt-shares"].append(which)
wks = (1, 1, 1)
would_keep_shares.append(wks)
recovered = self.state["cycle-to-date"]["space-recovered"]
if sum([wks[0] for wks in would_keep_shares]) == 0:
self.increment(recovered,
@ -233,6 +247,7 @@ class LeaseCheckingCrawler(ShareCrawler):
h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy()
h["buckets-examined"] = s["buckets-examined"]
h["shares-examined"] = s["shares-examined"]
h["corrupt-shares"] = s["corrupt-shares"][:]
# note: if ["shares-recovered"] ever acquires an internal dict, this
# copy() needs to become a deepcopy
h["space-recovered"] = s["space-recovered"].copy()
@ -261,6 +276,7 @@ class LeaseCheckingCrawler(ShareCrawler):
configured-expiration-time
lease-age-histogram (list of (minage,maxage,sharecount) tuples)
leases-per-share-histogram
corrupt-shares (list of (si_b32,shnum) tuples, minimal verification)
buckets-examined
shares-examined
space-recovered
@ -285,6 +301,7 @@ class LeaseCheckingCrawler(ShareCrawler):
configured-expiration-time
lease-age-histogram
leases-per-share-histogram
corrupt-shares
buckets-examined
shares-examined
space-recovered

View File

@ -1695,6 +1695,7 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
{1: 2, 2: 2})
self.failUnlessEqual(last["buckets-examined"], 4)
self.failUnlessEqual(last["shares-examined"], 4)
self.failUnlessEqual(last["corrupt-shares"], [])
rec = last["space-recovered"]
self.failUnlessEqual(rec["actual-numbuckets"], 0)
@ -1998,6 +1999,64 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
d.addCallback(_check)
return d
def test_bad_share(self):
basedir = "storage/LeaseCrawler/bad_share"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20)
w = StorageStatus(ss)
# make it start sooner than usual.
lc = ss.lease_checker
lc.slow_start = 0
lc.cpu_slice = 500
# create a few shares, with some leases on them
self.make_shares(ss)
# now corrupt one, and make sure the lease-checker keeps going
[immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
first_mutable = min(mutable_si_2, mutable_si_3)
fn = os.path.join(ss.sharedir, storage_index_to_dir(first_mutable), "0")
f = open(fn, "rb+")
f.seek(0)
f.write("BAD MAGIC")
f.close()
# get_share_file() doesn't see the correct mutable magic, so it
# assumes the file is an immutable share, and then
# immutable.ShareFile sees a bad version. So this actually triggers
# UnknownImmutableContainerVersionError.
ss.setServiceParent(self.s)
def _wait():
return bool(lc.get_state()["last-cycle-finished"] is not None)
d = self.poll(_wait)
def _after_first_cycle(ignored):
s = lc.get_state()
last = s["history"][0]
self.failUnlessEqual(last["buckets-examined"], 4)
self.failUnlessEqual(last["shares-examined"], 3)
self.failUnlessEqual(last["corrupt-shares"],
[(base32.b2a(first_mutable), 0)])
self.flushLoggedErrors(UnknownMutableContainerVersionError,
UnknownImmutableContainerVersionError)
d.addCallback(_after_first_cycle)
d.addCallback(lambda ign: self.render_json(w))
def _check_json(json):
data = simplejson.loads(json)
# grr. json turns all dict keys into strings.
last = data["lease-checker"]["history"]["0"]
corrupt_shares = last["corrupt-shares"]
# it also turns all tuples into lists
self.failUnlessEqual(corrupt_shares,
[[base32.b2a(first_mutable), 0]])
d.addCallback(_check_json)
return d
def render_json(self, page):
d = self.render1(page, args={"t": ["json"]})
return d
class NoStatvfsServer(StorageServer):
def do_statvfs(self):
raise AttributeError