Merge remote-tracking branch 'exarkun/3999.structure-config-manipulation' into 3999.structure-config-manipulation

This commit is contained in:
Itamar Turner-Trauring 2023-04-05 11:42:31 -04:00
commit b1f5201ef2
15 changed files with 223 additions and 255 deletions

View File

@ -1,6 +1,9 @@
"""
Ported to Python 3.
"""
from __future__ import annotations
import sys
import shutil
from time import sleep
@ -19,6 +22,7 @@ from eliot import (
log_call,
)
from twisted.python.filepath import FilePath
from twisted.python.procutils import which
from twisted.internet.defer import DeferredList
from twisted.internet.error import (
@ -104,7 +108,7 @@ def reactor():
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:temp_dir", include_args=[])
def temp_dir(request):
def temp_dir(request) -> str:
"""
Invoke like 'py.test --keep-tempdir ...' to avoid deleting the temp-dir
"""
@ -146,7 +150,8 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request):
'--location', 'tcp:localhost:3117',
'--port', '3117',
gather_dir,
)
),
env=environ,
)
pytest_twisted.blockon(out_protocol.done)
@ -159,6 +164,7 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request):
join(gather_dir, 'gatherer.tac'),
),
path=gather_dir,
env=environ,
)
pytest_twisted.blockon(twistd_protocol.magic_seen)
@ -177,6 +183,7 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request):
(
'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0])
),
env=environ,
)
print("Waiting for flogtool to complete")
try:
@ -444,15 +451,30 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques
@pytest.fixture(scope='session')
@pytest.mark.skipif(sys.platform.startswith('win'),
'Tor tests are unstable on Windows')
def chutney(reactor, temp_dir):
def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]:
# Try to find Chutney already installed in the environment.
try:
import chutney
except ImportError:
# Nope, we'll get our own in a moment.
pass
else:
# We already have one, just use it.
return (
# from `checkout/lib/chutney/__init__.py` we want to get back to
# `checkout` because that's the parent of the directory with all
# of the network definitions. So, great-grand-parent.
FilePath(chutney.__file__).parent().parent().parent().path,
# There's nothing to add to the environment.
{},
)
chutney_dir = join(temp_dir, 'chutney')
mkdir(chutney_dir)
# TODO:
# check for 'tor' binary explicitly and emit a "skip" if we can't
# find it
missing = [exe for exe in ["tor", "tor-gencert"] if not which(exe)]
if missing:
pytest.skip(f"Some command-line tools not found: {missing}")
# XXX yuck! should add a setup.py to chutney so we can at least
# "pip install <path to tarball>" and/or depend on chutney in "pip
@ -487,88 +509,58 @@ def chutney(reactor, temp_dir):
)
pytest_twisted.blockon(proto.done)
return chutney_dir
return (chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")})
@pytest.fixture(scope='session')
@pytest.mark.skipif(sys.platform.startswith('win'),
reason='Tor tests are unstable on Windows')
def tor_network(reactor, temp_dir, chutney, request):
"""
Build a basic Tor network.
# this is the actual "chutney" script at the root of a chutney checkout
chutney_dir = chutney
chut = join(chutney_dir, 'chutney')
:param chutney: The root directory of a Chutney checkout and a dict of
additional environment variables to set so a Python process can use
it.
:return: None
"""
chutney_root, chutney_env = chutney
basic_network = join(chutney_root, 'networks', 'basic')
env = environ.copy()
env.update(chutney_env)
chutney_argv = (sys.executable, '-m', 'chutney.TorNet')
def chutney(argv):
proto = _DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
chutney_argv + argv,
path=join(chutney_root),
env=env,
)
return proto.done
# now, as per Chutney's README, we have to create the network
# ./chutney configure networks/basic
# ./chutney start networks/basic
env = environ.copy()
env.update({"PYTHONPATH": join(chutney_dir, "lib")})
proto = _DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-m', 'chutney.TorNet', 'configure',
join(chutney_dir, 'networks', 'basic'),
),
path=join(chutney_dir),
env=env,
)
pytest_twisted.blockon(proto.done)
proto = _DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-m', 'chutney.TorNet', 'start',
join(chutney_dir, 'networks', 'basic'),
),
path=join(chutney_dir),
env=env,
)
pytest_twisted.blockon(proto.done)
pytest_twisted.blockon(chutney(("configure", basic_network)))
pytest_twisted.blockon(chutney(("start", basic_network)))
# print some useful stuff
proto = _CollectOutputProtocol()
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-m', 'chutney.TorNet', 'status',
join(chutney_dir, 'networks', 'basic'),
),
path=join(chutney_dir),
env=env,
)
try:
pytest_twisted.blockon(proto.done)
pytest_twisted.blockon(chutney(("status", basic_network)))
except ProcessTerminated:
print("Chutney.TorNet status failed (continuing):")
print(proto.output.getvalue())
print("Chutney.TorNet status failed (continuing)")
def cleanup():
print("Tearing down Chutney Tor network")
proto = _CollectOutputProtocol()
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-m', 'chutney.TorNet', 'stop',
join(chutney_dir, 'networks', 'basic'),
),
path=join(chutney_dir),
env=env,
)
try:
block_with_timeout(proto.done, reactor)
block_with_timeout(chutney(("stop", basic_network)), reactor)
except ProcessTerminated:
# If this doesn't exit cleanly, that's fine, that shouldn't fail
# the test suite.
pass
request.addfinalizer(cleanup)
return chut

View File

@ -2,26 +2,11 @@
Integration tests for I2P support.
"""
from __future__ import unicode_literals
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import sys
from os.path import join, exists
from os import mkdir
from os import mkdir, environ
from time import sleep
if PY2:
def which(path):
# This will result in skipping I2P tests on Python 2. Oh well.
return None
else:
from shutil import which
from shutil import which
from eliot import log_call
@ -62,6 +47,7 @@ def i2p_network(reactor, temp_dir, request):
"--log=stdout",
"--loglevel=info"
),
env=environ,
)
def cleanup():
@ -170,7 +156,8 @@ def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_netw
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'-d', join(temp_dir, 'carol_i2p'),
'put', gold_path,
)
),
env=environ,
)
yield proto.done
cap = proto.output.getvalue().strip().split()[-1]
@ -184,7 +171,8 @@ def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_netw
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'-d', join(temp_dir, 'dave_i2p'),
'get', cap,
)
),
env=environ,
)
yield proto.done
@ -211,7 +199,8 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
'--hide-ip',
'--listen', 'i2p',
node_dir.path,
)
),
env=environ,
)
yield proto.done

View File

@ -1,17 +1,10 @@
"""
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import sys
from os.path import join
from os import environ
from twisted.internet.error import ProcessTerminated
@ -45,7 +38,8 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'-d', node_dir,
'put', __file__,
]
],
env=environ,
)
try:
yield proto.done

View File

@ -1,17 +1,10 @@
"""
Ported to Python 3.
"""
from __future__ import unicode_literals
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import sys
from os.path import join
from os import environ
import pytest
import pytest_twisted
@ -36,9 +29,6 @@ from allmydata.client import read_config
if sys.platform.startswith('win'):
pytest.skip('Skipping Tor tests on Windows', allow_module_level=True)
if PY2:
pytest.skip('Skipping Tor tests on Python 2 because dependencies are hard to come by', allow_module_level=True)
@pytest_twisted.inlineCallbacks
def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl):
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)
@ -66,7 +56,8 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'-d', join(temp_dir, 'carol'),
'put', gold_path,
)
),
env=environ,
)
yield proto.done
cap = proto.output.getvalue().strip().split()[-1]
@ -80,7 +71,8 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'-d', join(temp_dir, 'dave'),
'get', cap,
)
),
env=environ,
)
yield proto.done
@ -113,7 +105,8 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
'--shares-happy', '1',
'--shares-total', '2',
node_dir.path,
)
),
env=environ,
)
yield proto.done

View File

@ -9,7 +9,7 @@ no_implicit_optional = True
warn_redundant_casts = True
strict_equality = True
[mypy-allmydata.test.cli.wormholetesting]
[mypy-allmydata.test.cli.wormholetesting,allmydata.test.test_connection_status]
disallow_any_generics = True
disallow_subclassing_any = True
disallow_untyped_calls = True

0
newsfragments/3978.minor Normal file
View File

0
newsfragments/4000.minor Normal file
View File

0
newsfragments/4001.minor Normal file
View File

0
newsfragments/4002.minor Normal file
View File

0
newsfragments/4003.minor Normal file
View File

View File

@ -1024,7 +1024,7 @@ async def _pick_a_http_server(
reactor,
nurls: list[DecodedURL],
request: Callable[[Any, DecodedURL], defer.Deferred[Any]]
) -> Optional[DecodedURL]:
) -> DecodedURL:
"""Pick the first server we successfully send a request to.
Fires with ``None`` if no server was found, or with the ``DecodedURL`` of
@ -1035,18 +1035,8 @@ async def _pick_a_http_server(
for nurl in nurls
])
try:
_, nurl = await queries
return nurl
except Exception as e:
# Logging errors breaks a bunch of tests, and it's not a _bug_ to
# have a failed connection, it's often expected and transient. More
# of a warning, really?
log.msg(
"Failed to connect to a storage server advertised by NURL: {}".format(
e)
)
return None
_, nurl = await queries
return nurl
@implementer(IServer)
@ -1079,7 +1069,7 @@ class HTTPNativeStorageServer(service.MultiService):
DecodedURL.from_text(u)
for u in announcement[ANONYMOUS_STORAGE_NURLS]
]
self._istorage_server = None
self._istorage_server : Optional[_HTTPStorageServer] = None
self._connection_status = connection_status.ConnectionStatus.unstarted()
self._version = None
@ -1196,18 +1186,56 @@ class HTTPNativeStorageServer(service.MultiService):
def try_to_connect(self):
self._connect()
def _connect(self) -> defer.Deferred[object]:
"""
Try to connect to a working storage server.
If called while a previous ``_connect()`` is already running, it will
just return the same ``Deferred``.
``LoopingCall.stop()`` doesn't cancel ``Deferred``s, unfortunately:
https://github.com/twisted/twisted/issues/11814. Thus we want to store
the ``Deferred`` so we can cancel it when necessary.
We also want to return it so that loop iterations take it into account,
and a new iteration doesn't start while we're in the middle of the
previous one.
"""
# Conceivably try_to_connect() was called on this before, in which case
# we already are in the middle of connecting. So in that case just
# return whatever is in progress:
if self._connecting_deferred is not None:
return self._connecting_deferred
def done(_):
self._connecting_deferred = None
connecting = self._pick_server_and_get_version()
# Set a short timeout since we're relying on this for server liveness.
connecting = connecting.addTimeout(5, self._reactor).addCallbacks(
self._got_version, self._failed_to_connect
).addBoth(done)
self._connecting_deferred = connecting
return connecting
@async_to_deferred
async def _connect(self):
if self._istorage_server is None:
async def _pick_server_and_get_version(self):
"""
Minimal implementation of connection logic: pick a server, get its
version. This doesn't deal with errors much, so as to minimize
statefulness. It does change ``self._istorage_server``, so possibly
more refactoring would be useful to remove even that much statefulness.
"""
async def get_istorage_server() -> _HTTPStorageServer:
if self._istorage_server is not None:
return self._istorage_server
# We haven't selected a server yet, so let's do so.
# TODO This is somewhat inefficient on startup: it takes two successful
# version() calls before we are live talking to a server, it could only
# be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992
# TODO Another problem with this scheme is that while picking
# the HTTP server to talk to, we don't have connection status
# updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978
def request(reactor, nurl: DecodedURL):
# Since we're just using this one off to check if the NURL
# works, no need for persistent pool or other fanciness.
@ -1217,45 +1245,23 @@ class HTTPNativeStorageServer(service.MultiService):
StorageClient.from_nurl(nurl, reactor, pool)
).get_version()
# LoopingCall.stop() doesn't cancel Deferreds, unfortunately:
# https://github.com/twisted/twisted/issues/11814 Thus we want
# store the Deferred so it gets cancelled.
picking = _pick_a_http_server(reactor, self._nurls, request)
self._connecting_deferred = picking
try:
nurl = await picking
finally:
self._connecting_deferred = None
nurl = await _pick_a_http_server(reactor, self._nurls, request)
if nurl is None:
# We failed to find a server to connect to. Perhaps the next
# iteration of the loop will succeed.
return
else:
self._istorage_server = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor)
)
# If we've gotten this far, we've found a working NURL.
self._istorage_server = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor)
)
return self._istorage_server
result = self._istorage_server.get_version()
def remove_connecting_deferred(result):
self._connecting_deferred = None
return result
# Set a short timeout since we're relying on this for server liveness.
self._connecting_deferred = result.addTimeout(5, self._reactor).addBoth(
remove_connecting_deferred).addCallbacks(
self._got_version,
self._failed_to_connect
)
# We don't want to do another iteration of the loop until this
# iteration has finished, so wait here:
try:
if self._connecting_deferred is not None:
await self._connecting_deferred
storage_server = await get_istorage_server()
# Get the version from the remote server.
version = await storage_server.get_version()
return version
except Exception as e:
log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS)
raise
def stopService(self):
if self._connecting_deferred is not None:
@ -1472,7 +1478,7 @@ class _HTTPStorageServer(object):
_http_client = attr.ib(type=StorageClient)
@staticmethod
def from_http_client(http_client): # type: (StorageClient) -> _HTTPStorageServer
def from_http_client(http_client: StorageClient) -> _HTTPStorageServer:
"""
Create an ``IStorageServer`` from a HTTP ``StorageClient``.
"""

View File

@ -1,25 +1,46 @@
"""
Tests for allmydata.util.connection_status.
Port to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from __future__ import annotations
import mock
from typing import Optional
from twisted.trial import unittest
from foolscap.reconnector import ReconnectionInfo, Reconnector
from foolscap.info import ConnectionInfo
from ..util import connection_status
from .common import SyncTestCase
class Status(unittest.TestCase):
def test_hint_statuses(self):
def reconnector(info: ReconnectionInfo) -> Reconnector:
rc = Reconnector(None, None, (), {}) # type: ignore[no-untyped-call]
rc._reconnectionInfo = info
return rc
def connection_info(
statuses: dict[str, str],
handlers: dict[str, str],
winningHint: Optional[str],
establishedAt: Optional[int],
) -> ConnectionInfo:
ci = ConnectionInfo() # type: ignore[no-untyped-call]
ci.connectorStatuses = statuses
ci.connectionHandlers = handlers
ci.winningHint = winningHint
ci.establishedAt = establishedAt
return ci
def reconnection_info(
state: str,
connection_info: ConnectionInfo,
) -> ReconnectionInfo:
ri = ReconnectionInfo() # type: ignore[no-untyped-call]
ri.state = state
ri.connectionInfo = connection_info
return ri
class Status(SyncTestCase):
def test_hint_statuses(self) -> None:
ncs = connection_status._hint_statuses(["h2","h1"],
{"h1": "hand1", "h4": "hand4"},
{"h1": "st1", "h2": "st2",
@ -27,17 +48,10 @@ class Status(unittest.TestCase):
self.assertEqual(ncs, {"h1 via hand1": "st1",
"h2": "st2"})
def test_reconnector_connected(self):
ci = mock.Mock()
ci.connectorStatuses = {"h1": "st1"}
ci.connectionHandlers = {"h1": "hand1"}
ci.winningHint = "h1"
ci.establishedAt = 120
ri = mock.Mock()
ri.state = "connected"
ri.connectionInfo = ci
rc = mock.Mock
rc.getReconnectionInfo = mock.Mock(return_value=ri)
def test_reconnector_connected(self) -> None:
ci = connection_info({"h1": "st1"}, {"h1": "hand1"}, "h1", 120)
ri = reconnection_info("connected", ci)
rc = reconnector(ri)
cs = connection_status.from_foolscap_reconnector(rc, 123)
self.assertEqual(cs.connected, True)
self.assertEqual(cs.summary, "Connected to h1 via hand1")
@ -45,17 +59,10 @@ class Status(unittest.TestCase):
self.assertEqual(cs.last_connection_time, 120)
self.assertEqual(cs.last_received_time, 123)
def test_reconnector_connected_others(self):
ci = mock.Mock()
ci.connectorStatuses = {"h1": "st1", "h2": "st2"}
ci.connectionHandlers = {"h1": "hand1"}
ci.winningHint = "h1"
ci.establishedAt = 120
ri = mock.Mock()
ri.state = "connected"
ri.connectionInfo = ci
rc = mock.Mock
rc.getReconnectionInfo = mock.Mock(return_value=ri)
def test_reconnector_connected_others(self) -> None:
ci = connection_info({"h1": "st1", "h2": "st2"}, {"h1": "hand1"}, "h1", 120)
ri = reconnection_info("connected", ci)
rc = reconnector(ri)
cs = connection_status.from_foolscap_reconnector(rc, 123)
self.assertEqual(cs.connected, True)
self.assertEqual(cs.summary, "Connected to h1 via hand1")
@ -63,18 +70,11 @@ class Status(unittest.TestCase):
self.assertEqual(cs.last_connection_time, 120)
self.assertEqual(cs.last_received_time, 123)
def test_reconnector_connected_listener(self):
ci = mock.Mock()
ci.connectorStatuses = {"h1": "st1", "h2": "st2"}
ci.connectionHandlers = {"h1": "hand1"}
def test_reconnector_connected_listener(self) -> None:
ci = connection_info({"h1": "st1", "h2": "st2"}, {"h1": "hand1"}, None, 120)
ci.listenerStatus = ("listener1", "successful")
ci.winningHint = None
ci.establishedAt = 120
ri = mock.Mock()
ri.state = "connected"
ri.connectionInfo = ci
rc = mock.Mock
rc.getReconnectionInfo = mock.Mock(return_value=ri)
ri = reconnection_info("connected", ci)
rc = reconnector(ri)
cs = connection_status.from_foolscap_reconnector(rc, 123)
self.assertEqual(cs.connected, True)
self.assertEqual(cs.summary, "Connected via listener (listener1)")
@ -83,15 +83,10 @@ class Status(unittest.TestCase):
self.assertEqual(cs.last_connection_time, 120)
self.assertEqual(cs.last_received_time, 123)
def test_reconnector_connecting(self):
ci = mock.Mock()
ci.connectorStatuses = {"h1": "st1", "h2": "st2"}
ci.connectionHandlers = {"h1": "hand1"}
ri = mock.Mock()
ri.state = "connecting"
ri.connectionInfo = ci
rc = mock.Mock
rc.getReconnectionInfo = mock.Mock(return_value=ri)
def test_reconnector_connecting(self) -> None:
ci = connection_info({"h1": "st1", "h2": "st2"}, {"h1": "hand1"}, None, None)
ri = reconnection_info("connecting", ci)
rc = reconnector(ri)
cs = connection_status.from_foolscap_reconnector(rc, 123)
self.assertEqual(cs.connected, False)
self.assertEqual(cs.summary, "Trying to connect")
@ -100,19 +95,13 @@ class Status(unittest.TestCase):
self.assertEqual(cs.last_connection_time, None)
self.assertEqual(cs.last_received_time, 123)
def test_reconnector_waiting(self):
ci = mock.Mock()
ci.connectorStatuses = {"h1": "st1", "h2": "st2"}
ci.connectionHandlers = {"h1": "hand1"}
ri = mock.Mock()
ri.state = "waiting"
def test_reconnector_waiting(self) -> None:
ci = connection_info({"h1": "st1", "h2": "st2"}, {"h1": "hand1"}, None, None)
ri = reconnection_info("waiting", ci)
ri.lastAttempt = 10
ri.nextAttempt = 20
ri.connectionInfo = ci
rc = mock.Mock
rc.getReconnectionInfo = mock.Mock(return_value=ri)
with mock.patch("time.time", return_value=12):
cs = connection_status.from_foolscap_reconnector(rc, 5)
rc = reconnector(ri)
cs = connection_status.from_foolscap_reconnector(rc, 5, time=lambda: 12)
self.assertEqual(cs.connected, False)
self.assertEqual(cs.summary,
"Reconnecting in 8 seconds (last attempt 2s ago)")

View File

@ -8,7 +8,7 @@ from json import (
loads,
)
import hashlib
from typing import Union, Any, Optional
from typing import Union, Any
from hyperlink import DecodedURL
from fixtures import (
@ -63,6 +63,8 @@ from foolscap.ipb import (
IConnectionHintHandler,
)
from allmydata.util.deferredutil import MultiFailure
from .no_network import LocalWrapper
from .common import (
EMPTY_CLIENT_CONFIG,
@ -782,7 +784,7 @@ storage:
class PickHTTPServerTests(unittest.SynchronousTestCase):
"""Tests for ``_pick_a_http_server``."""
def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Optional[DecodedURL]:
def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Deferred[DecodedURL]:
"""
Given mapping of URLs to (delay, result), return the URL of the
first selected server, or None.
@ -803,7 +805,7 @@ class PickHTTPServerTests(unittest.SynchronousTestCase):
d = _pick_a_http_server(clock, list(url_to_results.keys()), request)
for i in range(100):
clock.advance(0.1)
return self.successResultOf(d)
return d
def test_first_successful_connect_is_picked(self):
"""
@ -817,16 +819,21 @@ class PickHTTPServerTests(unittest.SynchronousTestCase):
earliest_url: (1, None),
bad_url: (0.5, RuntimeError()),
})
self.assertEqual(result, earliest_url)
self.assertEqual(self.successResultOf(result), earliest_url)
def test_failures_are_turned_into_none(self):
def test_failures_include_all_reasons(self):
"""
If the requests all fail, ``_pick_a_http_server`` returns ``None``.
If all the requests fail, ``_pick_a_http_server`` raises a
``allmydata.util.deferredutil.MultiFailure``.
"""
eventually_good_url = DecodedURL.from_text("http://good")
bad_url = DecodedURL.from_text("http://bad")
exception1 = RuntimeError()
exception2 = ZeroDivisionError()
result = self.pick_result({
eventually_good_url: (1, ZeroDivisionError()),
bad_url: (0.1, RuntimeError())
eventually_good_url: (1, exception1),
bad_url: (0.1, exception2),
})
self.assertEqual(result, None)
exc = self.failureResultOf(result).value
self.assertIsInstance(exc, MultiFailure)
self.assertEqual({f.value for f in exc.failures}, {exception2, exception1})

View File

@ -1,21 +1,13 @@
"""
Parse connection status from Foolscap.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from __future__ import annotations
import time
from zope.interface import implementer
from ..interfaces import IConnectionStatus
from foolscap.reconnector import Reconnector
@implementer(IConnectionStatus)
class ConnectionStatus(object):
@ -41,7 +33,7 @@ class ConnectionStatus(object):
last_received_time=None,
)
def _hint_statuses(which, handlers, statuses):
def _hint_statuses(which, handlers, statuses) -> dict[str, str]:
non_connected_statuses = {}
for hint in which:
handler = handlers.get(hint)
@ -50,7 +42,7 @@ def _hint_statuses(which, handlers, statuses):
non_connected_statuses["%s%s" % (hint, handler_dsc)] = dsc
return non_connected_statuses
def from_foolscap_reconnector(rc, last_received):
def from_foolscap_reconnector(rc: Reconnector, last_received: int, time=time.time) -> ConnectionStatus:
ri = rc.getReconnectionInfo()
# See foolscap/reconnector.py, ReconnectionInfo, for details about possible
# states. The returned result is a native string, it seems, so convert to
@ -80,7 +72,7 @@ def from_foolscap_reconnector(rc, last_received):
# ci describes the current in-progress attempt
summary = "Trying to connect"
elif state == "waiting":
now = time.time()
now = time()
elapsed = now - ri.lastAttempt
delay = ri.nextAttempt - now
summary = "Reconnecting in %d seconds (last attempt %ds ago)" % \

View File

@ -13,7 +13,9 @@ from typing import (
Sequence,
TypeVar,
Optional,
Coroutine,
)
from typing_extensions import ParamSpec
from foolscap.api import eventually
from eliot.twisted import (
@ -225,7 +227,11 @@ def until(
break
def async_to_deferred(f):
P = ParamSpec("P")
R = TypeVar("R")
def async_to_deferred(f: Callable[P, Coroutine[defer.Deferred[R], None, R]]) -> Callable[P, Deferred[R]]:
"""
Wrap an async function to return a Deferred instead.
@ -233,7 +239,7 @@ def async_to_deferred(f):
"""
@wraps(f)
def not_async(*args, **kwargs):
def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]:
return defer.Deferred.fromCoroutine(f(*args, **kwargs))
return not_async