From a9101112a41340d0fc4378e6fd3817a254bdb48c Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Wed, 29 Oct 2008 15:10:10 -0700 Subject: [PATCH] catalog-shares command: tolerate errors, log them to stderr, handle v2-immutable shares --- src/allmydata/scripts/debug.py | 41 ++++++++++++++++++++++++++-------- src/allmydata/test/test_cli.py | 30 +++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index eac5ea1d5..29e3e43fe 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -2,7 +2,8 @@ # do not import any allmydata modules at this level. Do that from inside # individual functions instead. import struct, time, os -from twisted.python import usage +from twisted.python import usage, failure +from twisted.internet import defer class DumpOptions(usage.Options): def getSynopsis(self): @@ -515,6 +516,14 @@ useful for purpose. """ return t +def call(c, *args, **kwargs): + # take advantage of the fact that ImmediateReadBucketProxy returns + # Deferreds that are already fired + results = [] + d = defer.maybeDeferred(c, *args, **kwargs) + d.addCallback(results.append) + return results[0] + def describe_share(abs_sharefile, si_s, shnum_s, now, out): from allmydata import uri, storage from allmydata.mutable.layout import unpack_share @@ -571,19 +580,27 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): elif struct.unpack(">L", prefix[:4]) == (1,): # immutable - sf = storage.ShareFile(abs_sharefile) + + class ImmediateReadBucketProxy(ReadBucketProxy): + def __init__(self, sf): + self.sf = sf + def __repr__(self): + return "" + def _read(self, offset, size): + return defer.succeed(sf.read_share_data(offset, size)) + # use a ReadBucketProxy to parse the bucket and find the uri extension - bp = ReadBucketProxy(None) - offsets = bp._parse_offsets(sf.read_share_data(0, 0x24)) - seek = offsets['uri_extension'] - length = struct.unpack(">L", sf.read_share_data(seek, 4))[0] - seek += 4 - UEB_data = sf.read_share_data(seek, length) + sf = storage.ShareFile(abs_sharefile) + bp = ImmediateReadBucketProxy(sf) + bp.start() + expiration_time = min( [lease.expiration_time for lease in sf.iter_leases()] ) expiration = max(0, expiration_time - now) + UEB_data = call(bp.get_uri_extension) unpacked = uri.unpack_extension_readable(UEB_data) + k = unpacked["needed_shares"] N = unpacked["total_shares"] filesize = unpacked["size"] @@ -601,6 +618,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): def catalog_shares(options): out = options.stdout + err = options.stderr now = time.time() for d in options.nodedirs: d = os.path.join(os.path.expanduser(d), "storage/shares") @@ -620,7 +638,12 @@ def catalog_shares(options): abs_sharefile = os.path.join(si_dir, shnum_s) abs_sharefile = os.path.abspath(abs_sharefile) assert os.path.isfile(abs_sharefile) - describe_share(abs_sharefile, si_s, shnum_s, now, out) + try: + describe_share(abs_sharefile, si_s, shnum_s, now, + out) + except: + print >>err, "Error processing %s" % abs_sharefile + failure.Failure().printTraceback(err) return 0 class CorruptShareOptions(usage.Options): diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py index 3f65079d8..6f72f2505 100644 --- a/src/allmydata/test/test_cli.py +++ b/src/allmydata/test/test_cli.py @@ -217,6 +217,36 @@ class CLI(unittest.TestCase): self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output) self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output) + def _catalog_shares(self, *basedirs): + o = debug.CatalogSharesOptions() + o.stdout,o.stderr = StringIO(), StringIO() + args = list(basedirs) + o.parseOptions(args) + debug.catalog_shares(o) + out = o.stdout.getvalue() + err = o.stderr.getvalue() + return out, err + + def test_catalog_shares_error(self): + nodedir1 = "cli/test_catalog_shares/node1" + sharedir = os.path.join(nodedir1, "storage", "shares", "mq", "mqfblse6m5a6dh45isu2cg7oji") + fileutil.make_dirs(sharedir) + f = open(os.path.join(sharedir, "8"), "wb") + # write a bogus share that looks a little bit like CHK + f.write("\x00\x00\x00\x01" + "\xff" * 200) # this triggers an assert + f.close() + + nodedir2 = "cli/test_catalog_shares/node2" + fileutil.make_dirs(nodedir2) + + # now make sure that the 'catalog-shares' commands survives the error + out, err = self._catalog_shares(nodedir1, nodedir2) + self.failUnlessEqual(out, "", out) + self.failUnless("Error processing " in err, err) + self.failUnless(nodedir1 in err, err) + self.flushLoggedErrors(AssertionError) + + class CLITestMixin: def do_cli(self, verb, *args, **kwargs): nodeargs = [