Move pipeline tests into their own module.

This commit is contained in:
Itamar Turner-Trauring 2020-07-22 10:33:23 -04:00
parent 04bf9aeffc
commit e427163ec8
2 changed files with 191 additions and 177 deletions

View File

@ -0,0 +1,190 @@
"""
Tests for allmydata.util.pipeline.
Ported to Python 3.
"""
import gc
from twisted.internet import defer
from twisted.trial import unittest
from twisted.python import log
from twisted.python.failure import Failure
from allmydata.util import pipeline
class Pipeline(unittest.TestCase):
def pause(self, *args, **kwargs):
d = defer.Deferred()
self.calls.append( (d, args, kwargs) )
return d
def failUnlessCallsAre(self, expected):
#print self.calls
#print expected
self.failUnlessEqual(len(self.calls), len(expected), self.calls)
for i,c in enumerate(self.calls):
self.failUnlessEqual(c[1:], expected[i], str(i))
def test_basic(self):
self.calls = []
finished = []
p = pipeline.Pipeline(100)
d = p.flush() # fires immediately
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
finished = []
d = p.add(10, self.pause, "one")
# the call should start right away, and our return Deferred should
# fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ) ])
self.failUnlessEqual(p.gauge, 10)
# pipeline: [one]
finished = []
d = p.add(20, self.pause, "two", kw=2)
# pipeline: [one, two]
# the call and the Deferred should fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
])
self.failUnlessEqual(p.gauge, 30)
self.calls[0][0].callback("one-result")
# pipeline: [two]
self.failUnlessEqual(p.gauge, 20)
finished = []
d = p.add(90, self.pause, "three", "posarg1")
# pipeline: [two, three]
flushed = []
fd = p.flush()
fd.addCallbacks(flushed.append, log.err)
self.failUnlessEqual(flushed, [])
# the call will be made right away, but the return Deferred will not,
# because the pipeline is now full.
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 0)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
( ("three", "posarg1"), {} ),
])
self.failUnlessEqual(p.gauge, 110)
self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
# retiring either call will unblock the pipeline, causing the #3
# Deferred to fire
self.calls[2][0].callback("three-result")
# pipeline: [two]
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessEqual(flushed, [])
# retiring call#2 will finally allow the flush() Deferred to fire
self.calls[1][0].callback("two-result")
self.failUnlessEqual(len(flushed), 1)
def test_errors(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(200, self.pause, "one")
d2 = p.flush()
finished = []
d1.addBoth(finished.append)
self.failUnlessEqual(finished, [])
flushed = []
d2.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
self.failUnlessIn("PipelineError", str(f.value))
self.failUnlessIn("ValueError", str(f.value))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
# now that the pipeline is in the failed state, any new calls will
# fail immediately
d3 = p.add(20, self.pause, "two")
finished = []
d3.addBoth(finished.append)
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
d4 = p.flush()
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
def test_errors2(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(10, self.pause, "one")
d2 = p.add(20, self.pause, "two")
d3 = p.add(30, self.pause, "three")
d4 = p.flush()
# one call fails, then the second one succeeds: make sure
# ExpandableDeferredList tolerates the second one
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.calls[1][0].callback("two-result")
self.calls[2][0].errback(ValueError("three-error"))
del d1,d2,d3,d4
gc.collect() # for PyPy

View File

@ -5,19 +5,17 @@ import six
import hashlib
import os, time, sys
import yaml
import gc # support PyPy
from six.moves import StringIO
from datetime import timedelta
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.python import log
from allmydata.util import base32, idlib, mathutil, hashutil
from allmydata.util import fileutil, abbreviate
from allmydata.util import limiter, time_format, pollmixin
from allmydata.util import statistics, dictutil, pipeline, yamlutil
from allmydata.util import statistics, dictutil, yamlutil
from allmydata.util import log as tahoe_log
from allmydata.util.spans import Spans, overlap, DataSpans
from allmydata.util.fileutil import EncryptedTemporaryFile
@ -1121,180 +1119,6 @@ class DictUtil(unittest.TestCase):
self.failUnlessEqual(d["one"], 1)
self.failUnlessEqual(d.get_aux("one"), None)
class Pipeline(unittest.TestCase):
def pause(self, *args, **kwargs):
d = defer.Deferred()
self.calls.append( (d, args, kwargs) )
return d
def failUnlessCallsAre(self, expected):
#print self.calls
#print expected
self.failUnlessEqual(len(self.calls), len(expected), self.calls)
for i,c in enumerate(self.calls):
self.failUnlessEqual(c[1:], expected[i], str(i))
def test_basic(self):
self.calls = []
finished = []
p = pipeline.Pipeline(100)
d = p.flush() # fires immediately
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
finished = []
d = p.add(10, self.pause, "one")
# the call should start right away, and our return Deferred should
# fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ) ])
self.failUnlessEqual(p.gauge, 10)
# pipeline: [one]
finished = []
d = p.add(20, self.pause, "two", kw=2)
# pipeline: [one, two]
# the call and the Deferred should fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
])
self.failUnlessEqual(p.gauge, 30)
self.calls[0][0].callback("one-result")
# pipeline: [two]
self.failUnlessEqual(p.gauge, 20)
finished = []
d = p.add(90, self.pause, "three", "posarg1")
# pipeline: [two, three]
flushed = []
fd = p.flush()
fd.addCallbacks(flushed.append, log.err)
self.failUnlessEqual(flushed, [])
# the call will be made right away, but the return Deferred will not,
# because the pipeline is now full.
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 0)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
( ("three", "posarg1"), {} ),
])
self.failUnlessEqual(p.gauge, 110)
self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
# retiring either call will unblock the pipeline, causing the #3
# Deferred to fire
self.calls[2][0].callback("three-result")
# pipeline: [two]
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessEqual(flushed, [])
# retiring call#2 will finally allow the flush() Deferred to fire
self.calls[1][0].callback("two-result")
self.failUnlessEqual(len(flushed), 1)
def test_errors(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(200, self.pause, "one")
d2 = p.flush()
finished = []
d1.addBoth(finished.append)
self.failUnlessEqual(finished, [])
flushed = []
d2.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
self.failUnlessIn("PipelineError", str(f.value))
self.failUnlessIn("ValueError", str(f.value))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
# now that the pipeline is in the failed state, any new calls will
# fail immediately
d3 = p.add(20, self.pause, "two")
finished = []
d3.addBoth(finished.append)
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
d4 = p.flush()
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
def test_errors2(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(10, self.pause, "one")
d2 = p.add(20, self.pause, "two")
d3 = p.add(30, self.pause, "three")
d4 = p.flush()
# one call fails, then the second one succeeds: make sure
# ExpandableDeferredList tolerates the second one
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.calls[1][0].callback("two-result")
self.calls[2][0].errback(ValueError("three-error"))
del d1,d2,d3,d4
gc.collect() # for PyPy
class SampleError(Exception):
pass