Merge branch '3783-storage-client-http' into 3940-http-timeouts

This commit is contained in:
Itamar Turner-Trauring 2022-11-07 11:20:54 -05:00
commit ce59ddc1ea
11 changed files with 422 additions and 137 deletions

View File

@ -52,7 +52,7 @@ fi
# This is primarily aimed at catching hangs on the PyPy job which runs for # This is primarily aimed at catching hangs on the PyPy job which runs for
# about 21 minutes and then gets killed by CircleCI in a way that fails the # about 21 minutes and then gets killed by CircleCI in a way that fails the
# job and bypasses our "allowed failure" logic. # job and bypasses our "allowed failure" logic.
TIMEOUT="timeout --kill-after 1m 15m" TIMEOUT="timeout --kill-after 1m 25m"
# Run the test suite as a non-root user. This is the expected usage some # Run the test suite as a non-root user. This is the expected usage some
# small areas of the test suite assume non-root privileges (such as unreadable # small areas of the test suite assume non-root privileges (such as unreadable

0
newsfragments/3783.minor Normal file
View File

View File

@ -823,9 +823,10 @@ class _Client(node.Node, pollmixin.PollMixin):
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding()) furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file) furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file)
(_, _, swissnum) = decode_furl(furl) (_, _, swissnum) = decode_furl(furl)
self.storage_nurls = self.tub.negotiationClass.add_storage_server( if hasattr(self.tub.negotiationClass, "add_storage_server"):
ss, swissnum.encode("ascii") nurls = self.tub.negotiationClass.add_storage_server(ss, swissnum.encode("ascii"))
) self.storage_nurls = nurls
announcement["anonymous-storage-NURLs"] = [n.to_text() for n in nurls]
announcement["anonymous-storage-FURL"] = furl announcement["anonymous-storage-FURL"] = furl
enabled_storage_servers = self._enable_storage_servers( enabled_storage_servers = self._enable_storage_servers(

View File

@ -64,6 +64,7 @@ def _common_valid_config():
"tcp", "tcp",
), ),
"node": ( "node": (
"force_foolscap",
"log_gatherer.furl", "log_gatherer.furl",
"nickname", "nickname",
"reveal-ip-address", "reveal-ip-address",
@ -697,7 +698,7 @@ def create_connection_handlers(config, i2p_provider, tor_provider):
def create_tub(tub_options, default_connection_handlers, foolscap_connection_handlers, def create_tub(tub_options, default_connection_handlers, foolscap_connection_handlers,
handler_overrides={}, **kwargs): handler_overrides={}, force_foolscap=False, **kwargs):
""" """
Create a Tub with the right options and handlers. It will be Create a Tub with the right options and handlers. It will be
ephemeral unless the caller provides certFile= in kwargs ephemeral unless the caller provides certFile= in kwargs
@ -707,10 +708,16 @@ def create_tub(tub_options, default_connection_handlers, foolscap_connection_han
:param dict tub_options: every key-value pair in here will be set in :param dict tub_options: every key-value pair in here will be set in
the new Tub via `Tub.setOption` the new Tub via `Tub.setOption`
:param bool force_foolscap: If True, only allow Foolscap, not just HTTPS
storage protocol.
""" """
# We listen simulataneously for both Foolscap and HTTPS on the same port, # We listen simultaneously for both Foolscap and HTTPS on the same port,
# so we have to create a special Foolscap Tub for that to work: # so we have to create a special Foolscap Tub for that to work:
tub = create_tub_with_https_support(**kwargs) if force_foolscap:
tub = Tub(**kwargs)
else:
tub = create_tub_with_https_support(**kwargs)
for (name, value) in list(tub_options.items()): for (name, value) in list(tub_options.items()):
tub.setOption(name, value) tub.setOption(name, value)
@ -901,14 +908,20 @@ def create_main_tub(config, tub_options,
# FIXME? "node.pem" was the CERTFILE option/thing # FIXME? "node.pem" was the CERTFILE option/thing
certfile = config.get_private_path("node.pem") certfile = config.get_private_path("node.pem")
tub = create_tub( tub = create_tub(
tub_options, tub_options,
default_connection_handlers, default_connection_handlers,
foolscap_connection_handlers, foolscap_connection_handlers,
# TODO eventually we will want the default to be False, but for now we
# don't want to enable HTTP by default.
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3934
force_foolscap=config.get_config(
"node", "force_foolscap", default=True, boolean=True
),
handler_overrides=handler_overrides, handler_overrides=handler_overrides,
certFile=certfile, certFile=certfile,
) )
if portlocation is None: if portlocation is None:
log.msg("Tub is not listening") log.msg("Tub is not listening")
else: else:

View File

@ -186,7 +186,7 @@ class DaemonizeTheRealService(Service, HookMixin):
) )
) )
else: else:
self.stderr.write("\nUnknown error\n") self.stderr.write("\nUnknown error, here's the traceback:\n")
reason.printTraceback(self.stderr) reason.printTraceback(self.stderr)
reactor.stop() reactor.stop()

View File

@ -290,21 +290,39 @@ class _StorageClientHTTPSPolicy:
) )
@define @define(hash=True)
class StorageClient(object): class StorageClient(object):
""" """
Low-level HTTP client that talks to the HTTP storage server. Low-level HTTP client that talks to the HTTP storage server.
""" """
# If set, we're doing unit testing and we should call this with
# HTTPConnectionPool we create.
TEST_MODE_REGISTER_HTTP_POOL = None
@classmethod
def start_test_mode(cls, callback):
"""Switch to testing mode.
In testing mode we register the pool with test system using the given
callback so it can Do Things, most notably killing off idle HTTP
connections at test shutdown and, in some tests, in the midddle of the
test.
"""
cls.TEST_MODE_REGISTER_HTTP_POOL = callback
# The URL is a HTTPS URL ("https://..."). To construct from a NURL, use # The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
# ``StorageClient.from_nurl()``. # ``StorageClient.from_nurl()``.
_base_url: DecodedURL _base_url: DecodedURL
_swissnum: bytes _swissnum: bytes
_treq: Union[treq, StubTreq, HTTPClient] _treq: Union[treq, StubTreq, HTTPClient] = field(eq=False)
_clock: IReactorTime = field(eq=False)
@classmethod @classmethod
def from_nurl( def from_nurl(
cls, nurl: DecodedURL, reactor, persistent: bool = True cls,
nurl: DecodedURL,
reactor,
) -> StorageClient: ) -> StorageClient:
""" """
Create a ``StorageClient`` for the given NURL. Create a ``StorageClient`` for the given NURL.
@ -315,19 +333,23 @@ class StorageClient(object):
assert nurl.scheme == "pb" assert nurl.scheme == "pb"
swissnum = nurl.path[0].encode("ascii") swissnum = nurl.path[0].encode("ascii")
certificate_hash = nurl.user.encode("ascii") certificate_hash = nurl.user.encode("ascii")
pool = HTTPConnectionPool(reactor)
if cls.TEST_MODE_REGISTER_HTTP_POOL is not None:
cls.TEST_MODE_REGISTER_HTTP_POOL(pool)
treq_client = HTTPClient( treq_client = HTTPClient(
Agent( Agent(
reactor, reactor,
_StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash), _StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash),
pool=HTTPConnectionPool(reactor, persistent=persistent),
# TCP-level connection timeout # TCP-level connection timeout
connectTimeout=5, connectTimeout=5,
pool=pool,
) )
) )
https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port) https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
return cls(https_url, swissnum, treq_client) return cls(https_url, swissnum, treq_client, reactor)
def relative_url(self, path): def relative_url(self, path):
"""Get a URL relative to the base URL.""" """Get a URL relative to the base URL."""
@ -398,13 +420,13 @@ class StorageClient(object):
) )
@define(hash=True)
class StorageClientGeneral(object): class StorageClientGeneral(object):
""" """
High-level HTTP APIs that aren't immutable- or mutable-specific. High-level HTTP APIs that aren't immutable- or mutable-specific.
""" """
def __init__(self, client): # type: (StorageClient) -> None _client: StorageClient
self._client = client
@inlineCallbacks @inlineCallbacks
def get_version(self): def get_version(self):
@ -412,7 +434,10 @@ class StorageClientGeneral(object):
Return the version metadata for the server. Return the version metadata for the server.
""" """
url = self._client.relative_url("/storage/v1/version") url = self._client.relative_url("/storage/v1/version")
response = yield self._client.request("GET", url) # 1. Getting the version should never take particularly long.
# 2. Clients rely on the version command for liveness checks of servers.
# Thus, a short timeout.
response = yield self._client.request("GET", url, timeout=5)
decoded_response = yield _decode_cbor(response, _SCHEMAS["get_version"]) decoded_response = yield _decode_cbor(response, _SCHEMAS["get_version"])
returnValue(decoded_response) returnValue(decoded_response)
@ -557,7 +582,7 @@ async def advise_corrupt_share(
) )
@define @define(hash=True)
class StorageClientImmutables(object): class StorageClientImmutables(object):
""" """
APIs for interacting with immutables. APIs for interacting with immutables.

View File

@ -30,6 +30,8 @@ Ported to Python 3.
# #
# 6: implement other sorts of IStorageClient classes: S3, etc # 6: implement other sorts of IStorageClient classes: S3, etc
from __future__ import annotations
from six import ensure_text from six import ensure_text
from typing import Union from typing import Union
import re, time, hashlib import re, time, hashlib
@ -37,13 +39,16 @@ from os import urandom
from configparser import NoSectionError from configparser import NoSectionError
import attr import attr
from hyperlink import DecodedURL
from zope.interface import ( from zope.interface import (
Attribute, Attribute,
Interface, Interface,
implementer, implementer,
) )
from twisted.python.failure import Failure
from twisted.web import http from twisted.web import http
from twisted.internet import defer from twisted.internet.task import LoopingCall
from twisted.internet import defer, reactor
from twisted.application import service from twisted.application import service
from twisted.plugin import ( from twisted.plugin import (
getPlugins, getPlugins,
@ -99,8 +104,8 @@ class StorageClientConfig(object):
:ivar preferred_peers: An iterable of the server-ids (``bytes``) of the :ivar preferred_peers: An iterable of the server-ids (``bytes``) of the
storage servers where share placement is preferred, in order of storage servers where share placement is preferred, in order of
decreasing preference. See the *[client]peers.preferred* decreasing preference. See the *[client]peers.preferred* documentation
documentation for details. for details.
:ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from :ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from
names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the
@ -262,6 +267,11 @@ class StorageFarmBroker(service.MultiService):
by the given announcement. by the given announcement.
""" """
assert isinstance(server_id, bytes) assert isinstance(server_id, bytes)
# TODO use constant for anonymous-storage-NURLs
if len(server["ann"].get("anonymous-storage-NURLs", [])) > 0:
s = HTTPNativeStorageServer(server_id, server["ann"])
s.on_status_changed(lambda _: self._got_connection())
return s
handler_overrides = server.get("connections", {}) handler_overrides = server.get("connections", {})
s = NativeStorageServer( s = NativeStorageServer(
server_id, server_id,
@ -523,6 +533,45 @@ class IFoolscapStorageServer(Interface):
""" """
def _parse_announcement(server_id: bytes, furl: bytes, ann: dict) -> tuple[str, bytes, bytes, bytes, bytes]:
"""
Parse the furl and announcement, return:
(nickname, permutation_seed, tubid, short_description, long_description)
"""
m = re.match(br'pb://(\w+)@', furl)
assert m, furl
tubid_s = m.group(1).lower()
tubid = base32.a2b(tubid_s)
if "permutation-seed-base32" in ann:
seed = ann["permutation-seed-base32"]
if isinstance(seed, str):
seed = seed.encode("utf-8")
ps = base32.a2b(seed)
elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id):
ps = base32.a2b(server_id[3:])
else:
log.msg("unable to parse serverid '%(server_id)s as pubkey, "
"hashing it to get permutation-seed, "
"may not converge with other clients",
server_id=server_id,
facility="tahoe.storage_broker",
level=log.UNUSUAL, umid="qu86tw")
ps = hashlib.sha256(server_id).digest()
permutation_seed = ps
assert server_id
long_description = server_id
if server_id.startswith(b"v0-"):
# remove v0- prefix from abbreviated name
short_description = server_id[3:3+8]
else:
short_description = server_id[:8]
nickname = ann.get("nickname", "")
return (nickname, permutation_seed, tubid, short_description, long_description)
@implementer(IFoolscapStorageServer) @implementer(IFoolscapStorageServer)
@attr.s(frozen=True) @attr.s(frozen=True)
class _FoolscapStorage(object): class _FoolscapStorage(object):
@ -566,43 +615,13 @@ class _FoolscapStorage(object):
The furl will be a Unicode string on Python 3; on Python 2 it will be The furl will be a Unicode string on Python 3; on Python 2 it will be
either a native (bytes) string or a Unicode string. either a native (bytes) string or a Unicode string.
""" """
furl = furl.encode("utf-8") (nickname, permutation_seed, tubid, short_description, long_description) = _parse_announcement(server_id, furl.encode("utf-8"), ann)
m = re.match(br'pb://(\w+)@', furl)
assert m, furl
tubid_s = m.group(1).lower()
tubid = base32.a2b(tubid_s)
if "permutation-seed-base32" in ann:
seed = ann["permutation-seed-base32"]
if isinstance(seed, str):
seed = seed.encode("utf-8")
ps = base32.a2b(seed)
elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id):
ps = base32.a2b(server_id[3:])
else:
log.msg("unable to parse serverid '%(server_id)s as pubkey, "
"hashing it to get permutation-seed, "
"may not converge with other clients",
server_id=server_id,
facility="tahoe.storage_broker",
level=log.UNUSUAL, umid="qu86tw")
ps = hashlib.sha256(server_id).digest()
permutation_seed = ps
assert server_id
long_description = server_id
if server_id.startswith(b"v0-"):
# remove v0- prefix from abbreviated name
short_description = server_id[3:3+8]
else:
short_description = server_id[:8]
nickname = ann.get("nickname", "")
return cls( return cls(
nickname=nickname, nickname=nickname,
permutation_seed=permutation_seed, permutation_seed=permutation_seed,
tubid=tubid, tubid=tubid,
storage_server=storage_server, storage_server=storage_server,
furl=furl, furl=furl.encode("utf-8"),
short_description=short_description, short_description=short_description,
long_description=long_description, long_description=long_description,
) )
@ -684,6 +703,16 @@ def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref):
raise AnnouncementNotMatched() raise AnnouncementNotMatched()
def _available_space_from_version(version):
if version is None:
return None
protocol_v1_version = version.get(b'http://allmydata.org/tahoe/protocols/storage/v1', BytesKeyDict())
available_space = protocol_v1_version.get(b'available-space')
if available_space is None:
available_space = protocol_v1_version.get(b'maximum-immutable-share-size', None)
return available_space
@implementer(IServer) @implementer(IServer)
class NativeStorageServer(service.MultiService): class NativeStorageServer(service.MultiService):
"""I hold information about a storage server that we want to connect to. """I hold information about a storage server that we want to connect to.
@ -842,13 +871,7 @@ class NativeStorageServer(service.MultiService):
def get_available_space(self): def get_available_space(self):
version = self.get_version() version = self.get_version()
if version is None: return _available_space_from_version(version)
return None
protocol_v1_version = version.get(b'http://allmydata.org/tahoe/protocols/storage/v1', BytesKeyDict())
available_space = protocol_v1_version.get(b'available-space')
if available_space is None:
available_space = protocol_v1_version.get(b'maximum-immutable-share-size', None)
return available_space
def start_connecting(self, trigger_cb): def start_connecting(self, trigger_cb):
self._tub = self._tub_maker(self._handler_overrides) self._tub = self._tub_maker(self._handler_overrides)
@ -910,6 +933,149 @@ class NativeStorageServer(service.MultiService):
# used when the broker wants us to hurry up # used when the broker wants us to hurry up
self._reconnector.reset() self._reconnector.reset()
@implementer(IServer)
class HTTPNativeStorageServer(service.MultiService):
"""
Like ``NativeStorageServer``, but for HTTP clients.
The notion of being "connected" is less meaningful for HTTP; we just poll
occasionally, and if we've succeeded at last poll, we assume we're
"connected".
"""
def __init__(self, server_id: bytes, announcement):
service.MultiService.__init__(self)
assert isinstance(server_id, bytes)
self._server_id = server_id
self.announcement = announcement
self._on_status_changed = ObserverList()
furl = announcement["anonymous-storage-FURL"].encode("utf-8")
(
self._nickname,
self._permutation_seed,
self._tubid,
self._short_description,
self._long_description
) = _parse_announcement(server_id, furl, announcement)
# TODO need some way to do equivalent of Happy Eyeballs for multiple NURLs?
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3935
nurl = DecodedURL.from_text(announcement["anonymous-storage-NURLs"][0])
self._istorage_server = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor)
)
self._connection_status = connection_status.ConnectionStatus.unstarted()
self._version = None
self._last_connect_time = None
def get_permutation_seed(self):
return self._permutation_seed
def get_name(self):
return self._short_description
def get_longname(self):
return self._long_description
def get_tubid(self):
return self._tubid
def get_lease_seed(self):
# Apparently this is what Foolscap version above does?!
return self._tubid
def get_foolscap_write_enabler_seed(self):
return self._tubid
def get_nickname(self):
return self._nickname
def on_status_changed(self, status_changed):
"""
:param status_changed: a callable taking a single arg (the
NativeStorageServer) that is notified when we become connected
"""
return self._on_status_changed.subscribe(status_changed)
# Special methods used by copy.copy() and copy.deepcopy(). When those are
# used in allmydata.immutable.filenode to copy CheckResults during
# repair, we want it to treat the IServer instances as singletons, and
# not attempt to duplicate them..
def __copy__(self):
return self
def __deepcopy__(self, memodict):
return self
def __repr__(self):
return "<HTTPNativeStorageServer for %r>" % self.get_name()
def get_serverid(self):
return self._server_id
def get_version(self):
return self._version
def get_announcement(self):
return self.announcement
def get_connection_status(self):
return self._connection_status
def is_connected(self):
return self._connection_status.connected
def get_available_space(self):
version = self.get_version()
return _available_space_from_version(version)
def start_connecting(self, trigger_cb):
self._lc = LoopingCall(self._connect)
self._lc.start(1, True)
def _got_version(self, version):
self._last_connect_time = time.time()
self._version = version
self._connection_status = connection_status.ConnectionStatus(
True, "connected", [], self._last_connect_time, self._last_connect_time
)
self._on_status_changed.notify(self)
def _failed_to_connect(self, reason):
self._connection_status = connection_status.ConnectionStatus(
False, f"failure: {reason}", [], self._last_connect_time, self._last_connect_time
)
self._on_status_changed.notify(self)
def get_storage_server(self):
"""
See ``IServer.get_storage_server``.
"""
if self._connection_status.summary == "unstarted":
return None
return self._istorage_server
def stop_connecting(self):
self._lc.stop()
def try_to_connect(self):
self._connect()
def _connect(self):
return self._istorage_server.get_version().addCallbacks(
self._got_version,
self._failed_to_connect
)
def stopService(self):
result = service.MultiService.stopService(self)
if self._lc.running:
self._lc.stop()
self._failed_to_connect("shut down")
return result
class UnknownServerTypeError(Exception): class UnknownServerTypeError(Exception):
pass pass
@ -1026,7 +1192,7 @@ class _StorageServer(object):
@attr.s @attr.s(hash=True)
class _FakeRemoteReference(object): class _FakeRemoteReference(object):
""" """
Emulate a Foolscap RemoteReference, calling a local object instead. Emulate a Foolscap RemoteReference, calling a local object instead.
@ -1051,7 +1217,7 @@ class _HTTPBucketWriter(object):
storage_index = attr.ib(type=bytes) storage_index = attr.ib(type=bytes)
share_number = attr.ib(type=int) share_number = attr.ib(type=int)
upload_secret = attr.ib(type=bytes) upload_secret = attr.ib(type=bytes)
finished = attr.ib(type=bool, default=False) finished = attr.ib(type=defer.Deferred[bool], factory=defer.Deferred)
def abort(self): def abort(self):
return self.client.abort_upload(self.storage_index, self.share_number, return self.client.abort_upload(self.storage_index, self.share_number,
@ -1063,18 +1229,27 @@ class _HTTPBucketWriter(object):
self.storage_index, self.share_number, self.upload_secret, offset, data self.storage_index, self.share_number, self.upload_secret, offset, data
) )
if result.finished: if result.finished:
self.finished = True self.finished.callback(True)
defer.returnValue(None) defer.returnValue(None)
def close(self): def close(self):
# A no-op in HTTP protocol. # We're not _really_ closed until all writes have succeeded and we
if not self.finished: # finished writing all the data.
return defer.fail(RuntimeError("You didn't finish writing?!")) return self.finished
return defer.succeed(None)
def _ignore_404(failure: Failure) -> Union[Failure, None]:
"""
Useful for advise_corrupt_share(), since it swallows unknown share numbers
in Foolscap.
"""
if failure.check(HTTPClientException) and failure.value.code == http.NOT_FOUND:
return None
else:
return failure
@attr.s
@attr.s(hash=True)
class _HTTPBucketReader(object): class _HTTPBucketReader(object):
""" """
Emulate a ``RIBucketReader``, but use HTTP protocol underneath. Emulate a ``RIBucketReader``, but use HTTP protocol underneath.
@ -1092,7 +1267,7 @@ class _HTTPBucketReader(object):
return self.client.advise_corrupt_share( return self.client.advise_corrupt_share(
self.storage_index, self.share_number, self.storage_index, self.share_number,
str(reason, "utf-8", errors="backslashreplace") str(reason, "utf-8", errors="backslashreplace")
) ).addErrback(_ignore_404)
# WORK IN PROGRESS, for now it doesn't actually implement whole thing. # WORK IN PROGRESS, for now it doesn't actually implement whole thing.
@ -1192,7 +1367,7 @@ class _HTTPStorageServer(object):
raise ValueError("Unknown share type") raise ValueError("Unknown share type")
return client.advise_corrupt_share( return client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace") storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
) ).addErrback(_ignore_404)
@defer.inlineCallbacks @defer.inlineCallbacks
def slot_readv(self, storage_index, shares, readv): def slot_readv(self, storage_index, shares, readv):

View File

@ -5,22 +5,14 @@ in ``allmydata.test.test_system``.
Ported to Python 3. Ported to Python 3.
""" """
from __future__ import absolute_import from typing import Optional
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# Don't import bytes since it causes issues on (so far unported) modules on Python 2.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, dict, list, object, range, max, min, str # noqa: F401
import os import os
from functools import partial from functools import partial
from twisted.internet import reactor from twisted.internet import reactor
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import deferLater
from twisted.application import service from twisted.application import service
from foolscap.api import flushEventualQueue from foolscap.api import flushEventualQueue
@ -28,6 +20,11 @@ from foolscap.api import flushEventualQueue
from allmydata import client from allmydata import client
from allmydata.introducer.server import create_introducer from allmydata.introducer.server import create_introducer
from allmydata.util import fileutil, log, pollmixin from allmydata.util import fileutil, log, pollmixin
from allmydata.storage import http_client
from allmydata.storage_client import (
NativeStorageServer,
HTTPNativeStorageServer,
)
from twisted.python.filepath import ( from twisted.python.filepath import (
FilePath, FilePath,
@ -644,7 +641,14 @@ def _render_section_values(values):
class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# If set to True, use Foolscap for storage protocol. If set to False, HTTP
# will be used when possible. If set to None, this suggests a bug in the
# test code.
FORCE_FOOLSCAP_FOR_STORAGE : Optional[bool] = None
def setUp(self): def setUp(self):
self._http_client_pools = []
http_client.StorageClient.start_test_mode(self._http_client_pools.append)
self.port_assigner = SameProcessStreamEndpointAssigner() self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp() self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown) self.addCleanup(self.port_assigner.tearDown)
@ -652,10 +656,18 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
self.sparent = service.MultiService() self.sparent = service.MultiService()
self.sparent.startService() self.sparent.startService()
def close_idle_http_connections(self):
"""Close all HTTP client connections that are just hanging around."""
return defer.gatherResults(
[pool.closeCachedConnections() for pool in self._http_client_pools]
)
def tearDown(self): def tearDown(self):
log.msg("shutting down SystemTest services") log.msg("shutting down SystemTest services")
d = self.sparent.stopService() d = self.sparent.stopService()
d.addBoth(flush_but_dont_ignore) d.addBoth(flush_but_dont_ignore)
d.addBoth(lambda x: self.close_idle_http_connections().addCallback(lambda _: x))
d.addBoth(lambda x: deferLater(reactor, 0.01, lambda: x))
return d return d
def getdir(self, subdir): def getdir(self, subdir):
@ -714,21 +726,31 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
:return: A ``Deferred`` that fires when the nodes have connected to :return: A ``Deferred`` that fires when the nodes have connected to
each other. each other.
""" """
self.assertIn(
self.FORCE_FOOLSCAP_FOR_STORAGE, (True, False),
"You forgot to set FORCE_FOOLSCAP_FOR_STORAGE on {}".format(self.__class__)
)
self.numclients = NUMCLIENTS self.numclients = NUMCLIENTS
self.introducer = yield self._create_introducer() self.introducer = yield self._create_introducer()
self.add_service(self.introducer) self.add_service(self.introducer)
self.introweb_url = self._get_introducer_web() self.introweb_url = self._get_introducer_web()
yield self._set_up_client_nodes() yield self._set_up_client_nodes(self.FORCE_FOOLSCAP_FOR_STORAGE)
native_server = next(iter(self.clients[0].storage_broker.get_known_servers()))
if self.FORCE_FOOLSCAP_FOR_STORAGE:
expected_storage_server_class = NativeStorageServer
else:
expected_storage_server_class = HTTPNativeStorageServer
self.assertIsInstance(native_server, expected_storage_server_class)
@inlineCallbacks @inlineCallbacks
def _set_up_client_nodes(self): def _set_up_client_nodes(self, force_foolscap):
q = self.introducer q = self.introducer
self.introducer_furl = q.introducer_url self.introducer_furl = q.introducer_url
self.clients = [] self.clients = []
basedirs = [] basedirs = []
for i in range(self.numclients): for i in range(self.numclients):
basedirs.append((yield self._set_up_client_node(i))) basedirs.append((yield self._set_up_client_node(i, force_foolscap)))
# start clients[0], wait for it's tub to be ready (at which point it # start clients[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl). # will have registered the helper furl).
@ -761,7 +783,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# and the helper-using webport # and the helper-using webport
self.helper_webish_url = self.clients[3].getServiceNamed("webish").getURL() self.helper_webish_url = self.clients[3].getServiceNamed("webish").getURL()
def _generate_config(self, which, basedir): def _generate_config(self, which, basedir, force_foolscap=False):
config = {} config = {}
allclients = set(range(self.numclients)) allclients = set(range(self.numclients))
@ -787,10 +809,13 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
if which in feature_matrix.get((section, feature), {which}): if which in feature_matrix.get((section, feature), {which}):
config.setdefault(section, {})[feature] = value config.setdefault(section, {})[feature] = value
#config.setdefault("node", {})["force_foolscap"] = force_foolscap
setnode = partial(setconf, config, which, "node") setnode = partial(setconf, config, which, "node")
sethelper = partial(setconf, config, which, "helper") sethelper = partial(setconf, config, which, "helper")
setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,)) setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,))
setnode("force_foolscap", str(force_foolscap))
tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor) tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor)
setnode("tub.port", tub_port_endpoint) setnode("tub.port", tub_port_endpoint)
@ -808,17 +833,16 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
" furl: %s\n") % self.introducer_furl " furl: %s\n") % self.introducer_furl
iyaml_fn = os.path.join(basedir, "private", "introducers.yaml") iyaml_fn = os.path.join(basedir, "private", "introducers.yaml")
fileutil.write(iyaml_fn, iyaml) fileutil.write(iyaml_fn, iyaml)
return _render_config(config) return _render_config(config)
def _set_up_client_node(self, which): def _set_up_client_node(self, which, force_foolscap):
basedir = self.getdir("client%d" % (which,)) basedir = self.getdir("client%d" % (which,))
fileutil.make_dirs(os.path.join(basedir, "private")) fileutil.make_dirs(os.path.join(basedir, "private"))
if len(SYSTEM_TEST_CERTS) > (which + 1): if len(SYSTEM_TEST_CERTS) > (which + 1):
f = open(os.path.join(basedir, "private", "node.pem"), "w") f = open(os.path.join(basedir, "private", "node.pem"), "w")
f.write(SYSTEM_TEST_CERTS[which + 1]) f.write(SYSTEM_TEST_CERTS[which + 1])
f.close() f.close()
config = self._generate_config(which, basedir) config = self._generate_config(which, basedir, force_foolscap)
fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config) fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
return basedir return basedir

View File

@ -15,9 +15,8 @@ from typing import Set
from random import Random from random import Random
from unittest import SkipTest from unittest import SkipTest
from twisted.internet.defer import inlineCallbacks, returnValue, succeed from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import Clock from twisted.internet.task import Clock
from twisted.internet import reactor
from foolscap.api import Referenceable, RemoteException from foolscap.api import Referenceable, RemoteException
# A better name for this would be IStorageClient... # A better name for this would be IStorageClient...
@ -26,8 +25,6 @@ from allmydata.interfaces import IStorageServer
from .common_system import SystemTestMixin from .common_system import SystemTestMixin
from .common import AsyncTestCase from .common import AsyncTestCase
from allmydata.storage.server import StorageServer # not a IStorageServer!! from allmydata.storage.server import StorageServer # not a IStorageServer!!
from allmydata.storage.http_client import StorageClient
from allmydata.storage_client import _HTTPStorageServer
# Use random generator with known seed, so results are reproducible if tests # Use random generator with known seed, so results are reproducible if tests
@ -439,6 +436,17 @@ class IStorageServerImmutableAPIsTestsMixin(object):
b"immutable", storage_index, 0, b"ono" b"immutable", storage_index, 0, b"ono"
) )
@inlineCallbacks
def test_advise_corrupt_share_unknown_share_number(self):
"""
Calling ``advise_corrupt_share()`` on an immutable share, with an
unknown share number, does not result in error.
"""
storage_index, _, _ = yield self.create_share()
yield self.storage_client.advise_corrupt_share(
b"immutable", storage_index, 999, b"ono"
)
@inlineCallbacks @inlineCallbacks
def test_allocate_buckets_creates_lease(self): def test_allocate_buckets_creates_lease(self):
""" """
@ -908,6 +916,19 @@ class IStorageServerMutableAPIsTestsMixin(object):
b"mutable", storage_index, 0, b"ono" b"mutable", storage_index, 0, b"ono"
) )
@inlineCallbacks
def test_advise_corrupt_share_unknown_share_number(self):
"""
Calling ``advise_corrupt_share()`` on a mutable share with an unknown
share number does not result in error (other behavior is opaque at this
level of abstraction).
"""
secrets, storage_index = yield self.create_slot()
yield self.storage_client.advise_corrupt_share(
b"mutable", storage_index, 999, b"ono"
)
@inlineCallbacks @inlineCallbacks
def test_STARAW_create_lease(self): def test_STARAW_create_lease(self):
""" """
@ -1023,7 +1044,10 @@ class _SharedMixin(SystemTestMixin):
SKIP_TESTS = set() # type: Set[str] SKIP_TESTS = set() # type: Set[str]
def _get_istorage_server(self): def _get_istorage_server(self):
raise NotImplementedError("implement in subclass") native_server = next(iter(self.clients[0].storage_broker.get_known_servers()))
client = native_server.get_storage_server()
self.assertTrue(IStorageServer.providedBy(client))
return client
@inlineCallbacks @inlineCallbacks
def setUp(self): def setUp(self):
@ -1046,7 +1070,7 @@ class _SharedMixin(SystemTestMixin):
self._clock = Clock() self._clock = Clock()
self._clock.advance(123456) self._clock.advance(123456)
self.server._clock = self._clock self.server._clock = self._clock
self.storage_client = yield self._get_istorage_server() self.storage_client = self._get_istorage_server()
def fake_time(self): def fake_time(self):
"""Return the current fake, test-controlled, time.""" """Return the current fake, test-controlled, time."""
@ -1062,51 +1086,29 @@ class _SharedMixin(SystemTestMixin):
yield SystemTestMixin.tearDown(self) yield SystemTestMixin.tearDown(self)
class _FoolscapMixin(_SharedMixin):
"""Run tests on Foolscap version of ``IStorageServer``."""
def _get_native_server(self):
return next(iter(self.clients[0].storage_broker.get_known_servers()))
def _get_istorage_server(self):
client = self._get_native_server().get_storage_server()
self.assertTrue(IStorageServer.providedBy(client))
return succeed(client)
class _HTTPMixin(_SharedMixin):
"""Run tests on the HTTP version of ``IStorageServer``."""
def _get_istorage_server(self):
nurl = list(self.clients[0].storage_nurls)[0]
# Create HTTP client with non-persistent connections, so we don't leak
# state across tests:
client: IStorageServer = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor, persistent=False)
)
self.assertTrue(IStorageServer.providedBy(client))
return succeed(client)
class FoolscapSharedAPIsTests( class FoolscapSharedAPIsTests(
_FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase _SharedMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
): ):
"""Foolscap-specific tests for shared ``IStorageServer`` APIs.""" """Foolscap-specific tests for shared ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = True
class HTTPSharedAPIsTests( class HTTPSharedAPIsTests(
_HTTPMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase _SharedMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
): ):
"""HTTP-specific tests for shared ``IStorageServer`` APIs.""" """HTTP-specific tests for shared ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = False
class FoolscapImmutableAPIsTests( class FoolscapImmutableAPIsTests(
_FoolscapMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase _SharedMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
): ):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" """Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = True
def test_disconnection(self): def test_disconnection(self):
""" """
If we disconnect in the middle of writing to a bucket, all data is If we disconnect in the middle of writing to a bucket, all data is
@ -1129,23 +1131,29 @@ class FoolscapImmutableAPIsTests(
""" """
current = self.storage_client current = self.storage_client
yield self.bounce_client(0) yield self.bounce_client(0)
self.storage_client = self._get_native_server().get_storage_server() self.storage_client = self._get_istorage_server()
assert self.storage_client is not current assert self.storage_client is not current
class HTTPImmutableAPIsTests( class HTTPImmutableAPIsTests(
_HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase _SharedMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
): ):
"""HTTP-specific tests for immutable ``IStorageServer`` APIs.""" """HTTP-specific tests for immutable ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = False
class FoolscapMutableAPIsTests( class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase _SharedMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
): ):
"""Foolscap-specific tests for mutable ``IStorageServer`` APIs.""" """Foolscap-specific tests for mutable ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = True
class HTTPMutableAPIsTests( class HTTPMutableAPIsTests(
_HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase _SharedMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
): ):
"""HTTP-specific tests for mutable ``IStorageServer`` APIs.""" """HTTP-specific tests for mutable ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = False

View File

@ -305,6 +305,9 @@ class CustomHTTPServerTests(SyncTestCase):
def setUp(self): def setUp(self):
super(CustomHTTPServerTests, self).setUp() super(CustomHTTPServerTests, self).setUp()
StorageClient.start_test_mode(
lambda pool: self.addCleanup(pool.closeCachedConnections)
)
# Could be a fixture, but will only be used in this test class so not # Could be a fixture, but will only be used in this test class so not
# going to bother: # going to bother:
self._http_server = TestApp() self._http_server = TestApp()
@ -312,6 +315,7 @@ class CustomHTTPServerTests(SyncTestCase):
DecodedURL.from_text("http://127.0.0.1"), DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST, SWISSNUM_FOR_TEST,
treq=StubTreq(self._http_server._app.resource()), treq=StubTreq(self._http_server._app.resource()),
clock=Clock(),
) )
# We're using a Treq private API to get the reactor, alas, but only in # We're using a Treq private API to get the reactor, alas, but only in
# a test, so not going to worry about it too much. This would be fixed # a test, so not going to worry about it too much. This would be fixed
@ -422,6 +426,9 @@ class HttpTestFixture(Fixture):
""" """
def _setUp(self): def _setUp(self):
StorageClient.start_test_mode(
lambda pool: self.addCleanup(pool.closeCachedConnections)
)
self.clock = Clock() self.clock = Clock()
self.tempdir = self.useFixture(TempDir()) self.tempdir = self.useFixture(TempDir())
# The global Cooperator used by Twisted (a) used by pull producers in # The global Cooperator used by Twisted (a) used by pull producers in
@ -443,6 +450,7 @@ class HttpTestFixture(Fixture):
DecodedURL.from_text("http://127.0.0.1"), DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST, SWISSNUM_FOR_TEST,
treq=self.treq, treq=self.treq,
clock=self.clock,
) )
def result_of_with_flush(self, d): def result_of_with_flush(self, d):
@ -527,6 +535,7 @@ class GenericHTTPAPITests(SyncTestCase):
DecodedURL.from_text("http://127.0.0.1"), DecodedURL.from_text("http://127.0.0.1"),
b"something wrong", b"something wrong",
treq=StubTreq(self.http.http_server.get_resource()), treq=StubTreq(self.http.http_server.get_resource()),
clock=self.http.clock,
) )
) )
with assert_fails_with_http_code(self, http.UNAUTHORIZED): with assert_fails_with_http_code(self, http.UNAUTHORIZED):

View File

@ -117,11 +117,17 @@ class CountingDataUploadable(upload.Data):
class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
"""Foolscap integration-y tests."""
FORCE_FOOLSCAP_FOR_STORAGE = True
timeout = 180 timeout = 180
@property
def basedir(self):
return "system/SystemTest/{}-foolscap-{}".format(
self.id().split(".")[-1], self.FORCE_FOOLSCAP_FOR_STORAGE
)
def test_connections(self): def test_connections(self):
self.basedir = "system/SystemTest/test_connections"
d = self.set_up_nodes() d = self.set_up_nodes()
self.extra_node = None self.extra_node = None
d.addCallback(lambda res: self.add_extra_node(self.numclients)) d.addCallback(lambda res: self.add_extra_node(self.numclients))
@ -149,11 +155,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
del test_connections del test_connections
def test_upload_and_download_random_key(self): def test_upload_and_download_random_key(self):
self.basedir = "system/SystemTest/test_upload_and_download_random_key"
return self._test_upload_and_download(convergence=None) return self._test_upload_and_download(convergence=None)
def test_upload_and_download_convergent(self): def test_upload_and_download_convergent(self):
self.basedir = "system/SystemTest/test_upload_and_download_convergent"
return self._test_upload_and_download(convergence=b"some convergence string") return self._test_upload_and_download(convergence=b"some convergence string")
def _test_upload_and_download(self, convergence): def _test_upload_and_download(self, convergence):
@ -516,7 +520,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
def test_mutable(self): def test_mutable(self):
self.basedir = "system/SystemTest/test_mutable"
DATA = b"initial contents go here." # 25 bytes % 3 != 0 DATA = b"initial contents go here." # 25 bytes % 3 != 0
DATA_uploadable = MutableData(DATA) DATA_uploadable = MutableData(DATA)
NEWDATA = b"new contents yay" NEWDATA = b"new contents yay"
@ -746,7 +749,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
# plaintext_hash check. # plaintext_hash check.
def test_filesystem(self): def test_filesystem(self):
self.basedir = "system/SystemTest/test_filesystem"
self.data = LARGE_DATA self.data = LARGE_DATA
d = self.set_up_nodes() d = self.set_up_nodes()
def _new_happy_semantics(ign): def _new_happy_semantics(ign):
@ -1713,7 +1715,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
def test_filesystem_with_cli_in_subprocess(self): def test_filesystem_with_cli_in_subprocess(self):
# We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe. # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
d = self.set_up_nodes() d = self.set_up_nodes()
def _new_happy_semantics(ign): def _new_happy_semantics(ign):
for c in self.clients: for c in self.clients:
@ -1794,9 +1795,21 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
class Connections(SystemTestMixin, unittest.TestCase): class Connections(SystemTestMixin, unittest.TestCase):
FORCE_FOOLSCAP_FOR_STORAGE = True
def test_rref(self): def test_rref(self):
self.basedir = "system/Connections/rref" # The way the listening port is created is via
# SameProcessStreamEndpointAssigner (allmydata.test.common), which then
# makes an endpoint string parsed by AdoptedServerPort. The latter does
# dup(fd), which results in the filedescriptor staying alive _until the
# test ends_. That means that when we disown the service, we still have
# the listening port there on the OS level! Just the resulting
# connections aren't handled. So this test relies on aggressive
# timeouts in the HTTP client and presumably some equivalent in
# Foolscap, since connection refused does _not_ happen.
self.basedir = "system/Connections/rref-foolscap-{}".format(
self.FORCE_FOOLSCAP_FOR_STORAGE
)
d = self.set_up_nodes(2) d = self.set_up_nodes(2)
def _start(ign): def _start(ign):
self.c0 = self.clients[0] self.c0 = self.clients[0]
@ -1812,9 +1825,13 @@ class Connections(SystemTestMixin, unittest.TestCase):
# now shut down the server # now shut down the server
d.addCallback(lambda ign: self.clients[1].disownServiceParent()) d.addCallback(lambda ign: self.clients[1].disownServiceParent())
# kill any persistent http connections that might continue to work
d.addCallback(lambda ign: self.close_idle_http_connections())
# and wait for the client to notice # and wait for the client to notice
def _poll(): def _poll():
return len(self.c0.storage_broker.get_connected_servers()) < 2 return len(self.c0.storage_broker.get_connected_servers()) == 1
d.addCallback(lambda ign: self.poll(_poll)) d.addCallback(lambda ign: self.poll(_poll))
def _down(ign): def _down(ign):
@ -1824,3 +1841,16 @@ class Connections(SystemTestMixin, unittest.TestCase):
self.assertEqual(storage_server, self.s1_storage_server) self.assertEqual(storage_server, self.s1_storage_server)
d.addCallback(_down) d.addCallback(_down)
return d return d
class HTTPSystemTest(SystemTest):
"""HTTP storage protocol variant of the system tests."""
FORCE_FOOLSCAP_FOR_STORAGE = False
class HTTPConnections(Connections):
"""HTTP storage protocol variant of the connections tests."""
FORCE_FOOLSCAP_FOR_STORAGE = False