catalog-shares command: tolerate errors, log them to stderr, handle v2-immutable shares

This commit is contained in:
Brian Warner 2008-10-29 15:10:10 -07:00
parent 186b64b633
commit a9101112a4
2 changed files with 62 additions and 9 deletions

View File

@ -2,7 +2,8 @@
# do not import any allmydata modules at this level. Do that from inside # do not import any allmydata modules at this level. Do that from inside
# individual functions instead. # individual functions instead.
import struct, time, os import struct, time, os
from twisted.python import usage from twisted.python import usage, failure
from twisted.internet import defer
class DumpOptions(usage.Options): class DumpOptions(usage.Options):
def getSynopsis(self): def getSynopsis(self):
@ -515,6 +516,14 @@ useful for purpose.
""" """
return t return t
def call(c, *args, **kwargs):
# take advantage of the fact that ImmediateReadBucketProxy returns
# Deferreds that are already fired
results = []
d = defer.maybeDeferred(c, *args, **kwargs)
d.addCallback(results.append)
return results[0]
def describe_share(abs_sharefile, si_s, shnum_s, now, out): def describe_share(abs_sharefile, si_s, shnum_s, now, out):
from allmydata import uri, storage from allmydata import uri, storage
from allmydata.mutable.layout import unpack_share from allmydata.mutable.layout import unpack_share
@ -571,19 +580,27 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
elif struct.unpack(">L", prefix[:4]) == (1,): elif struct.unpack(">L", prefix[:4]) == (1,):
# immutable # immutable
sf = storage.ShareFile(abs_sharefile)
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
return defer.succeed(sf.read_share_data(offset, size))
# use a ReadBucketProxy to parse the bucket and find the uri extension # use a ReadBucketProxy to parse the bucket and find the uri extension
bp = ReadBucketProxy(None) sf = storage.ShareFile(abs_sharefile)
offsets = bp._parse_offsets(sf.read_share_data(0, 0x24)) bp = ImmediateReadBucketProxy(sf)
seek = offsets['uri_extension'] bp.start()
length = struct.unpack(">L", sf.read_share_data(seek, 4))[0]
seek += 4
UEB_data = sf.read_share_data(seek, length)
expiration_time = min( [lease.expiration_time expiration_time = min( [lease.expiration_time
for lease in sf.iter_leases()] ) for lease in sf.iter_leases()] )
expiration = max(0, expiration_time - now) expiration = max(0, expiration_time - now)
UEB_data = call(bp.get_uri_extension)
unpacked = uri.unpack_extension_readable(UEB_data) unpacked = uri.unpack_extension_readable(UEB_data)
k = unpacked["needed_shares"] k = unpacked["needed_shares"]
N = unpacked["total_shares"] N = unpacked["total_shares"]
filesize = unpacked["size"] filesize = unpacked["size"]
@ -601,6 +618,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
def catalog_shares(options): def catalog_shares(options):
out = options.stdout out = options.stdout
err = options.stderr
now = time.time() now = time.time()
for d in options.nodedirs: for d in options.nodedirs:
d = os.path.join(os.path.expanduser(d), "storage/shares") d = os.path.join(os.path.expanduser(d), "storage/shares")
@ -620,7 +638,12 @@ def catalog_shares(options):
abs_sharefile = os.path.join(si_dir, shnum_s) abs_sharefile = os.path.join(si_dir, shnum_s)
abs_sharefile = os.path.abspath(abs_sharefile) abs_sharefile = os.path.abspath(abs_sharefile)
assert os.path.isfile(abs_sharefile) assert os.path.isfile(abs_sharefile)
describe_share(abs_sharefile, si_s, shnum_s, now, out) try:
describe_share(abs_sharefile, si_s, shnum_s, now,
out)
except:
print >>err, "Error processing %s" % abs_sharefile
failure.Failure().printTraceback(err)
return 0 return 0
class CorruptShareOptions(usage.Options): class CorruptShareOptions(usage.Options):

View File

@ -217,6 +217,36 @@ class CLI(unittest.TestCase):
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output) self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output) self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
def _catalog_shares(self, *basedirs):
o = debug.CatalogSharesOptions()
o.stdout,o.stderr = StringIO(), StringIO()
args = list(basedirs)
o.parseOptions(args)
debug.catalog_shares(o)
out = o.stdout.getvalue()
err = o.stderr.getvalue()
return out, err
def test_catalog_shares_error(self):
nodedir1 = "cli/test_catalog_shares/node1"
sharedir = os.path.join(nodedir1, "storage", "shares", "mq", "mqfblse6m5a6dh45isu2cg7oji")
fileutil.make_dirs(sharedir)
f = open(os.path.join(sharedir, "8"), "wb")
# write a bogus share that looks a little bit like CHK
f.write("\x00\x00\x00\x01" + "\xff" * 200) # this triggers an assert
f.close()
nodedir2 = "cli/test_catalog_shares/node2"
fileutil.make_dirs(nodedir2)
# now make sure that the 'catalog-shares' commands survives the error
out, err = self._catalog_shares(nodedir1, nodedir2)
self.failUnlessEqual(out, "", out)
self.failUnless("Error processing " in err, err)
self.failUnless(nodedir1 in err, err)
self.flushLoggedErrors(AssertionError)
class CLITestMixin: class CLITestMixin:
def do_cli(self, verb, *args, **kwargs): def do_cli(self, verb, *args, **kwargs):
nodeargs = [ nodeargs = [