From b4e6686211cb105f1b61154115868b2efa361fd1 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 22 Jul 2020 10:29:44 -0400 Subject: [PATCH 1/6] Port to Python 3. --- src/allmydata/test/test_observer.py | 14 ++++++++++++++ src/allmydata/util/_python3.py | 2 ++ src/allmydata/util/observer.py | 15 ++++++++++++++- 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/allmydata/test/test_observer.py b/src/allmydata/test/test_observer.py index 32fd0a395..b37f0d3e1 100644 --- a/src/allmydata/test/test_observer.py +++ b/src/allmydata/test/test_observer.py @@ -1,3 +1,17 @@ +""" +Tests for allmydata.util.observer. + +Ported to Python 3. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 +if PY2: + from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401 from twisted.trial import unittest from twisted.internet import defer, reactor diff --git a/src/allmydata/util/_python3.py b/src/allmydata/util/_python3.py index ea4e0e702..92ad34174 100644 --- a/src/allmydata/util/_python3.py +++ b/src/allmydata/util/_python3.py @@ -22,6 +22,7 @@ PORTED_MODULES = [ "allmydata.util.humanreadable", "allmydata.util.mathutil", "allmydata.util.namespace", + "allmydata.util.observer", "allmydata.util.pollmixin", "allmydata.util._python3", ] @@ -31,6 +32,7 @@ PORTED_TEST_MODULES = [ "allmydata.test.test_base62", "allmydata.test.test_deferredutil", "allmydata.test.test_humanreadable", + "allmydata.test.test_observer", "allmydata.test.test_python3", ] diff --git a/src/allmydata/util/observer.py b/src/allmydata/util/observer.py index 30eb92329..d5003dfb3 100644 --- a/src/allmydata/util/observer.py +++ b/src/allmydata/util/observer.py @@ -1,4 +1,17 @@ -# -*- test-case-name: allmydata.test.test_observer -*- +""" +Observer for Twisted code. + +Ported to Python 3. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 +if PY2: + from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401 import weakref from twisted.internet import defer From 04bf9aeffcdf023c2d9a79c11bb8225f85df8eff Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 22 Jul 2020 10:30:13 -0400 Subject: [PATCH 2/6] News file. --- newsfragments/3353.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3353.minor diff --git a/newsfragments/3353.minor b/newsfragments/3353.minor new file mode 100644 index 000000000..e69de29bb From e427163ec84bcf4e665461e7a530a1da987c7d95 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 22 Jul 2020 10:33:23 -0400 Subject: [PATCH 3/6] Move pipeline tests into their own module. --- src/allmydata/test/test_pipeline.py | 190 ++++++++++++++++++++++++++++ src/allmydata/test/test_util.py | 178 +------------------------- 2 files changed, 191 insertions(+), 177 deletions(-) create mode 100644 src/allmydata/test/test_pipeline.py diff --git a/src/allmydata/test/test_pipeline.py b/src/allmydata/test/test_pipeline.py new file mode 100644 index 000000000..b5c0f535a --- /dev/null +++ b/src/allmydata/test/test_pipeline.py @@ -0,0 +1,190 @@ +""" +Tests for allmydata.util.pipeline. + +Ported to Python 3. +""" + +import gc + +from twisted.internet import defer +from twisted.trial import unittest +from twisted.python import log +from twisted.python.failure import Failure + +from allmydata.util import pipeline + + +class Pipeline(unittest.TestCase): + def pause(self, *args, **kwargs): + d = defer.Deferred() + self.calls.append( (d, args, kwargs) ) + return d + + def failUnlessCallsAre(self, expected): + #print self.calls + #print expected + self.failUnlessEqual(len(self.calls), len(expected), self.calls) + for i,c in enumerate(self.calls): + self.failUnlessEqual(c[1:], expected[i], str(i)) + + def test_basic(self): + self.calls = [] + finished = [] + p = pipeline.Pipeline(100) + + d = p.flush() # fires immediately + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 1) + finished = [] + + d = p.add(10, self.pause, "one") + # the call should start right away, and our return Deferred should + # fire right away + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 1) + self.failUnlessEqual(finished[0], None) + self.failUnlessCallsAre([ ( ("one",) , {} ) ]) + self.failUnlessEqual(p.gauge, 10) + + # pipeline: [one] + + finished = [] + d = p.add(20, self.pause, "two", kw=2) + # pipeline: [one, two] + + # the call and the Deferred should fire right away + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 1) + self.failUnlessEqual(finished[0], None) + self.failUnlessCallsAre([ ( ("one",) , {} ), + ( ("two",) , {"kw": 2} ), + ]) + self.failUnlessEqual(p.gauge, 30) + + self.calls[0][0].callback("one-result") + # pipeline: [two] + self.failUnlessEqual(p.gauge, 20) + + finished = [] + d = p.add(90, self.pause, "three", "posarg1") + # pipeline: [two, three] + flushed = [] + fd = p.flush() + fd.addCallbacks(flushed.append, log.err) + self.failUnlessEqual(flushed, []) + + # the call will be made right away, but the return Deferred will not, + # because the pipeline is now full. + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 0) + self.failUnlessCallsAre([ ( ("one",) , {} ), + ( ("two",) , {"kw": 2} ), + ( ("three", "posarg1"), {} ), + ]) + self.failUnlessEqual(p.gauge, 110) + + self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause) + + # retiring either call will unblock the pipeline, causing the #3 + # Deferred to fire + self.calls[2][0].callback("three-result") + # pipeline: [two] + + self.failUnlessEqual(len(finished), 1) + self.failUnlessEqual(finished[0], None) + self.failUnlessEqual(flushed, []) + + # retiring call#2 will finally allow the flush() Deferred to fire + self.calls[1][0].callback("two-result") + self.failUnlessEqual(len(flushed), 1) + + def test_errors(self): + self.calls = [] + p = pipeline.Pipeline(100) + + d1 = p.add(200, self.pause, "one") + d2 = p.flush() + + finished = [] + d1.addBoth(finished.append) + self.failUnlessEqual(finished, []) + + flushed = [] + d2.addBoth(flushed.append) + self.failUnlessEqual(flushed, []) + + self.calls[0][0].errback(ValueError("oops")) + + self.failUnlessEqual(len(finished), 1) + f = finished[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + self.failUnlessIn("PipelineError", str(f.value)) + self.failUnlessIn("ValueError", str(f.value)) + r = repr(f.value) + self.failUnless("ValueError" in r, r) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + self.failUnlessEqual(len(flushed), 1) + f = flushed[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + # now that the pipeline is in the failed state, any new calls will + # fail immediately + + d3 = p.add(20, self.pause, "two") + + finished = [] + d3.addBoth(finished.append) + self.failUnlessEqual(len(finished), 1) + f = finished[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + r = repr(f.value) + self.failUnless("ValueError" in r, r) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + d4 = p.flush() + flushed = [] + d4.addBoth(flushed.append) + self.failUnlessEqual(len(flushed), 1) + f = flushed[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + def test_errors2(self): + self.calls = [] + p = pipeline.Pipeline(100) + + d1 = p.add(10, self.pause, "one") + d2 = p.add(20, self.pause, "two") + d3 = p.add(30, self.pause, "three") + d4 = p.flush() + + # one call fails, then the second one succeeds: make sure + # ExpandableDeferredList tolerates the second one + + flushed = [] + d4.addBoth(flushed.append) + self.failUnlessEqual(flushed, []) + + self.calls[0][0].errback(ValueError("oops")) + self.failUnlessEqual(len(flushed), 1) + f = flushed[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + self.calls[1][0].callback("two-result") + self.calls[2][0].errback(ValueError("three-error")) + + del d1,d2,d3,d4 + gc.collect() # for PyPy diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index f744feb7e..a71600627 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -5,19 +5,17 @@ import six import hashlib import os, time, sys import yaml -import gc # support PyPy from six.moves import StringIO from datetime import timedelta from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.python.failure import Failure -from twisted.python import log from allmydata.util import base32, idlib, mathutil, hashutil from allmydata.util import fileutil, abbreviate from allmydata.util import limiter, time_format, pollmixin -from allmydata.util import statistics, dictutil, pipeline, yamlutil +from allmydata.util import statistics, dictutil, yamlutil from allmydata.util import log as tahoe_log from allmydata.util.spans import Spans, overlap, DataSpans from allmydata.util.fileutil import EncryptedTemporaryFile @@ -1121,180 +1119,6 @@ class DictUtil(unittest.TestCase): self.failUnlessEqual(d["one"], 1) self.failUnlessEqual(d.get_aux("one"), None) -class Pipeline(unittest.TestCase): - def pause(self, *args, **kwargs): - d = defer.Deferred() - self.calls.append( (d, args, kwargs) ) - return d - - def failUnlessCallsAre(self, expected): - #print self.calls - #print expected - self.failUnlessEqual(len(self.calls), len(expected), self.calls) - for i,c in enumerate(self.calls): - self.failUnlessEqual(c[1:], expected[i], str(i)) - - def test_basic(self): - self.calls = [] - finished = [] - p = pipeline.Pipeline(100) - - d = p.flush() # fires immediately - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 1) - finished = [] - - d = p.add(10, self.pause, "one") - # the call should start right away, and our return Deferred should - # fire right away - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 1) - self.failUnlessEqual(finished[0], None) - self.failUnlessCallsAre([ ( ("one",) , {} ) ]) - self.failUnlessEqual(p.gauge, 10) - - # pipeline: [one] - - finished = [] - d = p.add(20, self.pause, "two", kw=2) - # pipeline: [one, two] - - # the call and the Deferred should fire right away - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 1) - self.failUnlessEqual(finished[0], None) - self.failUnlessCallsAre([ ( ("one",) , {} ), - ( ("two",) , {"kw": 2} ), - ]) - self.failUnlessEqual(p.gauge, 30) - - self.calls[0][0].callback("one-result") - # pipeline: [two] - self.failUnlessEqual(p.gauge, 20) - - finished = [] - d = p.add(90, self.pause, "three", "posarg1") - # pipeline: [two, three] - flushed = [] - fd = p.flush() - fd.addCallbacks(flushed.append, log.err) - self.failUnlessEqual(flushed, []) - - # the call will be made right away, but the return Deferred will not, - # because the pipeline is now full. - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 0) - self.failUnlessCallsAre([ ( ("one",) , {} ), - ( ("two",) , {"kw": 2} ), - ( ("three", "posarg1"), {} ), - ]) - self.failUnlessEqual(p.gauge, 110) - - self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause) - - # retiring either call will unblock the pipeline, causing the #3 - # Deferred to fire - self.calls[2][0].callback("three-result") - # pipeline: [two] - - self.failUnlessEqual(len(finished), 1) - self.failUnlessEqual(finished[0], None) - self.failUnlessEqual(flushed, []) - - # retiring call#2 will finally allow the flush() Deferred to fire - self.calls[1][0].callback("two-result") - self.failUnlessEqual(len(flushed), 1) - - def test_errors(self): - self.calls = [] - p = pipeline.Pipeline(100) - - d1 = p.add(200, self.pause, "one") - d2 = p.flush() - - finished = [] - d1.addBoth(finished.append) - self.failUnlessEqual(finished, []) - - flushed = [] - d2.addBoth(flushed.append) - self.failUnlessEqual(flushed, []) - - self.calls[0][0].errback(ValueError("oops")) - - self.failUnlessEqual(len(finished), 1) - f = finished[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - self.failUnlessIn("PipelineError", str(f.value)) - self.failUnlessIn("ValueError", str(f.value)) - r = repr(f.value) - self.failUnless("ValueError" in r, r) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - self.failUnlessEqual(len(flushed), 1) - f = flushed[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - # now that the pipeline is in the failed state, any new calls will - # fail immediately - - d3 = p.add(20, self.pause, "two") - - finished = [] - d3.addBoth(finished.append) - self.failUnlessEqual(len(finished), 1) - f = finished[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - r = repr(f.value) - self.failUnless("ValueError" in r, r) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - d4 = p.flush() - flushed = [] - d4.addBoth(flushed.append) - self.failUnlessEqual(len(flushed), 1) - f = flushed[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - def test_errors2(self): - self.calls = [] - p = pipeline.Pipeline(100) - - d1 = p.add(10, self.pause, "one") - d2 = p.add(20, self.pause, "two") - d3 = p.add(30, self.pause, "three") - d4 = p.flush() - - # one call fails, then the second one succeeds: make sure - # ExpandableDeferredList tolerates the second one - - flushed = [] - d4.addBoth(flushed.append) - self.failUnlessEqual(flushed, []) - - self.calls[0][0].errback(ValueError("oops")) - self.failUnlessEqual(len(flushed), 1) - f = flushed[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - self.calls[1][0].callback("two-result") - self.calls[2][0].errback(ValueError("three-error")) - - del d1,d2,d3,d4 - gc.collect() # for PyPy class SampleError(Exception): pass From 0763f9f90b4ae7549efac5d6752d3a1e7bb058f6 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 22 Jul 2020 10:36:50 -0400 Subject: [PATCH 4/6] Port to Python 3. --- src/allmydata/test/test_pipeline.py | 8 ++++++++ src/allmydata/util/_python3.py | 2 ++ src/allmydata/util/pipeline.py | 15 +++++++++++++++ 3 files changed, 25 insertions(+) diff --git a/src/allmydata/test/test_pipeline.py b/src/allmydata/test/test_pipeline.py index b5c0f535a..ab7059521 100644 --- a/src/allmydata/test/test_pipeline.py +++ b/src/allmydata/test/test_pipeline.py @@ -3,6 +3,14 @@ Tests for allmydata.util.pipeline. Ported to Python 3. """ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 +if PY2: + from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401 import gc diff --git a/src/allmydata/util/_python3.py b/src/allmydata/util/_python3.py index 92ad34174..72e56581b 100644 --- a/src/allmydata/util/_python3.py +++ b/src/allmydata/util/_python3.py @@ -23,6 +23,7 @@ PORTED_MODULES = [ "allmydata.util.mathutil", "allmydata.util.namespace", "allmydata.util.observer", + "allmydata.util.pipeline", "allmydata.util.pollmixin", "allmydata.util._python3", ] @@ -33,6 +34,7 @@ PORTED_TEST_MODULES = [ "allmydata.test.test_deferredutil", "allmydata.test.test_humanreadable", "allmydata.test.test_observer", + "allmydata.test.test_pipeline", "allmydata.test.test_python3", ] diff --git a/src/allmydata/util/pipeline.py b/src/allmydata/util/pipeline.py index 285a06b98..df80e2c6c 100644 --- a/src/allmydata/util/pipeline.py +++ b/src/allmydata/util/pipeline.py @@ -1,9 +1,24 @@ +""" +A pipeline of Deferreds. + +Ported to Python 3. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 +if PY2: + from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401 from twisted.internet import defer from twisted.python.failure import Failure from twisted.python import log from allmydata.util.assertutil import precondition + class PipelineError(Exception): """One of the pipelined messages returned an error. The received Failure object is stored in my .error attribute.""" From 691322764d3d71a4beba842dcf6ea95868f9ae6a Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 22 Jul 2020 10:37:30 -0400 Subject: [PATCH 5/6] Ratchet tests. --- misc/python3/ratchet-passing | 3 +++ 1 file changed, 3 insertions(+) diff --git a/misc/python3/ratchet-passing b/misc/python3/ratchet-passing index a6e1de68b..08f9b3fe9 100644 --- a/misc/python3/ratchet-passing +++ b/misc/python3/ratchet-passing @@ -23,6 +23,9 @@ allmydata.test.test_observer.Observer.test_lazy_oneshot allmydata.test.test_observer.Observer.test_observerlist allmydata.test.test_observer.Observer.test_oneshot allmydata.test.test_observer.Observer.test_oneshot_fireagain +allmydata.test.test_pipeline.Pipeline.test_basic +allmydata.test.test_pipeline.Pipeline.test_errors +allmydata.test.test_pipeline.Pipeline.test_errors2 allmydata.test.test_python3.Python3PortingEffortTests.test_finished_porting allmydata.test.test_python3.Python3PortingEffortTests.test_ported_modules_distinct allmydata.test.test_python3.Python3PortingEffortTests.test_ported_modules_exist From 38648c0f8fb9e7bb6e56d2a5aa4cc0ba5382f04b Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 24 Jul 2020 11:09:08 -0400 Subject: [PATCH 6/6] Fix indentation --- src/allmydata/util/_python3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/allmydata/util/_python3.py b/src/allmydata/util/_python3.py index 8c59978c6..08c11b3a6 100644 --- a/src/allmydata/util/_python3.py +++ b/src/allmydata/util/_python3.py @@ -38,7 +38,7 @@ PORTED_TEST_MODULES = [ "allmydata.test.test_hashtree", "allmydata.test.test_hashutil", "allmydata.test.test_humanreadable", - "allmydata.test.test_netstring", + "allmydata.test.test_netstring", "allmydata.test.test_observer", "allmydata.test.test_pipeline", "allmydata.test.test_python3",