mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-31 08:25:35 +00:00
Split up the state management logic from the server pinging logic.
This commit is contained in:
parent
7838f25bf8
commit
bd7c61cc5c
@ -1186,8 +1186,46 @@ class HTTPNativeStorageServer(service.MultiService):
|
|||||||
def try_to_connect(self):
|
def try_to_connect(self):
|
||||||
self._connect()
|
self._connect()
|
||||||
|
|
||||||
|
def _connect(self) -> defer.Deferred[object]:
|
||||||
|
"""
|
||||||
|
Try to connect to a working storage server.
|
||||||
|
|
||||||
|
If called while a previous ``_connect()`` is already running, it will
|
||||||
|
just return the same ``Deferred``.
|
||||||
|
|
||||||
|
``LoopingCall.stop()`` doesn't cancel ``Deferred``s, unfortunately:
|
||||||
|
https://github.com/twisted/twisted/issues/11814. Thus we want to store
|
||||||
|
the ``Deferred`` so we can cancel it when necessary.
|
||||||
|
|
||||||
|
We also want to return it so that loop iterations take it into account,
|
||||||
|
and a new iteration doesn't start while we're in the middle of the
|
||||||
|
previous one.
|
||||||
|
"""
|
||||||
|
# Conceivably try_to_connect() was called on this before, in which case
|
||||||
|
# we already are in the middle of connecting. So in that case just
|
||||||
|
# return whatever is in progress:
|
||||||
|
if self._connecting_deferred is not None:
|
||||||
|
return self._connecting_deferred
|
||||||
|
|
||||||
|
def done(_):
|
||||||
|
self._connecting_deferred = None
|
||||||
|
|
||||||
|
connecting = self._pick_server_and_get_version()
|
||||||
|
# Set a short timeout since we're relying on this for server liveness.
|
||||||
|
connecting = connecting.addTimeout(5, self._reactor).addCallbacks(
|
||||||
|
self._got_version, self._failed_to_connect
|
||||||
|
).addBoth(done)
|
||||||
|
self._connecting_deferred = connecting
|
||||||
|
return connecting
|
||||||
|
|
||||||
@async_to_deferred
|
@async_to_deferred
|
||||||
async def _connect(self):
|
async def _pick_server_and_get_version(self):
|
||||||
|
"""
|
||||||
|
Minimal implementation of connection logic: pick a server, get its
|
||||||
|
version. This doesn't deal with errors much, so as to minimize
|
||||||
|
statefulness. It does change ``self._istorage_server``, so possibly
|
||||||
|
more refactoring would be useful to remove even that much statefulness.
|
||||||
|
"""
|
||||||
async def get_istorage_server() -> _HTTPStorageServer:
|
async def get_istorage_server() -> _HTTPStorageServer:
|
||||||
if self._istorage_server is not None:
|
if self._istorage_server is not None:
|
||||||
return self._istorage_server
|
return self._istorage_server
|
||||||
@ -1207,15 +1245,7 @@ class HTTPNativeStorageServer(service.MultiService):
|
|||||||
StorageClient.from_nurl(nurl, reactor, pool)
|
StorageClient.from_nurl(nurl, reactor, pool)
|
||||||
).get_version()
|
).get_version()
|
||||||
|
|
||||||
# LoopingCall.stop() doesn't cancel Deferreds, unfortunately:
|
nurl = await _pick_a_http_server(reactor, self._nurls, request)
|
||||||
# https://github.com/twisted/twisted/issues/11814 Thus we want
|
|
||||||
# store the Deferred so it gets cancelled.
|
|
||||||
picking = _pick_a_http_server(reactor, self._nurls, request)
|
|
||||||
self._connecting_deferred = picking
|
|
||||||
try:
|
|
||||||
nurl = await picking
|
|
||||||
finally:
|
|
||||||
self._connecting_deferred = None
|
|
||||||
|
|
||||||
# If we've gotten this far, we've found a working NURL.
|
# If we've gotten this far, we've found a working NURL.
|
||||||
self._istorage_server = _HTTPStorageServer.from_http_client(
|
self._istorage_server = _HTTPStorageServer.from_http_client(
|
||||||
@ -1226,19 +1256,12 @@ class HTTPNativeStorageServer(service.MultiService):
|
|||||||
try:
|
try:
|
||||||
storage_server = await get_istorage_server()
|
storage_server = await get_istorage_server()
|
||||||
|
|
||||||
# Get the version from the remote server. Set a short timeout since
|
# Get the version from the remote server.
|
||||||
# we're relying on this for server liveness.
|
version = await storage_server.get_version()
|
||||||
self._connecting_deferred = storage_server.get_version().addTimeout(
|
return version
|
||||||
5, self._reactor)
|
|
||||||
# We don't want to do another iteration of the loop until this
|
|
||||||
# iteration has finished, so wait here:
|
|
||||||
version = await self._connecting_deferred
|
|
||||||
self._got_version(version)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS)
|
log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS)
|
||||||
self._failed_to_connect(Failure(e))
|
raise
|
||||||
finally:
|
|
||||||
self._connecting_deferred = None
|
|
||||||
|
|
||||||
def stopService(self):
|
def stopService(self):
|
||||||
if self._connecting_deferred is not None:
|
if self._connecting_deferred is not None:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user