webapi #590: add streaming deep-check. Still need a CLI tool to use it.

This commit is contained in:
Brian Warner 2009-02-16 23:35:53 -07:00
parent 8579e25059
commit 476a5c8fac
4 changed files with 346 additions and 50 deletions

View File

@ -885,6 +885,45 @@ POST $URL?t=start-deep-check (must add &ophandle=XYZ)
stats: a dictionary with the same keys as the t=start-deep-stats command
(described below)
POST $URL?t=stream-deep-check
This initiates a recursive walk of all files and directories reachable from
the target, performing a check on each one just like t=check. For each
unique object (duplicates are skipped), a single line of JSON is emitted to
the HTTP response channel. When the walk is complete, a final line of JSON
is emitted which contains the accumulated file-size/count "deep-stats" data.
This command takes the same arguments as t=start-deep-check.
A CLI tool can split the response stream on newlines into "response units",
and parse each response unit as JSON. Each such parsed unit will be a
dictionary, and will contain at least the "type" key: a string, one of
"file", "directory", or "stats".
For all units that have a type of "file" or "directory", the dictionary will
contain the following keys:
"path": a list of strings, with the path that is traversed to reach the
object
"cap": a writecap for the file or directory, if available, else a readcap
"verifycap": a verifycap for the file or directory
"repaircap": the weakest cap which can still be used to repair the object
"storage-index": a base32 storage index for the object
"check-results": a copy of the dictionary which would be returned by
t=check&output=json, with three top-level keys:
"storage-index", "summary", and "results", and a variety
of counts and sharemaps in the "results" value.
Note that non-distributed files (i.e. LIT files) will have values of None
for verifycap, repaircap, and storage-index, since these files can neither
be verified nor repaired, and are not stored on the storage servers.
Likewise the check-results dictionary will be limited: an empty string for
storage-index, and a results dictionary with only the "healthy" key.
The last unit in the stream will have a type of "stats", and will contain
the keys described in the "start-deep-stats" operation, below.
POST $URL?t=check&repair=true
This performs a health check of the given file or directory, and if the

View File

@ -2793,3 +2793,150 @@ class Grid(GridTestMixin, WebErrorMixin, unittest.TestCase):
d.addErrback(self.explain_web_error)
return d
def test_deep_check(self):
self.basedir = "web/Grid/deep_check"
self.set_up_grid()
c0 = self.g.clients[0]
self.uris = {}
self.fileurls = {}
DATA = "data" * 100
d = c0.create_empty_dirnode()
def _stash_root_and_create_file(n):
self.rootnode = n
self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/"
return n.add_file(u"good", upload.Data(DATA, convergence=""))
d.addCallback(_stash_root_and_create_file)
def _stash_uri(fn, which):
self.uris[which] = fn.get_uri()
d.addCallback(_stash_uri, "good")
d.addCallback(lambda ign:
self.rootnode.add_file(u"small",
upload.Data("literal",
convergence="")))
d.addCallback(_stash_uri, "small")
d.addCallback(self.CHECK, "root", "t=stream-deep-check")
def _done(res):
units = [simplejson.loads(line)
for line in res.splitlines()
if line]
self.failUnlessEqual(len(units), 3+1)
# should be parent-first
u0 = units[0]
self.failUnlessEqual(u0["path"], [])
self.failUnlessEqual(u0["type"], "directory")
self.failUnlessEqual(u0["cap"], self.rootnode.get_uri())
u0cr = u0["check-results"]
self.failUnlessEqual(u0cr["results"]["count-shares-good"], 10)
ugood = [u for u in units
if u["type"] == "file" and u["path"] == [u"good"]][0]
self.failUnlessEqual(ugood["cap"], self.uris["good"])
ugoodcr = ugood["check-results"]
self.failUnlessEqual(ugoodcr["results"]["count-shares-good"], 10)
stats = units[-1]
self.failUnlessEqual(stats["type"], "stats")
s = stats["stats"]
self.failUnlessEqual(s["count-immutable-files"], 1)
self.failUnlessEqual(s["count-literal-files"], 1)
self.failUnlessEqual(s["count-directories"], 1)
d.addCallback(_done)
d.addErrback(self.explain_web_error)
return d
def test_deep_check_and_repair(self):
self.basedir = "web/Grid/deep_check_and_repair"
self.set_up_grid()
c0 = self.g.clients[0]
self.uris = {}
self.fileurls = {}
DATA = "data" * 100
d = c0.create_empty_dirnode()
def _stash_root_and_create_file(n):
self.rootnode = n
self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/"
return n.add_file(u"good", upload.Data(DATA, convergence=""))
d.addCallback(_stash_root_and_create_file)
def _stash_uri(fn, which):
self.uris[which] = fn.get_uri()
d.addCallback(_stash_uri, "good")
d.addCallback(lambda ign:
self.rootnode.add_file(u"small",
upload.Data("literal",
convergence="")))
d.addCallback(_stash_uri, "small")
d.addCallback(lambda ign:
self.rootnode.add_file(u"sick",
upload.Data(DATA+"1",
convergence="")))
d.addCallback(_stash_uri, "sick")
#d.addCallback(lambda ign:
# self.rootnode.add_file(u"dead",
# upload.Data(DATA+"2",
# convergence="")))
#d.addCallback(_stash_uri, "dead")
#d.addCallback(lambda ign: c0.create_mutable_file("mutable"))
#d.addCallback(lambda fn: self.rootnode.set_node(u"corrupt", fn))
#d.addCallback(_stash_uri, "corrupt")
def _clobber_shares(ignored):
good_shares = self.find_shares(self.uris["good"])
self.failUnlessEqual(len(good_shares), 10)
sick_shares = self.find_shares(self.uris["sick"])
os.unlink(sick_shares[0][2])
#dead_shares = self.find_shares(self.uris["dead"])
#for i in range(1, 10):
# os.unlink(dead_shares[i][2])
#c_shares = self.find_shares(self.uris["corrupt"])
#cso = CorruptShareOptions()
#cso.stdout = StringIO()
#cso.parseOptions([c_shares[0][2]])
#corrupt_share(cso)
d.addCallback(_clobber_shares)
d.addCallback(self.CHECK, "root", "t=stream-deep-check&repair=true")
def _done(res):
units = [simplejson.loads(line)
for line in res.splitlines()
if line]
self.failUnlessEqual(len(units), 4+1)
# should be parent-first
u0 = units[0]
self.failUnlessEqual(u0["path"], [])
self.failUnlessEqual(u0["type"], "directory")
self.failUnlessEqual(u0["cap"], self.rootnode.get_uri())
u0crr = u0["check-and-repair-results"]
self.failUnlessEqual(u0crr["repair-attempted"], False)
self.failUnlessEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10)
ugood = [u for u in units
if u["type"] == "file" and u["path"] == [u"good"]][0]
self.failUnlessEqual(ugood["cap"], self.uris["good"])
ugoodcrr = ugood["check-and-repair-results"]
self.failUnlessEqual(u0crr["repair-attempted"], False)
self.failUnlessEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10)
usick = [u for u in units
if u["type"] == "file" and u["path"] == [u"sick"]][0]
self.failUnlessEqual(usick["cap"], self.uris["sick"])
usickcrr = usick["check-and-repair-results"]
self.failUnlessEqual(usickcrr["repair-attempted"], True)
self.failUnlessEqual(usickcrr["repair-successful"], True)
self.failUnlessEqual(usickcrr["pre-repair-results"]["results"]["count-shares-good"], 9)
self.failUnlessEqual(usickcrr["post-repair-results"]["results"]["count-shares-good"], 10)
stats = units[-1]
self.failUnlessEqual(stats["type"], "stats")
s = stats["stats"]
self.failUnlessEqual(s["count-immutable-files"], 2)
self.failUnlessEqual(s["count-literal-files"], 1)
self.failUnlessEqual(s["count-directories"], 1)
d.addCallback(_done)
d.addErrback(self.explain_web_error)
return d

View File

@ -9,6 +9,64 @@ from allmydata.web.operations import ReloadMixin
from allmydata.interfaces import ICheckAndRepairResults, ICheckResults
from allmydata.util import base32, idlib
def json_check_counts(d):
r = {}
r["count-shares-good"] = d["count-shares-good"]
r["count-shares-needed"] = d["count-shares-needed"]
r["count-shares-expected"] = d["count-shares-expected"]
r["count-good-share-hosts"] = d["count-good-share-hosts"]
r["count-corrupt-shares"] = d["count-corrupt-shares"]
r["list-corrupt-shares"] = [ (idlib.nodeid_b2a(serverid),
base32.b2a(si), shnum)
for (serverid, si, shnum)
in d["list-corrupt-shares"] ]
r["servers-responding"] = [idlib.nodeid_b2a(serverid)
for serverid in d["servers-responding"]]
sharemap = {}
for (shareid, serverids) in d["sharemap"].items():
sharemap[shareid] = [idlib.nodeid_b2a(serverid)
for serverid in serverids]
r["sharemap"] = sharemap
r["count-wrong-shares"] = d["count-wrong-shares"]
r["count-recoverable-versions"] = d["count-recoverable-versions"]
r["count-unrecoverable-versions"] = d["count-unrecoverable-versions"]
return r
def json_check_results(r):
if r is None:
# LIT file
data = {"storage-index": "",
"results": {"healthy": True},
}
return data
data = {}
data["storage-index"] = r.get_storage_index_string()
data["summary"] = r.get_summary()
data["results"] = json_check_counts(r.get_data())
data["results"]["needs-rebalancing"] = r.needs_rebalancing()
data["results"]["healthy"] = r.is_healthy()
data["results"]["recoverable"] = r.is_recoverable()
return data
def json_check_and_repair_results(r):
if r is None:
# LIT file
data = {"storage-index": "",
"repair-attempted": False,
}
return data
data = {}
data["storage-index"] = r.get_storage_index_string()
data["repair-attempted"] = r.get_repair_attempted()
data["repair-successful"] = r.get_repair_successful()
pre = r.get_pre_repair_results()
data["pre-repair-results"] = json_check_results(pre)
post = r.get_post_repair_results()
data["post-repair-results"] = json_check_results(post)
return data
class ResultsBase:
def _join_pathstring(self, path):
if path:
@ -94,52 +152,6 @@ class ResultsBase:
return T.ul[r]
def _json_check_and_repair_results(self, r):
data = {}
data["storage-index"] = r.get_storage_index_string()
data["repair-attempted"] = r.get_repair_attempted()
data["repair-successful"] = r.get_repair_successful()
pre = r.get_pre_repair_results()
data["pre-repair-results"] = self._json_check_results(pre)
post = r.get_post_repair_results()
data["post-repair-results"] = self._json_check_results(post)
return data
def _json_check_results(self, r):
data = {}
data["storage-index"] = r.get_storage_index_string()
data["summary"] = r.get_summary()
data["results"] = self._json_check_counts(r.get_data())
data["results"]["needs-rebalancing"] = r.needs_rebalancing()
data["results"]["healthy"] = r.is_healthy()
data["results"]["recoverable"] = r.is_recoverable()
return data
def _json_check_counts(self, d):
r = {}
r["count-shares-good"] = d["count-shares-good"]
r["count-shares-needed"] = d["count-shares-needed"]
r["count-shares-expected"] = d["count-shares-expected"]
r["count-good-share-hosts"] = d["count-good-share-hosts"]
r["count-corrupt-shares"] = d["count-corrupt-shares"]
r["list-corrupt-shares"] = [ (idlib.nodeid_b2a(serverid),
base32.b2a(si), shnum)
for (serverid, si, shnum)
in d["list-corrupt-shares"] ]
r["servers-responding"] = [idlib.nodeid_b2a(serverid)
for serverid in d["servers-responding"]]
sharemap = {}
for (shareid, serverids) in d["sharemap"].items():
sharemap[shareid] = [idlib.nodeid_b2a(serverid)
for serverid in serverids]
r["sharemap"] = sharemap
r["count-wrong-shares"] = d["count-wrong-shares"]
r["count-recoverable-versions"] = d["count-recoverable-versions"]
r["count-unrecoverable-versions"] = d["count-unrecoverable-versions"]
return r
def _html(self, s):
if isinstance(s, (str, unicode)):
return html.escape(s)
@ -210,7 +222,7 @@ class CheckResults(CheckerBase, rend.Page, ResultsBase):
def json(self, ctx):
inevow.IRequest(ctx).setHeader("content-type", "text/plain")
data = self._json_check_results(self.r)
data = json_check_results(self.r)
return simplejson.dumps(data, indent=1) + "\n"
def render_summary(self, ctx, data):
@ -249,7 +261,7 @@ class CheckAndRepairResults(CheckerBase, rend.Page, ResultsBase):
def json(self, ctx):
inevow.IRequest(ctx).setHeader("content-type", "text/plain")
data = self._json_check_and_repair_results(self.r)
data = json_check_and_repair_results(self.r)
return simplejson.dumps(data, indent=1) + "\n"
def render_summary(self, ctx, data):
@ -324,7 +336,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin):
shnum)
for (serverid, storage_index, shnum)
in res.get_corrupt_shares() ]
data["list-unhealthy-files"] = [ (path_t, self._json_check_results(r))
data["list-unhealthy-files"] = [ (path_t, json_check_results(r))
for (path_t, r)
in res.get_all_results().items()
if not r.is_healthy() ]
@ -496,7 +508,7 @@ class DeepCheckAndRepairResults(rend.Page, ResultsBase, ReloadMixin):
data["list-remaining-corrupt-shares"] = remaining_corrupt
unhealthy = [ (path_t,
self._json_check_results(crr.get_pre_repair_results()))
json_check_results(crr.get_pre_repair_results()))
for (path_t, crr)
in res.get_all_results().items()
if not crr.get_pre_repair_results().is_healthy() ]

View File

@ -30,6 +30,8 @@ from allmydata.web.check_results import CheckResults, \
CheckAndRepairResults, DeepCheckResults, DeepCheckAndRepairResults
from allmydata.web.info import MoreInfo
from allmydata.web.operations import ReloadMixin
from allmydata.web.check_results import json_check_results, \
json_check_and_repair_results
class BlockingFileError(Exception):
# TODO: catch and transform
@ -189,6 +191,8 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
d = self._POST_check(req)
elif t == "start-deep-check":
d = self._POST_start_deep_check(ctx)
elif t == "stream-deep-check":
d = self._POST_stream_deep_check(ctx)
elif t == "start-manifest":
d = self._POST_start_manifest(ctx)
elif t == "start-deep-size":
@ -378,6 +382,25 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
renderer = DeepCheckResults(monitor)
return self._start_operation(monitor, renderer, ctx)
def _POST_stream_deep_check(self, ctx):
verify = boolean_of_arg(get_arg(ctx, "verify", "false"))
repair = boolean_of_arg(get_arg(ctx, "repair", "false"))
walker = DeepCheckStreamer(ctx, self.node, verify, repair)
monitor = self.node.deep_traverse(walker)
walker.setMonitor(monitor)
# register to hear stopProducing. The walker ignores pauseProducing.
IRequest(ctx).registerProducer(walker, True)
d = monitor.when_done()
def _done(res):
IRequest(ctx).unregisterProducer()
return res
d.addBoth(_done)
def _cancelled(f):
f.trap(OperationCancelledError)
return "Operation Cancelled"
d.addErrback(_cancelled)
return d
def _POST_start_manifest(self, ctx):
if not get_arg(ctx, "ophandle"):
raise NeedOperationHandleError("slow operation requires ophandle=")
@ -903,3 +926,78 @@ class ManifestStreamer(dirnode.DeepStats):
assert "\n" not in j
self.req.write(j+"\n")
return ""
class DeepCheckStreamer(dirnode.DeepStats):
implements(IPushProducer)
def __init__(self, ctx, origin, verify, repair):
dirnode.DeepStats.__init__(self, origin)
self.req = IRequest(ctx)
self.verify = verify
self.repair = repair
def setMonitor(self, monitor):
self.monitor = monitor
def pauseProducing(self):
pass
def resumeProducing(self):
pass
def stopProducing(self):
self.monitor.cancel()
def add_node(self, node, path):
dirnode.DeepStats.add_node(self, node, path)
data = {"path": path,
"cap": node.get_uri()}
if IDirectoryNode.providedBy(node):
data["type"] = "directory"
else:
data["type"] = "file"
v = node.get_verify_cap()
if v:
v = v.to_string()
data["verifycap"] = v
r = node.get_repair_cap()
if r:
r = r.to_string()
data["repaircap"] = r
si = node.get_storage_index()
if si:
si = base32.b2a(si)
data["storage-index"] = si
if self.repair:
d = node.check_and_repair(self.monitor, self.verify)
d.addCallback(self.add_check_and_repair, data)
else:
d = node.check(self.monitor, self.verify)
d.addCallback(self.add_check, data)
d.addCallback(self.write_line)
return d
def add_check_and_repair(self, crr, data):
data["check-and-repair-results"] = json_check_and_repair_results(crr)
return data
def add_check(self, cr, data):
data["check-results"] = json_check_results(cr)
return data
def write_line(self, data):
j = simplejson.dumps(data, ensure_ascii=True)
assert "\n" not in j
self.req.write(j+"\n")
def finish(self):
stats = dirnode.DeepStats.get_results(self)
d = {"type": "stats",
"stats": stats,
}
j = simplejson.dumps(d, ensure_ascii=True)
assert "\n" not in j
self.req.write(j+"\n")
return ""