diff --git a/newsfragments/3978.minor b/newsfragments/3978.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index c88613803..a40e98b03 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1024,7 +1024,7 @@ async def _pick_a_http_server( reactor, nurls: list[DecodedURL], request: Callable[[Any, DecodedURL], defer.Deferred[Any]] -) -> Optional[DecodedURL]: +) -> DecodedURL: """Pick the first server we successfully send a request to. Fires with ``None`` if no server was found, or with the ``DecodedURL`` of @@ -1035,18 +1035,8 @@ async def _pick_a_http_server( for nurl in nurls ]) - try: - _, nurl = await queries - return nurl - except Exception as e: - # Logging errors breaks a bunch of tests, and it's not a _bug_ to - # have a failed connection, it's often expected and transient. More - # of a warning, really? - log.msg( - "Failed to connect to a storage server advertised by NURL: {}".format( - e) - ) - return None + _, nurl = await queries + return nurl @implementer(IServer) @@ -1079,7 +1069,7 @@ class HTTPNativeStorageServer(service.MultiService): DecodedURL.from_text(u) for u in announcement[ANONYMOUS_STORAGE_NURLS] ] - self._istorage_server = None + self._istorage_server : Optional[_HTTPStorageServer] = None self._connection_status = connection_status.ConnectionStatus.unstarted() self._version = None @@ -1196,18 +1186,56 @@ class HTTPNativeStorageServer(service.MultiService): def try_to_connect(self): self._connect() + def _connect(self) -> defer.Deferred[object]: + """ + Try to connect to a working storage server. + + If called while a previous ``_connect()`` is already running, it will + just return the same ``Deferred``. + + ``LoopingCall.stop()`` doesn't cancel ``Deferred``s, unfortunately: + https://github.com/twisted/twisted/issues/11814. Thus we want to store + the ``Deferred`` so we can cancel it when necessary. + + We also want to return it so that loop iterations take it into account, + and a new iteration doesn't start while we're in the middle of the + previous one. + """ + # Conceivably try_to_connect() was called on this before, in which case + # we already are in the middle of connecting. So in that case just + # return whatever is in progress: + if self._connecting_deferred is not None: + return self._connecting_deferred + + def done(_): + self._connecting_deferred = None + + connecting = self._pick_server_and_get_version() + # Set a short timeout since we're relying on this for server liveness. + connecting = connecting.addTimeout(5, self._reactor).addCallbacks( + self._got_version, self._failed_to_connect + ).addBoth(done) + self._connecting_deferred = connecting + return connecting + @async_to_deferred - async def _connect(self): - if self._istorage_server is None: + async def _pick_server_and_get_version(self): + """ + Minimal implementation of connection logic: pick a server, get its + version. This doesn't deal with errors much, so as to minimize + statefulness. It does change ``self._istorage_server``, so possibly + more refactoring would be useful to remove even that much statefulness. + """ + async def get_istorage_server() -> _HTTPStorageServer: + if self._istorage_server is not None: + return self._istorage_server + # We haven't selected a server yet, so let's do so. # TODO This is somewhat inefficient on startup: it takes two successful # version() calls before we are live talking to a server, it could only # be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992 - # TODO Another problem with this scheme is that while picking - # the HTTP server to talk to, we don't have connection status - # updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978 def request(reactor, nurl: DecodedURL): # Since we're just using this one off to check if the NURL # works, no need for persistent pool or other fanciness. @@ -1217,45 +1245,23 @@ class HTTPNativeStorageServer(service.MultiService): StorageClient.from_nurl(nurl, reactor, pool) ).get_version() - # LoopingCall.stop() doesn't cancel Deferreds, unfortunately: - # https://github.com/twisted/twisted/issues/11814 Thus we want - # store the Deferred so it gets cancelled. - picking = _pick_a_http_server(reactor, self._nurls, request) - self._connecting_deferred = picking - try: - nurl = await picking - finally: - self._connecting_deferred = None + nurl = await _pick_a_http_server(reactor, self._nurls, request) - if nurl is None: - # We failed to find a server to connect to. Perhaps the next - # iteration of the loop will succeed. - return - else: - self._istorage_server = _HTTPStorageServer.from_http_client( - StorageClient.from_nurl(nurl, reactor) - ) + # If we've gotten this far, we've found a working NURL. + self._istorage_server = _HTTPStorageServer.from_http_client( + StorageClient.from_nurl(nurl, reactor) + ) + return self._istorage_server - result = self._istorage_server.get_version() - - def remove_connecting_deferred(result): - self._connecting_deferred = None - return result - - # Set a short timeout since we're relying on this for server liveness. - self._connecting_deferred = result.addTimeout(5, self._reactor).addBoth( - remove_connecting_deferred).addCallbacks( - self._got_version, - self._failed_to_connect - ) - - # We don't want to do another iteration of the loop until this - # iteration has finished, so wait here: try: - if self._connecting_deferred is not None: - await self._connecting_deferred + storage_server = await get_istorage_server() + + # Get the version from the remote server. + version = await storage_server.get_version() + return version except Exception as e: log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS) + raise def stopService(self): if self._connecting_deferred is not None: @@ -1472,7 +1478,7 @@ class _HTTPStorageServer(object): _http_client = attr.ib(type=StorageClient) @staticmethod - def from_http_client(http_client): # type: (StorageClient) -> _HTTPStorageServer + def from_http_client(http_client: StorageClient) -> _HTTPStorageServer: """ Create an ``IStorageServer`` from a HTTP ``StorageClient``. """ diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index 91668e7ca..0671526ae 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -8,7 +8,7 @@ from json import ( loads, ) import hashlib -from typing import Union, Any, Optional +from typing import Union, Any from hyperlink import DecodedURL from fixtures import ( @@ -63,6 +63,8 @@ from foolscap.ipb import ( IConnectionHintHandler, ) +from allmydata.util.deferredutil import MultiFailure + from .no_network import LocalWrapper from .common import ( EMPTY_CLIENT_CONFIG, @@ -782,7 +784,7 @@ storage: class PickHTTPServerTests(unittest.SynchronousTestCase): """Tests for ``_pick_a_http_server``.""" - def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Optional[DecodedURL]: + def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Deferred[DecodedURL]: """ Given mapping of URLs to (delay, result), return the URL of the first selected server, or None. @@ -803,7 +805,7 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): d = _pick_a_http_server(clock, list(url_to_results.keys()), request) for i in range(100): clock.advance(0.1) - return self.successResultOf(d) + return d def test_first_successful_connect_is_picked(self): """ @@ -817,16 +819,21 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): earliest_url: (1, None), bad_url: (0.5, RuntimeError()), }) - self.assertEqual(result, earliest_url) + self.assertEqual(self.successResultOf(result), earliest_url) - def test_failures_are_turned_into_none(self): + def test_failures_include_all_reasons(self): """ - If the requests all fail, ``_pick_a_http_server`` returns ``None``. + If all the requests fail, ``_pick_a_http_server`` raises a + ``allmydata.util.deferredutil.MultiFailure``. """ eventually_good_url = DecodedURL.from_text("http://good") bad_url = DecodedURL.from_text("http://bad") + exception1 = RuntimeError() + exception2 = ZeroDivisionError() result = self.pick_result({ - eventually_good_url: (1, ZeroDivisionError()), - bad_url: (0.1, RuntimeError()) + eventually_good_url: (1, exception1), + bad_url: (0.1, exception2), }) - self.assertEqual(result, None) + exc = self.failureResultOf(result).value + self.assertIsInstance(exc, MultiFailure) + self.assertEqual({f.value for f in exc.failures}, {exception2, exception1}) diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index 70ce8dade..695915ceb 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -13,7 +13,9 @@ from typing import ( Sequence, TypeVar, Optional, + Coroutine, ) +from typing_extensions import ParamSpec from foolscap.api import eventually from eliot.twisted import ( @@ -225,7 +227,11 @@ def until( break -def async_to_deferred(f): +P = ParamSpec("P") +R = TypeVar("R") + + +def async_to_deferred(f: Callable[P, Coroutine[defer.Deferred[R], None, R]]) -> Callable[P, Deferred[R]]: """ Wrap an async function to return a Deferred instead. @@ -233,7 +239,7 @@ def async_to_deferred(f): """ @wraps(f) - def not_async(*args, **kwargs): + def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]: return defer.Deferred.fromCoroutine(f(*args, **kwargs)) return not_async