mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-23 14:52:26 +00:00
dirnode: refactor recursive-traversal methods, add stats to deep_check() method results and t=deep-check webapi
This commit is contained in:
parent
f6eeb3161f
commit
4bb88fd2ee
@ -765,6 +765,8 @@ POST $URL?t=deep-check
|
||||
invoked. The 'check-results' field is the same as
|
||||
that returned by t=check&output=JSON, described
|
||||
above.
|
||||
stats: a dictionary with the same keys as the t=deep-stats command
|
||||
(described below)
|
||||
|
||||
POST $URL?t=check&repair=true
|
||||
|
||||
@ -840,6 +842,8 @@ POST $URL?t=deep-check&repair=true
|
||||
invoked. The 'check-results' field is the same as
|
||||
that returned by t=check&repair=true&output=JSON,
|
||||
described above.
|
||||
stats: a dictionary with the same keys as the t=deep-stats command
|
||||
(described below)
|
||||
|
||||
GET $DIRURL?t=manifest
|
||||
|
||||
|
@ -91,6 +91,10 @@ class DeepResultsBase:
|
||||
self.objects_unhealthy = 0
|
||||
self.corrupt_shares = []
|
||||
self.all_results = {}
|
||||
self.stats = {}
|
||||
|
||||
def update_stats(self, new_stats):
|
||||
self.stats.update(new_stats)
|
||||
|
||||
def get_root_storage_index_string(self):
|
||||
return self.root_storage_index_s
|
||||
@ -101,6 +105,9 @@ class DeepResultsBase:
|
||||
def get_all_results(self):
|
||||
return self.all_results
|
||||
|
||||
def get_stats(self):
|
||||
return self.stats
|
||||
|
||||
|
||||
class DeepCheckResults(DeepResultsBase):
|
||||
implements(IDeepCheckResults)
|
||||
|
@ -7,7 +7,7 @@ import simplejson
|
||||
from allmydata.mutable.common import NotMutableError
|
||||
from allmydata.mutable.node import MutableFileNode
|
||||
from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
|
||||
IURI, IFileNode, IMutableFileURI, IVerifierURI, IFilesystemNode, \
|
||||
IURI, IFileNode, IMutableFileURI, IFilesystemNode, \
|
||||
ExistingChildError, ICheckable
|
||||
from allmydata.checker_results import DeepCheckResults, \
|
||||
DeepCheckAndRepairResults
|
||||
@ -446,161 +446,92 @@ class NewDirectoryNode:
|
||||
d.addCallback(lambda child: self.delete(current_child_name))
|
||||
return d
|
||||
|
||||
def build_manifest(self):
|
||||
"""Return a frozenset of verifier-capability strings for all nodes
|
||||
(directories and files) reachable from this one."""
|
||||
|
||||
def deep_traverse(self, walker):
|
||||
"""Perform a recursive walk, using this dirnode as a root, notifying
|
||||
the 'walker' instance of everything I encounter.
|
||||
|
||||
I call walker.enter_directory(parent, children) once for each dirnode
|
||||
I visit, immediately after retrieving the list of children. I pass in
|
||||
the parent dirnode and the dict of childname->(childnode,metadata).
|
||||
This function should *not* traverse the children: I will do that.
|
||||
enter_directory() is most useful for the deep-stats number that
|
||||
counts how large a directory is.
|
||||
|
||||
I call walker.add_node(node, path) for each node (both files and
|
||||
directories) I can reach. Most work should be done here.
|
||||
|
||||
I avoid loops by keeping track of verifier-caps and refusing to call
|
||||
each() or traverse a node that I've seen before.
|
||||
|
||||
I return a Deferred that will fire with the value of walker.finish().
|
||||
"""
|
||||
|
||||
# this is just a tree-walker, except that following each edge
|
||||
# requires a Deferred. We use a ConcurrencyLimiter to make sure the
|
||||
# fan-out doesn't cause problems.
|
||||
|
||||
manifest = set()
|
||||
manifest.add(self.get_verifier())
|
||||
limiter = ConcurrencyLimiter(10) # allow 10 in parallel
|
||||
|
||||
d = self._build_manifest_from_node(self, manifest, limiter)
|
||||
def _done(res):
|
||||
# LIT nodes have no verifier-capability: their data is stored
|
||||
# inside the URI itself, so there is no need to refresh anything.
|
||||
# They indicate this by returning None from their get_verifier
|
||||
# method. We need to remove any such Nones from our set. We also
|
||||
# want to convert all these caps into strings.
|
||||
return frozenset([IVerifierURI(cap).to_string()
|
||||
for cap in manifest
|
||||
if cap is not None])
|
||||
d.addCallback(_done)
|
||||
found = set([self.get_verifier()])
|
||||
limiter = ConcurrencyLimiter(10)
|
||||
d = self._deep_traverse_dirnode(self, [], walker, found, limiter)
|
||||
d.addCallback(lambda ignored: walker.finish())
|
||||
return d
|
||||
|
||||
def _build_manifest_from_node(self, node, manifest, limiter):
|
||||
d = limiter.add(node.list)
|
||||
def _got_list(res):
|
||||
dl = []
|
||||
for name, (child, metadata) in res.iteritems():
|
||||
verifier = child.get_verifier()
|
||||
if verifier not in manifest:
|
||||
manifest.add(verifier)
|
||||
if IDirectoryNode.providedBy(child):
|
||||
dl.append(self._build_manifest_from_node(child,
|
||||
manifest,
|
||||
limiter))
|
||||
if dl:
|
||||
return defer.DeferredList(dl)
|
||||
d.addCallback(_got_list)
|
||||
def _deep_traverse_dirnode(self, node, path, walker, found, limiter):
|
||||
# process this directory, then walk its children
|
||||
d = limiter.add(walker.add_node, node, path)
|
||||
d.addCallback(lambda ignored: node.list())
|
||||
d.addCallback(self._deep_traverse_dirnode_children, node, path,
|
||||
walker, found, limiter)
|
||||
return d
|
||||
|
||||
def _deep_traverse_dirnode_children(self, children, parent, path,
|
||||
walker, found, limiter):
|
||||
dl = [limiter.add(walker.enter_directory, parent, children)]
|
||||
for name, (child, metadata) in children.iteritems():
|
||||
verifier = child.get_verifier()
|
||||
if verifier in found:
|
||||
continue
|
||||
found.add(verifier)
|
||||
childpath = path + [name]
|
||||
if IDirectoryNode.providedBy(child):
|
||||
dl.append(self._deep_traverse_dirnode(child, childpath,
|
||||
walker, found, limiter))
|
||||
else:
|
||||
dl.append(limiter.add(walker.add_node, child, childpath))
|
||||
return defer.DeferredList(dl, fireOnOneErrback=True)
|
||||
|
||||
|
||||
def build_manifest(self):
|
||||
"""Return a frozenset of verifier-capability strings for all nodes
|
||||
(directories and files) reachable from this one."""
|
||||
return self.deep_traverse(ManifestWalker())
|
||||
|
||||
def deep_stats(self):
|
||||
stats = DeepStats()
|
||||
# we track verifier caps, to avoid double-counting children for which
|
||||
# we've got both a write-cap and a read-cap
|
||||
found = set()
|
||||
found.add(self.get_verifier())
|
||||
|
||||
limiter = ConcurrencyLimiter(10)
|
||||
|
||||
d = self._add_deepstats_from_node(self, found, stats, limiter)
|
||||
d.addCallback(lambda res: stats.get_results())
|
||||
return d
|
||||
|
||||
def _add_deepstats_from_node(self, node, found, stats, limiter):
|
||||
d = limiter.add(node.list)
|
||||
def _got_list(children):
|
||||
dl = []
|
||||
dirsize_bytes = node.get_size()
|
||||
dirsize_children = len(children)
|
||||
stats.add("count-directories")
|
||||
stats.add("size-directories", dirsize_bytes)
|
||||
stats.max("largest-directory", dirsize_bytes)
|
||||
stats.max("largest-directory-children", dirsize_children)
|
||||
for name, (child, metadata) in children.iteritems():
|
||||
verifier = child.get_verifier()
|
||||
if verifier in found:
|
||||
continue
|
||||
found.add(verifier)
|
||||
if IDirectoryNode.providedBy(child):
|
||||
dl.append(self._add_deepstats_from_node(child, found,
|
||||
stats, limiter))
|
||||
elif IMutableFileNode.providedBy(child):
|
||||
stats.add("count-files")
|
||||
stats.add("count-mutable-files")
|
||||
# TODO: update the servermap, compute a size, add it to
|
||||
# size-mutable-files, max it into "largest-mutable-file"
|
||||
elif IFileNode.providedBy(child): # CHK and LIT
|
||||
stats.add("count-files")
|
||||
size = child.get_size()
|
||||
stats.histogram("size-files-histogram", size)
|
||||
if child.get_uri().startswith("URI:LIT:"):
|
||||
stats.add("count-literal-files")
|
||||
stats.add("size-literal-files", size)
|
||||
else:
|
||||
stats.add("count-immutable-files")
|
||||
stats.add("size-immutable-files", size)
|
||||
stats.max("largest-immutable-file", size)
|
||||
if dl:
|
||||
return defer.DeferredList(dl)
|
||||
d.addCallback(_got_list)
|
||||
return d
|
||||
# Since deep_traverse tracks verifier caps, we avoid double-counting
|
||||
# children for which we've got both a write-cap and a read-cap
|
||||
return self.deep_traverse(DeepStats())
|
||||
|
||||
def deep_check(self, verify=False):
|
||||
return self.deep_check_base(verify, False)
|
||||
return self.deep_traverse(DeepChecker(self, verify, repair=False))
|
||||
|
||||
def deep_check_and_repair(self, verify=False):
|
||||
return self.deep_check_base(verify, True)
|
||||
return self.deep_traverse(DeepChecker(self, verify, repair=True))
|
||||
|
||||
def deep_check_base(self, verify, repair):
|
||||
# shallow-check each object first, then traverse children
|
||||
root_si = self._node.get_storage_index()
|
||||
self._lp = log.msg(format="deep-check starting (%(si)s),"
|
||||
" verify=%(verify)s, repair=%(repair)s",
|
||||
si=base32.b2a(root_si), verify=verify, repair=repair)
|
||||
if repair:
|
||||
results = DeepCheckAndRepairResults(root_si)
|
||||
else:
|
||||
results = DeepCheckResults(root_si)
|
||||
found = set()
|
||||
limiter = ConcurrencyLimiter(10)
|
||||
|
||||
d = self._add_deepcheck_from_node([], self, results, found, limiter,
|
||||
verify, repair)
|
||||
def _done(res):
|
||||
log.msg("deep-check done", parent=self._lp)
|
||||
return results
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def _add_deepcheck_from_node(self, path, node, results, found, limiter,
|
||||
verify, repair):
|
||||
verifier = node.get_verifier()
|
||||
if verifier in found:
|
||||
# avoid loops
|
||||
return None
|
||||
found.add(verifier)
|
||||
|
||||
if repair:
|
||||
d = limiter.add(node.check_and_repair, verify)
|
||||
d.addCallback(results.add_check_and_repair, path)
|
||||
else:
|
||||
d = limiter.add(node.check, verify)
|
||||
d.addCallback(results.add_check, path)
|
||||
|
||||
# TODO: stats: split the DeepStats.foo calls out of
|
||||
# _add_deepstats_from_node into a separate non-recursing method, call
|
||||
# it from both here and _add_deepstats_from_node.
|
||||
|
||||
if IDirectoryNode.providedBy(node):
|
||||
d.addCallback(lambda res: node.list())
|
||||
def _got_children(children):
|
||||
dl = []
|
||||
for name, (child, metadata) in children.iteritems():
|
||||
childpath = path + [name]
|
||||
d2 = self._add_deepcheck_from_node(childpath, child,
|
||||
results,
|
||||
found, limiter,
|
||||
verify, repair)
|
||||
if d2:
|
||||
dl.append(d2)
|
||||
if dl:
|
||||
return defer.DeferredList(dl, fireOnOneErrback=True)
|
||||
d.addCallback(_got_children)
|
||||
return d
|
||||
class ManifestWalker:
|
||||
def __init__(self):
|
||||
self.manifest = set()
|
||||
def add_node(self, node, path):
|
||||
v = node.get_verifier()
|
||||
# LIT files have no verify-cap, so don't add them
|
||||
if v:
|
||||
assert not isinstance(v, str), "ICK: %s %s" % (v, node)
|
||||
self.manifest.add(v.to_string())
|
||||
def enter_directory(self, parent, children):
|
||||
pass
|
||||
def finish(self):
|
||||
return frozenset(self.manifest)
|
||||
|
||||
|
||||
class DeepStats:
|
||||
@ -627,6 +558,33 @@ class DeepStats:
|
||||
self.buckets = [ (0,0), (1,3)]
|
||||
self.root = math.sqrt(10)
|
||||
|
||||
def add_node(self, node, childpath):
|
||||
if IDirectoryNode.providedBy(node):
|
||||
self.add("count-directories")
|
||||
elif IMutableFileNode.providedBy(node):
|
||||
self.add("count-files")
|
||||
self.add("count-mutable-files")
|
||||
# TODO: update the servermap, compute a size, add it to
|
||||
# size-mutable-files, max it into "largest-mutable-file"
|
||||
elif IFileNode.providedBy(node): # CHK and LIT
|
||||
self.add("count-files")
|
||||
size = node.get_size()
|
||||
self.histogram("size-files-histogram", size)
|
||||
if node.get_uri().startswith("URI:LIT:"):
|
||||
self.add("count-literal-files")
|
||||
self.add("size-literal-files", size)
|
||||
else:
|
||||
self.add("count-immutable-files")
|
||||
self.add("size-immutable-files", size)
|
||||
self.max("largest-immutable-file", size)
|
||||
|
||||
def enter_directory(self, parent, children):
|
||||
dirsize_bytes = parent.get_size()
|
||||
dirsize_children = len(children)
|
||||
self.add("size-directories", dirsize_bytes)
|
||||
self.max("largest-directory", dirsize_bytes)
|
||||
self.max("largest-directory-children", dirsize_children)
|
||||
|
||||
def add(self, key, value=1):
|
||||
self.stats[key] += value
|
||||
|
||||
@ -666,6 +624,42 @@ class DeepStats:
|
||||
stats[key] = out
|
||||
return stats
|
||||
|
||||
def finish(self):
|
||||
return self.get_results()
|
||||
|
||||
|
||||
class DeepChecker:
|
||||
def __init__(self, root, verify, repair):
|
||||
root_si = root.get_storage_index()
|
||||
self._lp = log.msg(format="deep-check starting (%(si)s),"
|
||||
" verify=%(verify)s, repair=%(repair)s",
|
||||
si=base32.b2a(root_si), verify=verify, repair=repair)
|
||||
self._verify = verify
|
||||
self._repair = repair
|
||||
if repair:
|
||||
self._results = DeepCheckAndRepairResults(root_si)
|
||||
else:
|
||||
self._results = DeepCheckResults(root_si)
|
||||
self._stats = DeepStats()
|
||||
|
||||
def add_node(self, node, childpath):
|
||||
if self._repair:
|
||||
d = node.check_and_repair(self._verify)
|
||||
d.addCallback(self._results.add_check_and_repair, childpath)
|
||||
else:
|
||||
d = node.check(self._verify)
|
||||
d.addCallback(self._results.add_check, childpath)
|
||||
d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
|
||||
return d
|
||||
|
||||
def enter_directory(self, parent, children):
|
||||
return self._stats.enter_directory(parent, children)
|
||||
|
||||
def finish(self):
|
||||
log.msg("deep-check done", parent=self._lp)
|
||||
self._results.update_stats(self._stats.get_results())
|
||||
return self._results
|
||||
|
||||
|
||||
# use client.create_dirnode() to make one of these
|
||||
|
||||
|
@ -1648,6 +1648,10 @@ class IDeepCheckResults(Interface):
|
||||
be slash-joined) to an ICheckerResults instance, one for each object
|
||||
that was checked."""
|
||||
|
||||
def get_stats():
|
||||
"""Return a dictionary with the same keys as
|
||||
IDirectoryNode.deep_stats()."""
|
||||
|
||||
class IDeepCheckAndRepairResults(Interface):
|
||||
"""I contain the results of a deep-check-and-repair operation.
|
||||
|
||||
@ -1690,6 +1694,10 @@ class IDeepCheckAndRepairResults(Interface):
|
||||
repair)
|
||||
"""
|
||||
|
||||
def get_stats():
|
||||
"""Return a dictionary with the same keys as
|
||||
IDirectoryNode.deep_stats()."""
|
||||
|
||||
def get_corrupt_shares():
|
||||
"""Return a set of (serverid, storage_index, sharenum) for all shares
|
||||
that were found to be corrupt before any repair was attempted. Both
|
||||
|
@ -2271,6 +2271,7 @@ class DeepCheck(SystemTestMixin, unittest.TestCase):
|
||||
self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
|
||||
self.failUnlessEqual(data["list-corrupt-shares"], [], where)
|
||||
self.failUnlessEqual(data["list-unhealthy-files"], [], where)
|
||||
self.json_check_stats(data["stats"], where)
|
||||
|
||||
def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
|
||||
self.failUnlessEqual(data["root-storage-index"],
|
||||
|
@ -196,6 +196,7 @@ class DeepCheckResults(rend.Page, ResultsBase):
|
||||
for (path_t, r)
|
||||
in self.r.get_all_results().items()
|
||||
if not r.is_healthy() ]
|
||||
data["stats"] = self.r.get_stats()
|
||||
return simplejson.dumps(data, indent=1)
|
||||
|
||||
def render_root_storage_index(self, ctx, data):
|
||||
@ -344,6 +345,7 @@ class DeepCheckAndRepairResults(rend.Page, ResultsBase):
|
||||
for (path_t, r)
|
||||
in self.r.get_all_results().items()
|
||||
if not r.get_pre_repair_results().is_healthy() ]
|
||||
data["stats"] = self.r.get_stats()
|
||||
return simplejson.dumps(data, indent=1)
|
||||
|
||||
def render_root_storage_index(self, ctx, data):
|
||||
|
Loading…
Reference in New Issue
Block a user