diff --git a/misc/python3/ratchet-passing b/misc/python3/ratchet-passing index 6da235e09..5bf2d1521 100644 --- a/misc/python3/ratchet-passing +++ b/misc/python3/ratchet-passing @@ -52,6 +52,9 @@ allmydata.test.test_observer.Observer.test_lazy_oneshot allmydata.test.test_observer.Observer.test_observerlist allmydata.test.test_observer.Observer.test_oneshot allmydata.test.test_observer.Observer.test_oneshot_fireagain +allmydata.test.test_pipeline.Pipeline.test_basic +allmydata.test.test_pipeline.Pipeline.test_errors +allmydata.test.test_pipeline.Pipeline.test_errors2 allmydata.test.test_python3.Python3PortingEffortTests.test_finished_porting allmydata.test.test_python3.Python3PortingEffortTests.test_ported_modules_distinct allmydata.test.test_python3.Python3PortingEffortTests.test_ported_modules_exist diff --git a/newsfragments/3353.minor b/newsfragments/3353.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/test/test_observer.py b/src/allmydata/test/test_observer.py index 32fd0a395..b37f0d3e1 100644 --- a/src/allmydata/test/test_observer.py +++ b/src/allmydata/test/test_observer.py @@ -1,3 +1,17 @@ +""" +Tests for allmydata.util.observer. + +Ported to Python 3. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 +if PY2: + from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401 from twisted.trial import unittest from twisted.internet import defer, reactor diff --git a/src/allmydata/test/test_pipeline.py b/src/allmydata/test/test_pipeline.py new file mode 100644 index 000000000..ab7059521 --- /dev/null +++ b/src/allmydata/test/test_pipeline.py @@ -0,0 +1,198 @@ +""" +Tests for allmydata.util.pipeline. + +Ported to Python 3. +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 +if PY2: + from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401 + +import gc + +from twisted.internet import defer +from twisted.trial import unittest +from twisted.python import log +from twisted.python.failure import Failure + +from allmydata.util import pipeline + + +class Pipeline(unittest.TestCase): + def pause(self, *args, **kwargs): + d = defer.Deferred() + self.calls.append( (d, args, kwargs) ) + return d + + def failUnlessCallsAre(self, expected): + #print self.calls + #print expected + self.failUnlessEqual(len(self.calls), len(expected), self.calls) + for i,c in enumerate(self.calls): + self.failUnlessEqual(c[1:], expected[i], str(i)) + + def test_basic(self): + self.calls = [] + finished = [] + p = pipeline.Pipeline(100) + + d = p.flush() # fires immediately + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 1) + finished = [] + + d = p.add(10, self.pause, "one") + # the call should start right away, and our return Deferred should + # fire right away + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 1) + self.failUnlessEqual(finished[0], None) + self.failUnlessCallsAre([ ( ("one",) , {} ) ]) + self.failUnlessEqual(p.gauge, 10) + + # pipeline: [one] + + finished = [] + d = p.add(20, self.pause, "two", kw=2) + # pipeline: [one, two] + + # the call and the Deferred should fire right away + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 1) + self.failUnlessEqual(finished[0], None) + self.failUnlessCallsAre([ ( ("one",) , {} ), + ( ("two",) , {"kw": 2} ), + ]) + self.failUnlessEqual(p.gauge, 30) + + self.calls[0][0].callback("one-result") + # pipeline: [two] + self.failUnlessEqual(p.gauge, 20) + + finished = [] + d = p.add(90, self.pause, "three", "posarg1") + # pipeline: [two, three] + flushed = [] + fd = p.flush() + fd.addCallbacks(flushed.append, log.err) + self.failUnlessEqual(flushed, []) + + # the call will be made right away, but the return Deferred will not, + # because the pipeline is now full. + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 0) + self.failUnlessCallsAre([ ( ("one",) , {} ), + ( ("two",) , {"kw": 2} ), + ( ("three", "posarg1"), {} ), + ]) + self.failUnlessEqual(p.gauge, 110) + + self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause) + + # retiring either call will unblock the pipeline, causing the #3 + # Deferred to fire + self.calls[2][0].callback("three-result") + # pipeline: [two] + + self.failUnlessEqual(len(finished), 1) + self.failUnlessEqual(finished[0], None) + self.failUnlessEqual(flushed, []) + + # retiring call#2 will finally allow the flush() Deferred to fire + self.calls[1][0].callback("two-result") + self.failUnlessEqual(len(flushed), 1) + + def test_errors(self): + self.calls = [] + p = pipeline.Pipeline(100) + + d1 = p.add(200, self.pause, "one") + d2 = p.flush() + + finished = [] + d1.addBoth(finished.append) + self.failUnlessEqual(finished, []) + + flushed = [] + d2.addBoth(flushed.append) + self.failUnlessEqual(flushed, []) + + self.calls[0][0].errback(ValueError("oops")) + + self.failUnlessEqual(len(finished), 1) + f = finished[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + self.failUnlessIn("PipelineError", str(f.value)) + self.failUnlessIn("ValueError", str(f.value)) + r = repr(f.value) + self.failUnless("ValueError" in r, r) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + self.failUnlessEqual(len(flushed), 1) + f = flushed[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + # now that the pipeline is in the failed state, any new calls will + # fail immediately + + d3 = p.add(20, self.pause, "two") + + finished = [] + d3.addBoth(finished.append) + self.failUnlessEqual(len(finished), 1) + f = finished[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + r = repr(f.value) + self.failUnless("ValueError" in r, r) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + d4 = p.flush() + flushed = [] + d4.addBoth(flushed.append) + self.failUnlessEqual(len(flushed), 1) + f = flushed[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + def test_errors2(self): + self.calls = [] + p = pipeline.Pipeline(100) + + d1 = p.add(10, self.pause, "one") + d2 = p.add(20, self.pause, "two") + d3 = p.add(30, self.pause, "three") + d4 = p.flush() + + # one call fails, then the second one succeeds: make sure + # ExpandableDeferredList tolerates the second one + + flushed = [] + d4.addBoth(flushed.append) + self.failUnlessEqual(flushed, []) + + self.calls[0][0].errback(ValueError("oops")) + self.failUnlessEqual(len(flushed), 1) + f = flushed[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + self.calls[1][0].callback("two-result") + self.calls[2][0].errback(ValueError("three-error")) + + del d1,d2,d3,d4 + gc.collect() # for PyPy diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 08262d582..8c0e937f3 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -5,18 +5,16 @@ import six import hashlib import os, time, sys import yaml -import gc # support PyPy from six.moves import StringIO from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.python.failure import Failure -from twisted.python import log from allmydata.util import idlib, mathutil from allmydata.util import fileutil from allmydata.util import limiter, pollmixin -from allmydata.util import statistics, dictutil, pipeline, yamlutil +from allmydata.util import statistics, dictutil, yamlutil from allmydata.util import log as tahoe_log from allmydata.util.spans import Spans, overlap, DataSpans from allmydata.util.fileutil import EncryptedTemporaryFile @@ -751,180 +749,6 @@ class DictUtil(unittest.TestCase): self.failUnlessEqual(d["one"], 1) self.failUnlessEqual(d.get_aux("one"), None) -class Pipeline(unittest.TestCase): - def pause(self, *args, **kwargs): - d = defer.Deferred() - self.calls.append( (d, args, kwargs) ) - return d - - def failUnlessCallsAre(self, expected): - #print self.calls - #print expected - self.failUnlessEqual(len(self.calls), len(expected), self.calls) - for i,c in enumerate(self.calls): - self.failUnlessEqual(c[1:], expected[i], str(i)) - - def test_basic(self): - self.calls = [] - finished = [] - p = pipeline.Pipeline(100) - - d = p.flush() # fires immediately - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 1) - finished = [] - - d = p.add(10, self.pause, "one") - # the call should start right away, and our return Deferred should - # fire right away - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 1) - self.failUnlessEqual(finished[0], None) - self.failUnlessCallsAre([ ( ("one",) , {} ) ]) - self.failUnlessEqual(p.gauge, 10) - - # pipeline: [one] - - finished = [] - d = p.add(20, self.pause, "two", kw=2) - # pipeline: [one, two] - - # the call and the Deferred should fire right away - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 1) - self.failUnlessEqual(finished[0], None) - self.failUnlessCallsAre([ ( ("one",) , {} ), - ( ("two",) , {"kw": 2} ), - ]) - self.failUnlessEqual(p.gauge, 30) - - self.calls[0][0].callback("one-result") - # pipeline: [two] - self.failUnlessEqual(p.gauge, 20) - - finished = [] - d = p.add(90, self.pause, "three", "posarg1") - # pipeline: [two, three] - flushed = [] - fd = p.flush() - fd.addCallbacks(flushed.append, log.err) - self.failUnlessEqual(flushed, []) - - # the call will be made right away, but the return Deferred will not, - # because the pipeline is now full. - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 0) - self.failUnlessCallsAre([ ( ("one",) , {} ), - ( ("two",) , {"kw": 2} ), - ( ("three", "posarg1"), {} ), - ]) - self.failUnlessEqual(p.gauge, 110) - - self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause) - - # retiring either call will unblock the pipeline, causing the #3 - # Deferred to fire - self.calls[2][0].callback("three-result") - # pipeline: [two] - - self.failUnlessEqual(len(finished), 1) - self.failUnlessEqual(finished[0], None) - self.failUnlessEqual(flushed, []) - - # retiring call#2 will finally allow the flush() Deferred to fire - self.calls[1][0].callback("two-result") - self.failUnlessEqual(len(flushed), 1) - - def test_errors(self): - self.calls = [] - p = pipeline.Pipeline(100) - - d1 = p.add(200, self.pause, "one") - d2 = p.flush() - - finished = [] - d1.addBoth(finished.append) - self.failUnlessEqual(finished, []) - - flushed = [] - d2.addBoth(flushed.append) - self.failUnlessEqual(flushed, []) - - self.calls[0][0].errback(ValueError("oops")) - - self.failUnlessEqual(len(finished), 1) - f = finished[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - self.failUnlessIn("PipelineError", str(f.value)) - self.failUnlessIn("ValueError", str(f.value)) - r = repr(f.value) - self.failUnless("ValueError" in r, r) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - self.failUnlessEqual(len(flushed), 1) - f = flushed[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - # now that the pipeline is in the failed state, any new calls will - # fail immediately - - d3 = p.add(20, self.pause, "two") - - finished = [] - d3.addBoth(finished.append) - self.failUnlessEqual(len(finished), 1) - f = finished[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - r = repr(f.value) - self.failUnless("ValueError" in r, r) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - d4 = p.flush() - flushed = [] - d4.addBoth(flushed.append) - self.failUnlessEqual(len(flushed), 1) - f = flushed[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - def test_errors2(self): - self.calls = [] - p = pipeline.Pipeline(100) - - d1 = p.add(10, self.pause, "one") - d2 = p.add(20, self.pause, "two") - d3 = p.add(30, self.pause, "three") - d4 = p.flush() - - # one call fails, then the second one succeeds: make sure - # ExpandableDeferredList tolerates the second one - - flushed = [] - d4.addBoth(flushed.append) - self.failUnlessEqual(flushed, []) - - self.calls[0][0].errback(ValueError("oops")) - self.failUnlessEqual(len(flushed), 1) - f = flushed[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - self.calls[1][0].callback("two-result") - self.calls[2][0].errback(ValueError("three-error")) - - del d1,d2,d3,d4 - gc.collect() # for PyPy class SampleError(Exception): pass diff --git a/src/allmydata/util/_python3.py b/src/allmydata/util/_python3.py index 32bd406e5..6873c5c08 100644 --- a/src/allmydata/util/_python3.py +++ b/src/allmydata/util/_python3.py @@ -26,6 +26,8 @@ PORTED_MODULES = [ "allmydata.util.mathutil", "allmydata.util.namespace", "allmydata.util.netstring", + "allmydata.util.observer", + "allmydata.util.pipeline", "allmydata.util.pollmixin", "allmydata.util._python3", "allmydata.util.time_format", @@ -41,6 +43,8 @@ PORTED_TEST_MODULES = [ "allmydata.test.test_hashutil", "allmydata.test.test_humanreadable", "allmydata.test.test_netstring", + "allmydata.test.test_observer", + "allmydata.test.test_pipeline", "allmydata.test.test_python3", "allmydata.test.test_time_format", ] diff --git a/src/allmydata/util/observer.py b/src/allmydata/util/observer.py index 30eb92329..d5003dfb3 100644 --- a/src/allmydata/util/observer.py +++ b/src/allmydata/util/observer.py @@ -1,4 +1,17 @@ -# -*- test-case-name: allmydata.test.test_observer -*- +""" +Observer for Twisted code. + +Ported to Python 3. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 +if PY2: + from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401 import weakref from twisted.internet import defer diff --git a/src/allmydata/util/pipeline.py b/src/allmydata/util/pipeline.py index 285a06b98..df80e2c6c 100644 --- a/src/allmydata/util/pipeline.py +++ b/src/allmydata/util/pipeline.py @@ -1,9 +1,24 @@ +""" +A pipeline of Deferreds. + +Ported to Python 3. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 +if PY2: + from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401 from twisted.internet import defer from twisted.python.failure import Failure from twisted.python import log from allmydata.util.assertutil import precondition + class PipelineError(Exception): """One of the pipelined messages returned an error. The received Failure object is stored in my .error attribute."""