Merge pull request #1276 from tahoe-lafs/3978-connection-status-http-storage

Make connection status for http storage get updated in more cases

Fixes ticket:3978
This commit is contained in:
Itamar Turner-Trauring 2023-03-28 12:14:26 -04:00 committed by GitHub
commit 2fd1123f77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 65 deletions

0
newsfragments/3978.minor Normal file
View File

View File

@ -1024,7 +1024,7 @@ async def _pick_a_http_server(
reactor,
nurls: list[DecodedURL],
request: Callable[[Any, DecodedURL], defer.Deferred[Any]]
) -> Optional[DecodedURL]:
) -> DecodedURL:
"""Pick the first server we successfully send a request to.
Fires with ``None`` if no server was found, or with the ``DecodedURL`` of
@ -1035,18 +1035,8 @@ async def _pick_a_http_server(
for nurl in nurls
])
try:
_, nurl = await queries
return nurl
except Exception as e:
# Logging errors breaks a bunch of tests, and it's not a _bug_ to
# have a failed connection, it's often expected and transient. More
# of a warning, really?
log.msg(
"Failed to connect to a storage server advertised by NURL: {}".format(
e)
)
return None
_, nurl = await queries
return nurl
@implementer(IServer)
@ -1079,7 +1069,7 @@ class HTTPNativeStorageServer(service.MultiService):
DecodedURL.from_text(u)
for u in announcement[ANONYMOUS_STORAGE_NURLS]
]
self._istorage_server = None
self._istorage_server : Optional[_HTTPStorageServer] = None
self._connection_status = connection_status.ConnectionStatus.unstarted()
self._version = None
@ -1196,18 +1186,56 @@ class HTTPNativeStorageServer(service.MultiService):
def try_to_connect(self):
self._connect()
def _connect(self) -> defer.Deferred[object]:
"""
Try to connect to a working storage server.
If called while a previous ``_connect()`` is already running, it will
just return the same ``Deferred``.
``LoopingCall.stop()`` doesn't cancel ``Deferred``s, unfortunately:
https://github.com/twisted/twisted/issues/11814. Thus we want to store
the ``Deferred`` so we can cancel it when necessary.
We also want to return it so that loop iterations take it into account,
and a new iteration doesn't start while we're in the middle of the
previous one.
"""
# Conceivably try_to_connect() was called on this before, in which case
# we already are in the middle of connecting. So in that case just
# return whatever is in progress:
if self._connecting_deferred is not None:
return self._connecting_deferred
def done(_):
self._connecting_deferred = None
connecting = self._pick_server_and_get_version()
# Set a short timeout since we're relying on this for server liveness.
connecting = connecting.addTimeout(5, self._reactor).addCallbacks(
self._got_version, self._failed_to_connect
).addBoth(done)
self._connecting_deferred = connecting
return connecting
@async_to_deferred
async def _connect(self):
if self._istorage_server is None:
async def _pick_server_and_get_version(self):
"""
Minimal implementation of connection logic: pick a server, get its
version. This doesn't deal with errors much, so as to minimize
statefulness. It does change ``self._istorage_server``, so possibly
more refactoring would be useful to remove even that much statefulness.
"""
async def get_istorage_server() -> _HTTPStorageServer:
if self._istorage_server is not None:
return self._istorage_server
# We haven't selected a server yet, so let's do so.
# TODO This is somewhat inefficient on startup: it takes two successful
# version() calls before we are live talking to a server, it could only
# be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992
# TODO Another problem with this scheme is that while picking
# the HTTP server to talk to, we don't have connection status
# updates... https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3978
def request(reactor, nurl: DecodedURL):
# Since we're just using this one off to check if the NURL
# works, no need for persistent pool or other fanciness.
@ -1217,45 +1245,23 @@ class HTTPNativeStorageServer(service.MultiService):
StorageClient.from_nurl(nurl, reactor, pool)
).get_version()
# LoopingCall.stop() doesn't cancel Deferreds, unfortunately:
# https://github.com/twisted/twisted/issues/11814 Thus we want
# store the Deferred so it gets cancelled.
picking = _pick_a_http_server(reactor, self._nurls, request)
self._connecting_deferred = picking
try:
nurl = await picking
finally:
self._connecting_deferred = None
nurl = await _pick_a_http_server(reactor, self._nurls, request)
if nurl is None:
# We failed to find a server to connect to. Perhaps the next
# iteration of the loop will succeed.
return
else:
self._istorage_server = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor)
)
# If we've gotten this far, we've found a working NURL.
self._istorage_server = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor)
)
return self._istorage_server
result = self._istorage_server.get_version()
def remove_connecting_deferred(result):
self._connecting_deferred = None
return result
# Set a short timeout since we're relying on this for server liveness.
self._connecting_deferred = result.addTimeout(5, self._reactor).addBoth(
remove_connecting_deferred).addCallbacks(
self._got_version,
self._failed_to_connect
)
# We don't want to do another iteration of the loop until this
# iteration has finished, so wait here:
try:
if self._connecting_deferred is not None:
await self._connecting_deferred
storage_server = await get_istorage_server()
# Get the version from the remote server.
version = await storage_server.get_version()
return version
except Exception as e:
log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS)
raise
def stopService(self):
if self._connecting_deferred is not None:
@ -1472,7 +1478,7 @@ class _HTTPStorageServer(object):
_http_client = attr.ib(type=StorageClient)
@staticmethod
def from_http_client(http_client): # type: (StorageClient) -> _HTTPStorageServer
def from_http_client(http_client: StorageClient) -> _HTTPStorageServer:
"""
Create an ``IStorageServer`` from a HTTP ``StorageClient``.
"""

View File

@ -8,7 +8,7 @@ from json import (
loads,
)
import hashlib
from typing import Union, Any, Optional
from typing import Union, Any
from hyperlink import DecodedURL
from fixtures import (
@ -63,6 +63,8 @@ from foolscap.ipb import (
IConnectionHintHandler,
)
from allmydata.util.deferredutil import MultiFailure
from .no_network import LocalWrapper
from .common import (
EMPTY_CLIENT_CONFIG,
@ -782,7 +784,7 @@ storage:
class PickHTTPServerTests(unittest.SynchronousTestCase):
"""Tests for ``_pick_a_http_server``."""
def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Optional[DecodedURL]:
def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Deferred[DecodedURL]:
"""
Given mapping of URLs to (delay, result), return the URL of the
first selected server, or None.
@ -803,7 +805,7 @@ class PickHTTPServerTests(unittest.SynchronousTestCase):
d = _pick_a_http_server(clock, list(url_to_results.keys()), request)
for i in range(100):
clock.advance(0.1)
return self.successResultOf(d)
return d
def test_first_successful_connect_is_picked(self):
"""
@ -817,16 +819,21 @@ class PickHTTPServerTests(unittest.SynchronousTestCase):
earliest_url: (1, None),
bad_url: (0.5, RuntimeError()),
})
self.assertEqual(result, earliest_url)
self.assertEqual(self.successResultOf(result), earliest_url)
def test_failures_are_turned_into_none(self):
def test_failures_include_all_reasons(self):
"""
If the requests all fail, ``_pick_a_http_server`` returns ``None``.
If all the requests fail, ``_pick_a_http_server`` raises a
``allmydata.util.deferredutil.MultiFailure``.
"""
eventually_good_url = DecodedURL.from_text("http://good")
bad_url = DecodedURL.from_text("http://bad")
exception1 = RuntimeError()
exception2 = ZeroDivisionError()
result = self.pick_result({
eventually_good_url: (1, ZeroDivisionError()),
bad_url: (0.1, RuntimeError())
eventually_good_url: (1, exception1),
bad_url: (0.1, exception2),
})
self.assertEqual(result, None)
exc = self.failureResultOf(result).value
self.assertIsInstance(exc, MultiFailure)
self.assertEqual({f.value for f in exc.failures}, {exception2, exception1})

View File

@ -13,7 +13,9 @@ from typing import (
Sequence,
TypeVar,
Optional,
Coroutine,
)
from typing_extensions import ParamSpec
from foolscap.api import eventually
from eliot.twisted import (
@ -225,7 +227,11 @@ def until(
break
def async_to_deferred(f):
P = ParamSpec("P")
R = TypeVar("R")
def async_to_deferred(f: Callable[P, Coroutine[defer.Deferred[R], None, R]]) -> Callable[P, Deferred[R]]:
"""
Wrap an async function to return a Deferred instead.
@ -233,7 +239,7 @@ def async_to_deferred(f):
"""
@wraps(f)
def not_async(*args, **kwargs):
def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]:
return defer.Deferred.fromCoroutine(f(*args, **kwargs))
return not_async