From 669296d5d68a7705698bcc2fe15fa56491948fdc Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 26 Jan 2023 11:44:53 -0500 Subject: [PATCH 01/34] News file. --- newsfragments/3935.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3935.minor diff --git a/newsfragments/3935.minor b/newsfragments/3935.minor new file mode 100644 index 000000000..e69de29bb From 95bb7afba7c0c883ff7c9099dc9cd839daef3c6c Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 21 Feb 2023 10:42:06 -0500 Subject: [PATCH 02/34] Sketch of happy eyeballs. --- src/allmydata/storage_client.py | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 8e9ad3656..8191014e8 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -47,7 +47,7 @@ from zope.interface import ( ) from twisted.python.failure import Failure from twisted.web import http -from twisted.internet.task import LoopingCall +from twisted.internet.task import LoopingCall, deferLater from twisted.internet import defer, reactor from twisted.application import service from twisted.plugin import ( @@ -935,6 +935,21 @@ class NativeStorageServer(service.MultiService): self._reconnector.reset() +async def _pick_a_http_server(reactor, nurls: list[DecodedURL]) -> DecodedURL: + """Pick the first server we successfully talk to.""" + while True: + try: + _, index = await defer.DeferredList([ + StorageClientGeneral( + StorageClient.from_nurl(nurl, reactor) + ).get_version() for nurl in nurls + ], consumeErrors=True, fireOnOneCallback=True) + return nurls[index] + except Exception as e: + log.err(e, "Failed to connect to any of the HTTP NURLs for server") + await deferLater(reactor, 1, lambda: None) + + @implementer(IServer) class HTTPNativeStorageServer(service.MultiService): """ @@ -962,10 +977,8 @@ class HTTPNativeStorageServer(service.MultiService): ) = _parse_announcement(server_id, furl, announcement) # TODO need some way to do equivalent of Happy Eyeballs for multiple NURLs? # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3935 - nurl = DecodedURL.from_text(announcement[ANONYMOUS_STORAGE_NURLS][0]) - self._istorage_server = _HTTPStorageServer.from_http_client( - StorageClient.from_nurl(nurl, reactor) - ) + self._nurls = [DecodedURL.from_text(u) for u in announcement[ANONYMOUS_STORAGE_NURLS]] + self._istorage_server = None self._connection_status = connection_status.ConnectionStatus.unstarted() self._version = None @@ -1033,7 +1046,14 @@ class HTTPNativeStorageServer(service.MultiService): version = self.get_version() return _available_space_from_version(version) - def start_connecting(self, trigger_cb): + @async_to_deferred + async def start_connecting(self, trigger_cb): + # The problem with this scheme is that while picking the HTTP server to + # talk to, we don't have connection status updates... + nurl = await _pick_a_http_server(reactor, self._nurls) + self._istorage_server = _HTTPStorageServer.from_http_client( + StorageClient.from_nurl(nurl, reactor) + ) self._lc = LoopingCall(self._connect) self._lc.start(1, True) From 2ac6580c269268e891a1f79f23a81f0d56917512 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 21 Feb 2023 10:56:38 -0500 Subject: [PATCH 03/34] Welcome to the world of tomorrow. --- src/allmydata/test/test_storage_client.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index 1a84f35ec..0657a814e 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -1,16 +1,8 @@ """ -Ported from Python 3. +Tests for allmydata.storage_client. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals -from future.utils import PY2 -if PY2: - from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 - -from six import ensure_text +from __future__ import annotations from json import ( loads, @@ -475,7 +467,7 @@ class StoragePluginWebPresence(AsyncTestCase): # config validation policy). "tub.port": tubport_endpoint, "tub.location": tubport_location, - "web.port": ensure_text(webport_endpoint), + "web.port": str(webport_endpoint), }, storage_plugin=self.storage_plugin, basedir=self.basedir, From 32768e310ae5316cc2e6fc0f0f969368c1ff3eee Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 21 Feb 2023 11:30:47 -0500 Subject: [PATCH 04/34] Unit test for _pick_a_http_server. --- src/allmydata/storage_client.py | 48 ++++++++++------- src/allmydata/test/test_storage_client.py | 64 ++++++++++++++++++++++- 2 files changed, 93 insertions(+), 19 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 8191014e8..436335431 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -33,7 +33,7 @@ Ported to Python 3. from __future__ import annotations from six import ensure_text -from typing import Union +from typing import Union, Callable, Any import re, time, hashlib from os import urandom from configparser import NoSectionError @@ -935,19 +935,25 @@ class NativeStorageServer(service.MultiService): self._reconnector.reset() -async def _pick_a_http_server(reactor, nurls: list[DecodedURL]) -> DecodedURL: - """Pick the first server we successfully talk to.""" +async def _pick_a_http_server( + reactor, + nurls: list[DecodedURL], + request: Callable[[DecodedURL, Any], defer.Deferred[Any]] +) -> DecodedURL: + """Pick the first server we successfully send a request to.""" while True: - try: - _, index = await defer.DeferredList([ - StorageClientGeneral( - StorageClient.from_nurl(nurl, reactor) - ).get_version() for nurl in nurls - ], consumeErrors=True, fireOnOneCallback=True) + result = await defer.DeferredList([ + request(reactor, nurl) for nurl in nurls + ], consumeErrors=True, fireOnOneCallback=True) + # Apparently DeferredList is an awful awful API. If everything fails, + # you get back a list of (False, Failure), if it succeeds, you get a + # tuple of (value, index). + if isinstance(result, list): + await deferLater(reactor, 1, lambda: None) + else: + assert isinstance(result, tuple) + _, index = result return nurls[index] - except Exception as e: - log.err(e, "Failed to connect to any of the HTTP NURLs for server") - await deferLater(reactor, 1, lambda: None) @implementer(IServer) @@ -975,9 +981,10 @@ class HTTPNativeStorageServer(service.MultiService): self._short_description, self._long_description ) = _parse_announcement(server_id, furl, announcement) - # TODO need some way to do equivalent of Happy Eyeballs for multiple NURLs? - # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3935 - self._nurls = [DecodedURL.from_text(u) for u in announcement[ANONYMOUS_STORAGE_NURLS]] + self._nurls = [ + DecodedURL.from_text(u) + for u in announcement[ANONYMOUS_STORAGE_NURLS] + ] self._istorage_server = None self._connection_status = connection_status.ConnectionStatus.unstarted() @@ -1048,9 +1055,14 @@ class HTTPNativeStorageServer(service.MultiService): @async_to_deferred async def start_connecting(self, trigger_cb): - # The problem with this scheme is that while picking the HTTP server to - # talk to, we don't have connection status updates... - nurl = await _pick_a_http_server(reactor, self._nurls) + # TODO file a bug: The problem with this scheme is that while picking + # the HTTP server to talk to, we don't have connection status + # updates... + def request(reactor, nurl: DecodedURL): + return StorageClientGeneral( + StorageClient.from_nurl(nurl, reactor) + ).get_version() + nurl = await _pick_a_http_server(reactor, self._nurls, request) self._istorage_server = _HTTPStorageServer.from_http_client( StorageClient.from_nurl(nurl, reactor) ) diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index 0657a814e..38ef8c1d3 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -7,8 +7,10 @@ from __future__ import annotations from json import ( loads, ) - import hashlib +from typing import Union, Any + +from hyperlink import DecodedURL from fixtures import ( TempDir, ) @@ -52,6 +54,7 @@ from twisted.internet.defer import ( from twisted.python.filepath import ( FilePath, ) +from twisted.internet.task import Clock from foolscap.api import ( Tub, @@ -80,12 +83,14 @@ from allmydata.webish import ( WebishServer, ) from allmydata.util import base32, yamlutil +from allmydata.util.deferredutil import async_to_deferred from allmydata.storage_client import ( IFoolscapStorageServer, NativeStorageServer, StorageFarmBroker, _FoolscapStorage, _NullStorage, + _pick_a_http_server ) from ..storage.server import ( StorageServer, @@ -731,3 +736,60 @@ storage: yield done self.assertTrue(done.called) + + +class PickHTTPServerTests(unittest.SynchronousTestCase): + """Tests for ``_pick_a_http_server``.""" + + def loop_until_result(self, url_to_results: dict[DecodedURL, list[tuple[float, Union[Exception, Any]]]]) -> Deferred[DecodedURL]: + """ + Given mapping of URLs to list of (delay, result), return the URL of the + first selected server. + """ + clock = Clock() + + def request(reactor, url): + delay, value = url_to_results[url].pop(0) + result = Deferred() + def add_result_value(): + if isinstance(value, Exception): + result.errback(value) + else: + result.callback(value) + reactor.callLater(delay, add_result_value) + return result + + d = async_to_deferred(_pick_a_http_server)( + clock, list(url_to_results.keys()), request + ) + for i in range(1000): + clock.advance(0.1) + return d + + def test_first_successful_connect_is_picked(self): + """ + Given multiple good URLs, the first one that connects is chosen. + """ + earliest_url = DecodedURL.from_text("http://a") + latest_url = DecodedURL.from_text("http://b") + d = self.loop_until_result({ + latest_url: [(2, None)], + earliest_url: [(1, None)] + }) + self.assertEqual(self.successResultOf(d), earliest_url) + + def test_failures_are_retried(self): + """ + If the initial requests all fail, ``_pick_a_http_server`` keeps trying + until success. + """ + eventually_good_url = DecodedURL.from_text("http://good") + bad_url = DecodedURL.from_text("http://bad") + d = self.loop_until_result({ + eventually_good_url: [ + (1, ZeroDivisionError()), (0.1, ZeroDivisionError()), (1, None) + ], + bad_url: [(0.1, RuntimeError()), (0.1, RuntimeError()), (0.1, RuntimeError())] + }) + self.flushLoggedErrors(ZeroDivisionError, RuntimeError) + self.assertEqual(self.successResultOf(d), eventually_good_url) From 74e77685a35905bc9bdd0609ac73309a9da7f10d Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 23 Feb 2023 10:07:57 -0500 Subject: [PATCH 05/34] Get rid of DeferredList. --- src/allmydata/storage_client.py | 35 ++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 436335431..0d2443d64 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -942,18 +942,31 @@ async def _pick_a_http_server( ) -> DecodedURL: """Pick the first server we successfully send a request to.""" while True: - result = await defer.DeferredList([ - request(reactor, nurl) for nurl in nurls - ], consumeErrors=True, fireOnOneCallback=True) - # Apparently DeferredList is an awful awful API. If everything fails, - # you get back a list of (False, Failure), if it succeeds, you get a - # tuple of (value, index). - if isinstance(result, list): - await deferLater(reactor, 1, lambda: None) + result : defer.Deferred[Union[DecodedURL, None]] = defer.Deferred() + + def succeeded(nurl: DecodedURL, result=result): + # Only need the first successful NURL: + if result.called: + return + result.callback(nurl) + + def failed(failure, failures=[], result=result): + log.err(failure) + failures.append(None) + if len(failures) == len(nurls): + # All our potential NURLs failed... + result.callback(None) + + for index, nurl in enumerate(nurls): + request(reactor, nurl).addCallback( + lambda _, nurl=nurl: nurl).addCallbacks(succeeded, failed) + + first_nurl = await result + if first_nurl is None: + # Failed to connect to any of the NURLs: + await deferLater(reactor, 1, lambda: None) else: - assert isinstance(result, tuple) - _, index = result - return nurls[index] + return first_nurl @implementer(IServer) From f41f4a5e0cc1fde83e7f184b6dd9c08775994d4b Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 23 Feb 2023 10:10:25 -0500 Subject: [PATCH 06/34] Correct type. --- src/allmydata/storage_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 0d2443d64..ac2a3cf5e 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -938,7 +938,7 @@ class NativeStorageServer(service.MultiService): async def _pick_a_http_server( reactor, nurls: list[DecodedURL], - request: Callable[[DecodedURL, Any], defer.Deferred[Any]] + request: Callable[[Any, DecodedURL], defer.Deferred[Any]] ) -> DecodedURL: """Pick the first server we successfully send a request to.""" while True: @@ -951,7 +951,7 @@ async def _pick_a_http_server( result.callback(nurl) def failed(failure, failures=[], result=result): - log.err(failure) + log.err(failure, "Failed to connect to NURL") failures.append(None) if len(failures) == len(nurls): # All our potential NURLs failed... From 99de5fa54c95331086d4ad92a6cc46b1823ccec7 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 23 Feb 2023 10:12:25 -0500 Subject: [PATCH 07/34] Link to follow-up ticket. --- src/allmydata/storage_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index ac2a3cf5e..420be8461 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1068,9 +1068,9 @@ class HTTPNativeStorageServer(service.MultiService): @async_to_deferred async def start_connecting(self, trigger_cb): - # TODO file a bug: The problem with this scheme is that while picking + # TODO The problem with this scheme is that while picking # the HTTP server to talk to, we don't have connection status - # updates... + # updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978 def request(reactor, nurl: DecodedURL): return StorageClientGeneral( StorageClient.from_nurl(nurl, reactor) From b6e20dfa812be98ffb569aa023f1081cf83938af Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 23 Feb 2023 13:26:30 -0500 Subject: [PATCH 08/34] Slightly longer timeout. --- src/allmydata/storage_client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 420be8461..93c890c56 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -963,8 +963,9 @@ async def _pick_a_http_server( first_nurl = await result if first_nurl is None: - # Failed to connect to any of the NURLs: - await deferLater(reactor, 1, lambda: None) + # Failed to connect to any of the NURLs, try again in a few + # seconds: + await deferLater(reactor, 5, lambda: None) else: return first_nurl From b95a1d2b79a465a70a65e08511221cfe80bc9dc0 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 23 Feb 2023 13:27:41 -0500 Subject: [PATCH 09/34] Nicer type annotations. --- src/allmydata/storage_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 93c890c56..c2468c679 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -33,7 +33,7 @@ Ported to Python 3. from __future__ import annotations from six import ensure_text -from typing import Union, Callable, Any +from typing import Union, Callable, Any, Optional import re, time, hashlib from os import urandom from configparser import NoSectionError @@ -942,7 +942,7 @@ async def _pick_a_http_server( ) -> DecodedURL: """Pick the first server we successfully send a request to.""" while True: - result : defer.Deferred[Union[DecodedURL, None]] = defer.Deferred() + result : defer.Deferred[Optional[DecodedURL]] = defer.Deferred() def succeeded(nurl: DecodedURL, result=result): # Only need the first successful NURL: @@ -1300,7 +1300,7 @@ class _HTTPBucketWriter(object): return self.finished -def _ignore_404(failure: Failure) -> Union[Failure, None]: +def _ignore_404(failure: Failure) -> Optional[Failure]: """ Useful for advise_corrupt_share(), since it swallows unknown share numbers in Foolscap. From 96e1e9ffac0f2b60d1a4db14761939ad96d9467e Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 23 Feb 2023 19:45:01 -0500 Subject: [PATCH 10/34] Move where choosing a NURL happens. --- src/allmydata/storage_client.py | 36 ++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index c2468c679..53131c88a 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1067,19 +1067,7 @@ class HTTPNativeStorageServer(service.MultiService): version = self.get_version() return _available_space_from_version(version) - @async_to_deferred - async def start_connecting(self, trigger_cb): - # TODO The problem with this scheme is that while picking - # the HTTP server to talk to, we don't have connection status - # updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978 - def request(reactor, nurl: DecodedURL): - return StorageClientGeneral( - StorageClient.from_nurl(nurl, reactor) - ).get_version() - nurl = await _pick_a_http_server(reactor, self._nurls, request) - self._istorage_server = _HTTPStorageServer.from_http_client( - StorageClient.from_nurl(nurl, reactor) - ) + def start_connecting(self, trigger_cb): self._lc = LoopingCall(self._connect) self._lc.start(1, True) @@ -1113,7 +1101,24 @@ class HTTPNativeStorageServer(service.MultiService): def try_to_connect(self): self._connect() - def _connect(self): + @async_to_deferred + async def _connect(self): + if self._istorage_server is None: + # We haven't selected a server yet, so let's do so. + + # TODO The problem with this scheme is that while picking + # the HTTP server to talk to, we don't have connection status + # updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978 + def request(reactor, nurl: DecodedURL): + return StorageClientGeneral( + StorageClient.from_nurl(nurl, reactor) + ).get_version() + + nurl = await _pick_a_http_server(reactor, self._nurls, request) + self._istorage_server = _HTTPStorageServer.from_http_client( + StorageClient.from_nurl(nurl, reactor) + ) + result = self._istorage_server.get_version() def remove_connecting_deferred(result): @@ -1127,6 +1132,9 @@ class HTTPNativeStorageServer(service.MultiService): self._failed_to_connect ) + # TODO Make sure LoopingCall waits for the above timeout for looping again: + #return self._connecting_deferred + def stopService(self): if self._connecting_deferred is not None: self._connecting_deferred.cancel() From e09d19463dfffc9f6b68ff99593fb96f2bdf5233 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 24 Feb 2023 09:53:28 -0500 Subject: [PATCH 11/34] Logging errors breaks some tests. --- src/allmydata/storage_client.py | 5 ++++- src/allmydata/test/test_storage_client.py | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 53131c88a..a2726fe09 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -951,7 +951,10 @@ async def _pick_a_http_server( result.callback(nurl) def failed(failure, failures=[], result=result): - log.err(failure, "Failed to connect to NURL") + # Logging errors breaks a bunch of tests, and it's not a _bug_ to + # have a failed connection, it's often expected and transient. More + # of a warning, really? + log.msg("Failed to connect to NURL: {}".format(failure)) failures.append(None) if len(failures) == len(nurls): # All our potential NURLs failed... diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index 38ef8c1d3..d7420b62f 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -791,5 +791,4 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): ], bad_url: [(0.1, RuntimeError()), (0.1, RuntimeError()), (0.1, RuntimeError())] }) - self.flushLoggedErrors(ZeroDivisionError, RuntimeError) self.assertEqual(self.successResultOf(d), eventually_good_url) From 3d0b17bc1c197a73b212b6b5eac0ad3b3ee43297 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 27 Feb 2023 11:37:18 -0500 Subject: [PATCH 12/34] Make cancellation more likely to happen. --- src/allmydata/storage_client.py | 88 ++++++++++++++--------- src/allmydata/test/test_storage_client.py | 28 ++++---- 2 files changed, 72 insertions(+), 44 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index a2726fe09..549062d63 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -47,7 +47,7 @@ from zope.interface import ( ) from twisted.python.failure import Failure from twisted.web import http -from twisted.internet.task import LoopingCall, deferLater +from twisted.internet.task import LoopingCall from twisted.internet import defer, reactor from twisted.application import service from twisted.plugin import ( @@ -935,42 +935,52 @@ class NativeStorageServer(service.MultiService): self._reconnector.reset() -async def _pick_a_http_server( +def _pick_a_http_server( reactor, nurls: list[DecodedURL], request: Callable[[Any, DecodedURL], defer.Deferred[Any]] -) -> DecodedURL: - """Pick the first server we successfully send a request to.""" - while True: - result : defer.Deferred[Optional[DecodedURL]] = defer.Deferred() +) -> defer.Deferred[Optional[DecodedURL]]: + """Pick the first server we successfully send a request to. - def succeeded(nurl: DecodedURL, result=result): - # Only need the first successful NURL: - if result.called: - return - result.callback(nurl) + Fires with ``None`` if no server was found, or with the ``DecodedURL`` of + the first successfully-connected server. + """ - def failed(failure, failures=[], result=result): - # Logging errors breaks a bunch of tests, and it's not a _bug_ to - # have a failed connection, it's often expected and transient. More - # of a warning, really? - log.msg("Failed to connect to NURL: {}".format(failure)) - failures.append(None) - if len(failures) == len(nurls): - # All our potential NURLs failed... - result.callback(None) + to_cancel : list[defer.Deferred] = [] - for index, nurl in enumerate(nurls): - request(reactor, nurl).addCallback( - lambda _, nurl=nurl: nurl).addCallbacks(succeeded, failed) + def cancel(result: Optional[defer.Deferred]): + for d in to_cancel: + if not d.called: + d.cancel() + if result is not None: + result.errback(defer.CancelledError()) - first_nurl = await result - if first_nurl is None: - # Failed to connect to any of the NURLs, try again in a few - # seconds: - await deferLater(reactor, 5, lambda: None) - else: - return first_nurl + result : defer.Deferred[Optional[DecodedURL]] = defer.Deferred(canceller=cancel) + + def succeeded(nurl: DecodedURL, result=result): + # Only need the first successful NURL: + if result.called: + return + result.callback(nurl) + # No point in continuing other requests if we're connected: + cancel(None) + + def failed(failure, failures=[], result=result): + # Logging errors breaks a bunch of tests, and it's not a _bug_ to + # have a failed connection, it's often expected and transient. More + # of a warning, really? + log.msg("Failed to connect to NURL: {}".format(failure)) + failures.append(None) + if len(failures) == len(nurls): + # All our potential NURLs failed... + result.callback(None) + + for index, nurl in enumerate(nurls): + d = request(reactor, nurl) + to_cancel.append(d) + d.addCallback(lambda _, nurl=nurl: nurl).addCallbacks(succeeded, failed) + + return result @implementer(IServer) @@ -1117,8 +1127,22 @@ class HTTPNativeStorageServer(service.MultiService): StorageClient.from_nurl(nurl, reactor) ).get_version() - nurl = await _pick_a_http_server(reactor, self._nurls, request) - self._istorage_server = _HTTPStorageServer.from_http_client( + # LoopingCall.stop() doesn't cancel Deferreds, unfortunately: + # https://github.com/twisted/twisted/issues/11814 Thus we want + # store the Deferred so it gets cancelled. + picking = _pick_a_http_server(reactor, self._nurls, request) + self._connecting_deferred = picking + try: + nurl = await picking + finally: + self._connecting_deferred = None + + if nurl is None: + # We failed to find a server to connect to. Perhaps the next + # iteration of the loop will succeed. + return + else: + self._istorage_server = _HTTPStorageServer.from_http_client( StorageClient.from_nurl(nurl, reactor) ) diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index d7420b62f..a51e44a82 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -83,7 +83,6 @@ from allmydata.webish import ( WebishServer, ) from allmydata.util import base32, yamlutil -from allmydata.util.deferredutil import async_to_deferred from allmydata.storage_client import ( IFoolscapStorageServer, NativeStorageServer, @@ -741,7 +740,7 @@ storage: class PickHTTPServerTests(unittest.SynchronousTestCase): """Tests for ``_pick_a_http_server``.""" - def loop_until_result(self, url_to_results: dict[DecodedURL, list[tuple[float, Union[Exception, Any]]]]) -> Deferred[DecodedURL]: + def loop_until_result(self, url_to_results: dict[DecodedURL, list[tuple[float, Union[Exception, Any]]]]) -> tuple[int, DecodedURL]: """ Given mapping of URLs to list of (delay, result), return the URL of the first selected server. @@ -759,12 +758,15 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): reactor.callLater(delay, add_result_value) return result - d = async_to_deferred(_pick_a_http_server)( - clock, list(url_to_results.keys()), request - ) - for i in range(1000): - clock.advance(0.1) - return d + iterations = 0 + while True: + iterations += 1 + d = _pick_a_http_server(clock, list(url_to_results.keys()), request) + for i in range(100): + clock.advance(0.1) + result = self.successResultOf(d) + if result is not None: + return iterations, result def test_first_successful_connect_is_picked(self): """ @@ -772,11 +774,12 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): """ earliest_url = DecodedURL.from_text("http://a") latest_url = DecodedURL.from_text("http://b") - d = self.loop_until_result({ + iterations, result = self.loop_until_result({ latest_url: [(2, None)], earliest_url: [(1, None)] }) - self.assertEqual(self.successResultOf(d), earliest_url) + self.assertEqual(iterations, 1) + self.assertEqual(result, earliest_url) def test_failures_are_retried(self): """ @@ -785,10 +788,11 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): """ eventually_good_url = DecodedURL.from_text("http://good") bad_url = DecodedURL.from_text("http://bad") - d = self.loop_until_result({ + iterations, result = self.loop_until_result({ eventually_good_url: [ (1, ZeroDivisionError()), (0.1, ZeroDivisionError()), (1, None) ], bad_url: [(0.1, RuntimeError()), (0.1, RuntimeError()), (0.1, RuntimeError())] }) - self.assertEqual(self.successResultOf(d), eventually_good_url) + self.assertEqual(iterations, 3) + self.assertEqual(result, eventually_good_url) From e9c3a227a17f3c23efbc9ac8ee0907101c0e8f62 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 8 Mar 2023 14:17:25 -0500 Subject: [PATCH 13/34] File follow-up ticket. --- src/allmydata/storage_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 388f8b4b8..686ee8e59 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1248,7 +1248,8 @@ class HTTPNativeStorageServer(service.MultiService): ) # TODO Make sure LoopingCall waits for the above timeout for looping again: - #return self._connecting_deferred + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3981 + #return self._connecting_deferred or maye await it def stopService(self): if self._connecting_deferred is not None: From 75da037d673a3db8db50a9472758f89960591f2d Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 8 Mar 2023 14:25:04 -0500 Subject: [PATCH 14/34] Add race() implementation from https://github.com/twisted/twisted/pull/11818 --- src/allmydata/test/test_deferredutil.py | 160 ++++++++++++++++++++++-- src/allmydata/util/deferredutil.py | 100 ++++++++++++++- 2 files changed, 247 insertions(+), 13 deletions(-) diff --git a/src/allmydata/test/test_deferredutil.py b/src/allmydata/test/test_deferredutil.py index a37dfdd6f..47121b4cb 100644 --- a/src/allmydata/test/test_deferredutil.py +++ b/src/allmydata/test/test_deferredutil.py @@ -1,23 +1,16 @@ """ Tests for allmydata.util.deferredutil. - -Ported to Python 3. """ -from __future__ import unicode_literals -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from future.utils import PY2 -if PY2: - from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 - from twisted.trial import unittest from twisted.internet import defer, reactor +from twisted.internet.defer import Deferred from twisted.python.failure import Failure +from hypothesis.strategies import integers +from hypothesis import given from allmydata.util import deferredutil +from allmydata.util.deferredutil import race, MultiFailure class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin): @@ -157,3 +150,148 @@ class AsyncToDeferred(unittest.TestCase): result = f(1, 0) self.assertIsInstance(self.failureResultOf(result).value, ZeroDivisionError) + + + +def _setupRaceState(numDeferreds: int) -> tuple[list[int], list[Deferred[object]]]: + """ + Create a list of Deferreds and a corresponding list of integers + tracking how many times each Deferred has been cancelled. Without + additional steps the Deferreds will never fire. + """ + cancelledState = [0] * numDeferreds + + ds: list[Deferred[object]] = [] + for n in range(numDeferreds): + + def cancel(d: Deferred, n: int = n) -> None: + cancelledState[n] += 1 + + ds.append(Deferred(canceller=cancel)) + + return cancelledState, ds + + +class RaceTests(unittest.SynchronousTestCase): + """ + Tests for L{race}. + """ + + @given( + beforeWinner=integers(min_value=0, max_value=3), + afterWinner=integers(min_value=0, max_value=3), + ) + def test_success(self, beforeWinner: int, afterWinner: int) -> None: + """ + When one of the L{Deferred}s passed to L{race} fires successfully, + the L{Deferred} return by L{race} fires with the index of that + L{Deferred} and its result and cancels the rest of the L{Deferred}s. + @param beforeWinner: A randomly selected number of Deferreds to + appear before the "winning" Deferred in the list passed in. + @param beforeWinner: A randomly selected number of Deferreds to + appear after the "winning" Deferred in the list passed in. + """ + cancelledState, ds = _setupRaceState(beforeWinner + 1 + afterWinner) + + raceResult = race(ds) + expected = object() + ds[beforeWinner].callback(expected) + + # The result should be the index and result of the only Deferred that + # fired. + self.assertEqual( + self.successResultOf(raceResult), + (beforeWinner, expected), + ) + # All Deferreds except the winner should have been cancelled once. + expectedCancelledState = [1] * beforeWinner + [0] + [1] * afterWinner + self.assertEqual( + cancelledState, + expectedCancelledState, + ) + + @given( + beforeWinner=integers(min_value=0, max_value=3), + afterWinner=integers(min_value=0, max_value=3), + ) + def test_failure(self, beforeWinner: int, afterWinner: int) -> None: + """ + When all of the L{Deferred}s passed to L{race} fire with failures, + the L{Deferred} return by L{race} fires with L{MultiFailure} wrapping + all of their failures. + @param beforeWinner: A randomly selected number of Deferreds to + appear before the "winning" Deferred in the list passed in. + @param beforeWinner: A randomly selected number of Deferreds to + appear after the "winning" Deferred in the list passed in. + """ + cancelledState, ds = _setupRaceState(beforeWinner + 1 + afterWinner) + + failure = Failure(Exception("The test demands failures.")) + raceResult = race(ds) + for d in ds: + d.errback(failure) + + actualFailure = self.failureResultOf(raceResult, MultiFailure) + self.assertEqual( + actualFailure.value.failures, + [failure] * len(ds), + ) + self.assertEqual( + cancelledState, + [0] * len(ds), + ) + + @given( + beforeWinner=integers(min_value=0, max_value=3), + afterWinner=integers(min_value=0, max_value=3), + ) + def test_resultAfterCancel(self, beforeWinner: int, afterWinner: int) -> None: + """ + If one of the Deferreds fires after it was cancelled its result + goes nowhere. In particular, it does not cause any errors to be + logged. + """ + # Ensure we have a Deferred to win and at least one other Deferred + # that can ignore cancellation. + ds: list[Deferred[None]] = [ + Deferred() for n in range(beforeWinner + 2 + afterWinner) + ] + + raceResult = race(ds) + ds[beforeWinner].callback(None) + ds[beforeWinner + 1].callback(None) + + self.successResultOf(raceResult) + self.assertEqual(len(self.flushLoggedErrors()), 0) + + def test_resultFromCancel(self) -> None: + """ + If one of the input Deferreds has a cancel function that fires it + with success, nothing bad happens. + """ + winner: Deferred[object] = Deferred() + ds: list[Deferred[object]] = [ + winner, + Deferred(canceller=lambda d: d.callback(object())), + ] + expected = object() + raceResult = race(ds) + winner.callback(expected) + + self.assertEqual(self.successResultOf(raceResult), (0, expected)) + + @given( + numDeferreds=integers(min_value=1, max_value=3), + ) + def test_cancel(self, numDeferreds: int) -> None: + """ + If the result of L{race} is cancelled then all of the L{Deferred}s + passed in are cancelled. + """ + cancelledState, ds = _setupRaceState(numDeferreds) + + raceResult = race(ds) + raceResult.cancel() + + self.assertEqual(cancelledState, [1] * numDeferreds) + self.failureResultOf(raceResult, MultiFailure) diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index 782663e8b..83de411ce 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -1,15 +1,18 @@ """ Utilities for working with Twisted Deferreds. - -Ported to Python 3. """ +from __future__ import annotations + import time from functools import wraps from typing import ( Callable, Any, + Sequence, + TypeVar, + Optional, ) from foolscap.api import eventually @@ -17,6 +20,7 @@ from eliot.twisted import ( inline_callbacks, ) from twisted.internet import defer, reactor, error +from twisted.internet.defer import Deferred from twisted.python.failure import Failure from allmydata.util import log @@ -234,3 +238,95 @@ def async_to_deferred(f): return defer.Deferred.fromCoroutine(f(*args, **kwargs)) return not_async + + +class MultiFailure(Exception): + """ + More than one failure occurred. + """ + + def __init__(self, failures: Sequence[Failure]) -> None: + super(MultiFailure, self).__init__() + self.failures = failures + + +_T = TypeVar("_T") + +# Eventually this should be in Twisted upstream: +# https://github.com/twisted/twisted/pull/11818 +def race(ds: Sequence[Deferred[_T]]) -> Deferred[tuple[int, _T]]: + """ + Select the first available result from the sequence of Deferreds and + cancel the rest. + @return: A cancellable L{Deferred} that fires with the index and output of + the element of C{ds} to have a success result first, or that fires + with L{MultiFailure} holding a list of their failures if they all + fail. + """ + # Keep track of the Deferred for the action which completed first. When + # it completes, all of the other Deferreds will get cancelled but this one + # shouldn't be. Even though it "completed" it isn't really done - the + # caller will still be using it for something. If we cancelled it, + # cancellation could propagate down to them. + winner: Optional[Deferred] = None + + # The cancellation function for the Deferred this function returns. + def cancel(result: Deferred) -> None: + # If it is cancelled then we cancel all of the Deferreds for the + # individual actions because there is no longer the possibility of + # delivering any of their results anywhere. We don't have to fire + # `result` because the Deferred will do that for us. + for d in to_cancel: + d.cancel() + + # The Deferred that this function will return. It will fire with the + # index and output of the action that completes first, or None if all of + # the actions fail. If it is cancelled, all of the actions will be + # cancelled. + final_result: Deferred[tuple[int, _T]] = Deferred(canceller=cancel) + + # A callback for an individual action. + def succeeded(this_output: _T, this_index: int) -> None: + # If it is the first action to succeed then it becomes the "winner", + # its index/output become the externally visible result, and the rest + # of the action Deferreds get cancelled. If it is not the first + # action to succeed (because some action did not support + # cancellation), just ignore the result. It is uncommon for this + # callback to be entered twice. The only way it can happen is if one + # of the input Deferreds has a cancellation function that fires the + # Deferred with a success result. + nonlocal winner + if winner is None: + # This is the first success. Act on it. + winner = to_cancel[this_index] + + # Cancel the rest. + for d in to_cancel: + if d is not winner: + d.cancel() + + # Fire our Deferred + final_result.callback((this_index, this_output)) + + # Keep track of how many actions have failed. If they all fail we need to + # deliver failure notification on our externally visible result. + failure_state = [] + + def failed(failure: Failure, this_index: int) -> None: + failure_state.append((this_index, failure)) + if len(failure_state) == len(to_cancel): + # Every operation failed. + failure_state.sort() + failures = [f for (ignored, f) in failure_state] + final_result.errback(MultiFailure(failures)) + + # Copy the sequence of Deferreds so we know it doesn't get mutated out + # from under us. + to_cancel = list(ds) + for index, d in enumerate(ds): + # Propagate the position of this action as well as the argument to f + # to the success callback so we can cancel the right Deferreds and + # propagate the result outwards. + d.addCallbacks(succeeded, failed, callbackArgs=(index,), errbackArgs=(index,)) + + return final_result From 0093edcd938b6f18692094a2442bcf236f42da39 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 8 Mar 2023 14:36:37 -0500 Subject: [PATCH 15/34] Refactor to use race(). --- src/allmydata/storage_client.py | 41 +++++++++------------------------ 1 file changed, 11 insertions(+), 30 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 686ee8e59..169d34e32 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -82,7 +82,7 @@ from allmydata.util.observer import ObserverList from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.hashutil import permute_server_hash from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict -from allmydata.util.deferredutil import async_to_deferred +from allmydata.util.deferredutil import async_to_deferred, race from allmydata.storage.http_client import ( StorageClient, StorageClientImmutables, StorageClientGeneral, ClientException as HTTPClientException, StorageClientMutables, @@ -1017,42 +1017,23 @@ def _pick_a_http_server( Fires with ``None`` if no server was found, or with the ``DecodedURL`` of the first successfully-connected server. """ + queries = race([ + request(reactor, nurl).addCallback(lambda _, nurl=nurl: nurl) + for nurl in nurls + ]) - to_cancel : list[defer.Deferred] = [] - - def cancel(result: Optional[defer.Deferred]): - for d in to_cancel: - if not d.called: - d.cancel() - if result is not None: - result.errback(defer.CancelledError()) - - result : defer.Deferred[Optional[DecodedURL]] = defer.Deferred(canceller=cancel) - - def succeeded(nurl: DecodedURL, result=result): - # Only need the first successful NURL: - if result.called: - return - result.callback(nurl) - # No point in continuing other requests if we're connected: - cancel(None) - - def failed(failure, failures=[], result=result): + def failed(failure: Failure): # Logging errors breaks a bunch of tests, and it's not a _bug_ to # have a failed connection, it's often expected and transient. More # of a warning, really? log.msg("Failed to connect to NURL: {}".format(failure)) - failures.append(None) - if len(failures) == len(nurls): - # All our potential NURLs failed... - result.callback(None) + return None - for index, nurl in enumerate(nurls): - d = request(reactor, nurl) - to_cancel.append(d) - d.addCallback(lambda _, nurl=nurl: nurl).addCallbacks(succeeded, failed) + def succeeded(result: tuple[int, DecodedURL]): + _, nurl = result + return nurl - return result + return queries.addCallbacks(succeeded, failed) @implementer(IServer) From 4db65ea9369b80d92b4a6d4c0c8ae9c7a7b8916c Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 8 Mar 2023 14:53:43 -0500 Subject: [PATCH 16/34] Make tests test _pick_a_http_server more directly. --- src/allmydata/test/test_storage_client.py | 46 ++++++++++------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index a51e44a82..c919440d8 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -8,7 +8,7 @@ from json import ( loads, ) import hashlib -from typing import Union, Any +from typing import Union, Any, Optional from hyperlink import DecodedURL from fixtures import ( @@ -740,15 +740,15 @@ storage: class PickHTTPServerTests(unittest.SynchronousTestCase): """Tests for ``_pick_a_http_server``.""" - def loop_until_result(self, url_to_results: dict[DecodedURL, list[tuple[float, Union[Exception, Any]]]]) -> tuple[int, DecodedURL]: + def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Optional[DecodedURL]: """ - Given mapping of URLs to list of (delay, result), return the URL of the - first selected server. + Given mapping of URLs to (delay, result), return the URL of the + first selected server, or None. """ clock = Clock() def request(reactor, url): - delay, value = url_to_results[url].pop(0) + delay, value = url_to_results[url] result = Deferred() def add_result_value(): if isinstance(value, Exception): @@ -758,15 +758,10 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): reactor.callLater(delay, add_result_value) return result - iterations = 0 - while True: - iterations += 1 - d = _pick_a_http_server(clock, list(url_to_results.keys()), request) - for i in range(100): - clock.advance(0.1) - result = self.successResultOf(d) - if result is not None: - return iterations, result + d = _pick_a_http_server(clock, list(url_to_results.keys()), request) + for i in range(100): + clock.advance(0.1) + return self.successResultOf(d) def test_first_successful_connect_is_picked(self): """ @@ -774,25 +769,22 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): """ earliest_url = DecodedURL.from_text("http://a") latest_url = DecodedURL.from_text("http://b") - iterations, result = self.loop_until_result({ - latest_url: [(2, None)], - earliest_url: [(1, None)] + bad_url = DecodedURL.from_text("http://bad") + result = self.pick_result({ + latest_url: (2, None), + earliest_url: (1, None), + bad_url: (0.5, RuntimeError()), }) - self.assertEqual(iterations, 1) self.assertEqual(result, earliest_url) def test_failures_are_retried(self): """ - If the initial requests all fail, ``_pick_a_http_server`` keeps trying - until success. + If the requests all fail, ``_pick_a_http_server`` returns ``None``. """ eventually_good_url = DecodedURL.from_text("http://good") bad_url = DecodedURL.from_text("http://bad") - iterations, result = self.loop_until_result({ - eventually_good_url: [ - (1, ZeroDivisionError()), (0.1, ZeroDivisionError()), (1, None) - ], - bad_url: [(0.1, RuntimeError()), (0.1, RuntimeError()), (0.1, RuntimeError())] + result = self.pick_result({ + eventually_good_url: (1, ZeroDivisionError()), + bad_url: (0.1, RuntimeError()) }) - self.assertEqual(iterations, 3) - self.assertEqual(result, eventually_good_url) + self.assertEqual(result, None) From 3702ad62335451d8148a70bc7e36fa93d3d52fa5 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 8 Mar 2023 14:54:53 -0500 Subject: [PATCH 17/34] Fix indentation. --- src/allmydata/storage_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 169d34e32..4073d9b41 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1212,8 +1212,8 @@ class HTTPNativeStorageServer(service.MultiService): return else: self._istorage_server = _HTTPStorageServer.from_http_client( - StorageClient.from_nurl(nurl, reactor) - ) + StorageClient.from_nurl(nurl, reactor) + ) result = self._istorage_server.get_version() From b43150ba85c7a3b5d1cb5952f9a235f9493c04d7 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 8 Mar 2023 16:48:08 -0500 Subject: [PATCH 18/34] Add future import. --- src/allmydata/test/test_deferredutil.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/allmydata/test/test_deferredutil.py b/src/allmydata/test/test_deferredutil.py index 47121b4cb..34358d0c8 100644 --- a/src/allmydata/test/test_deferredutil.py +++ b/src/allmydata/test/test_deferredutil.py @@ -2,6 +2,8 @@ Tests for allmydata.util.deferredutil. """ +from __future__ import annotations + from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.internet.defer import Deferred From f8ea650b922ba5622debdf6a49121388e2a32e2d Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 14 Mar 2023 12:02:32 -0400 Subject: [PATCH 19/34] Wait for current loop iteration to finish before moving on to next iteration. --- src/allmydata/storage_client.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 5dd906005..faa48710f 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1239,9 +1239,13 @@ class HTTPNativeStorageServer(service.MultiService): self._failed_to_connect ) - # TODO Make sure LoopingCall waits for the above timeout for looping again: - # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3981 - #return self._connecting_deferred or maye await it + # We don't want to do another iteration of the loop until this + # iteration has finished, so wait here: + try: + if self._connecting_deferred is not None: + await self._connecting_deferred + except Exception as e: + log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS) def stopService(self): if self._connecting_deferred is not None: From dd07a39399709b59d7c1c1e2e0cb5d3ac4d6ff65 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 14 Mar 2023 13:01:10 -0400 Subject: [PATCH 20/34] Don't bother with persistent connections when testing NURLs. --- src/allmydata/storage/http_client.py | 10 ++++++---- src/allmydata/storage_client.py | 6 +++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index 90bda7fc0..3edf5f835 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -311,18 +311,20 @@ class StorageClient(object): @classmethod def from_nurl( - cls, - nurl: DecodedURL, - reactor, + cls, nurl: DecodedURL, reactor, persistent=True, retryAutomatically=True ) -> StorageClient: """ Create a ``StorageClient`` for the given NURL. + + ``persistent`` and ``retryAutomatically`` arguments are passed to the + new HTTPConnectionPool. """ assert nurl.fragment == "v=1" assert nurl.scheme == "pb" swissnum = nurl.path[0].encode("ascii") certificate_hash = nurl.user.encode("ascii") - pool = HTTPConnectionPool(reactor) + pool = HTTPConnectionPool(reactor, persistent=persistent) + pool.retryAutomatically = retryAutomatically pool.maxPersistentPerHost = 20 if cls.TEST_MODE_REGISTER_HTTP_POOL is not None: diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index faa48710f..2888b10e7 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1203,8 +1203,12 @@ class HTTPNativeStorageServer(service.MultiService): # the HTTP server to talk to, we don't have connection status # updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978 def request(reactor, nurl: DecodedURL): + # Since we're just using this one off to check if the NURL + # works, no need for persistent pool or other fanciness. return StorageClientGeneral( - StorageClient.from_nurl(nurl, reactor) + StorageClient.from_nurl( + nurl, reactor, persistent=False, retryAutomatically=False + ) ).get_version() # LoopingCall.stop() doesn't cancel Deferreds, unfortunately: From 24212b412cba52481695b85e057da1ac04a96d67 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 14 Mar 2023 13:01:45 -0400 Subject: [PATCH 21/34] Fix 3.11 runs. --- tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/tox.ini b/tox.ini index 3e2dacbb2..538cded80 100644 --- a/tox.ini +++ b/tox.ini @@ -10,6 +10,7 @@ python = 3.8: py38-coverage 3.9: py39-coverage 3.10: py310-coverage + 3.11: py311-coverage pypy-3.8: pypy38 pypy-3.9: pypy39 From 52f43cefea6b447cda47cd5e75df4937190f8bd6 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 15 Mar 2023 15:44:45 -0400 Subject: [PATCH 22/34] Add 3.11. --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 538cded80..382ba973e 100644 --- a/tox.ini +++ b/tox.ini @@ -18,7 +18,7 @@ python = twisted = 1 [tox] -envlist = typechecks,codechecks,py{38,39,310}-{coverage},pypy27,pypy38,pypy39,integration +envlist = typechecks,codechecks,py{38,39,310,311}-{coverage},pypy27,pypy38,pypy39,integration minversion = 2.4 [testenv] From a24e6bd7f94311b185072132b3372fd4a0afb723 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 16 Mar 2023 16:31:28 -0400 Subject: [PATCH 23/34] Try to rewrite test_get_put.py::test_large_file into system-style test. --- src/allmydata/test/test_system.py | 32 +++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index d11a6e866..b7873f14f 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -15,11 +15,15 @@ from past.builtins import chr as byteschr, long from six import ensure_text import os, re, sys, time, json +from subprocess import check_call +from pathlib import Path +from tempfile import mkdtemp from bs4 import BeautifulSoup from twisted.trial import unittest from twisted.internet import defer +from twisted.internet.threads import deferToThread from allmydata import uri from allmydata.storage.mutable import MutableShareFile @@ -1830,6 +1834,34 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): d.addCallback(_got_lit_filenode) return d + async def test_immutable_upload_download(self): + """ + A reproducer for issue 3988: upload a large file and then download it. + """ + DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11" * 1_000_000 + await self.set_up_nodes() + + async def run(*args): + await deferToThread(check_call, ["tahoe", "--node-directory", self.getdir("client0")] + list(args)) + + for c in self.clients: + c.encoding_params['k'] = 2 + c.encoding_params['happy'] = 3 + c.encoding_params['n'] = 4 + + await run("create-alias", "getput") + + tmp_path = Path(mkdtemp()) + tempfile = tmp_path / "input" + + with tempfile.open("wb") as f: + f.write(DATA) + await run("put", str(tempfile), "getput:largefile") + + outfile = tmp_path / "out" + await run("get", "getput:largefile", str(outfile)) + self.assertEqual(outfile.read_bytes(), tempfile.read_bytes()) + class Connections(SystemTestMixin, unittest.TestCase): FORCE_FOOLSCAP_FOR_STORAGE = True From 61d9d82c55644b3a786b3b05725438a3cff02a18 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 20 Mar 2023 15:02:35 -0400 Subject: [PATCH 24/34] Make await_client_ready() non-blocking. --- integration/conftest.py | 6 +++--- integration/test_get_put.py | 4 ---- integration/test_servers_of_happiness.py | 2 +- integration/test_tor.py | 4 ++-- integration/util.py | 11 +++++------ newsfragments/3988.minor | 0 6 files changed, 11 insertions(+), 16 deletions(-) create mode 100644 newsfragments/3988.minor diff --git a/integration/conftest.py b/integration/conftest.py index dc0107eea..33e7998c1 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -393,7 +393,7 @@ def alice( finalize=False, ) ) - await_client_ready(process) + pytest_twisted.blockon(await_client_ready(process)) # 1. Create a new RW directory cap: cli(process, "create-alias", "test") @@ -424,7 +424,7 @@ alice-key ssh-rsa {ssh_public_key} {rwcap} # 4. Restart the node with new SFTP config. pytest_twisted.blockon(process.restart_async(reactor, request)) - await_client_ready(process) + pytest_twisted.blockon(await_client_ready(process)) print(f"Alice pid: {process.transport.pid}") return process @@ -439,7 +439,7 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques storage=False, ) ) - await_client_ready(process) + pytest_twisted.blockon(await_client_ready(process)) return process diff --git a/integration/test_get_put.py b/integration/test_get_put.py index 1b6c30072..927ec622b 100644 --- a/integration/test_get_put.py +++ b/integration/test_get_put.py @@ -85,10 +85,6 @@ def test_large_file(alice, get_put_alias, tmp_path): assert outfile.read_bytes() == tempfile.read_bytes() -@pytest.mark.skipif( - sys.platform.startswith("win"), - reason="reconfigure() has issues on Windows" -) @ensureDeferred async def test_upload_download_immutable_different_default_max_segment_size(alice, get_put_alias, tmpdir, request): """ diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index b9de0c075..c63642066 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -31,7 +31,7 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto happy=7, total=10, ) - util.await_client_ready(edna) + yield util.await_client_ready(edna) node_dir = join(temp_dir, 'edna') diff --git a/integration/test_tor.py b/integration/test_tor.py index c78fa8098..901858347 100644 --- a/integration/test_tor.py +++ b/integration/test_tor.py @@ -42,8 +42,8 @@ if PY2: def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl): carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl) dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl) - util.await_client_ready(carol, minimum_number_of_servers=2) - util.await_client_ready(dave, minimum_number_of_servers=2) + yield util.await_client_ready(carol, minimum_number_of_servers=2) + yield util.await_client_ready(dave, minimum_number_of_servers=2) # ensure both nodes are connected to "a grid" by uploading # something via carol, and retrieve it using dave. diff --git a/integration/util.py b/integration/util.py index 04c925abf..c2befe47b 100644 --- a/integration/util.py +++ b/integration/util.py @@ -570,6 +570,10 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve We will try for up to `timeout` seconds for the above conditions to be true. Otherwise, an exception is raised """ + return deferToThread(_await_client_ready_blocking, tahoe, timeout, liveness, minimum_number_of_servers) + + +def _await_client_ready_blocking(tahoe, timeout, liveness, minimum_number_of_servers): start = time.time() while (time.time() - start) < float(timeout): try: @@ -792,16 +796,11 @@ async def reconfigure(reactor, request, node: TahoeProcess, ) if changed: - # TODO reconfigure() seems to have issues on Windows. If you need to - # use it there, delete this assert and try to figure out what's going - # on... - assert not sys.platform.startswith("win") - # restart the node print(f"Restarting {node.node_dir} for ZFEC reconfiguration") await node.restart_async(reactor, request) print("Restarted. Waiting for ready state.") - await_client_ready(node) + await await_client_ready(node) print("Ready.") else: print("Config unchanged, not restarting.") diff --git a/newsfragments/3988.minor b/newsfragments/3988.minor new file mode 100644 index 000000000..e69de29bb From aba60d271956d5100375c14b507a720582459860 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 20 Mar 2023 15:08:22 -0400 Subject: [PATCH 25/34] Run blocking tests in a thread. --- integration/test_get_put.py | 2 ++ integration/test_web.py | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/integration/test_get_put.py b/integration/test_get_put.py index 927ec622b..6b87c9b62 100644 --- a/integration/test_get_put.py +++ b/integration/test_get_put.py @@ -50,6 +50,7 @@ def test_put_from_stdin(alice, get_put_alias, tmpdir): assert read_bytes(tempfile) == DATA +@run_in_thread def test_get_to_stdout(alice, get_put_alias, tmpdir): """ It's possible to upload a file, and then download it to stdout. @@ -67,6 +68,7 @@ def test_get_to_stdout(alice, get_put_alias, tmpdir): assert p.wait() == 0 +@run_in_thread def test_large_file(alice, get_put_alias, tmp_path): """ It's possible to upload and download a larger file. diff --git a/integration/test_web.py b/integration/test_web.py index 95a09a5f5..b3c4a8e5f 100644 --- a/integration/test_web.py +++ b/integration/test_web.py @@ -18,6 +18,7 @@ import allmydata.uri from allmydata.util import jsonbytes as json from . import util +from .util import run_in_thread import requests import html5lib @@ -25,6 +26,7 @@ from bs4 import BeautifulSoup from pytest_twisted import ensureDeferred +@run_in_thread def test_index(alice): """ we can download the index file @@ -32,6 +34,7 @@ def test_index(alice): util.web_get(alice, u"") +@run_in_thread def test_index_json(alice): """ we can download the index file as json @@ -41,6 +44,7 @@ def test_index_json(alice): json.loads(data) +@run_in_thread def test_upload_download(alice): """ upload a file, then download it via readcap @@ -70,6 +74,7 @@ def test_upload_download(alice): assert str(data, "utf-8") == FILE_CONTENTS +@run_in_thread def test_put(alice): """ use PUT to create a file @@ -89,6 +94,7 @@ def test_put(alice): assert cap.needed_shares == int(cfg.get_config("client", "shares.needed")) +@run_in_thread def test_helper_status(storage_nodes): """ successfully GET the /helper_status page @@ -101,6 +107,7 @@ def test_helper_status(storage_nodes): assert str(dom.h1.string) == u"Helper Status" +@run_in_thread def test_deep_stats(alice): """ create a directory, do deep-stats on it and prove the /operations/ @@ -417,6 +424,7 @@ async def test_directory_deep_check(reactor, request, alice): assert dom is not None, "Operation never completed" +@run_in_thread def test_storage_info(storage_nodes): """ retrieve and confirm /storage URI for one storage node @@ -428,6 +436,7 @@ def test_storage_info(storage_nodes): ) +@run_in_thread def test_storage_info_json(storage_nodes): """ retrieve and confirm /storage?t=json URI for one storage node @@ -442,6 +451,7 @@ def test_storage_info_json(storage_nodes): assert data[u"stats"][u"storage_server.reserved_space"] == 1000000000 +@run_in_thread def test_introducer_info(introducer): """ retrieve and confirm /introducer URI for the introducer @@ -460,6 +470,7 @@ def test_introducer_info(introducer): assert "subscription_summary" in data +@run_in_thread def test_mkdir_with_children(alice): """ create a directory using ?t=mkdir-with-children From ded5b20924537bbda2471581606cc9adbe7f87ce Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 20 Mar 2023 15:20:39 -0400 Subject: [PATCH 26/34] Lint fix. --- integration/test_get_put.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integration/test_get_put.py b/integration/test_get_put.py index 6b87c9b62..f121d6284 100644 --- a/integration/test_get_put.py +++ b/integration/test_get_put.py @@ -4,7 +4,6 @@ and stdout. """ from subprocess import Popen, PIPE, check_output, check_call -import sys import pytest from pytest_twisted import ensureDeferred From cce5d3adff757d67fdde04da8cd10a86e5a61176 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 20 Mar 2023 15:24:10 -0400 Subject: [PATCH 27/34] Don't actually need this. --- src/allmydata/test/test_system.py | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index b7873f14f..c997ac734 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -1834,34 +1834,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): d.addCallback(_got_lit_filenode) return d - async def test_immutable_upload_download(self): - """ - A reproducer for issue 3988: upload a large file and then download it. - """ - DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11" * 1_000_000 - await self.set_up_nodes() - - async def run(*args): - await deferToThread(check_call, ["tahoe", "--node-directory", self.getdir("client0")] + list(args)) - - for c in self.clients: - c.encoding_params['k'] = 2 - c.encoding_params['happy'] = 3 - c.encoding_params['n'] = 4 - - await run("create-alias", "getput") - - tmp_path = Path(mkdtemp()) - tempfile = tmp_path / "input" - - with tempfile.open("wb") as f: - f.write(DATA) - await run("put", str(tempfile), "getput:largefile") - - outfile = tmp_path / "out" - await run("get", "getput:largefile", str(outfile)) - self.assertEqual(outfile.read_bytes(), tempfile.read_bytes()) - class Connections(SystemTestMixin, unittest.TestCase): FORCE_FOOLSCAP_FOR_STORAGE = True From 815066c4de7db05c5df86ca1b9ab7dbeb08cfab2 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 20 Mar 2023 15:25:52 -0400 Subject: [PATCH 28/34] Just use the utility. --- integration/util.py | 54 ++++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/integration/util.py b/integration/util.py index c2befe47b..05fef8fed 100644 --- a/integration/util.py +++ b/integration/util.py @@ -430,6 +430,31 @@ class FileShouldVanishException(Exception): ) +def run_in_thread(f): + """Decorator for integration tests that runs code in a thread. + + Because we're using pytest_twisted, tests that rely on the reactor are + expected to return a Deferred and use async APIs so the reactor can run. + + In the case of the integration test suite, it launches nodes in the + background using Twisted APIs. The nodes stdout and stderr is read via + Twisted code. If the reactor doesn't run, reads don't happen, and + eventually the buffers fill up, and the nodes block when they try to flush + logs. + + We can switch to Twisted APIs (treq instead of requests etc.), but + sometimes it's easier or expedient to just have a blocking test. So this + decorator allows you to run the test in a thread, and the reactor can keep + running in the main thread. + + See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3597 for tracking bug. + """ + @wraps(f) + def test(*args, **kwargs): + return deferToThread(lambda: f(*args, **kwargs)) + return test + + def await_file_contents(path, contents, timeout=15, error_if=None): """ wait up to `timeout` seconds for the file at `path` (any path-like @@ -555,6 +580,7 @@ def web_post(tahoe, uri_fragment, **kwargs): return resp.content +@run_in_thread def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_servers=1): """ Uses the status API to wait for a client-type node (in `tahoe`, a @@ -570,10 +596,6 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve We will try for up to `timeout` seconds for the above conditions to be true. Otherwise, an exception is raised """ - return deferToThread(_await_client_ready_blocking, tahoe, timeout, liveness, minimum_number_of_servers) - - -def _await_client_ready_blocking(tahoe, timeout, liveness, minimum_number_of_servers): start = time.time() while (time.time() - start) < float(timeout): try: @@ -626,30 +648,6 @@ def generate_ssh_key(path): f.write(s.encode("ascii")) -def run_in_thread(f): - """Decorator for integration tests that runs code in a thread. - - Because we're using pytest_twisted, tests that rely on the reactor are - expected to return a Deferred and use async APIs so the reactor can run. - - In the case of the integration test suite, it launches nodes in the - background using Twisted APIs. The nodes stdout and stderr is read via - Twisted code. If the reactor doesn't run, reads don't happen, and - eventually the buffers fill up, and the nodes block when they try to flush - logs. - - We can switch to Twisted APIs (treq instead of requests etc.), but - sometimes it's easier or expedient to just have a blocking test. So this - decorator allows you to run the test in a thread, and the reactor can keep - running in the main thread. - - See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3597 for tracking bug. - """ - @wraps(f) - def test(*args, **kwargs): - return deferToThread(lambda: f(*args, **kwargs)) - return test - @frozen class CHK: """ From 23b977a4b1626683116c71e08ed1a4f33381d5f0 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 20 Mar 2023 15:27:16 -0400 Subject: [PATCH 29/34] Undo unnecessary imports. --- src/allmydata/test/test_system.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index c997ac734..d11a6e866 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -15,15 +15,11 @@ from past.builtins import chr as byteschr, long from six import ensure_text import os, re, sys, time, json -from subprocess import check_call -from pathlib import Path -from tempfile import mkdtemp from bs4 import BeautifulSoup from twisted.trial import unittest from twisted.internet import defer -from twisted.internet.threads import deferToThread from allmydata import uri from allmydata.storage.mutable import MutableShareFile From e98967731952208896e78cf8e1b697b367c6f8a0 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 21 Mar 2023 11:20:25 -0400 Subject: [PATCH 30/34] Pass in a pool instead of pool options. --- src/allmydata/storage/http_client.py | 11 ++++------- src/allmydata/storage_client.py | 7 ++++--- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index 3edf5f835..1d798fecc 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -311,21 +311,18 @@ class StorageClient(object): @classmethod def from_nurl( - cls, nurl: DecodedURL, reactor, persistent=True, retryAutomatically=True + cls, nurl: DecodedURL, reactor, pool: Optional[HTTPConnectionPool] = None ) -> StorageClient: """ Create a ``StorageClient`` for the given NURL. - - ``persistent`` and ``retryAutomatically`` arguments are passed to the - new HTTPConnectionPool. """ assert nurl.fragment == "v=1" assert nurl.scheme == "pb" swissnum = nurl.path[0].encode("ascii") certificate_hash = nurl.user.encode("ascii") - pool = HTTPConnectionPool(reactor, persistent=persistent) - pool.retryAutomatically = retryAutomatically - pool.maxPersistentPerHost = 20 + if pool is None: + pool = HTTPConnectionPool(reactor) + pool.maxPersistentPerHost = 20 if cls.TEST_MODE_REGISTER_HTTP_POOL is not None: cls.TEST_MODE_REGISTER_HTTP_POOL(pool) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 2888b10e7..19d6ef4a7 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -43,6 +43,7 @@ from configparser import NoSectionError import attr from hyperlink import DecodedURL +from twisted.web.client import HTTPConnectionPool from zope.interface import ( Attribute, Interface, @@ -1205,10 +1206,10 @@ class HTTPNativeStorageServer(service.MultiService): def request(reactor, nurl: DecodedURL): # Since we're just using this one off to check if the NURL # works, no need for persistent pool or other fanciness. + pool = HTTPConnectionPool(reactor, persistent=False) + pool.retryAutomatically = False return StorageClientGeneral( - StorageClient.from_nurl( - nurl, reactor, persistent=False, retryAutomatically=False - ) + StorageClient.from_nurl(nurl, reactor, pool) ).get_version() # LoopingCall.stop() doesn't cancel Deferreds, unfortunately: From b65bc9dca70048d6e1c41abbf47d93a4f4b30d3e Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 21 Mar 2023 11:22:43 -0400 Subject: [PATCH 31/34] Better explanation. --- src/allmydata/storage_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 19d6ef4a7..a52bb3f75 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1038,7 +1038,10 @@ def _pick_a_http_server( # Logging errors breaks a bunch of tests, and it's not a _bug_ to # have a failed connection, it's often expected and transient. More # of a warning, really? - log.msg("Failed to connect to NURL: {}".format(failure)) + log.msg( + "Failed to connect to a storage server advertised by NURL: {}".format( + failure) + ) return None def succeeded(result: tuple[int, DecodedURL]): From 7ae8b50d14465a6b4fa3d7f5b35bfdbf5056f28a Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 21 Mar 2023 11:26:40 -0400 Subject: [PATCH 32/34] Async! --- src/allmydata/storage_client.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index a52bb3f75..dffa78bc4 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1019,11 +1019,12 @@ class NativeStorageServer(service.MultiService): self._reconnector.reset() -def _pick_a_http_server( +@async_to_deferred +async def _pick_a_http_server( reactor, nurls: list[DecodedURL], request: Callable[[Any, DecodedURL], defer.Deferred[Any]] -) -> defer.Deferred[Optional[DecodedURL]]: +) -> Optional[DecodedURL]: """Pick the first server we successfully send a request to. Fires with ``None`` if no server was found, or with the ``DecodedURL`` of @@ -1034,22 +1035,19 @@ def _pick_a_http_server( for nurl in nurls ]) - def failed(failure: Failure): + try: + _, nurl = await queries + return nurl + except Exception as e: # Logging errors breaks a bunch of tests, and it's not a _bug_ to # have a failed connection, it's often expected and transient. More # of a warning, really? log.msg( "Failed to connect to a storage server advertised by NURL: {}".format( - failure) + e) ) return None - def succeeded(result: tuple[int, DecodedURL]): - _, nurl = result - return nurl - - return queries.addCallbacks(succeeded, failed) - @implementer(IServer) class HTTPNativeStorageServer(service.MultiService): From 14aeaea02223970c1cbc2afbb7499a22231fbb62 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 21 Mar 2023 11:29:19 -0400 Subject: [PATCH 33/34] Another todo. --- src/allmydata/storage_client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index dffa78bc4..c88613803 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1201,7 +1201,11 @@ class HTTPNativeStorageServer(service.MultiService): if self._istorage_server is None: # We haven't selected a server yet, so let's do so. - # TODO The problem with this scheme is that while picking + # TODO This is somewhat inefficient on startup: it takes two successful + # version() calls before we are live talking to a server, it could only + # be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992 + + # TODO Another problem with this scheme is that while picking # the HTTP server to talk to, we don't have connection status # updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978 def request(reactor, nurl: DecodedURL): From 264269f409bb7e567c4632335ed1d8a3a20faef4 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 21 Mar 2023 11:29:50 -0400 Subject: [PATCH 34/34] Better test name. --- src/allmydata/test/test_storage_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index 8273966ce..91668e7ca 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -819,7 +819,7 @@ class PickHTTPServerTests(unittest.SynchronousTestCase): }) self.assertEqual(result, earliest_url) - def test_failures_are_retried(self): + def test_failures_are_turned_into_none(self): """ If the requests all fail, ``_pick_a_http_server`` returns ``None``. """