From 0da059b64486642864c0dbd211ccdb98909d79c1 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 24 Mar 2023 10:10:18 -0400 Subject: [PATCH 01/11] Update the connection status during the initial choice of NURLs. --- src/allmydata/storage_client.py | 50 ++++++++++++----------- src/allmydata/test/test_storage_client.py | 23 +++++++---- 2 files changed, 41 insertions(+), 32 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index c88613803..f71931c8b 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1024,7 +1024,7 @@ async def _pick_a_http_server( reactor, nurls: list[DecodedURL], request: Callable[[Any, DecodedURL], defer.Deferred[Any]] -) -> Optional[DecodedURL]: +) -> DecodedURL: """Pick the first server we successfully send a request to. Fires with ``None`` if no server was found, or with the ``DecodedURL`` of @@ -1035,18 +1035,8 @@ async def _pick_a_http_server( for nurl in nurls ]) - try: - _, nurl = await queries - return nurl - except Exception as e: - # Logging errors breaks a bunch of tests, and it's not a _bug_ to - # have a failed connection, it's often expected and transient. More - # of a warning, really? - log.msg( - "Failed to connect to a storage server advertised by NURL: {}".format( - e) - ) - return None + _, nurl = await queries + return nurl @implementer(IServer) @@ -1223,19 +1213,31 @@ class HTTPNativeStorageServer(service.MultiService): picking = _pick_a_http_server(reactor, self._nurls, request) self._connecting_deferred = picking try: - nurl = await picking - finally: - self._connecting_deferred = None - - if nurl is None: - # We failed to find a server to connect to. Perhaps the next - # iteration of the loop will succeed. - return - else: - self._istorage_server = _HTTPStorageServer.from_http_client( - StorageClient.from_nurl(nurl, reactor) + try: + nurl = await picking + finally: + self._connecting_deferred = None + except Exception as e: + # Logging errors breaks a bunch of tests, and it's not a _bug_ to + # have a failed connection, it's often expected and transient. More + # of a warning, really? + log.msg( + "Failed to connect to a storage server advertised by NURL: {}".format(e) ) + # Update the connection status: + self._failed_to_connect(Failure(e)) + + # Since we failed to find a server to connect to, give up + # for now. Perhaps the next iteration of the loop will + # succeed. + return + + # iF we've gotten this far, we've found a working NURL. + self._istorage_server = _HTTPStorageServer.from_http_client( + StorageClient.from_nurl(nurl, reactor) + ) + result = self._istorage_server.get_version() def remove_connecting_deferred(result): diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index 91668e7ca..cf4a939e8 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -63,6 +63,8 @@ from foolscap.ipb import ( IConnectionHintHandler, ) +from allmydata.util.deferredutil import MultiFailure + from .no_network import LocalWrapper from .common import ( EMPTY_CLIENT_CONFIG, @@ -782,7 +784,7 @@ storage: class PickHTTPServerTests(unittest.SynchronousTestCase): """Tests for ``_pick_a_http_server``.""" - def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Optional[DecodedURL]: + def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Deferred[DecodedURL]: """ Given mapping of URLs to (delay, result), return the URL of the first selected server, or None. @@ -803,7 +805,7 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): d = _pick_a_http_server(clock, list(url_to_results.keys()), request) for i in range(100): clock.advance(0.1) - return self.successResultOf(d) + return d def test_first_successful_connect_is_picked(self): """ @@ -817,16 +819,21 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): earliest_url: (1, None), bad_url: (0.5, RuntimeError()), }) - self.assertEqual(result, earliest_url) + self.assertEqual(self.successResultOf(result), earliest_url) - def test_failures_are_turned_into_none(self): + def test_failures_include_all_reasons(self): """ - If the requests all fail, ``_pick_a_http_server`` returns ``None``. + If all the requests fail, ``_pick_a_http_server`` raises a + ``allmydata.util.deferredutil.MultiFailure``. """ eventually_good_url = DecodedURL.from_text("http://good") bad_url = DecodedURL.from_text("http://bad") + exception1 = RuntimeError() + exception2 = ZeroDivisionError() result = self.pick_result({ - eventually_good_url: (1, ZeroDivisionError()), - bad_url: (0.1, RuntimeError()) + eventually_good_url: (1, exception1), + bad_url: (0.1, exception2), }) - self.assertEqual(result, None) + exc = self.failureResultOf(result).value + self.assertIsInstance(exc, MultiFailure) + self.assertEqual({f.value for f in exc.failures}, {exception2, exception1}) From 6659350ff3279c9c4162f16f5a14f8aa4d10fee4 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 24 Mar 2023 10:18:15 -0400 Subject: [PATCH 02/11] Improve type annotations. --- src/allmydata/util/deferredutil.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index 83de411ce..77451b132 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -14,6 +14,7 @@ from typing import ( TypeVar, Optional, ) +from typing_extensions import Awaitable, ParamSpec from foolscap.api import eventually from eliot.twisted import ( @@ -226,7 +227,11 @@ def until( break -def async_to_deferred(f): +P = ParamSpec("P") +R = TypeVar("R") + + +def async_to_deferred(f: Callable[P, Awaitable[R]]) -> Callable[P, Deferred[R]]: """ Wrap an async function to return a Deferred instead. @@ -234,8 +239,8 @@ def async_to_deferred(f): """ @wraps(f) - def not_async(*args, **kwargs): - return defer.Deferred.fromCoroutine(f(*args, **kwargs)) + def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]: + return defer.Deferred.fromCoroutine(f(*args, **kwargs)) # type: ignore return not_async From f0e60a80afa1b970299a9ebf97da1f7aeb12d784 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 24 Mar 2023 10:22:52 -0400 Subject: [PATCH 03/11] Remove unneeded import. --- src/allmydata/test/test_storage_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index cf4a939e8..0671526ae 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -8,7 +8,7 @@ from json import ( loads, ) import hashlib -from typing import Union, Any, Optional +from typing import Union, Any from hyperlink import DecodedURL from fixtures import ( From 7715c4c6d01fda3da79245a3438360281b5abab9 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 24 Mar 2023 10:23:10 -0400 Subject: [PATCH 04/11] News fragment. --- newsfragments/3978.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3978.minor diff --git a/newsfragments/3978.minor b/newsfragments/3978.minor new file mode 100644 index 000000000..e69de29bb From 9baafea00ed8aab9703c6d5af53a2efbed3303b0 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 24 Mar 2023 11:06:58 -0400 Subject: [PATCH 05/11] Refactor: simplify code so there are fewer codepaths. --- src/allmydata/storage_client.py | 68 ++++++++++++--------------------- 1 file changed, 25 insertions(+), 43 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index f71931c8b..96f37e599 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1188,16 +1188,16 @@ class HTTPNativeStorageServer(service.MultiService): @async_to_deferred async def _connect(self): - if self._istorage_server is None: + async def get_istorage_server() -> _HTTPStorageServer: + if self._istorage_server is not None: + return self._istorage_server + # We haven't selected a server yet, so let's do so. # TODO This is somewhat inefficient on startup: it takes two successful # version() calls before we are live talking to a server, it could only # be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992 - # TODO Another problem with this scheme is that while picking - # the HTTP server to talk to, we don't have connection status - # updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978 def request(reactor, nurl: DecodedURL): # Since we're just using this one off to check if the NURL # works, no need for persistent pool or other fanciness. @@ -1213,51 +1213,33 @@ class HTTPNativeStorageServer(service.MultiService): picking = _pick_a_http_server(reactor, self._nurls, request) self._connecting_deferred = picking try: - try: - nurl = await picking - finally: - self._connecting_deferred = None - except Exception as e: - # Logging errors breaks a bunch of tests, and it's not a _bug_ to - # have a failed connection, it's often expected and transient. More - # of a warning, really? - log.msg( - "Failed to connect to a storage server advertised by NURL: {}".format(e) - ) + nurl = await picking + finally: + self._connecting_deferred = None - # Update the connection status: - self._failed_to_connect(Failure(e)) - - # Since we failed to find a server to connect to, give up - # for now. Perhaps the next iteration of the loop will - # succeed. - return - - # iF we've gotten this far, we've found a working NURL. + # If we've gotten this far, we've found a working NURL. self._istorage_server = _HTTPStorageServer.from_http_client( StorageClient.from_nurl(nurl, reactor) ) + return self._istorage_server - result = self._istorage_server.get_version() - - def remove_connecting_deferred(result): - self._connecting_deferred = None - return result - - # Set a short timeout since we're relying on this for server liveness. - self._connecting_deferred = result.addTimeout(5, self._reactor).addBoth( - remove_connecting_deferred).addCallbacks( - self._got_version, - self._failed_to_connect - ) - - # We don't want to do another iteration of the loop until this - # iteration has finished, so wait here: try: - if self._connecting_deferred is not None: - await self._connecting_deferred - except Exception as e: - log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS) + try: + storage_server = await get_istorage_server() + + # Get the version from the remote server. Set a short timeout since + # we're relying on this for server liveness. + self._connecting_deferred = storage_server.get_version().addTimeout( + 5, self._reactor) + # We don't want to do another iteration of the loop until this + # iteration has finished, so wait here: + version = await self._connecting_deferred + self._got_version(version) + except Exception as e: + log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS) + self._failed_to_connect(Failure(e)) + finally: + self._connecting_deferred = None def stopService(self): if self._connecting_deferred is not None: From 33d30b5c80bad6a647e1d8e6d6268555e4a72826 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 24 Mar 2023 11:20:31 -0400 Subject: [PATCH 06/11] Type annotations. --- src/allmydata/storage_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 96f37e599..de756e322 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1069,7 +1069,7 @@ class HTTPNativeStorageServer(service.MultiService): DecodedURL.from_text(u) for u in announcement[ANONYMOUS_STORAGE_NURLS] ] - self._istorage_server = None + self._istorage_server : Optional[_HTTPStorageServer] = None self._connection_status = connection_status.ConnectionStatus.unstarted() self._version = None @@ -1456,7 +1456,7 @@ class _HTTPStorageServer(object): _http_client = attr.ib(type=StorageClient) @staticmethod - def from_http_client(http_client): # type: (StorageClient) -> _HTTPStorageServer + def from_http_client(http_client: StorageClient) -> _HTTPStorageServer: """ Create an ``IStorageServer`` from a HTTP ``StorageClient``. """ From 4b25a923567e53b74ee04bba78b0342b7e09fe4d Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 24 Mar 2023 13:49:44 -0400 Subject: [PATCH 07/11] Limit cryptography for now. --- setup.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 152c49f0e..b12b8f4a2 100644 --- a/setup.py +++ b/setup.py @@ -63,7 +63,11 @@ install_requires = [ # Twisted[conch] also depends on cryptography and Twisted[tls] # transitively depends on cryptography. So it's anyone's guess what # version of cryptography will *really* be installed. - "cryptography >= 2.6", + + # * cryptography 40 broke constants we need; should really be using them + # * via pyOpenSSL; will be fixed in + # * https://github.com/pyca/pyopenssl/issues/1201 + "cryptography >= 2.6, < 40", # * The SFTP frontend depends on Twisted 11.0.0 to fix the SSH server # rekeying bug From 0995772b24168a47049c35ed35825fc69a660316 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 27 Mar 2023 14:54:27 -0400 Subject: [PATCH 08/11] Explain why we ignore type check. --- src/allmydata/util/deferredutil.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index 89dc9704c..58ca7dde0 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -239,6 +239,11 @@ def async_to_deferred(f: Callable[P, Awaitable[R]]) -> Callable[P, Deferred[R]]: @wraps(f) def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]: + # Twisted documents fromCoroutine as accepting either a Generator or a + # Coroutine. However, the standard for type annotations of async + # functions is to return an Awaitable: + # https://github.com/twisted/twisted/issues/11832 + # So, we ignore the type warning. return defer.Deferred.fromCoroutine(f(*args, **kwargs)) # type: ignore return not_async From 7838f25bf8b70c981b29d8357a1edd427c253f80 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 27 Mar 2023 14:54:36 -0400 Subject: [PATCH 09/11] Clean up with simpler idiom. --- src/allmydata/storage_client.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index de756e322..ee555819c 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1224,20 +1224,19 @@ class HTTPNativeStorageServer(service.MultiService): return self._istorage_server try: - try: - storage_server = await get_istorage_server() + storage_server = await get_istorage_server() - # Get the version from the remote server. Set a short timeout since - # we're relying on this for server liveness. - self._connecting_deferred = storage_server.get_version().addTimeout( - 5, self._reactor) - # We don't want to do another iteration of the loop until this - # iteration has finished, so wait here: - version = await self._connecting_deferred - self._got_version(version) - except Exception as e: - log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS) - self._failed_to_connect(Failure(e)) + # Get the version from the remote server. Set a short timeout since + # we're relying on this for server liveness. + self._connecting_deferred = storage_server.get_version().addTimeout( + 5, self._reactor) + # We don't want to do another iteration of the loop until this + # iteration has finished, so wait here: + version = await self._connecting_deferred + self._got_version(version) + except Exception as e: + log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS) + self._failed_to_connect(Failure(e)) finally: self._connecting_deferred = None From bd7c61cc5cce30ee75c0803e71ce9c5d7cc1643a Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 27 Mar 2023 16:58:15 -0400 Subject: [PATCH 10/11] Split up the state management logic from the server pinging logic. --- src/allmydata/storage_client.py | 65 ++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index ee555819c..a40e98b03 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1186,8 +1186,46 @@ class HTTPNativeStorageServer(service.MultiService): def try_to_connect(self): self._connect() + def _connect(self) -> defer.Deferred[object]: + """ + Try to connect to a working storage server. + + If called while a previous ``_connect()`` is already running, it will + just return the same ``Deferred``. + + ``LoopingCall.stop()`` doesn't cancel ``Deferred``s, unfortunately: + https://github.com/twisted/twisted/issues/11814. Thus we want to store + the ``Deferred`` so we can cancel it when necessary. + + We also want to return it so that loop iterations take it into account, + and a new iteration doesn't start while we're in the middle of the + previous one. + """ + # Conceivably try_to_connect() was called on this before, in which case + # we already are in the middle of connecting. So in that case just + # return whatever is in progress: + if self._connecting_deferred is not None: + return self._connecting_deferred + + def done(_): + self._connecting_deferred = None + + connecting = self._pick_server_and_get_version() + # Set a short timeout since we're relying on this for server liveness. + connecting = connecting.addTimeout(5, self._reactor).addCallbacks( + self._got_version, self._failed_to_connect + ).addBoth(done) + self._connecting_deferred = connecting + return connecting + @async_to_deferred - async def _connect(self): + async def _pick_server_and_get_version(self): + """ + Minimal implementation of connection logic: pick a server, get its + version. This doesn't deal with errors much, so as to minimize + statefulness. It does change ``self._istorage_server``, so possibly + more refactoring would be useful to remove even that much statefulness. + """ async def get_istorage_server() -> _HTTPStorageServer: if self._istorage_server is not None: return self._istorage_server @@ -1207,15 +1245,7 @@ class HTTPNativeStorageServer(service.MultiService): StorageClient.from_nurl(nurl, reactor, pool) ).get_version() - # LoopingCall.stop() doesn't cancel Deferreds, unfortunately: - # https://github.com/twisted/twisted/issues/11814 Thus we want - # store the Deferred so it gets cancelled. - picking = _pick_a_http_server(reactor, self._nurls, request) - self._connecting_deferred = picking - try: - nurl = await picking - finally: - self._connecting_deferred = None + nurl = await _pick_a_http_server(reactor, self._nurls, request) # If we've gotten this far, we've found a working NURL. self._istorage_server = _HTTPStorageServer.from_http_client( @@ -1226,19 +1256,12 @@ class HTTPNativeStorageServer(service.MultiService): try: storage_server = await get_istorage_server() - # Get the version from the remote server. Set a short timeout since - # we're relying on this for server liveness. - self._connecting_deferred = storage_server.get_version().addTimeout( - 5, self._reactor) - # We don't want to do another iteration of the loop until this - # iteration has finished, so wait here: - version = await self._connecting_deferred - self._got_version(version) + # Get the version from the remote server. + version = await storage_server.get_version() + return version except Exception as e: log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS) - self._failed_to_connect(Failure(e)) - finally: - self._connecting_deferred = None + raise def stopService(self): if self._connecting_deferred is not None: From 80d8e5b465bbc717fc76708f319973ce40fb2907 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 Mar 2023 11:11:44 -0400 Subject: [PATCH 11/11] The function should return a coroutine. --- src/allmydata/util/deferredutil.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index 58ca7dde0..695915ceb 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -13,8 +13,9 @@ from typing import ( Sequence, TypeVar, Optional, + Coroutine, ) -from typing_extensions import Awaitable, ParamSpec +from typing_extensions import ParamSpec from foolscap.api import eventually from eliot.twisted import ( @@ -230,7 +231,7 @@ P = ParamSpec("P") R = TypeVar("R") -def async_to_deferred(f: Callable[P, Awaitable[R]]) -> Callable[P, Deferred[R]]: +def async_to_deferred(f: Callable[P, Coroutine[defer.Deferred[R], None, R]]) -> Callable[P, Deferred[R]]: """ Wrap an async function to return a Deferred instead. @@ -239,12 +240,7 @@ def async_to_deferred(f: Callable[P, Awaitable[R]]) -> Callable[P, Deferred[R]]: @wraps(f) def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]: - # Twisted documents fromCoroutine as accepting either a Generator or a - # Coroutine. However, the standard for type annotations of async - # functions is to return an Awaitable: - # https://github.com/twisted/twisted/issues/11832 - # So, we ignore the type warning. - return defer.Deferred.fromCoroutine(f(*args, **kwargs)) # type: ignore + return defer.Deferred.fromCoroutine(f(*args, **kwargs)) return not_async