Some useful Deferred utilities, originally from the cloud backend branch.

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
Daira Hopwood 2012-11-23 00:23:54 +00:00
parent 9789c4b20c
commit 4de4e0e65e
2 changed files with 166 additions and 2 deletions

View File

@ -581,7 +581,7 @@ class PollMixinTests(unittest.TestCase):
d.addCallbacks(_suc, _err)
return d
class DeferredUtilTests(unittest.TestCase):
class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin):
def test_gather_results(self):
d1 = defer.Deferred()
d2 = defer.Deferred()
@ -621,6 +621,21 @@ class DeferredUtilTests(unittest.TestCase):
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(ValueError))
def test_wait_for_delayed_calls(self):
"""
This tests that 'wait_for_delayed_calls' does in fact wait for a
delayed call that is active when the test returns. If it didn't,
Trial would report an unclean reactor error for this test.
"""
def _trigger():
#print "trigger"
pass
reactor.callLater(0.1, _trigger)
d = defer.succeed(None)
d.addBoth(self.wait_for_delayed_calls)
return d
class HashUtilTests(unittest.TestCase):
def test_random_key(self):

View File

@ -1,4 +1,12 @@
from twisted.internet import defer
import time
from foolscap.api import eventually, fireEventually
from twisted.internet import defer, reactor
from allmydata.util import log
from allmydata.util.pollmixin import PollMixin
# utility wrapper for DeferredList
def _check_deferred_list(results):
@ -9,6 +17,7 @@ def _check_deferred_list(results):
if not success:
return f
return [r[1] for r in results]
def DeferredListShouldSucceed(dl):
d = defer.DeferredList(dl)
d.addCallback(_check_deferred_list)
@ -33,3 +42,143 @@ def gatherResults(deferredList):
d.addCallbacks(_parseDListResult, _unwrapFirstError)
return d
def _with_log(op, res):
"""
The default behaviour on firing an already-fired Deferred is unhelpful for
debugging, because the AlreadyCalledError can easily get lost or be raised
in a context that results in a different error. So make sure it is logged
(for the abstractions defined here). If we are in a test, log.err will cause
the test to fail.
"""
try:
op(res)
except defer.AlreadyCalledError, e:
log.err(e, op=repr(op), level=log.WEIRD)
def eventually_callback(d):
def _callback(res):
eventually(_with_log, d.callback, res)
return res
return _callback
def eventually_errback(d):
def _errback(res):
eventually(_with_log, d.errback, res)
return res
return _errback
def eventual_chain(source, target):
source.addCallbacks(eventually_callback(target), eventually_errback(target))
class HookMixin:
"""
I am a helper mixin that maintains a collection of named hooks, primarily
for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
and can then be fired exactly once at the appropriate time by '_call_hook'.
I assume a '_hooks' attribute that should set by the class constructor to
a dict mapping each valid hook name to None.
"""
def set_hook(self, name, d=None):
"""
Called by the hook observer (e.g. by a test).
If d is not given, an unfired Deferred is created and returned.
The hook must not already be set.
"""
if d is None:
d = defer.Deferred()
assert self._hooks[name] is None, self._hooks[name]
assert isinstance(d, defer.Deferred), d
self._hooks[name] = d
return d
def _call_hook(self, res, name):
"""
Called to trigger the hook, with argument 'res'. This is a no-op if the
hook is unset. Otherwise, the hook will be unset, and then its Deferred
will be fired synchronously.
The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
This ensures that if 'res' is a failure, the hook will be errbacked,
which will typically cause the test to also fail.
'res' is returned so that the current result or failure will be passed
through.
"""
d = self._hooks[name]
if d is None:
return defer.succeed(None)
self._hooks[name] = None
_with_log(d.callback, res)
return res
def async_iterate(process, iterable, *extra_args, **kwargs):
"""
I iterate over the elements of 'iterable' (which may be deferred), eventually
applying 'process' to each one, optionally with 'extra_args' and 'kwargs'.
'process' should return a (possibly deferred) boolean: True to continue the
iteration, False to stop.
I return a Deferred that fires with True if all elements of the iterable
were processed (i.e. 'process' only returned True values); with False if
the iteration was stopped by 'process' returning False; or that fails with
the first failure of either 'process' or the iterator.
"""
iterator = iter(iterable)
d = defer.succeed(None)
def _iterate(ign):
d2 = defer.maybeDeferred(iterator.next)
def _cb(item):
d3 = defer.maybeDeferred(process, item, *extra_args, **kwargs)
def _maybe_iterate(res):
if res:
d4 = fireEventually()
d4.addCallback(_iterate)
return d4
return False
d3.addCallback(_maybe_iterate)
return d3
def _eb(f):
f.trap(StopIteration)
return True
d2.addCallbacks(_cb, _eb)
return d2
d.addCallback(_iterate)
return d
def for_items(cb, mapping):
"""
For each (key, value) pair in a mapping, I add a callback to cb(None, key, value)
to a Deferred that fires immediately. I return that Deferred.
"""
d = defer.succeed(None)
for k, v in mapping.items():
d.addCallback(lambda ign, k=k, v=v: cb(None, k, v))
return d
class WaitForDelayedCallsMixin(PollMixin):
def _delayed_calls_done(self):
# We're done when the only remaining DelayedCalls fire after threshold.
# (These will be associated with the test timeout, or else they *should*
# cause an unclean reactor error because the test should have waited for
# them.)
threshold = time.time() + 10
for delayed in reactor.getDelayedCalls():
if delayed.getTime() < threshold:
return False
return True
def wait_for_delayed_calls(self, res=None):
"""
Use like this at the end of a test:
d.addBoth(self.wait_for_delayed_calls)
"""
d = self.poll(self._delayed_calls_done)
d.addErrback(log.err, "error while waiting for delayed calls")
d.addBoth(lambda ign: res)
return d