util/pipeline.py: new utility class to manage size-limited work pipelines, for #392

This commit is contained in:
Brian Warner 2009-05-18 16:43:26 -07:00
parent e7e13958c2
commit e76c6b606f
2 changed files with 305 additions and 1 deletions

View File

@ -6,11 +6,12 @@ from StringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.python import log
from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
from allmydata.util import limiter, time_format, pollmixin, cachedir
from allmydata.util import statistics, dictutil, rrefutil
from allmydata.util import statistics, dictutil, rrefutil, pipeline
from allmydata.util.rrefutil import ServerFailure
class Base32(unittest.TestCase):
@ -1300,3 +1301,174 @@ class RemoteFailures(unittest.TestCase):
rrefutil.trap_local, f, IndexError)
d.addErrback(_check)
return d
class Pipeline(unittest.TestCase):
def pause(self, *args, **kwargs):
d = defer.Deferred()
self.calls.append( (d, args, kwargs) )
return d
def failUnlessCallsAre(self, expected):
#print self.calls
#print expected
self.failUnlessEqual(len(self.calls), len(expected), self.calls)
for i,c in enumerate(self.calls):
self.failUnlessEqual(c[1:], expected[i], str(i))
def test_basic(self):
self.calls = []
finished = []
p = pipeline.Pipeline(100)
d = p.flush() # fires immediately
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
finished = []
d = p.add(10, self.pause, "one")
# the call should start right away, and our return Deferred should
# fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ) ])
self.failUnlessEqual(p.gauge, 10)
# pipeline: [one]
finished = []
d = p.add(20, self.pause, "two", kw=2)
# pipeline: [one, two]
# the call and the Deferred should fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
])
self.failUnlessEqual(p.gauge, 30)
self.calls[0][0].callback("one-result")
# pipeline: [two]
self.failUnlessEqual(p.gauge, 20)
finished = []
d = p.add(90, self.pause, "three", "posarg1")
# pipeline: [two, three]
flushed = []
fd = p.flush()
fd.addCallbacks(flushed.append, log.err)
self.failUnlessEqual(flushed, [])
# the call will be made right away, but the return Deferred will not,
# because the pipeline is now full.
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 0)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
( ("three", "posarg1"), {} ),
])
self.failUnlessEqual(p.gauge, 110)
self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
# retiring either call will unblock the pipeline, causing the #3
# Deferred to fire
self.calls[2][0].callback("three-result")
# pipeline: [two]
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessEqual(flushed, [])
# retiring call#2 will finally allow the flush() Deferred to fire
self.calls[1][0].callback("two-result")
self.failUnlessEqual(len(flushed), 1)
def test_errors(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(200, self.pause, "one")
d2 = p.flush()
finished = []
d1.addBoth(finished.append)
self.failUnlessEqual(finished, [])
flushed = []
d2.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
# now that the pipeline is in the failed state, any new calls will
# fail immediately
d3 = p.add(20, self.pause, "two")
finished = []
d3.addBoth(finished.append)
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
d4 = p.flush()
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
def test_errors2(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(10, self.pause, "one")
d2 = p.add(20, self.pause, "two")
d3 = p.add(30, self.pause, "three")
d4 = p.flush()
# one call fails, then the second one succeeds: make sure
# ExpandableDeferredList tolerates the second one
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.calls[1][0].callback("two-result")
self.calls[2][0].errback(ValueError("three-error"))

View File

@ -0,0 +1,132 @@
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.python import log
from allmydata.util.assertutil import precondition
class PipelineError(Exception):
"""One of the pipelined messages returned an error. The received Failure
object is stored in my .error attribute."""
def __init__(self, error):
self.error = error
def __repr__(self):
return "<PipelineError error=(%r)>" % self.error
class SingleFileError(Exception):
"""You are not permitted to add a job to a full pipeline."""
class ExpandableDeferredList(defer.Deferred):
# like DeferredList(fireOnOneErrback=True) with a built-in
# gatherResults(), but you can add new Deferreds until you close it. This
# gives you a chance to add don't-complain-about-unhandled-error errbacks
# immediately after attachment, regardless of whether you actually end up
# wanting the list or not.
def __init__(self):
defer.Deferred.__init__(self)
self.resultsReceived = 0
self.resultList = []
self.failure = None
self.closed = False
def addDeferred(self, d):
precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList")
index = len(self.resultList)
self.resultList.append(None)
d.addCallbacks(self._cbDeferred, self._ebDeferred,
callbackArgs=(index,))
return d
def close(self):
self.closed = True
self.checkForFinished()
def checkForFinished(self):
if not self.closed:
return
if self.called:
return
if self.failure:
self.errback(self.failure)
elif self.resultsReceived == len(self.resultList):
self.callback(self.resultList)
def _cbDeferred(self, res, index):
self.resultList[index] = res
self.resultsReceived += 1
self.checkForFinished()
return res
def _ebDeferred(self, f):
self.failure = f
self.checkForFinished()
return f
class Pipeline:
"""I manage a size-limited pipeline of Deferred operations, usually
callRemote() messages."""
def __init__(self, capacity):
self.capacity = capacity # how full we can be
self.gauge = 0 # how full we are
self.failure = None
self.waiting = [] # callers of add() who are blocked
self.unflushed = ExpandableDeferredList()
def add(self, _size, _func, *args, **kwargs):
# We promise that all the Deferreds we return will fire in the order
# they were returned. To make it easier to keep this promise, we
# prohibit multiple outstanding calls to add() .
if self.waiting:
raise SingleFileError
if self.failure:
return defer.fail(self.failure)
self.gauge += _size
fd = defer.maybeDeferred(_func, *args, **kwargs)
fd.addBoth(self._call_finished, _size)
self.unflushed.addDeferred(fd)
fd.addErrback(self._eat_pipeline_errors)
fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it")
if self.gauge < self.capacity:
return defer.succeed(None)
d = defer.Deferred()
self.waiting.append(d)
return d
def flush(self):
if self.failure:
return defer.fail(self.failure)
d, self.unflushed = self.unflushed, ExpandableDeferredList()
d.close()
d.addErrback(self._flushed_error)
return d
def _flushed_error(self, f):
precondition(self.failure) # should have been set by _call_finished
return self.failure
def _call_finished(self, res, size):
self.gauge -= size
if isinstance(res, Failure):
res = Failure(PipelineError(res))
if not self.failure:
self.failure = res
if self.failure:
while self.waiting:
d = self.waiting.pop(0)
d.errback(self.failure)
else:
while self.waiting and (self.gauge < self.capacity):
d = self.waiting.pop(0)
d.callback(None)
# the d.callback() might trigger a new call to add(), which
# will raise our gauge and might cause the pipeline to be
# filled. So the while() loop gets a chance to tell the
# caller to stop.
return res
def _eat_pipeline_errors(self, f):
f.trap(PipelineError)
return None