Merge pull request #842 from tahoe-lafs/3456.bye-bye-common_py3

Fold common_py3 back into other files
This commit is contained in:
Chad Whitacre 2020-10-05 08:03:18 -04:00 committed by GitHub
commit 2fe2acf4c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 159 additions and 188 deletions

0
newsfragments/3456.minor Normal file
View File

View File

@ -46,6 +46,7 @@ from testtools.twistedsupport import (
flush_logged_errors,
)
from twisted.application import service
from twisted.plugin import IPlugin
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, returnValue
@ -87,8 +88,7 @@ from ..crypto import (
from .eliotutil import (
EliotLoggedRunTest,
)
# Backwards compatibility imports:
from .common_py3 import LoggingServiceParent, ShouldFailMixin # noqa: F401
from .common_util import ShouldFailMixin # noqa: F401
TEST_RSA_KEY_SIZE = 522
@ -781,6 +781,11 @@ def create_mutable_filenode(contents, mdmf=False, all_contents=None):
return filenode
class LoggingServiceParent(service.MultiService):
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)
TEST_DATA=b"\x02"*(Uploader.URI_LIT_SIZE_THRESHOLD+1)

View File

@ -1,170 +0,0 @@
"""
Common utilities that have been ported to Python 3.
Ported to Python 3.
"""
from __future__ import unicode_literals
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from past.builtins import unicode
import os
import time
import signal
from twisted.internet import defer, reactor
from twisted.application import service
from twisted.python import failure
from twisted.trial import unittest
from ..util.assertutil import precondition
from ..util.encodingutil import unicode_platform, get_filesystem_encoding
from ..util import log
class TimezoneMixin(object):
def setTimezone(self, timezone):
def tzset_if_possible():
# Windows doesn't have time.tzset().
if hasattr(time, 'tzset'):
time.tzset()
unset = object()
originalTimezone = os.environ.get('TZ', unset)
def restoreTimezone():
if originalTimezone is unset:
del os.environ['TZ']
else:
os.environ['TZ'] = originalTimezone
tzset_if_possible()
os.environ['TZ'] = timezone
self.addCleanup(restoreTimezone)
tzset_if_possible()
def have_working_tzset(self):
return hasattr(time, 'tzset')
class SignalMixin(object):
# This class is necessary for any code which wants to use Processes
# outside the usual reactor.run() environment. It is copied from
# Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses
# something rather different.
sigchldHandler = None
def setUp(self):
# make sure SIGCHLD handler is installed, as it should be on
# reactor.run(). problem is reactor may not have been run when this
# test runs.
if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
self.sigchldHandler = signal.signal(signal.SIGCHLD,
reactor._handleSigchld)
return super(SignalMixin, self).setUp()
def tearDown(self):
if self.sigchldHandler:
signal.signal(signal.SIGCHLD, self.sigchldHandler)
return super(SignalMixin, self).tearDown()
class ShouldFailMixin(object):
def shouldFail(self, expected_failure, which, substring,
callable, *args, **kwargs):
"""Assert that a function call raises some exception. This is a
Deferred-friendly version of TestCase.assertRaises() .
Suppose you want to verify the following function:
def broken(a, b, c):
if a < 0:
raise TypeError('a must not be negative')
return defer.succeed(b+c)
You can use:
d = self.shouldFail(TypeError, 'test name',
'a must not be negative',
broken, -4, 5, c=12)
in your test method. The 'test name' string will be included in the
error message, if any, because Deferred chains frequently make it
difficult to tell which assertion was tripped.
The substring= argument, if not None, must appear in the 'repr'
of the message wrapped by this Failure, or the test will fail.
"""
assert substring is None or isinstance(substring, (bytes, unicode))
d = defer.maybeDeferred(callable, *args, **kwargs)
def done(res):
if isinstance(res, failure.Failure):
res.trap(expected_failure)
if substring:
self.failUnless(substring in str(res),
"%s: substring '%s' not in '%s'"
% (which, substring, str(res)))
# return the Failure for further analysis, but in a form that
# doesn't make the Deferred chain think that we failed.
return [res]
else:
self.fail("%s was supposed to raise %s, not get '%s'" %
(which, expected_failure, res))
d.addBoth(done)
return d
class ReallyEqualMixin(object):
def failUnlessReallyEqual(self, a, b, msg=None):
self.assertEqual(a, b, msg)
self.assertEqual(type(a), type(b), "a :: %r (%s), b :: %r (%s), %r" % (a, type(a), b, type(b), msg))
def skip_if_cannot_represent_filename(u):
precondition(isinstance(u, unicode))
enc = get_filesystem_encoding()
if not unicode_platform():
try:
u.encode(enc)
except UnicodeEncodeError:
raise unittest.SkipTest("A non-ASCII filename could not be encoded on this platform.")
class Marker(object):
pass
class FakeCanary(object):
"""For use in storage tests.
Can be moved back to test_storage.py once enough Python 3 porting has been
done.
"""
def __init__(self, ignore_disconnectors=False):
self.ignore = ignore_disconnectors
self.disconnectors = {}
def notifyOnDisconnect(self, f, *args, **kwargs):
if self.ignore:
return
m = Marker()
self.disconnectors[m] = (f, args, kwargs)
return m
def dontNotifyOnDisconnect(self, marker):
if self.ignore:
return
del self.disconnectors[marker]
def getRemoteTubID(self):
return None
def getPeer(self):
return "<fake>"
class LoggingServiceParent(service.MultiService):
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)

View File

@ -1,22 +1,33 @@
from __future__ import print_function
import os
import time
import signal
from random import randrange
from six.moves import StringIO
from twisted.internet import reactor, defer
from twisted.python import failure
from twisted.trial import unittest
from ..util.assertutil import precondition
from ..scripts import runner
from allmydata.util.encodingutil import get_io_encoding
from allmydata.util.encodingutil import unicode_platform, get_filesystem_encoding, get_io_encoding
# Imported for backwards compatibility:
from future.utils import bord, bchr, binary_type
from .common_py3 import (
SignalMixin, skip_if_cannot_represent_filename, ReallyEqualMixin, ShouldFailMixin
)
from past.builtins import unicode
def skip_if_cannot_represent_filename(u):
precondition(isinstance(u, unicode))
enc = get_filesystem_encoding()
if not unicode_platform():
try:
u.encode(enc)
except UnicodeEncodeError:
raise unittest.SkipTest("A non-ASCII filename could not be encoded on this platform.")
def skip_if_cannot_represent_argv(u):
precondition(isinstance(u, unicode))
try:
@ -78,6 +89,34 @@ def flip_one_bit(s, offset=0, size=None):
return result
class ReallyEqualMixin(object):
def failUnlessReallyEqual(self, a, b, msg=None):
self.assertEqual(a, b, msg)
self.assertEqual(type(a), type(b), "a :: %r (%s), b :: %r (%s), %r" % (a, type(a), b, type(b), msg))
class SignalMixin(object):
# This class is necessary for any code which wants to use Processes
# outside the usual reactor.run() environment. It is copied from
# Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses
# something rather different.
sigchldHandler = None
def setUp(self):
# make sure SIGCHLD handler is installed, as it should be on
# reactor.run(). problem is reactor may not have been run when this
# test runs.
if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
self.sigchldHandler = signal.signal(signal.SIGCHLD,
reactor._handleSigchld)
return super(SignalMixin, self).setUp()
def tearDown(self):
if self.sigchldHandler:
signal.signal(signal.SIGCHLD, self.sigchldHandler)
return super(SignalMixin, self).tearDown()
class StallMixin(object):
def stall(self, res=None, delay=1):
d = defer.Deferred()
@ -85,6 +124,79 @@ class StallMixin(object):
return d
class Marker(object):
pass
class FakeCanary(object):
"""For use in storage tests.
Can be moved back to test_storage.py once enough Python 3 porting has been
done.
"""
def __init__(self, ignore_disconnectors=False):
self.ignore = ignore_disconnectors
self.disconnectors = {}
def notifyOnDisconnect(self, f, *args, **kwargs):
if self.ignore:
return
m = Marker()
self.disconnectors[m] = (f, args, kwargs)
return m
def dontNotifyOnDisconnect(self, marker):
if self.ignore:
return
del self.disconnectors[marker]
def getRemoteTubID(self):
return None
def getPeer(self):
return "<fake>"
class ShouldFailMixin(object):
def shouldFail(self, expected_failure, which, substring,
callable, *args, **kwargs):
"""Assert that a function call raises some exception. This is a
Deferred-friendly version of TestCase.assertRaises() .
Suppose you want to verify the following function:
def broken(a, b, c):
if a < 0:
raise TypeError('a must not be negative')
return defer.succeed(b+c)
You can use:
d = self.shouldFail(TypeError, 'test name',
'a must not be negative',
broken, -4, 5, c=12)
in your test method. The 'test name' string will be included in the
error message, if any, because Deferred chains frequently make it
difficult to tell which assertion was tripped.
The substring= argument, if not None, must appear in the 'repr'
of the message wrapped by this Failure, or the test will fail.
"""
assert substring is None or isinstance(substring, (bytes, unicode))
d = defer.maybeDeferred(callable, *args, **kwargs)
def done(res):
if isinstance(res, failure.Failure):
res.trap(expected_failure)
if substring:
self.failUnless(substring in str(res),
"%s: substring '%s' not in '%s'"
% (which, substring, str(res)))
# return the Failure for further analysis, but in a form that
# doesn't make the Deferred chain think that we failed.
return [res]
else:
self.fail("%s was supposed to raise %s, not get '%s'" %
(which, expected_failure, res))
d.addBoth(done)
return d
class TestMixin(SignalMixin):
def setUp(self):
return super(TestMixin, self).setUp()
@ -132,6 +244,31 @@ class TestMixin(SignalMixin):
self.fail("Reactor was still active when it was required to be quiescent.")
class TimezoneMixin(object):
def setTimezone(self, timezone):
def tzset_if_possible():
# Windows doesn't have time.tzset().
if hasattr(time, 'tzset'):
time.tzset()
unset = object()
originalTimezone = os.environ.get('TZ', unset)
def restoreTimezone():
if originalTimezone is unset:
del os.environ['TZ']
else:
os.environ['TZ'] = originalTimezone
tzset_if_possible()
os.environ['TZ'] = timezone
self.addCleanup(restoreTimezone)
tzset_if_possible()
def have_working_tzset(self):
return hasattr(time, 'tzset')
try:
import win32file
import win32con

View File

@ -27,8 +27,7 @@ from allmydata.util import fileutil, hashutil, pollmixin
from allmydata.storage.server import StorageServer, si_b2a
from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
from allmydata.test.common_py3 import FakeCanary
from allmydata.test.common_util import StallMixin
from allmydata.test.common_util import StallMixin, FakeCanary
class BucketEnumeratingCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice

View File

@ -77,7 +77,7 @@ from twisted.trial import unittest
from twisted.python.filepath import FilePath
from allmydata.test.common_py3 import (
from allmydata.test.common_util import (
ReallyEqualMixin, skip_if_cannot_represent_filename,
)
from allmydata.util import encodingutil, fileutil

View File

@ -23,7 +23,7 @@ from hypothesis.strategies import text, sets
from allmydata.immutable import happiness_upload
from allmydata.util.happinessutil import servers_of_happiness, \
shares_by_server, merge_servers
from allmydata.test.common_py3 import ShouldFailMixin
from allmydata.test.common import ShouldFailMixin
class HappinessUploadUtils(unittest.TestCase):

View File

@ -23,7 +23,7 @@ from tenacity import retry, stop_after_attempt
from foolscap.api import Tub
from allmydata.util import iputil, gcutil
import allmydata.test.common_py3 as testutil
import allmydata.test.common_util as testutil
from allmydata.util.namespace import Namespace

View File

@ -51,7 +51,8 @@ from allmydata.test.no_network import NoNetworkServer
from allmydata.storage_client import (
_StorageServer,
)
from .common_py3 import FakeCanary, LoggingServiceParent, ShouldFailMixin
from .common import LoggingServiceParent, ShouldFailMixin
from .common_util import FakeCanary
class UtilTests(unittest.TestCase):

View File

@ -50,7 +50,7 @@ from allmydata.web.storage import (
StorageStatusElement,
remove_prefix
)
from .common_py3 import FakeCanary
from .common_util import FakeCanary
def remove_tags(s):
s = re.sub(br'<[^>]*>', b' ', s)

View File

@ -16,7 +16,7 @@ import time
from twisted.trial import unittest
from allmydata.test.common_py3 import TimezoneMixin
from allmydata.test.common_util import TimezoneMixin
from allmydata.util import time_format

View File

@ -28,12 +28,12 @@ from allmydata.util import log, base32
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata.test.no_network import GridTestMixin
from allmydata.test.common_py3 import ShouldFailMixin
from allmydata.storage_client import StorageFarmBroker
from allmydata.storage.server import storage_index_to_dir
from allmydata.client import _Client
from .common import (
EMPTY_CLIENT_CONFIG,
ShouldFailMixin,
)
from functools import reduce

View File

@ -33,7 +33,7 @@ from .common import (
from ..common import (
SameProcessStreamEndpointAssigner,
)
from ..common_py3 import (
from ..common_util import (
FakeCanary,
)
from ..common_web import (

View File

@ -52,7 +52,7 @@ from allmydata.interfaces import (
)
from allmydata.mutable import servermap, publish, retrieve
from .. import common_util as testutil
from ..common_py3 import TimezoneMixin
from ..common_util import TimezoneMixin
from ..common_web import (
do_http,
Error,

View File

@ -53,7 +53,6 @@ PORTED_MODULES = [
"allmydata.storage.mutable",
"allmydata.storage.server",
"allmydata.storage.shares",
"allmydata.test.common_py3",
"allmydata.test.no_network",
"allmydata.uri",
"allmydata.util._python3",