CLI #590: convert 'tahoe deep-check' to streaming form, improve display, add tests

This commit is contained in:
Brian Warner 2009-02-17 17:15:11 -07:00
parent 0e78b2587c
commit fde2289e7b
5 changed files with 468 additions and 41 deletions

View File

@ -1006,6 +1006,24 @@ POST $URL?t=start-deep-check&repair=true (must add &ophandle=XYZ)
stats: a dictionary with the same keys as the t=start-deep-stats command
(described below)
POST $URL?t=stream-deep-check&repair=true
This triggers a recursive walk of all files and directories, performing a
t=check&repair=true on each one. For each unique object (duplicates are
skipped), a single line of JSON is emitted to the HTTP response channel.
When the walk is complete, a final line of JSON is emitted which contains
the accumulated file-size/count "deep-stats" data.
This emits the same data as t=stream-deep-check (without the repair=true),
except that the "check-results" field is replaced with a
"check-and-repair-results" field, which contains the keys returned by
t=check&repair=true&output=json (i.e. repair-attempted, repair-successful,
pre-repair-results, and post-repair-results). The output does not contain
the summary dictionary that is provied by t=start-deep-check&repair=true
(the one with count-objects-checked and list-unhealthy-files), since the
receiving client is expected to calculate those values itself from the
stream of per-object check-and-repair-results.
POST $DIRURL?t=start-manifest (must add &ophandle=XYZ)
This operation generates a "manfest" of the given directory tree, mostly

View File

@ -228,8 +228,10 @@ class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
prr.data['servers-responding'] = list(servers_responding)
prr.data['count-shares-good'] = len(sm)
prr.data['count-good-share-hosts'] = len(sm)
is_healthy = len(sm) >= self.u.total_shares
is_healthy = bool(len(sm) >= self.u.total_shares)
is_recoverable = bool(len(sm) >= self.u.needed_shares)
prr.set_healthy(is_healthy)
prr.set_recoverable(is_recoverable)
crr.repair_successful = is_healthy
prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)

View File

@ -244,9 +244,9 @@ class StatsOptions(VDriveOptions):
class CheckOptions(VDriveOptions):
optFlags = [
("raw", "r", "Display raw JSON data instead of parsed"),
("verify", "v", "Verify all hashes, instead of merely querying share presence"),
("repair", "r", "Automatically repair any problems found"),
("raw", None, "Display raw JSON data instead of parsed"),
("verify", None, "Verify all hashes, instead of merely querying share presence"),
("repair", None, "Automatically repair any problems found"),
]
def parseArgs(self, where=''):
self.where = where
@ -258,9 +258,10 @@ class CheckOptions(VDriveOptions):
class DeepCheckOptions(VDriveOptions):
optFlags = [
("raw", "r", "Display raw JSON data instead of parsed"),
("verify", "v", "Verify all hashes, instead of merely querying share presence"),
("repair", "r", "Automatically repair any problems found"),
("raw", None, "Display raw JSON data instead of parsed"),
("verify", None, "Verify all hashes, instead of merely querying share presence"),
("repair", None, "Automatically repair any problems found"),
("verbose", "v", "Be noisy about what is happening."),
]
def parseArgs(self, where=''):
self.where = where

View File

@ -1,10 +1,9 @@
from pprint import pprint
import urllib
import simplejson
from twisted.protocols.basic import LineOnlyReceiver
from allmydata.scripts.common import get_alias, DEFAULT_ALIAS, escape_path
from allmydata.scripts.common_http import do_http
from allmydata.scripts.slow_operation import SlowOperationRunner
class Checker:
pass
@ -42,44 +41,230 @@ def check(options):
if options["repair"]:
# show repair status
pprint(data, stream=stdout)
if data["pre-repair-results"]["results"]["healthy"]:
summary = "healthy"
else:
summary = "not healthy"
stdout.write("Summary: %s\n" % summary)
cr = data["pre-repair-results"]["results"]
stdout.write(" storage index: %s\n" % data["storage-index"])
stdout.write(" good-shares: %d (encoding is %d-of-%d)\n"
% (cr["count-shares-good"],
cr["count-shares-needed"],
cr["count-shares-expected"]))
stdout.write(" wrong-shares: %d\n" % cr["count-wrong-shares"])
corrupt = cr["list-corrupt-shares"]
if corrupt:
stdout.write(" corrupt shares:\n")
for (serverid, storage_index, sharenum) in corrupt:
stdout.write(" server %s, SI %s, shnum %d\n" %
(serverid, storage_index, sharenum))
if data["repair-attempted"]:
if data["repair-successful"]:
stdout.write(" repair successful\n")
else:
stdout.write(" repair failed\n")
else:
# make this prettier
pprint(data, stream=stdout)
stdout.write("Summary: %s\n" % data["summary"])
cr = data["results"]
stdout.write(" storage index: %s\n" % data["storage-index"])
stdout.write(" good-shares: %d (encoding is %d-of-%d)\n"
% (cr["count-shares-good"],
cr["count-shares-needed"],
cr["count-shares-expected"]))
stdout.write(" wrong-shares: %d\n" % cr["count-wrong-shares"])
corrupt = cr["list-corrupt-shares"]
if corrupt:
stdout.write(" corrupt shares:\n")
for (serverid, storage_index, sharenum) in corrupt:
stdout.write(" server %s, SI %s, shnum %d\n" %
(serverid, storage_index, sharenum))
return 0
class DeepChecker(SlowOperationRunner):
class FakeTransport:
disconnecting = False
def make_url(self, base, ophandle):
url = base + "?t=start-deep-check&ophandle=" + ophandle
if self.options["verify"]:
url += "&verify=true"
if self.options["repair"]:
url += "&repair=true"
return url
class DeepCheckOutput(LineOnlyReceiver):
delimiter = "\n"
def __init__(self, options):
self.transport = FakeTransport()
def write_results(self, data):
out = self.options.stdout
err = self.options.stderr
if self.options["repair"]:
# todo: make this prettier
pprint(data, stream=out)
self.verbose = bool(options["verbose"])
self.stdout = options.stdout
self.num_objects = 0
self.files_healthy = 0
self.files_unhealthy = 0
def lineReceived(self, line):
d = simplejson.loads(line)
stdout = self.stdout
if d["type"] not in ("file", "directory"):
return
self.num_objects += 1
# non-verbose means print a progress marker every 100 files
if self.num_objects % 100 == 0:
print >>stdout, "%d objects checked.." % self.num_objects
cr = d["check-results"]
if cr["results"]["healthy"]:
self.files_healthy += 1
else:
print >>out, "Objects Checked: %d" % data["count-objects-checked"]
print >>out, "Objects Healthy: %d" % data["count-objects-healthy"]
print >>out, "Objects Unhealthy: %d" % data["count-objects-unhealthy"]
print >>out
if data["list-unhealthy-files"]:
print "Unhealthy Files:"
for (pathname, cr) in data["list-unhealthy-files"]:
if pathname:
path_s = "/".join(pathname)
else:
path_s = "<root>"
print >>out, path_s, ":", cr["summary"]
self.files_unhealthy += 1
if self.verbose:
# verbose means also print one line per file
path = d["path"]
if not path:
path = ["<root>"]
summary = cr.get("summary", "Healthy (LIT)")
try:
print >>stdout, "%s: %s" % ("/".join(path), summary)
except UnicodeEncodeError:
print >>stdout, "%s: %s" % ("/".join([p.encode("utf-8")
for p in path]),
summary)
# always print out corrupt shares
for shareloc in cr["results"].get("list-corrupt-shares", []):
(serverid, storage_index, sharenum) = shareloc
print >>stdout, " corrupt: server %s, SI %s, shnum %d" % \
(serverid, storage_index, sharenum)
def done(self):
stdout = self.stdout
print >>stdout, "done: %d objects checked, %d healthy, %d unhealthy" \
% (self.num_objects, self.files_healthy, self.files_unhealthy)
class DeepCheckAndRepairOutput(LineOnlyReceiver):
delimiter = "\n"
def __init__(self, options):
self.transport = FakeTransport()
self.verbose = bool(options["verbose"])
self.stdout = options.stdout
self.num_objects = 0
self.pre_repair_files_healthy = 0
self.pre_repair_files_unhealthy = 0
self.repairs_attempted = 0
self.repairs_successful = 0
self.post_repair_files_healthy = 0
self.post_repair_files_unhealthy = 0
def lineReceived(self, line):
d = simplejson.loads(line)
stdout = self.stdout
if d["type"] not in ("file", "directory"):
return
self.num_objects += 1
# non-verbose means print a progress marker every 100 files
if self.num_objects % 100 == 0:
print >>stdout, "%d objects checked.." % self.num_objects
crr = d["check-and-repair-results"]
if d["storage-index"]:
if crr["pre-repair-results"]["results"]["healthy"]:
was_healthy = True
self.pre_repair_files_healthy += 1
else:
was_healthy = False
self.pre_repair_files_unhealthy += 1
if crr["post-repair-results"]["results"]["healthy"]:
self.post_repair_files_healthy += 1
else:
self.post_repair_files_unhealthy += 1
else:
# LIT file
was_healthy = True
self.pre_repair_files_healthy += 1
self.post_repair_files_healthy += 1
if crr["repair-attempted"]:
self.repairs_attempted += 1
if crr["repair-successful"]:
self.repairs_successful += 1
if self.verbose:
# verbose means also print one line per file
path = d["path"]
if not path:
path = ["<root>"]
# we don't seem to have a summary available, so build one
if was_healthy:
summary = "healthy"
else:
summary = "not healthy"
try:
print >>stdout, "%s: %s" % ("/".join(path), summary)
except UnicodeEncodeError:
print >>stdout, "%s: %s" % ("/".join([p.encode("utf-8")
for p in path]),
summary)
# always print out corrupt shares
prr = crr.get("pre-repair-results", {})
for shareloc in prr.get("results", {}).get("list-corrupt-shares", []):
(serverid, storage_index, sharenum) = shareloc
print >>stdout, " corrupt: server %s, SI %s, shnum %d" % \
(serverid, storage_index, sharenum)
# always print out repairs
if crr["repair-attempted"]:
if crr["repair-successful"]:
print >>stdout, " repair successful"
else:
print >>stdout, " repair failed"
def done(self):
stdout = self.stdout
print >>stdout, "done: %d objects checked" % self.num_objects
print >>stdout, " pre-repair: %d healthy, %d unhealthy" \
% (self.pre_repair_files_healthy,
self.pre_repair_files_unhealthy)
print >>stdout, " %d repairs attempted, %d successful, %d failed" \
% (self.repairs_attempted,
self.repairs_successful,
(self.repairs_attempted - self.repairs_successful))
print >>stdout, " post-repair: %d healthy, %d unhealthy" \
% (self.post_repair_files_healthy,
self.post_repair_files_unhealthy)
class DeepCheckStreamer(LineOnlyReceiver):
def run(self, options):
stdout = options.stdout
stderr = options.stderr
self.options = options
nodeurl = options['node-url']
if not nodeurl.endswith("/"):
nodeurl += "/"
self.nodeurl = nodeurl
where = options.where
rootcap, path = get_alias(options.aliases, where, DEFAULT_ALIAS)
if path == '/':
path = ''
url = nodeurl + "uri/%s" % urllib.quote(rootcap)
if path:
url += "/" + escape_path(path)
# todo: should it end with a slash?
url += "?t=stream-deep-check"
if options["verify"]:
url += "&verify=true"
if options["repair"]:
url += "&repair=true"
output = DeepCheckAndRepairOutput(options)
else:
output = DeepCheckOutput(options)
resp = do_http("POST", url)
if resp.status not in (200, 302):
print >>stderr, "ERROR", resp.status, resp.reason, resp.read()
return 1
# use Twisted to split this into lines
while True:
chunk = resp.read(100)
if not chunk:
break
if self.options["raw"]:
stdout.write(chunk)
else:
output.dataReceived(chunk)
if not self.options["raw"]:
output.done()
return 0
def deepcheck(options):
return DeepChecker().run(options)
return DeepCheckStreamer().run(options)

View File

@ -5,9 +5,11 @@ from twisted.trial import unittest
from cStringIO import StringIO
import urllib
import re
import simplejson
from allmydata.util import fileutil, hashutil
from allmydata.util import fileutil, hashutil, base32
from allmydata import uri
from allmydata.immutable import upload
# Test that the scripts can be imported -- although the actual tests of their functionality are
# done by invoking them in a subprocess.
@ -928,3 +930,222 @@ class Backup(GridTestMixin, CLITestMixin, StallMixin, unittest.TestCase):
# dirnodes being created (RSA key generation). The backup between check4
# and check4a takes 6s, as does the backup before check4b.
test_backup.timeout = 300
class Check(GridTestMixin, CLITestMixin, unittest.TestCase):
def test_check(self):
self.basedir = "cli/Check/check"
self.set_up_grid()
c0 = self.g.clients[0]
DATA = "data" * 100
d = c0.create_mutable_file(DATA)
def _stash_uri(n):
self.uri = n.get_uri()
d.addCallback(_stash_uri)
d.addCallback(lambda ign: self.do_cli("check", self.uri))
def _check1((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
self.failUnless("Summary: Healthy" in lines, out)
self.failUnless(" good-shares: 10 (encoding is 3-of-10)" in lines, out)
d.addCallback(_check1)
d.addCallback(lambda ign: self.do_cli("check", "--raw", self.uri))
def _check2((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
data = simplejson.loads(out)
self.failUnlessEqual(data["summary"], "Healthy")
d.addCallback(_check2)
def _clobber_shares(ignored):
# delete one, corrupt a second
shares = self.find_shares(self.uri)
self.failUnlessEqual(len(shares), 10)
os.unlink(shares[0][2])
cso = debug.CorruptShareOptions()
cso.stdout = StringIO()
cso.parseOptions([shares[1][2]])
storage_index = uri.from_string(self.uri).get_storage_index()
self._corrupt_share_line = " server %s, SI %s, shnum %d" % \
(base32.b2a(shares[1][1]),
base32.b2a(storage_index),
shares[1][0])
debug.corrupt_share(cso)
d.addCallback(_clobber_shares)
d.addCallback(lambda ign: self.do_cli("check", "--verify", self.uri))
def _check3((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
summary = [l for l in lines if l.startswith("Summary")][0]
self.failUnless("Summary: Unhealthy: 8 shares (enc 3-of-10)"
in summary, summary)
self.failUnless(" good-shares: 8 (encoding is 3-of-10)" in lines, out)
self.failUnless(" corrupt shares:" in lines, out)
self.failUnless(self._corrupt_share_line in lines, out)
d.addCallback(_check3)
d.addCallback(lambda ign:
self.do_cli("check", "--verify", "--repair", self.uri))
def _check4((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
self.failUnless("Summary: not healthy" in lines, out)
self.failUnless(" good-shares: 8 (encoding is 3-of-10)" in lines, out)
self.failUnless(" corrupt shares:" in lines, out)
self.failUnless(self._corrupt_share_line in lines, out)
self.failUnless(" repair successful" in lines, out)
d.addCallback(_check4)
d.addCallback(lambda ign:
self.do_cli("check", "--verify", "--repair", self.uri))
def _check5((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
self.failUnless("Summary: healthy" in lines, out)
self.failUnless(" good-shares: 10 (encoding is 3-of-10)" in lines, out)
self.failIf(" corrupt shares:" in lines, out)
d.addCallback(_check5)
return d
def test_deep_check(self):
self.basedir = "cli/Check/deep_check"
self.set_up_grid()
c0 = self.g.clients[0]
self.uris = {}
self.fileurls = {}
DATA = "data" * 100
d = c0.create_empty_dirnode()
def _stash_root_and_create_file(n):
self.rootnode = n
self.rooturi = n.get_uri()
return n.add_file(u"good", upload.Data(DATA, convergence=""))
d.addCallback(_stash_root_and_create_file)
def _stash_uri(fn, which):
self.uris[which] = fn.get_uri()
d.addCallback(_stash_uri, "good")
d.addCallback(lambda ign:
self.rootnode.add_file(u"small",
upload.Data("literal",
convergence="")))
d.addCallback(_stash_uri, "small")
d.addCallback(lambda ign: c0.create_mutable_file(DATA+"1"))
d.addCallback(lambda fn: self.rootnode.set_node(u"mutable", fn))
d.addCallback(_stash_uri, "mutable")
d.addCallback(lambda ign: self.do_cli("deep-check", self.rooturi))
def _check1((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
self.failUnless("done: 4 objects checked, 4 healthy, 0 unhealthy"
in lines, out)
d.addCallback(_check1)
d.addCallback(lambda ign: self.do_cli("deep-check", "--verbose",
self.rooturi))
def _check2((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
self.failUnless("<root>: Healthy" in lines, out)
self.failUnless("small: Healthy (LIT)" in lines, out)
self.failUnless("good: Healthy" in lines, out)
self.failUnless("mutable: Healthy" in lines, out)
self.failUnless("done: 4 objects checked, 4 healthy, 0 unhealthy"
in lines, out)
d.addCallback(_check2)
def _clobber_shares(ignored):
shares = self.find_shares(self.uris["good"])
self.failUnlessEqual(len(shares), 10)
os.unlink(shares[0][2])
shares = self.find_shares(self.uris["mutable"])
cso = debug.CorruptShareOptions()
cso.stdout = StringIO()
cso.parseOptions([shares[1][2]])
storage_index = uri.from_string(self.uris["mutable"]).get_storage_index()
self._corrupt_share_line = " corrupt: server %s, SI %s, shnum %d" % \
(base32.b2a(shares[1][1]),
base32.b2a(storage_index),
shares[1][0])
debug.corrupt_share(cso)
d.addCallback(_clobber_shares)
d.addCallback(lambda ign:
self.do_cli("deep-check", "--verbose", self.rooturi))
def _check3((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
self.failUnless("<root>: Healthy" in lines, out)
self.failUnless("small: Healthy (LIT)" in lines, out)
self.failUnless("mutable: Healthy" in lines, out) # needs verifier
self.failUnless("good: Not Healthy: 9 shares (enc 3-of-10)"
in lines, out)
self.failIf(self._corrupt_share_line in lines, out)
self.failUnless("done: 4 objects checked, 3 healthy, 1 unhealthy"
in lines, out)
d.addCallback(_check3)
d.addCallback(lambda ign:
self.do_cli("deep-check", "--verbose", "--verify",
self.rooturi))
def _check4((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
self.failUnless("<root>: Healthy" in lines, out)
self.failUnless("small: Healthy (LIT)" in lines, out)
mutable = [l for l in lines if l.startswith("mutable")][0]
self.failUnless(mutable.startswith("mutable: Unhealthy: 9 shares (enc 3-of-10)"),
mutable)
self.failUnless(self._corrupt_share_line in lines, out)
self.failUnless("good: Not Healthy: 9 shares (enc 3-of-10)"
in lines, out)
self.failUnless("done: 4 objects checked, 2 healthy, 2 unhealthy"
in lines, out)
d.addCallback(_check4)
d.addCallback(lambda ign:
self.do_cli("deep-check", "--raw",
self.rooturi))
def _check5((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
units = [simplejson.loads(line) for line in lines]
# root, small, good, mutable, stats
self.failUnlessEqual(len(units), 4+1)
d.addCallback(_check5)
d.addCallback(lambda ign:
self.do_cli("deep-check",
"--verbose", "--verify", "--repair",
self.rooturi))
def _check6((rc, out, err)):
self.failUnlessEqual(err, "")
self.failUnlessEqual(rc, 0)
lines = out.splitlines()
self.failUnless("<root>: healthy" in lines, out)
self.failUnless("small: healthy" in lines, out)
self.failUnless("mutable: not healthy" in lines, out)
self.failUnless(self._corrupt_share_line in lines, out)
self.failUnless("good: not healthy" in lines, out)
self.failUnless("done: 4 objects checked" in lines, out)
self.failUnless(" pre-repair: 2 healthy, 2 unhealthy" in lines, out)
self.failUnless(" 2 repairs attempted, 2 successful, 0 failed"
in lines, out)
self.failUnless(" post-repair: 4 healthy, 0 unhealthy" in lines,out)
d.addCallback(_check6)
return d