diff --git a/integration/conftest.py b/integration/conftest.py index 36e7eef0b..621c0224c 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -1,6 +1,9 @@ """ Ported to Python 3. """ + +from __future__ import annotations + import sys import shutil from time import sleep @@ -19,6 +22,7 @@ from eliot import ( log_call, ) +from twisted.python.filepath import FilePath from twisted.python.procutils import which from twisted.internet.defer import DeferredList from twisted.internet.error import ( @@ -104,7 +108,7 @@ def reactor(): @pytest.fixture(scope='session') @log_call(action_type=u"integration:temp_dir", include_args=[]) -def temp_dir(request): +def temp_dir(request) -> str: """ Invoke like 'py.test --keep-tempdir ...' to avoid deleting the temp-dir """ @@ -146,7 +150,8 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request): '--location', 'tcp:localhost:3117', '--port', '3117', gather_dir, - ) + ), + env=environ, ) pytest_twisted.blockon(out_protocol.done) @@ -159,6 +164,7 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request): join(gather_dir, 'gatherer.tac'), ), path=gather_dir, + env=environ, ) pytest_twisted.blockon(twistd_protocol.magic_seen) @@ -177,6 +183,7 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request): ( 'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0]) ), + env=environ, ) print("Waiting for flogtool to complete") try: @@ -444,15 +451,30 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques @pytest.fixture(scope='session') @pytest.mark.skipif(sys.platform.startswith('win'), 'Tor tests are unstable on Windows') -def chutney(reactor, temp_dir): +def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]: + # Try to find Chutney already installed in the environment. + try: + import chutney + except ImportError: + # Nope, we'll get our own in a moment. + pass + else: + # We already have one, just use it. + return ( + # from `checkout/lib/chutney/__init__.py` we want to get back to + # `checkout` because that's the parent of the directory with all + # of the network definitions. So, great-grand-parent. + FilePath(chutney.__file__).parent().parent().parent().path, + # There's nothing to add to the environment. + {}, + ) chutney_dir = join(temp_dir, 'chutney') mkdir(chutney_dir) - # TODO: - - # check for 'tor' binary explicitly and emit a "skip" if we can't - # find it + missing = [exe for exe in ["tor", "tor-gencert"] if not which(exe)] + if missing: + pytest.skip(f"Some command-line tools not found: {missing}") # XXX yuck! should add a setup.py to chutney so we can at least # "pip install " and/or depend on chutney in "pip @@ -487,88 +509,58 @@ def chutney(reactor, temp_dir): ) pytest_twisted.blockon(proto.done) - return chutney_dir + return (chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")}) @pytest.fixture(scope='session') @pytest.mark.skipif(sys.platform.startswith('win'), reason='Tor tests are unstable on Windows') def tor_network(reactor, temp_dir, chutney, request): + """ + Build a basic Tor network. - # this is the actual "chutney" script at the root of a chutney checkout - chutney_dir = chutney - chut = join(chutney_dir, 'chutney') + :param chutney: The root directory of a Chutney checkout and a dict of + additional environment variables to set so a Python process can use + it. + + :return: None + """ + chutney_root, chutney_env = chutney + basic_network = join(chutney_root, 'networks', 'basic') + + env = environ.copy() + env.update(chutney_env) + chutney_argv = (sys.executable, '-m', 'chutney.TorNet') + def chutney(argv): + proto = _DumpOutputProtocol(None) + reactor.spawnProcess( + proto, + sys.executable, + chutney_argv + argv, + path=join(chutney_root), + env=env, + ) + return proto.done # now, as per Chutney's README, we have to create the network # ./chutney configure networks/basic # ./chutney start networks/basic - - env = environ.copy() - env.update({"PYTHONPATH": join(chutney_dir, "lib")}) - proto = _DumpOutputProtocol(None) - reactor.spawnProcess( - proto, - sys.executable, - ( - sys.executable, '-m', 'chutney.TorNet', 'configure', - join(chutney_dir, 'networks', 'basic'), - ), - path=join(chutney_dir), - env=env, - ) - pytest_twisted.blockon(proto.done) - - proto = _DumpOutputProtocol(None) - reactor.spawnProcess( - proto, - sys.executable, - ( - sys.executable, '-m', 'chutney.TorNet', 'start', - join(chutney_dir, 'networks', 'basic'), - ), - path=join(chutney_dir), - env=env, - ) - pytest_twisted.blockon(proto.done) + pytest_twisted.blockon(chutney(("configure", basic_network))) + pytest_twisted.blockon(chutney(("start", basic_network))) # print some useful stuff - proto = _CollectOutputProtocol() - reactor.spawnProcess( - proto, - sys.executable, - ( - sys.executable, '-m', 'chutney.TorNet', 'status', - join(chutney_dir, 'networks', 'basic'), - ), - path=join(chutney_dir), - env=env, - ) try: - pytest_twisted.blockon(proto.done) + pytest_twisted.blockon(chutney(("status", basic_network))) except ProcessTerminated: - print("Chutney.TorNet status failed (continuing):") - print(proto.output.getvalue()) + print("Chutney.TorNet status failed (continuing)") def cleanup(): print("Tearing down Chutney Tor network") - proto = _CollectOutputProtocol() - reactor.spawnProcess( - proto, - sys.executable, - ( - sys.executable, '-m', 'chutney.TorNet', 'stop', - join(chutney_dir, 'networks', 'basic'), - ), - path=join(chutney_dir), - env=env, - ) try: - block_with_timeout(proto.done, reactor) + block_with_timeout(chutney(("stop", basic_network)), reactor) except ProcessTerminated: # If this doesn't exit cleanly, that's fine, that shouldn't fail # the test suite. pass request.addfinalizer(cleanup) - - return chut diff --git a/integration/test_i2p.py b/integration/test_i2p.py index 2deb01fab..96619a93a 100644 --- a/integration/test_i2p.py +++ b/integration/test_i2p.py @@ -2,26 +2,11 @@ Integration tests for I2P support. """ -from __future__ import unicode_literals -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from future.utils import PY2 -if PY2: - from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 - import sys from os.path import join, exists -from os import mkdir +from os import mkdir, environ from time import sleep - -if PY2: - def which(path): - # This will result in skipping I2P tests on Python 2. Oh well. - return None -else: - from shutil import which +from shutil import which from eliot import log_call @@ -62,6 +47,7 @@ def i2p_network(reactor, temp_dir, request): "--log=stdout", "--loglevel=info" ), + env=environ, ) def cleanup(): @@ -170,7 +156,8 @@ def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_netw sys.executable, '-b', '-m', 'allmydata.scripts.runner', '-d', join(temp_dir, 'carol_i2p'), 'put', gold_path, - ) + ), + env=environ, ) yield proto.done cap = proto.output.getvalue().strip().split()[-1] @@ -184,7 +171,8 @@ def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_netw sys.executable, '-b', '-m', 'allmydata.scripts.runner', '-d', join(temp_dir, 'dave_i2p'), 'get', cap, - ) + ), + env=environ, ) yield proto.done @@ -211,7 +199,8 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_ '--hide-ip', '--listen', 'i2p', node_dir.path, - ) + ), + env=environ, ) yield proto.done diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index c63642066..8363edb35 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -1,17 +1,10 @@ """ Ported to Python 3. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -from future.utils import PY2 -if PY2: - from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 import sys from os.path import join +from os import environ from twisted.internet.error import ProcessTerminated @@ -45,7 +38,8 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto sys.executable, '-b', '-m', 'allmydata.scripts.runner', '-d', node_dir, 'put', __file__, - ] + ], + env=environ, ) try: yield proto.done diff --git a/integration/test_tor.py b/integration/test_tor.py index 6f6f54c25..fb9d8c086 100644 --- a/integration/test_tor.py +++ b/integration/test_tor.py @@ -1,17 +1,10 @@ """ Ported to Python 3. """ -from __future__ import unicode_literals -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from future.utils import PY2 -if PY2: - from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 import sys from os.path import join +from os import environ import pytest import pytest_twisted @@ -36,9 +29,6 @@ from allmydata.client import read_config if sys.platform.startswith('win'): pytest.skip('Skipping Tor tests on Windows', allow_module_level=True) -if PY2: - pytest.skip('Skipping Tor tests on Python 2 because dependencies are hard to come by', allow_module_level=True) - @pytest_twisted.inlineCallbacks def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl): carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl) @@ -66,7 +56,8 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne sys.executable, '-b', '-m', 'allmydata.scripts.runner', '-d', join(temp_dir, 'carol'), 'put', gold_path, - ) + ), + env=environ, ) yield proto.done cap = proto.output.getvalue().strip().split()[-1] @@ -80,7 +71,8 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne sys.executable, '-b', '-m', 'allmydata.scripts.runner', '-d', join(temp_dir, 'dave'), 'get', cap, - ) + ), + env=environ, ) yield proto.done @@ -113,7 +105,8 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_ '--shares-happy', '1', '--shares-total', '2', node_dir.path, - ) + ), + env=environ, ) yield proto.done diff --git a/mypy.ini b/mypy.ini index 7acc0ddc5..482fd6dd8 100644 --- a/mypy.ini +++ b/mypy.ini @@ -9,7 +9,7 @@ no_implicit_optional = True warn_redundant_casts = True strict_equality = True -[mypy-allmydata.test.cli.wormholetesting] +[mypy-allmydata.test.cli.wormholetesting,allmydata.test.test_connection_status] disallow_any_generics = True disallow_subclassing_any = True disallow_untyped_calls = True diff --git a/newsfragments/3978.minor b/newsfragments/3978.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/4000.minor b/newsfragments/4000.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/4001.minor b/newsfragments/4001.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/4002.minor b/newsfragments/4002.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/4003.minor b/newsfragments/4003.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index c88613803..a40e98b03 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1024,7 +1024,7 @@ async def _pick_a_http_server( reactor, nurls: list[DecodedURL], request: Callable[[Any, DecodedURL], defer.Deferred[Any]] -) -> Optional[DecodedURL]: +) -> DecodedURL: """Pick the first server we successfully send a request to. Fires with ``None`` if no server was found, or with the ``DecodedURL`` of @@ -1035,18 +1035,8 @@ async def _pick_a_http_server( for nurl in nurls ]) - try: - _, nurl = await queries - return nurl - except Exception as e: - # Logging errors breaks a bunch of tests, and it's not a _bug_ to - # have a failed connection, it's often expected and transient. More - # of a warning, really? - log.msg( - "Failed to connect to a storage server advertised by NURL: {}".format( - e) - ) - return None + _, nurl = await queries + return nurl @implementer(IServer) @@ -1079,7 +1069,7 @@ class HTTPNativeStorageServer(service.MultiService): DecodedURL.from_text(u) for u in announcement[ANONYMOUS_STORAGE_NURLS] ] - self._istorage_server = None + self._istorage_server : Optional[_HTTPStorageServer] = None self._connection_status = connection_status.ConnectionStatus.unstarted() self._version = None @@ -1196,18 +1186,56 @@ class HTTPNativeStorageServer(service.MultiService): def try_to_connect(self): self._connect() + def _connect(self) -> defer.Deferred[object]: + """ + Try to connect to a working storage server. + + If called while a previous ``_connect()`` is already running, it will + just return the same ``Deferred``. + + ``LoopingCall.stop()`` doesn't cancel ``Deferred``s, unfortunately: + https://github.com/twisted/twisted/issues/11814. Thus we want to store + the ``Deferred`` so we can cancel it when necessary. + + We also want to return it so that loop iterations take it into account, + and a new iteration doesn't start while we're in the middle of the + previous one. + """ + # Conceivably try_to_connect() was called on this before, in which case + # we already are in the middle of connecting. So in that case just + # return whatever is in progress: + if self._connecting_deferred is not None: + return self._connecting_deferred + + def done(_): + self._connecting_deferred = None + + connecting = self._pick_server_and_get_version() + # Set a short timeout since we're relying on this for server liveness. + connecting = connecting.addTimeout(5, self._reactor).addCallbacks( + self._got_version, self._failed_to_connect + ).addBoth(done) + self._connecting_deferred = connecting + return connecting + @async_to_deferred - async def _connect(self): - if self._istorage_server is None: + async def _pick_server_and_get_version(self): + """ + Minimal implementation of connection logic: pick a server, get its + version. This doesn't deal with errors much, so as to minimize + statefulness. It does change ``self._istorage_server``, so possibly + more refactoring would be useful to remove even that much statefulness. + """ + async def get_istorage_server() -> _HTTPStorageServer: + if self._istorage_server is not None: + return self._istorage_server + # We haven't selected a server yet, so let's do so. # TODO This is somewhat inefficient on startup: it takes two successful # version() calls before we are live talking to a server, it could only # be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992 - # TODO Another problem with this scheme is that while picking - # the HTTP server to talk to, we don't have connection status - # updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978 def request(reactor, nurl: DecodedURL): # Since we're just using this one off to check if the NURL # works, no need for persistent pool or other fanciness. @@ -1217,45 +1245,23 @@ class HTTPNativeStorageServer(service.MultiService): StorageClient.from_nurl(nurl, reactor, pool) ).get_version() - # LoopingCall.stop() doesn't cancel Deferreds, unfortunately: - # https://github.com/twisted/twisted/issues/11814 Thus we want - # store the Deferred so it gets cancelled. - picking = _pick_a_http_server(reactor, self._nurls, request) - self._connecting_deferred = picking - try: - nurl = await picking - finally: - self._connecting_deferred = None + nurl = await _pick_a_http_server(reactor, self._nurls, request) - if nurl is None: - # We failed to find a server to connect to. Perhaps the next - # iteration of the loop will succeed. - return - else: - self._istorage_server = _HTTPStorageServer.from_http_client( - StorageClient.from_nurl(nurl, reactor) - ) + # If we've gotten this far, we've found a working NURL. + self._istorage_server = _HTTPStorageServer.from_http_client( + StorageClient.from_nurl(nurl, reactor) + ) + return self._istorage_server - result = self._istorage_server.get_version() - - def remove_connecting_deferred(result): - self._connecting_deferred = None - return result - - # Set a short timeout since we're relying on this for server liveness. - self._connecting_deferred = result.addTimeout(5, self._reactor).addBoth( - remove_connecting_deferred).addCallbacks( - self._got_version, - self._failed_to_connect - ) - - # We don't want to do another iteration of the loop until this - # iteration has finished, so wait here: try: - if self._connecting_deferred is not None: - await self._connecting_deferred + storage_server = await get_istorage_server() + + # Get the version from the remote server. + version = await storage_server.get_version() + return version except Exception as e: log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS) + raise def stopService(self): if self._connecting_deferred is not None: @@ -1472,7 +1478,7 @@ class _HTTPStorageServer(object): _http_client = attr.ib(type=StorageClient) @staticmethod - def from_http_client(http_client): # type: (StorageClient) -> _HTTPStorageServer + def from_http_client(http_client: StorageClient) -> _HTTPStorageServer: """ Create an ``IStorageServer`` from a HTTP ``StorageClient``. """ diff --git a/src/allmydata/test/test_connection_status.py b/src/allmydata/test/test_connection_status.py index 2bd8bf6ab..da41f5a47 100644 --- a/src/allmydata/test/test_connection_status.py +++ b/src/allmydata/test/test_connection_status.py @@ -1,25 +1,46 @@ """ Tests for allmydata.util.connection_status. - -Port to Python 3. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals -from future.utils import PY2 -if PY2: - from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 +from __future__ import annotations -import mock +from typing import Optional -from twisted.trial import unittest +from foolscap.reconnector import ReconnectionInfo, Reconnector +from foolscap.info import ConnectionInfo from ..util import connection_status +from .common import SyncTestCase -class Status(unittest.TestCase): - def test_hint_statuses(self): +def reconnector(info: ReconnectionInfo) -> Reconnector: + rc = Reconnector(None, None, (), {}) # type: ignore[no-untyped-call] + rc._reconnectionInfo = info + return rc + +def connection_info( + statuses: dict[str, str], + handlers: dict[str, str], + winningHint: Optional[str], + establishedAt: Optional[int], +) -> ConnectionInfo: + ci = ConnectionInfo() # type: ignore[no-untyped-call] + ci.connectorStatuses = statuses + ci.connectionHandlers = handlers + ci.winningHint = winningHint + ci.establishedAt = establishedAt + return ci + +def reconnection_info( + state: str, + connection_info: ConnectionInfo, +) -> ReconnectionInfo: + ri = ReconnectionInfo() # type: ignore[no-untyped-call] + ri.state = state + ri.connectionInfo = connection_info + return ri + +class Status(SyncTestCase): + def test_hint_statuses(self) -> None: ncs = connection_status._hint_statuses(["h2","h1"], {"h1": "hand1", "h4": "hand4"}, {"h1": "st1", "h2": "st2", @@ -27,17 +48,10 @@ class Status(unittest.TestCase): self.assertEqual(ncs, {"h1 via hand1": "st1", "h2": "st2"}) - def test_reconnector_connected(self): - ci = mock.Mock() - ci.connectorStatuses = {"h1": "st1"} - ci.connectionHandlers = {"h1": "hand1"} - ci.winningHint = "h1" - ci.establishedAt = 120 - ri = mock.Mock() - ri.state = "connected" - ri.connectionInfo = ci - rc = mock.Mock - rc.getReconnectionInfo = mock.Mock(return_value=ri) + def test_reconnector_connected(self) -> None: + ci = connection_info({"h1": "st1"}, {"h1": "hand1"}, "h1", 120) + ri = reconnection_info("connected", ci) + rc = reconnector(ri) cs = connection_status.from_foolscap_reconnector(rc, 123) self.assertEqual(cs.connected, True) self.assertEqual(cs.summary, "Connected to h1 via hand1") @@ -45,17 +59,10 @@ class Status(unittest.TestCase): self.assertEqual(cs.last_connection_time, 120) self.assertEqual(cs.last_received_time, 123) - def test_reconnector_connected_others(self): - ci = mock.Mock() - ci.connectorStatuses = {"h1": "st1", "h2": "st2"} - ci.connectionHandlers = {"h1": "hand1"} - ci.winningHint = "h1" - ci.establishedAt = 120 - ri = mock.Mock() - ri.state = "connected" - ri.connectionInfo = ci - rc = mock.Mock - rc.getReconnectionInfo = mock.Mock(return_value=ri) + def test_reconnector_connected_others(self) -> None: + ci = connection_info({"h1": "st1", "h2": "st2"}, {"h1": "hand1"}, "h1", 120) + ri = reconnection_info("connected", ci) + rc = reconnector(ri) cs = connection_status.from_foolscap_reconnector(rc, 123) self.assertEqual(cs.connected, True) self.assertEqual(cs.summary, "Connected to h1 via hand1") @@ -63,18 +70,11 @@ class Status(unittest.TestCase): self.assertEqual(cs.last_connection_time, 120) self.assertEqual(cs.last_received_time, 123) - def test_reconnector_connected_listener(self): - ci = mock.Mock() - ci.connectorStatuses = {"h1": "st1", "h2": "st2"} - ci.connectionHandlers = {"h1": "hand1"} + def test_reconnector_connected_listener(self) -> None: + ci = connection_info({"h1": "st1", "h2": "st2"}, {"h1": "hand1"}, None, 120) ci.listenerStatus = ("listener1", "successful") - ci.winningHint = None - ci.establishedAt = 120 - ri = mock.Mock() - ri.state = "connected" - ri.connectionInfo = ci - rc = mock.Mock - rc.getReconnectionInfo = mock.Mock(return_value=ri) + ri = reconnection_info("connected", ci) + rc = reconnector(ri) cs = connection_status.from_foolscap_reconnector(rc, 123) self.assertEqual(cs.connected, True) self.assertEqual(cs.summary, "Connected via listener (listener1)") @@ -83,15 +83,10 @@ class Status(unittest.TestCase): self.assertEqual(cs.last_connection_time, 120) self.assertEqual(cs.last_received_time, 123) - def test_reconnector_connecting(self): - ci = mock.Mock() - ci.connectorStatuses = {"h1": "st1", "h2": "st2"} - ci.connectionHandlers = {"h1": "hand1"} - ri = mock.Mock() - ri.state = "connecting" - ri.connectionInfo = ci - rc = mock.Mock - rc.getReconnectionInfo = mock.Mock(return_value=ri) + def test_reconnector_connecting(self) -> None: + ci = connection_info({"h1": "st1", "h2": "st2"}, {"h1": "hand1"}, None, None) + ri = reconnection_info("connecting", ci) + rc = reconnector(ri) cs = connection_status.from_foolscap_reconnector(rc, 123) self.assertEqual(cs.connected, False) self.assertEqual(cs.summary, "Trying to connect") @@ -100,19 +95,13 @@ class Status(unittest.TestCase): self.assertEqual(cs.last_connection_time, None) self.assertEqual(cs.last_received_time, 123) - def test_reconnector_waiting(self): - ci = mock.Mock() - ci.connectorStatuses = {"h1": "st1", "h2": "st2"} - ci.connectionHandlers = {"h1": "hand1"} - ri = mock.Mock() - ri.state = "waiting" + def test_reconnector_waiting(self) -> None: + ci = connection_info({"h1": "st1", "h2": "st2"}, {"h1": "hand1"}, None, None) + ri = reconnection_info("waiting", ci) ri.lastAttempt = 10 ri.nextAttempt = 20 - ri.connectionInfo = ci - rc = mock.Mock - rc.getReconnectionInfo = mock.Mock(return_value=ri) - with mock.patch("time.time", return_value=12): - cs = connection_status.from_foolscap_reconnector(rc, 5) + rc = reconnector(ri) + cs = connection_status.from_foolscap_reconnector(rc, 5, time=lambda: 12) self.assertEqual(cs.connected, False) self.assertEqual(cs.summary, "Reconnecting in 8 seconds (last attempt 2s ago)") diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index 91668e7ca..0671526ae 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -8,7 +8,7 @@ from json import ( loads, ) import hashlib -from typing import Union, Any, Optional +from typing import Union, Any from hyperlink import DecodedURL from fixtures import ( @@ -63,6 +63,8 @@ from foolscap.ipb import ( IConnectionHintHandler, ) +from allmydata.util.deferredutil import MultiFailure + from .no_network import LocalWrapper from .common import ( EMPTY_CLIENT_CONFIG, @@ -782,7 +784,7 @@ storage: class PickHTTPServerTests(unittest.SynchronousTestCase): """Tests for ``_pick_a_http_server``.""" - def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Optional[DecodedURL]: + def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Deferred[DecodedURL]: """ Given mapping of URLs to (delay, result), return the URL of the first selected server, or None. @@ -803,7 +805,7 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): d = _pick_a_http_server(clock, list(url_to_results.keys()), request) for i in range(100): clock.advance(0.1) - return self.successResultOf(d) + return d def test_first_successful_connect_is_picked(self): """ @@ -817,16 +819,21 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): earliest_url: (1, None), bad_url: (0.5, RuntimeError()), }) - self.assertEqual(result, earliest_url) + self.assertEqual(self.successResultOf(result), earliest_url) - def test_failures_are_turned_into_none(self): + def test_failures_include_all_reasons(self): """ - If the requests all fail, ``_pick_a_http_server`` returns ``None``. + If all the requests fail, ``_pick_a_http_server`` raises a + ``allmydata.util.deferredutil.MultiFailure``. """ eventually_good_url = DecodedURL.from_text("http://good") bad_url = DecodedURL.from_text("http://bad") + exception1 = RuntimeError() + exception2 = ZeroDivisionError() result = self.pick_result({ - eventually_good_url: (1, ZeroDivisionError()), - bad_url: (0.1, RuntimeError()) + eventually_good_url: (1, exception1), + bad_url: (0.1, exception2), }) - self.assertEqual(result, None) + exc = self.failureResultOf(result).value + self.assertIsInstance(exc, MultiFailure) + self.assertEqual({f.value for f in exc.failures}, {exception2, exception1}) diff --git a/src/allmydata/util/connection_status.py b/src/allmydata/util/connection_status.py index 0e8595e81..0ccdcd672 100644 --- a/src/allmydata/util/connection_status.py +++ b/src/allmydata/util/connection_status.py @@ -1,21 +1,13 @@ """ Parse connection status from Foolscap. - -Ported to Python 3. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -from future.utils import PY2 -if PY2: - from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 +from __future__ import annotations import time from zope.interface import implementer from ..interfaces import IConnectionStatus +from foolscap.reconnector import Reconnector @implementer(IConnectionStatus) class ConnectionStatus(object): @@ -41,7 +33,7 @@ class ConnectionStatus(object): last_received_time=None, ) -def _hint_statuses(which, handlers, statuses): +def _hint_statuses(which, handlers, statuses) -> dict[str, str]: non_connected_statuses = {} for hint in which: handler = handlers.get(hint) @@ -50,7 +42,7 @@ def _hint_statuses(which, handlers, statuses): non_connected_statuses["%s%s" % (hint, handler_dsc)] = dsc return non_connected_statuses -def from_foolscap_reconnector(rc, last_received): +def from_foolscap_reconnector(rc: Reconnector, last_received: int, time=time.time) -> ConnectionStatus: ri = rc.getReconnectionInfo() # See foolscap/reconnector.py, ReconnectionInfo, for details about possible # states. The returned result is a native string, it seems, so convert to @@ -80,7 +72,7 @@ def from_foolscap_reconnector(rc, last_received): # ci describes the current in-progress attempt summary = "Trying to connect" elif state == "waiting": - now = time.time() + now = time() elapsed = now - ri.lastAttempt delay = ri.nextAttempt - now summary = "Reconnecting in %d seconds (last attempt %ds ago)" % \ diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index 70ce8dade..695915ceb 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -13,7 +13,9 @@ from typing import ( Sequence, TypeVar, Optional, + Coroutine, ) +from typing_extensions import ParamSpec from foolscap.api import eventually from eliot.twisted import ( @@ -225,7 +227,11 @@ def until( break -def async_to_deferred(f): +P = ParamSpec("P") +R = TypeVar("R") + + +def async_to_deferred(f: Callable[P, Coroutine[defer.Deferred[R], None, R]]) -> Callable[P, Deferred[R]]: """ Wrap an async function to return a Deferred instead. @@ -233,7 +239,7 @@ def async_to_deferred(f): """ @wraps(f) - def not_async(*args, **kwargs): + def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]: return defer.Deferred.fromCoroutine(f(*args, **kwargs)) return not_async