""" Common utilities that have been ported to Python 3. Ported to Python 3. """ from __future__ import unicode_literals from __future__ import absolute_import from __future__ import division from __future__ import print_function from future.utils import PY2 if PY2: from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 from past.builtins import unicode import os import time import signal from twisted.internet import defer, reactor from twisted.application import service from twisted.python import failure from twisted.trial import unittest from ..util.assertutil import precondition from ..util.encodingutil import unicode_platform, get_filesystem_encoding from ..util import log class TimezoneMixin(object): def setTimezone(self, timezone): def tzset_if_possible(): # Windows doesn't have time.tzset(). if hasattr(time, 'tzset'): time.tzset() unset = object() originalTimezone = os.environ.get('TZ', unset) def restoreTimezone(): if originalTimezone is unset: del os.environ['TZ'] else: os.environ['TZ'] = originalTimezone tzset_if_possible() os.environ['TZ'] = timezone self.addCleanup(restoreTimezone) tzset_if_possible() def have_working_tzset(self): return hasattr(time, 'tzset') class SignalMixin(object): # This class is necessary for any code which wants to use Processes # outside the usual reactor.run() environment. It is copied from # Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses # something rather different. sigchldHandler = None def setUp(self): # make sure SIGCHLD handler is installed, as it should be on # reactor.run(). problem is reactor may not have been run when this # test runs. if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"): self.sigchldHandler = signal.signal(signal.SIGCHLD, reactor._handleSigchld) return super(SignalMixin, self).setUp() def tearDown(self): if self.sigchldHandler: signal.signal(signal.SIGCHLD, self.sigchldHandler) return super(SignalMixin, self).tearDown() class ShouldFailMixin(object): def shouldFail(self, expected_failure, which, substring, callable, *args, **kwargs): assert substring is None or isinstance(substring, (bytes, unicode)) d = defer.maybeDeferred(callable, *args, **kwargs) def done(res): if isinstance(res, failure.Failure): res.trap(expected_failure) if substring: self.failUnless(substring in str(res), "%s: substring '%s' not in '%s'" % (which, substring, str(res))) # return the Failure for further analysis, but in a form that # doesn't make the Deferred chain think that we failed. return [res] else: self.fail("%s was supposed to raise %s, not get '%s'" % (which, expected_failure, res)) d.addBoth(done) return d class ReallyEqualMixin(object): def failUnlessReallyEqual(self, a, b, msg=None): self.assertEqual(a, b, msg) self.assertEqual(type(a), type(b), "a :: %r, b :: %r, %r" % (a, b, msg)) def skip_if_cannot_represent_filename(u): precondition(isinstance(u, unicode)) enc = get_filesystem_encoding() if not unicode_platform(): try: u.encode(enc) except UnicodeEncodeError: raise unittest.SkipTest("A non-ASCII filename could not be encoded on this platform.") class Marker(object): pass class FakeCanary(object): """For use in storage tests. Can be moved back to test_storage.py once enough Python 3 porting has been done. """ def __init__(self, ignore_disconnectors=False): self.ignore = ignore_disconnectors self.disconnectors = {} def notifyOnDisconnect(self, f, *args, **kwargs): if self.ignore: return m = Marker() self.disconnectors[m] = (f, args, kwargs) return m def dontNotifyOnDisconnect(self, marker): if self.ignore: return del self.disconnectors[marker] class LoggingServiceParent(service.MultiService): def log(self, *args, **kwargs): return log.msg(*args, **kwargs) class ShouldFailMixin(object): def shouldFail(self, expected_failure, which, substring, callable, *args, **kwargs): """Assert that a function call raises some exception. This is a Deferred-friendly version of TestCase.assertRaises() . Suppose you want to verify the following function: def broken(a, b, c): if a < 0: raise TypeError('a must not be negative') return defer.succeed(b+c) You can use: d = self.shouldFail(TypeError, 'test name', 'a must not be negative', broken, -4, 5, c=12) in your test method. The 'test name' string will be included in the error message, if any, because Deferred chains frequently make it difficult to tell which assertion was tripped. The substring= argument, if not None, must appear in the 'repr' of the message wrapped by this Failure, or the test will fail. """ assert substring is None or isinstance(substring, str) d = defer.maybeDeferred(callable, *args, **kwargs) def done(res): if isinstance(res, failure.Failure): res.trap(expected_failure) if substring: message = repr(res.value.args[0]) self.failUnless(substring in message, "%s: substring '%s' not in '%s'" % (which, substring, message)) else: self.fail("%s was supposed to raise %s, not get '%s'" % (which, expected_failure, res)) d.addBoth(done) return d