mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-21 18:06:46 +00:00
Merge branch '3783-storage-client-http' into 3940-http-timeouts
This commit is contained in:
commit
30a9877236
@ -104,6 +104,7 @@ _client_config = configutil.ValidConfiguration(
|
|||||||
"reserved_space",
|
"reserved_space",
|
||||||
"storage_dir",
|
"storage_dir",
|
||||||
"plugins",
|
"plugins",
|
||||||
|
"force_foolscap",
|
||||||
),
|
),
|
||||||
"sftpd": (
|
"sftpd": (
|
||||||
"accounts.file",
|
"accounts.file",
|
||||||
@ -826,7 +827,7 @@ class _Client(node.Node, pollmixin.PollMixin):
|
|||||||
if hasattr(self.tub.negotiationClass, "add_storage_server"):
|
if hasattr(self.tub.negotiationClass, "add_storage_server"):
|
||||||
nurls = self.tub.negotiationClass.add_storage_server(ss, swissnum.encode("ascii"))
|
nurls = self.tub.negotiationClass.add_storage_server(ss, swissnum.encode("ascii"))
|
||||||
self.storage_nurls = nurls
|
self.storage_nurls = nurls
|
||||||
announcement["anonymous-storage-NURLs"] = [n.to_text() for n in nurls]
|
announcement[storage_client.ANONYMOUS_STORAGE_NURLS] = [n.to_text() for n in nurls]
|
||||||
announcement["anonymous-storage-FURL"] = furl
|
announcement["anonymous-storage-FURL"] = furl
|
||||||
|
|
||||||
enabled_storage_servers = self._enable_storage_servers(
|
enabled_storage_servers = self._enable_storage_servers(
|
||||||
|
@ -64,7 +64,6 @@ def _common_valid_config():
|
|||||||
"tcp",
|
"tcp",
|
||||||
),
|
),
|
||||||
"node": (
|
"node": (
|
||||||
"force_foolscap",
|
|
||||||
"log_gatherer.furl",
|
"log_gatherer.furl",
|
||||||
"nickname",
|
"nickname",
|
||||||
"reveal-ip-address",
|
"reveal-ip-address",
|
||||||
@ -916,7 +915,7 @@ def create_main_tub(config, tub_options,
|
|||||||
# don't want to enable HTTP by default.
|
# don't want to enable HTTP by default.
|
||||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3934
|
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3934
|
||||||
force_foolscap=config.get_config(
|
force_foolscap=config.get_config(
|
||||||
"node", "force_foolscap", default=True, boolean=True
|
"storage", "force_foolscap", default=True, boolean=True
|
||||||
),
|
),
|
||||||
handler_overrides=handler_overrides,
|
handler_overrides=handler_overrides,
|
||||||
certFile=certfile,
|
certFile=certfile,
|
||||||
|
@ -312,12 +312,17 @@ class StorageClient(object):
|
|||||||
"""
|
"""
|
||||||
cls.TEST_MODE_REGISTER_HTTP_POOL = callback
|
cls.TEST_MODE_REGISTER_HTTP_POOL = callback
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def stop_test_mode(cls):
|
||||||
|
"""Stop testing mode."""
|
||||||
|
cls.TEST_MODE_REGISTER_HTTP_POOL = None
|
||||||
|
|
||||||
# The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
|
# The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
|
||||||
# ``StorageClient.from_nurl()``.
|
# ``StorageClient.from_nurl()``.
|
||||||
_base_url: DecodedURL
|
_base_url: DecodedURL
|
||||||
_swissnum: bytes
|
_swissnum: bytes
|
||||||
_treq: Union[treq, StubTreq, HTTPClient] = field(eq=False)
|
_treq: Union[treq, StubTreq, HTTPClient]
|
||||||
_clock: IReactorTime = field(eq=False)
|
_clock: IReactorTime
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_nurl(
|
def from_nurl(
|
||||||
@ -327,8 +332,6 @@ class StorageClient(object):
|
|||||||
) -> StorageClient:
|
) -> StorageClient:
|
||||||
"""
|
"""
|
||||||
Create a ``StorageClient`` for the given NURL.
|
Create a ``StorageClient`` for the given NURL.
|
||||||
|
|
||||||
``persistent`` indicates whether to use persistent HTTP connections.
|
|
||||||
"""
|
"""
|
||||||
assert nurl.fragment == "v=1"
|
assert nurl.fragment == "v=1"
|
||||||
assert nurl.scheme == "pb"
|
assert nurl.scheme == "pb"
|
||||||
@ -435,10 +438,7 @@ class StorageClientGeneral(object):
|
|||||||
Return the version metadata for the server.
|
Return the version metadata for the server.
|
||||||
"""
|
"""
|
||||||
url = self._client.relative_url("/storage/v1/version")
|
url = self._client.relative_url("/storage/v1/version")
|
||||||
# 1. Getting the version should never take particularly long.
|
response = yield self._client.request("GET", url)
|
||||||
# 2. Clients rely on the version command for liveness checks of servers.
|
|
||||||
# Thus, a short timeout.
|
|
||||||
response = yield self._client.request("GET", url, timeout=5)
|
|
||||||
decoded_response = yield _decode_cbor(response, _SCHEMAS["get_version"])
|
decoded_response = yield _decode_cbor(response, _SCHEMAS["get_version"])
|
||||||
returnValue(decoded_response)
|
returnValue(decoded_response)
|
||||||
|
|
||||||
|
@ -80,6 +80,8 @@ from allmydata.storage.http_client import (
|
|||||||
ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException
|
ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ANONYMOUS_STORAGE_NURLS = "anonymous-storage-NURLs"
|
||||||
|
|
||||||
|
|
||||||
# who is responsible for de-duplication?
|
# who is responsible for de-duplication?
|
||||||
# both?
|
# both?
|
||||||
@ -267,8 +269,7 @@ class StorageFarmBroker(service.MultiService):
|
|||||||
by the given announcement.
|
by the given announcement.
|
||||||
"""
|
"""
|
||||||
assert isinstance(server_id, bytes)
|
assert isinstance(server_id, bytes)
|
||||||
# TODO use constant for anonymous-storage-NURLs
|
if len(server["ann"].get(ANONYMOUS_STORAGE_NURLS, [])) > 0:
|
||||||
if len(server["ann"].get("anonymous-storage-NURLs", [])) > 0:
|
|
||||||
s = HTTPNativeStorageServer(server_id, server["ann"])
|
s = HTTPNativeStorageServer(server_id, server["ann"])
|
||||||
s.on_status_changed(lambda _: self._got_connection())
|
s.on_status_changed(lambda _: self._got_connection())
|
||||||
return s
|
return s
|
||||||
@ -944,12 +945,13 @@ class HTTPNativeStorageServer(service.MultiService):
|
|||||||
"connected".
|
"connected".
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, server_id: bytes, announcement):
|
def __init__(self, server_id: bytes, announcement, reactor=reactor):
|
||||||
service.MultiService.__init__(self)
|
service.MultiService.__init__(self)
|
||||||
assert isinstance(server_id, bytes)
|
assert isinstance(server_id, bytes)
|
||||||
self._server_id = server_id
|
self._server_id = server_id
|
||||||
self.announcement = announcement
|
self.announcement = announcement
|
||||||
self._on_status_changed = ObserverList()
|
self._on_status_changed = ObserverList()
|
||||||
|
self._reactor = reactor
|
||||||
furl = announcement["anonymous-storage-FURL"].encode("utf-8")
|
furl = announcement["anonymous-storage-FURL"].encode("utf-8")
|
||||||
(
|
(
|
||||||
self._nickname,
|
self._nickname,
|
||||||
@ -960,7 +962,7 @@ class HTTPNativeStorageServer(service.MultiService):
|
|||||||
) = _parse_announcement(server_id, furl, announcement)
|
) = _parse_announcement(server_id, furl, announcement)
|
||||||
# TODO need some way to do equivalent of Happy Eyeballs for multiple NURLs?
|
# TODO need some way to do equivalent of Happy Eyeballs for multiple NURLs?
|
||||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3935
|
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3935
|
||||||
nurl = DecodedURL.from_text(announcement["anonymous-storage-NURLs"][0])
|
nurl = DecodedURL.from_text(announcement[ANONYMOUS_STORAGE_NURLS][0])
|
||||||
self._istorage_server = _HTTPStorageServer.from_http_client(
|
self._istorage_server = _HTTPStorageServer.from_http_client(
|
||||||
StorageClient.from_nurl(nurl, reactor)
|
StorageClient.from_nurl(nurl, reactor)
|
||||||
)
|
)
|
||||||
@ -1063,7 +1065,10 @@ class HTTPNativeStorageServer(service.MultiService):
|
|||||||
self._connect()
|
self._connect()
|
||||||
|
|
||||||
def _connect(self):
|
def _connect(self):
|
||||||
return self._istorage_server.get_version().addCallbacks(
|
result = self._istorage_server.get_version()
|
||||||
|
# Set a short timeout since we're relying on this for server liveness.
|
||||||
|
result.addTimeout(5, self._reactor)
|
||||||
|
result.addCallbacks(
|
||||||
self._got_version,
|
self._got_version,
|
||||||
self._failed_to_connect
|
self._failed_to_connect
|
||||||
)
|
)
|
||||||
|
@ -649,6 +649,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
self._http_client_pools = []
|
self._http_client_pools = []
|
||||||
http_client.StorageClient.start_test_mode(self._http_client_pools.append)
|
http_client.StorageClient.start_test_mode(self._http_client_pools.append)
|
||||||
|
self.addCleanup(http_client.StorageClient.stop_test_mode)
|
||||||
self.port_assigner = SameProcessStreamEndpointAssigner()
|
self.port_assigner = SameProcessStreamEndpointAssigner()
|
||||||
self.port_assigner.setUp()
|
self.port_assigner.setUp()
|
||||||
self.addCleanup(self.port_assigner.tearDown)
|
self.addCleanup(self.port_assigner.tearDown)
|
||||||
@ -667,7 +668,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
|
|||||||
d = self.sparent.stopService()
|
d = self.sparent.stopService()
|
||||||
d.addBoth(flush_but_dont_ignore)
|
d.addBoth(flush_but_dont_ignore)
|
||||||
d.addBoth(lambda x: self.close_idle_http_connections().addCallback(lambda _: x))
|
d.addBoth(lambda x: self.close_idle_http_connections().addCallback(lambda _: x))
|
||||||
d.addBoth(lambda x: deferLater(reactor, 0.01, lambda: x))
|
d.addBoth(lambda x: deferLater(reactor, 0.02, lambda: x))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def getdir(self, subdir):
|
def getdir(self, subdir):
|
||||||
@ -809,13 +810,11 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
|
|||||||
if which in feature_matrix.get((section, feature), {which}):
|
if which in feature_matrix.get((section, feature), {which}):
|
||||||
config.setdefault(section, {})[feature] = value
|
config.setdefault(section, {})[feature] = value
|
||||||
|
|
||||||
#config.setdefault("node", {})["force_foolscap"] = force_foolscap
|
|
||||||
|
|
||||||
setnode = partial(setconf, config, which, "node")
|
setnode = partial(setconf, config, which, "node")
|
||||||
sethelper = partial(setconf, config, which, "helper")
|
sethelper = partial(setconf, config, which, "helper")
|
||||||
|
|
||||||
setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,))
|
setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,))
|
||||||
setnode("force_foolscap", str(force_foolscap))
|
setconf(config, which, "storage", "force_foolscap", str(force_foolscap))
|
||||||
|
|
||||||
tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor)
|
tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor)
|
||||||
setnode("tub.port", tub_port_endpoint)
|
setnode("tub.port", tub_port_endpoint)
|
||||||
|
@ -308,6 +308,7 @@ class CustomHTTPServerTests(SyncTestCase):
|
|||||||
StorageClient.start_test_mode(
|
StorageClient.start_test_mode(
|
||||||
lambda pool: self.addCleanup(pool.closeCachedConnections)
|
lambda pool: self.addCleanup(pool.closeCachedConnections)
|
||||||
)
|
)
|
||||||
|
self.addCleanup(StorageClient.stop_test_mode)
|
||||||
# Could be a fixture, but will only be used in this test class so not
|
# Could be a fixture, but will only be used in this test class so not
|
||||||
# going to bother:
|
# going to bother:
|
||||||
self._http_server = TestApp()
|
self._http_server = TestApp()
|
||||||
@ -431,6 +432,7 @@ class HttpTestFixture(Fixture):
|
|||||||
StorageClient.start_test_mode(
|
StorageClient.start_test_mode(
|
||||||
lambda pool: self.addCleanup(pool.closeCachedConnections)
|
lambda pool: self.addCleanup(pool.closeCachedConnections)
|
||||||
)
|
)
|
||||||
|
self.addCleanup(StorageClient.stop_test_mode)
|
||||||
self.clock = Clock()
|
self.clock = Clock()
|
||||||
self.tempdir = self.useFixture(TempDir())
|
self.tempdir = self.useFixture(TempDir())
|
||||||
# The global Cooperator used by Twisted (a) used by pull producers in
|
# The global Cooperator used by Twisted (a) used by pull producers in
|
||||||
|
Loading…
x
Reference in New Issue
Block a user