mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-06-23 09:15:32 +00:00
dirnode.py: check for cancel during deep-traverse operations, and don't initiate any new ones if we've been cancelled. Gets us closer to #514.
This commit is contained in:
@ -477,22 +477,27 @@ class NewDirectoryNode:
|
|||||||
|
|
||||||
found = set([self.get_verifier()])
|
found = set([self.get_verifier()])
|
||||||
limiter = ConcurrencyLimiter(10)
|
limiter = ConcurrencyLimiter(10)
|
||||||
d = self._deep_traverse_dirnode(self, [], walker, found, limiter)
|
d = self._deep_traverse_dirnode(self, [],
|
||||||
|
walker, monitor, found, limiter)
|
||||||
d.addCallback(lambda ignored: walker.finish())
|
d.addCallback(lambda ignored: walker.finish())
|
||||||
d.addBoth(monitor.finish)
|
d.addBoth(monitor.finish)
|
||||||
|
d.addErrback(lambda f: None)
|
||||||
|
|
||||||
return monitor
|
return monitor
|
||||||
|
|
||||||
def _deep_traverse_dirnode(self, node, path, walker, found, limiter):
|
def _deep_traverse_dirnode(self, node, path,
|
||||||
|
walker, monitor, found, limiter):
|
||||||
# process this directory, then walk its children
|
# process this directory, then walk its children
|
||||||
# TODO: check monitor.is_cancelled()
|
monitor.raise_if_cancelled()
|
||||||
d = limiter.add(walker.add_node, node, path)
|
d = limiter.add(walker.add_node, node, path)
|
||||||
d.addCallback(lambda ignored: limiter.add(node.list))
|
d.addCallback(lambda ignored: limiter.add(node.list))
|
||||||
d.addCallback(self._deep_traverse_dirnode_children, node, path,
|
d.addCallback(self._deep_traverse_dirnode_children, node, path,
|
||||||
walker, found, limiter)
|
walker, monitor, found, limiter)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _deep_traverse_dirnode_children(self, children, parent, path,
|
def _deep_traverse_dirnode_children(self, children, parent, path,
|
||||||
walker, found, limiter):
|
walker, monitor, found, limiter):
|
||||||
|
monitor.raise_if_cancelled()
|
||||||
dl = [limiter.add(walker.enter_directory, parent, children)]
|
dl = [limiter.add(walker.enter_directory, parent, children)]
|
||||||
for name, (child, metadata) in children.iteritems():
|
for name, (child, metadata) in children.iteritems():
|
||||||
verifier = child.get_verifier()
|
verifier = child.get_verifier()
|
||||||
@ -502,10 +507,11 @@ class NewDirectoryNode:
|
|||||||
childpath = path + [name]
|
childpath = path + [name]
|
||||||
if IDirectoryNode.providedBy(child):
|
if IDirectoryNode.providedBy(child):
|
||||||
dl.append(self._deep_traverse_dirnode(child, childpath,
|
dl.append(self._deep_traverse_dirnode(child, childpath,
|
||||||
walker, found, limiter))
|
walker, monitor,
|
||||||
|
found, limiter))
|
||||||
else:
|
else:
|
||||||
dl.append(limiter.add(walker.add_node, child, childpath))
|
dl.append(limiter.add(walker.add_node, child, childpath))
|
||||||
return defer.DeferredList(dl, fireOnOneErrback=True)
|
return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
|
||||||
|
|
||||||
|
|
||||||
def build_manifest(self):
|
def build_manifest(self):
|
||||||
|
@ -32,6 +32,11 @@ class IMonitor(Interface):
|
|||||||
operation code should stop creating new work, and attempt to stop any
|
operation code should stop creating new work, and attempt to stop any
|
||||||
work already in progress."""
|
work already in progress."""
|
||||||
|
|
||||||
|
def raise_if_cancelled(self):
|
||||||
|
"""Raise OperationCancelledError if the operation has been cancelled.
|
||||||
|
Operation code that has a robust error-handling path can simply call
|
||||||
|
this periodically."""
|
||||||
|
|
||||||
def set_status(self, status):
|
def set_status(self, status):
|
||||||
"""Sets the Monitor's 'status' object to an arbitrary value.
|
"""Sets the Monitor's 'status' object to an arbitrary value.
|
||||||
Different operations will store different sorts of status information
|
Different operations will store different sorts of status information
|
||||||
@ -69,6 +74,9 @@ class IMonitor(Interface):
|
|||||||
|
|
||||||
# get_status() is useful too, but it is operation-specific
|
# get_status() is useful too, but it is operation-specific
|
||||||
|
|
||||||
|
class OperationCancelledError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
class Monitor:
|
class Monitor:
|
||||||
implements(IMonitor)
|
implements(IMonitor)
|
||||||
|
|
||||||
@ -81,6 +89,10 @@ class Monitor:
|
|||||||
def is_cancelled(self):
|
def is_cancelled(self):
|
||||||
return self.cancelled
|
return self.cancelled
|
||||||
|
|
||||||
|
def raise_if_cancelled(self):
|
||||||
|
if self.cancelled:
|
||||||
|
raise OperationCancelledError()
|
||||||
|
|
||||||
def is_finished(self):
|
def is_finished(self):
|
||||||
return self.finished
|
return self.finished
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ from allmydata.scripts import runner
|
|||||||
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
|
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
|
||||||
ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
|
ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
|
||||||
IDeepCheckAndRepairResults
|
IDeepCheckAndRepairResults
|
||||||
|
from allmydata.monitor import OperationCancelledError
|
||||||
from allmydata.mutable.common import NotMutableError
|
from allmydata.mutable.common import NotMutableError
|
||||||
from allmydata.mutable import layout as mutable_layout
|
from allmydata.mutable import layout as mutable_layout
|
||||||
from foolscap import DeadReferenceError
|
from foolscap import DeadReferenceError
|
||||||
@ -2048,6 +2049,23 @@ class DeepCheckWeb(SystemTestMixin, unittest.TestCase, WebErrorMixin):
|
|||||||
self.root.start_deep_check_and_repair(verify=True).when_done())
|
self.root.start_deep_check_and_repair(verify=True).when_done())
|
||||||
d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
|
d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
|
||||||
|
|
||||||
|
# and finally, start a deep-check, but then cancel it.
|
||||||
|
d.addCallback(lambda ign: self.root.start_deep_check())
|
||||||
|
def _checking(monitor):
|
||||||
|
monitor.cancel()
|
||||||
|
d = monitor.when_done()
|
||||||
|
# this should fire as soon as the next dirnode.list finishes.
|
||||||
|
# TODO: add a counter to measure how many list() calls are made,
|
||||||
|
# assert that no more than one gets to run before the cancel()
|
||||||
|
# takes effect.
|
||||||
|
def _finished_normally(res):
|
||||||
|
self.fail("this was supposed to fail, not finish normally")
|
||||||
|
def _cancelled(f):
|
||||||
|
f.trap(OperationCancelledError)
|
||||||
|
d.addCallbacks(_finished_normally, _cancelled)
|
||||||
|
return d
|
||||||
|
d.addCallback(_checking)
|
||||||
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def web_json(self, n, **kwargs):
|
def web_json(self, n, **kwargs):
|
||||||
|
Reference in New Issue
Block a user