diff --git a/.circleci/run-tests.sh b/.circleci/run-tests.sh index 764651c40..854013c32 100755 --- a/.circleci/run-tests.sh +++ b/.circleci/run-tests.sh @@ -52,7 +52,7 @@ fi # This is primarily aimed at catching hangs on the PyPy job which runs for # about 21 minutes and then gets killed by CircleCI in a way that fails the # job and bypasses our "allowed failure" logic. -TIMEOUT="timeout --kill-after 1m 15m" +TIMEOUT="timeout --kill-after 1m 25m" # Run the test suite as a non-root user. This is the expected usage some # small areas of the test suite assume non-root privileges (such as unreadable diff --git a/newsfragments/3783.minor b/newsfragments/3783.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 417dffed8..1a158a1aa 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -104,6 +104,7 @@ _client_config = configutil.ValidConfiguration( "reserved_space", "storage_dir", "plugins", + "force_foolscap", ), "sftpd": ( "accounts.file", @@ -823,9 +824,10 @@ class _Client(node.Node, pollmixin.PollMixin): furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding()) furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file) (_, _, swissnum) = decode_furl(furl) - self.storage_nurls = self.tub.negotiationClass.add_storage_server( - ss, swissnum.encode("ascii") - ) + if hasattr(self.tub.negotiationClass, "add_storage_server"): + nurls = self.tub.negotiationClass.add_storage_server(ss, swissnum.encode("ascii")) + self.storage_nurls = nurls + announcement[storage_client.ANONYMOUS_STORAGE_NURLS] = [n.to_text() for n in nurls] announcement["anonymous-storage-FURL"] = furl enabled_storage_servers = self._enable_storage_servers( diff --git a/src/allmydata/node.py b/src/allmydata/node.py index d6cbc9e36..8266fe3fb 100644 --- a/src/allmydata/node.py +++ b/src/allmydata/node.py @@ -697,7 +697,7 @@ def create_connection_handlers(config, i2p_provider, tor_provider): def create_tub(tub_options, default_connection_handlers, foolscap_connection_handlers, - handler_overrides={}, **kwargs): + handler_overrides={}, force_foolscap=False, **kwargs): """ Create a Tub with the right options and handlers. It will be ephemeral unless the caller provides certFile= in kwargs @@ -707,10 +707,16 @@ def create_tub(tub_options, default_connection_handlers, foolscap_connection_han :param dict tub_options: every key-value pair in here will be set in the new Tub via `Tub.setOption` + + :param bool force_foolscap: If True, only allow Foolscap, not just HTTPS + storage protocol. """ - # We listen simulataneously for both Foolscap and HTTPS on the same port, + # We listen simultaneously for both Foolscap and HTTPS on the same port, # so we have to create a special Foolscap Tub for that to work: - tub = create_tub_with_https_support(**kwargs) + if force_foolscap: + tub = Tub(**kwargs) + else: + tub = create_tub_with_https_support(**kwargs) for (name, value) in list(tub_options.items()): tub.setOption(name, value) @@ -901,14 +907,20 @@ def create_main_tub(config, tub_options, # FIXME? "node.pem" was the CERTFILE option/thing certfile = config.get_private_path("node.pem") - tub = create_tub( tub_options, default_connection_handlers, foolscap_connection_handlers, + # TODO eventually we will want the default to be False, but for now we + # don't want to enable HTTP by default. + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3934 + force_foolscap=config.get_config( + "storage", "force_foolscap", default=True, boolean=True + ), handler_overrides=handler_overrides, certFile=certfile, ) + if portlocation is None: log.msg("Tub is not listening") else: diff --git a/src/allmydata/scripts/tahoe_run.py b/src/allmydata/scripts/tahoe_run.py index e22e8c307..65d520f57 100644 --- a/src/allmydata/scripts/tahoe_run.py +++ b/src/allmydata/scripts/tahoe_run.py @@ -186,7 +186,7 @@ class DaemonizeTheRealService(Service, HookMixin): ) ) else: - self.stderr.write("\nUnknown error\n") + self.stderr.write("\nUnknown error, here's the traceback:\n") reason.printTraceback(self.stderr) reactor.stop() diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index 420d3610f..fed66bb75 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -20,7 +20,7 @@ from twisted.web.http_headers import Headers from twisted.web import http from twisted.web.iweb import IPolicyForHTTPS from twisted.internet.defer import inlineCallbacks, returnValue, fail, Deferred, succeed -from twisted.internet.interfaces import IOpenSSLClientConnectionCreator +from twisted.internet.interfaces import IOpenSSLClientConnectionCreator, IReactorTime from twisted.internet.ssl import CertificateOptions from twisted.web.client import Agent, HTTPConnectionPool from zope.interface import implementer @@ -276,42 +276,67 @@ class _StorageClientHTTPSPolicy: ) -@define +@define(hash=True) class StorageClient(object): """ Low-level HTTP client that talks to the HTTP storage server. """ + # If set, we're doing unit testing and we should call this with + # HTTPConnectionPool we create. + TEST_MODE_REGISTER_HTTP_POOL = None + + @classmethod + def start_test_mode(cls, callback): + """Switch to testing mode. + + In testing mode we register the pool with test system using the given + callback so it can Do Things, most notably killing off idle HTTP + connections at test shutdown and, in some tests, in the midddle of the + test. + """ + cls.TEST_MODE_REGISTER_HTTP_POOL = callback + + @classmethod + def stop_test_mode(cls): + """Stop testing mode.""" + cls.TEST_MODE_REGISTER_HTTP_POOL = None + # The URL is a HTTPS URL ("https://..."). To construct from a NURL, use # ``StorageClient.from_nurl()``. _base_url: DecodedURL _swissnum: bytes _treq: Union[treq, StubTreq, HTTPClient] + _clock: IReactorTime @classmethod def from_nurl( - cls, nurl: DecodedURL, reactor, persistent: bool = True + cls, + nurl: DecodedURL, + reactor, ) -> StorageClient: """ Create a ``StorageClient`` for the given NURL. - - ``persistent`` indicates whether to use persistent HTTP connections. """ assert nurl.fragment == "v=1" assert nurl.scheme == "pb" swissnum = nurl.path[0].encode("ascii") certificate_hash = nurl.user.encode("ascii") + pool = HTTPConnectionPool(reactor) + + if cls.TEST_MODE_REGISTER_HTTP_POOL is not None: + cls.TEST_MODE_REGISTER_HTTP_POOL(pool) treq_client = HTTPClient( Agent( reactor, _StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash), - pool=HTTPConnectionPool(reactor, persistent=persistent), + pool=pool, ) ) https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port) - return cls(https_url, swissnum, treq_client) + return cls(https_url, swissnum, treq_client, reactor) def relative_url(self, path): """Get a URL relative to the base URL.""" @@ -379,13 +404,13 @@ class StorageClient(object): return self._treq.request(method, url, headers=headers, **kwargs) +@define(hash=True) class StorageClientGeneral(object): """ High-level HTTP APIs that aren't immutable- or mutable-specific. """ - def __init__(self, client): # type: (StorageClient) -> None - self._client = client + _client: StorageClient @inlineCallbacks def get_version(self): @@ -534,7 +559,7 @@ async def advise_corrupt_share( ) -@define +@define(hash=True) class StorageClientImmutables(object): """ APIs for interacting with immutables. diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index c63bfccff..8e9ad3656 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -30,6 +30,8 @@ Ported to Python 3. # # 6: implement other sorts of IStorageClient classes: S3, etc +from __future__ import annotations + from six import ensure_text from typing import Union import re, time, hashlib @@ -37,13 +39,16 @@ from os import urandom from configparser import NoSectionError import attr +from hyperlink import DecodedURL from zope.interface import ( Attribute, Interface, implementer, ) +from twisted.python.failure import Failure from twisted.web import http -from twisted.internet import defer +from twisted.internet.task import LoopingCall +from twisted.internet import defer, reactor from twisted.application import service from twisted.plugin import ( getPlugins, @@ -75,6 +80,8 @@ from allmydata.storage.http_client import ( ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException ) +ANONYMOUS_STORAGE_NURLS = "anonymous-storage-NURLs" + # who is responsible for de-duplication? # both? @@ -99,8 +106,8 @@ class StorageClientConfig(object): :ivar preferred_peers: An iterable of the server-ids (``bytes``) of the storage servers where share placement is preferred, in order of - decreasing preference. See the *[client]peers.preferred* - documentation for details. + decreasing preference. See the *[client]peers.preferred* documentation + for details. :ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the @@ -262,6 +269,10 @@ class StorageFarmBroker(service.MultiService): by the given announcement. """ assert isinstance(server_id, bytes) + if len(server["ann"].get(ANONYMOUS_STORAGE_NURLS, [])) > 0: + s = HTTPNativeStorageServer(server_id, server["ann"]) + s.on_status_changed(lambda _: self._got_connection()) + return s handler_overrides = server.get("connections", {}) s = NativeStorageServer( server_id, @@ -523,6 +534,45 @@ class IFoolscapStorageServer(Interface): """ +def _parse_announcement(server_id: bytes, furl: bytes, ann: dict) -> tuple[str, bytes, bytes, bytes, bytes]: + """ + Parse the furl and announcement, return: + + (nickname, permutation_seed, tubid, short_description, long_description) + """ + m = re.match(br'pb://(\w+)@', furl) + assert m, furl + tubid_s = m.group(1).lower() + tubid = base32.a2b(tubid_s) + if "permutation-seed-base32" in ann: + seed = ann["permutation-seed-base32"] + if isinstance(seed, str): + seed = seed.encode("utf-8") + ps = base32.a2b(seed) + elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id): + ps = base32.a2b(server_id[3:]) + else: + log.msg("unable to parse serverid '%(server_id)s as pubkey, " + "hashing it to get permutation-seed, " + "may not converge with other clients", + server_id=server_id, + facility="tahoe.storage_broker", + level=log.UNUSUAL, umid="qu86tw") + ps = hashlib.sha256(server_id).digest() + permutation_seed = ps + + assert server_id + long_description = server_id + if server_id.startswith(b"v0-"): + # remove v0- prefix from abbreviated name + short_description = server_id[3:3+8] + else: + short_description = server_id[:8] + nickname = ann.get("nickname", "") + + return (nickname, permutation_seed, tubid, short_description, long_description) + + @implementer(IFoolscapStorageServer) @attr.s(frozen=True) class _FoolscapStorage(object): @@ -566,43 +616,13 @@ class _FoolscapStorage(object): The furl will be a Unicode string on Python 3; on Python 2 it will be either a native (bytes) string or a Unicode string. """ - furl = furl.encode("utf-8") - m = re.match(br'pb://(\w+)@', furl) - assert m, furl - tubid_s = m.group(1).lower() - tubid = base32.a2b(tubid_s) - if "permutation-seed-base32" in ann: - seed = ann["permutation-seed-base32"] - if isinstance(seed, str): - seed = seed.encode("utf-8") - ps = base32.a2b(seed) - elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id): - ps = base32.a2b(server_id[3:]) - else: - log.msg("unable to parse serverid '%(server_id)s as pubkey, " - "hashing it to get permutation-seed, " - "may not converge with other clients", - server_id=server_id, - facility="tahoe.storage_broker", - level=log.UNUSUAL, umid="qu86tw") - ps = hashlib.sha256(server_id).digest() - permutation_seed = ps - - assert server_id - long_description = server_id - if server_id.startswith(b"v0-"): - # remove v0- prefix from abbreviated name - short_description = server_id[3:3+8] - else: - short_description = server_id[:8] - nickname = ann.get("nickname", "") - + (nickname, permutation_seed, tubid, short_description, long_description) = _parse_announcement(server_id, furl.encode("utf-8"), ann) return cls( nickname=nickname, permutation_seed=permutation_seed, tubid=tubid, storage_server=storage_server, - furl=furl, + furl=furl.encode("utf-8"), short_description=short_description, long_description=long_description, ) @@ -684,6 +704,16 @@ def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref): raise AnnouncementNotMatched() +def _available_space_from_version(version): + if version is None: + return None + protocol_v1_version = version.get(b'http://allmydata.org/tahoe/protocols/storage/v1', BytesKeyDict()) + available_space = protocol_v1_version.get(b'available-space') + if available_space is None: + available_space = protocol_v1_version.get(b'maximum-immutable-share-size', None) + return available_space + + @implementer(IServer) class NativeStorageServer(service.MultiService): """I hold information about a storage server that we want to connect to. @@ -842,13 +872,7 @@ class NativeStorageServer(service.MultiService): def get_available_space(self): version = self.get_version() - if version is None: - return None - protocol_v1_version = version.get(b'http://allmydata.org/tahoe/protocols/storage/v1', BytesKeyDict()) - available_space = protocol_v1_version.get(b'available-space') - if available_space is None: - available_space = protocol_v1_version.get(b'maximum-immutable-share-size', None) - return available_space + return _available_space_from_version(version) def start_connecting(self, trigger_cb): self._tub = self._tub_maker(self._handler_overrides) @@ -910,6 +934,164 @@ class NativeStorageServer(service.MultiService): # used when the broker wants us to hurry up self._reconnector.reset() + +@implementer(IServer) +class HTTPNativeStorageServer(service.MultiService): + """ + Like ``NativeStorageServer``, but for HTTP clients. + + The notion of being "connected" is less meaningful for HTTP; we just poll + occasionally, and if we've succeeded at last poll, we assume we're + "connected". + """ + + def __init__(self, server_id: bytes, announcement, reactor=reactor): + service.MultiService.__init__(self) + assert isinstance(server_id, bytes) + self._server_id = server_id + self.announcement = announcement + self._on_status_changed = ObserverList() + self._reactor = reactor + furl = announcement["anonymous-storage-FURL"].encode("utf-8") + ( + self._nickname, + self._permutation_seed, + self._tubid, + self._short_description, + self._long_description + ) = _parse_announcement(server_id, furl, announcement) + # TODO need some way to do equivalent of Happy Eyeballs for multiple NURLs? + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3935 + nurl = DecodedURL.from_text(announcement[ANONYMOUS_STORAGE_NURLS][0]) + self._istorage_server = _HTTPStorageServer.from_http_client( + StorageClient.from_nurl(nurl, reactor) + ) + + self._connection_status = connection_status.ConnectionStatus.unstarted() + self._version = None + self._last_connect_time = None + self._connecting_deferred = None + + def get_permutation_seed(self): + return self._permutation_seed + + def get_name(self): + return self._short_description + + def get_longname(self): + return self._long_description + + def get_tubid(self): + return self._tubid + + def get_lease_seed(self): + # Apparently this is what Foolscap version above does?! + return self._tubid + + def get_foolscap_write_enabler_seed(self): + return self._tubid + + def get_nickname(self): + return self._nickname + + def on_status_changed(self, status_changed): + """ + :param status_changed: a callable taking a single arg (the + NativeStorageServer) that is notified when we become connected + """ + return self._on_status_changed.subscribe(status_changed) + + # Special methods used by copy.copy() and copy.deepcopy(). When those are + # used in allmydata.immutable.filenode to copy CheckResults during + # repair, we want it to treat the IServer instances as singletons, and + # not attempt to duplicate them.. + def __copy__(self): + return self + + def __deepcopy__(self, memodict): + return self + + def __repr__(self): + return "" % self.get_name() + + def get_serverid(self): + return self._server_id + + def get_version(self): + return self._version + + def get_announcement(self): + return self.announcement + + def get_connection_status(self): + return self._connection_status + + def is_connected(self): + return self._connection_status.connected + + def get_available_space(self): + version = self.get_version() + return _available_space_from_version(version) + + def start_connecting(self, trigger_cb): + self._lc = LoopingCall(self._connect) + self._lc.start(1, True) + + def _got_version(self, version): + self._last_connect_time = time.time() + self._version = version + self._connection_status = connection_status.ConnectionStatus( + True, "connected", [], self._last_connect_time, self._last_connect_time + ) + self._on_status_changed.notify(self) + + def _failed_to_connect(self, reason): + self._connection_status = connection_status.ConnectionStatus( + False, f"failure: {reason}", [], self._last_connect_time, self._last_connect_time + ) + self._on_status_changed.notify(self) + + def get_storage_server(self): + """ + See ``IServer.get_storage_server``. + """ + if self._connection_status.summary == "unstarted": + return None + return self._istorage_server + + def stop_connecting(self): + self._lc.stop() + if self._connecting_deferred is not None: + self._connecting_deferred.cancel() + + def try_to_connect(self): + self._connect() + + def _connect(self): + result = self._istorage_server.get_version() + + def remove_connecting_deferred(result): + self._connecting_deferred = None + return result + + # Set a short timeout since we're relying on this for server liveness. + self._connecting_deferred = result.addTimeout(5, self._reactor).addBoth( + remove_connecting_deferred).addCallbacks( + self._got_version, + self._failed_to_connect + ) + + def stopService(self): + if self._connecting_deferred is not None: + self._connecting_deferred.cancel() + + result = service.MultiService.stopService(self) + if self._lc.running: + self._lc.stop() + self._failed_to_connect("shut down") + return result + + class UnknownServerTypeError(Exception): pass @@ -1026,7 +1208,7 @@ class _StorageServer(object): -@attr.s +@attr.s(hash=True) class _FakeRemoteReference(object): """ Emulate a Foolscap RemoteReference, calling a local object instead. @@ -1051,7 +1233,7 @@ class _HTTPBucketWriter(object): storage_index = attr.ib(type=bytes) share_number = attr.ib(type=int) upload_secret = attr.ib(type=bytes) - finished = attr.ib(type=bool, default=False) + finished = attr.ib(type=defer.Deferred[bool], factory=defer.Deferred) def abort(self): return self.client.abort_upload(self.storage_index, self.share_number, @@ -1063,18 +1245,27 @@ class _HTTPBucketWriter(object): self.storage_index, self.share_number, self.upload_secret, offset, data ) if result.finished: - self.finished = True + self.finished.callback(True) defer.returnValue(None) def close(self): - # A no-op in HTTP protocol. - if not self.finished: - return defer.fail(RuntimeError("You didn't finish writing?!")) - return defer.succeed(None) + # We're not _really_ closed until all writes have succeeded and we + # finished writing all the data. + return self.finished +def _ignore_404(failure: Failure) -> Union[Failure, None]: + """ + Useful for advise_corrupt_share(), since it swallows unknown share numbers + in Foolscap. + """ + if failure.check(HTTPClientException) and failure.value.code == http.NOT_FOUND: + return None + else: + return failure -@attr.s + +@attr.s(hash=True) class _HTTPBucketReader(object): """ Emulate a ``RIBucketReader``, but use HTTP protocol underneath. @@ -1092,7 +1283,7 @@ class _HTTPBucketReader(object): return self.client.advise_corrupt_share( self.storage_index, self.share_number, str(reason, "utf-8", errors="backslashreplace") - ) + ).addErrback(_ignore_404) # WORK IN PROGRESS, for now it doesn't actually implement whole thing. @@ -1192,7 +1383,7 @@ class _HTTPStorageServer(object): raise ValueError("Unknown share type") return client.advise_corrupt_share( storage_index, shnum, str(reason, "utf-8", errors="backslashreplace") - ) + ).addErrback(_ignore_404) @defer.inlineCallbacks def slot_readv(self, storage_index, shares, readv): diff --git a/src/allmydata/test/common_system.py b/src/allmydata/test/common_system.py index 9851d2b91..8d3019935 100644 --- a/src/allmydata/test/common_system.py +++ b/src/allmydata/test/common_system.py @@ -5,22 +5,14 @@ in ``allmydata.test.test_system``. Ported to Python 3. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -from future.utils import PY2 -if PY2: - # Don't import bytes since it causes issues on (so far unported) modules on Python 2. - from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, dict, list, object, range, max, min, str # noqa: F401 - +from typing import Optional import os from functools import partial from twisted.internet import reactor from twisted.internet import defer from twisted.internet.defer import inlineCallbacks +from twisted.internet.task import deferLater from twisted.application import service from foolscap.api import flushEventualQueue @@ -28,6 +20,11 @@ from foolscap.api import flushEventualQueue from allmydata import client from allmydata.introducer.server import create_introducer from allmydata.util import fileutil, log, pollmixin +from allmydata.storage import http_client +from allmydata.storage_client import ( + NativeStorageServer, + HTTPNativeStorageServer, +) from twisted.python.filepath import ( FilePath, @@ -644,7 +641,15 @@ def _render_section_values(values): class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): + # If set to True, use Foolscap for storage protocol. If set to False, HTTP + # will be used when possible. If set to None, this suggests a bug in the + # test code. + FORCE_FOOLSCAP_FOR_STORAGE : Optional[bool] = None + def setUp(self): + self._http_client_pools = [] + http_client.StorageClient.start_test_mode(self._got_new_http_connection_pool) + self.addCleanup(http_client.StorageClient.stop_test_mode) self.port_assigner = SameProcessStreamEndpointAssigner() self.port_assigner.setUp() self.addCleanup(self.port_assigner.tearDown) @@ -652,10 +657,35 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): self.sparent = service.MultiService() self.sparent.startService() + def _got_new_http_connection_pool(self, pool): + # Register the pool for shutdown later: + self._http_client_pools.append(pool) + # Disable retries: + pool.retryAutomatically = False + # Make a much more aggressive timeout for connections, we're connecting + # locally after all... and also make sure it's lower than the delay we + # add in tearDown, to prevent dirty reactor issues. + getConnection = pool.getConnection + + def getConnectionWithTimeout(*args, **kwargs): + d = getConnection(*args, **kwargs) + d.addTimeout(1, reactor) + return d + + pool.getConnection = getConnectionWithTimeout + + def close_idle_http_connections(self): + """Close all HTTP client connections that are just hanging around.""" + return defer.gatherResults( + [pool.closeCachedConnections() for pool in self._http_client_pools] + ) + def tearDown(self): log.msg("shutting down SystemTest services") d = self.sparent.stopService() d.addBoth(flush_but_dont_ignore) + d.addBoth(lambda x: self.close_idle_http_connections().addCallback(lambda _: x)) + d.addBoth(lambda x: deferLater(reactor, 2, lambda: x)) return d def getdir(self, subdir): @@ -714,21 +744,31 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): :return: A ``Deferred`` that fires when the nodes have connected to each other. """ + self.assertIn( + self.FORCE_FOOLSCAP_FOR_STORAGE, (True, False), + "You forgot to set FORCE_FOOLSCAP_FOR_STORAGE on {}".format(self.__class__) + ) self.numclients = NUMCLIENTS self.introducer = yield self._create_introducer() self.add_service(self.introducer) self.introweb_url = self._get_introducer_web() - yield self._set_up_client_nodes() + yield self._set_up_client_nodes(self.FORCE_FOOLSCAP_FOR_STORAGE) + native_server = next(iter(self.clients[0].storage_broker.get_known_servers())) + if self.FORCE_FOOLSCAP_FOR_STORAGE: + expected_storage_server_class = NativeStorageServer + else: + expected_storage_server_class = HTTPNativeStorageServer + self.assertIsInstance(native_server, expected_storage_server_class) @inlineCallbacks - def _set_up_client_nodes(self): + def _set_up_client_nodes(self, force_foolscap): q = self.introducer self.introducer_furl = q.introducer_url self.clients = [] basedirs = [] for i in range(self.numclients): - basedirs.append((yield self._set_up_client_node(i))) + basedirs.append((yield self._set_up_client_node(i, force_foolscap))) # start clients[0], wait for it's tub to be ready (at which point it # will have registered the helper furl). @@ -761,7 +801,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): # and the helper-using webport self.helper_webish_url = self.clients[3].getServiceNamed("webish").getURL() - def _generate_config(self, which, basedir): + def _generate_config(self, which, basedir, force_foolscap=False): config = {} allclients = set(range(self.numclients)) @@ -791,6 +831,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): sethelper = partial(setconf, config, which, "helper") setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,)) + setconf(config, which, "storage", "force_foolscap", str(force_foolscap)) tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor) setnode("tub.port", tub_port_endpoint) @@ -808,17 +849,16 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): " furl: %s\n") % self.introducer_furl iyaml_fn = os.path.join(basedir, "private", "introducers.yaml") fileutil.write(iyaml_fn, iyaml) - return _render_config(config) - def _set_up_client_node(self, which): + def _set_up_client_node(self, which, force_foolscap): basedir = self.getdir("client%d" % (which,)) fileutil.make_dirs(os.path.join(basedir, "private")) if len(SYSTEM_TEST_CERTS) > (which + 1): f = open(os.path.join(basedir, "private", "node.pem"), "w") f.write(SYSTEM_TEST_CERTS[which + 1]) f.close() - config = self._generate_config(which, basedir) + config = self._generate_config(which, basedir, force_foolscap) fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config) return basedir diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 3328ea598..9e7e7b6e1 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -15,9 +15,8 @@ from typing import Set from random import Random from unittest import SkipTest -from twisted.internet.defer import inlineCallbacks, returnValue, succeed +from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.task import Clock -from twisted.internet import reactor from foolscap.api import Referenceable, RemoteException # A better name for this would be IStorageClient... @@ -26,8 +25,6 @@ from allmydata.interfaces import IStorageServer from .common_system import SystemTestMixin from .common import AsyncTestCase from allmydata.storage.server import StorageServer # not a IStorageServer!! -from allmydata.storage.http_client import StorageClient -from allmydata.storage_client import _HTTPStorageServer # Use random generator with known seed, so results are reproducible if tests @@ -439,6 +436,17 @@ class IStorageServerImmutableAPIsTestsMixin(object): b"immutable", storage_index, 0, b"ono" ) + @inlineCallbacks + def test_advise_corrupt_share_unknown_share_number(self): + """ + Calling ``advise_corrupt_share()`` on an immutable share, with an + unknown share number, does not result in error. + """ + storage_index, _, _ = yield self.create_share() + yield self.storage_client.advise_corrupt_share( + b"immutable", storage_index, 999, b"ono" + ) + @inlineCallbacks def test_allocate_buckets_creates_lease(self): """ @@ -908,6 +916,19 @@ class IStorageServerMutableAPIsTestsMixin(object): b"mutable", storage_index, 0, b"ono" ) + @inlineCallbacks + def test_advise_corrupt_share_unknown_share_number(self): + """ + Calling ``advise_corrupt_share()`` on a mutable share with an unknown + share number does not result in error (other behavior is opaque at this + level of abstraction). + """ + secrets, storage_index = yield self.create_slot() + + yield self.storage_client.advise_corrupt_share( + b"mutable", storage_index, 999, b"ono" + ) + @inlineCallbacks def test_STARAW_create_lease(self): """ @@ -1023,7 +1044,10 @@ class _SharedMixin(SystemTestMixin): SKIP_TESTS = set() # type: Set[str] def _get_istorage_server(self): - raise NotImplementedError("implement in subclass") + native_server = next(iter(self.clients[0].storage_broker.get_known_servers())) + client = native_server.get_storage_server() + self.assertTrue(IStorageServer.providedBy(client)) + return client @inlineCallbacks def setUp(self): @@ -1046,7 +1070,7 @@ class _SharedMixin(SystemTestMixin): self._clock = Clock() self._clock.advance(123456) self.server._clock = self._clock - self.storage_client = yield self._get_istorage_server() + self.storage_client = self._get_istorage_server() def fake_time(self): """Return the current fake, test-controlled, time.""" @@ -1062,51 +1086,29 @@ class _SharedMixin(SystemTestMixin): yield SystemTestMixin.tearDown(self) -class _FoolscapMixin(_SharedMixin): - """Run tests on Foolscap version of ``IStorageServer``.""" - - def _get_native_server(self): - return next(iter(self.clients[0].storage_broker.get_known_servers())) - - def _get_istorage_server(self): - client = self._get_native_server().get_storage_server() - self.assertTrue(IStorageServer.providedBy(client)) - return succeed(client) - - -class _HTTPMixin(_SharedMixin): - """Run tests on the HTTP version of ``IStorageServer``.""" - - def _get_istorage_server(self): - nurl = list(self.clients[0].storage_nurls)[0] - - # Create HTTP client with non-persistent connections, so we don't leak - # state across tests: - client: IStorageServer = _HTTPStorageServer.from_http_client( - StorageClient.from_nurl(nurl, reactor, persistent=False) - ) - self.assertTrue(IStorageServer.providedBy(client)) - - return succeed(client) - - class FoolscapSharedAPIsTests( - _FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase + _SharedMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase ): """Foolscap-specific tests for shared ``IStorageServer`` APIs.""" + FORCE_FOOLSCAP_FOR_STORAGE = True + class HTTPSharedAPIsTests( - _HTTPMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase + _SharedMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase ): """HTTP-specific tests for shared ``IStorageServer`` APIs.""" + FORCE_FOOLSCAP_FOR_STORAGE = False + class FoolscapImmutableAPIsTests( - _FoolscapMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase + _SharedMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase ): """Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" + FORCE_FOOLSCAP_FOR_STORAGE = True + def test_disconnection(self): """ If we disconnect in the middle of writing to a bucket, all data is @@ -1129,23 +1131,29 @@ class FoolscapImmutableAPIsTests( """ current = self.storage_client yield self.bounce_client(0) - self.storage_client = self._get_native_server().get_storage_server() + self.storage_client = self._get_istorage_server() assert self.storage_client is not current class HTTPImmutableAPIsTests( - _HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase + _SharedMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase ): """HTTP-specific tests for immutable ``IStorageServer`` APIs.""" + FORCE_FOOLSCAP_FOR_STORAGE = False + class FoolscapMutableAPIsTests( - _FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase + _SharedMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase ): """Foolscap-specific tests for mutable ``IStorageServer`` APIs.""" + FORCE_FOOLSCAP_FOR_STORAGE = True + class HTTPMutableAPIsTests( - _HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase + _SharedMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase ): """HTTP-specific tests for mutable ``IStorageServer`` APIs.""" + + FORCE_FOOLSCAP_FOR_STORAGE = False diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 4a912cf6c..87a6a2306 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -291,6 +291,10 @@ class CustomHTTPServerTests(SyncTestCase): def setUp(self): super(CustomHTTPServerTests, self).setUp() + StorageClient.start_test_mode( + lambda pool: self.addCleanup(pool.closeCachedConnections) + ) + self.addCleanup(StorageClient.stop_test_mode) # Could be a fixture, but will only be used in this test class so not # going to bother: self._http_server = TestApp() @@ -298,6 +302,7 @@ class CustomHTTPServerTests(SyncTestCase): DecodedURL.from_text("http://127.0.0.1"), SWISSNUM_FOR_TEST, treq=StubTreq(self._http_server._app.resource()), + clock=Clock(), ) def test_authorization_enforcement(self): @@ -375,6 +380,10 @@ class HttpTestFixture(Fixture): """ def _setUp(self): + StorageClient.start_test_mode( + lambda pool: self.addCleanup(pool.closeCachedConnections) + ) + self.addCleanup(StorageClient.stop_test_mode) self.clock = Clock() self.tempdir = self.useFixture(TempDir()) # The global Cooperator used by Twisted (a) used by pull producers in @@ -396,6 +405,7 @@ class HttpTestFixture(Fixture): DecodedURL.from_text("http://127.0.0.1"), SWISSNUM_FOR_TEST, treq=self.treq, + clock=self.clock, ) def result_of_with_flush(self, d): @@ -480,6 +490,7 @@ class GenericHTTPAPITests(SyncTestCase): DecodedURL.from_text("http://127.0.0.1"), b"something wrong", treq=StubTreq(self.http.http_server.get_resource()), + clock=self.http.clock, ) ) with assert_fails_with_http_code(self, http.UNAUTHORIZED): @@ -1441,7 +1452,9 @@ class SharedImmutableMutableTestsMixin: self.http.client.request( "GET", self.http.client.relative_url( - "/storage/v1/{}/{}/1".format(self.KIND, _encode_si(storage_index)) + "/storage/v1/{}/{}/1".format( + self.KIND, _encode_si(storage_index) + ) ), headers=headers, ) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index d859a0e00..670ac5868 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -117,11 +117,17 @@ class CountingDataUploadable(upload.Data): class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): - + """Foolscap integration-y tests.""" + FORCE_FOOLSCAP_FOR_STORAGE = True timeout = 180 + @property + def basedir(self): + return "system/SystemTest/{}-foolscap-{}".format( + self.id().split(".")[-1], self.FORCE_FOOLSCAP_FOR_STORAGE + ) + def test_connections(self): - self.basedir = "system/SystemTest/test_connections" d = self.set_up_nodes() self.extra_node = None d.addCallback(lambda res: self.add_extra_node(self.numclients)) @@ -149,11 +155,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): del test_connections def test_upload_and_download_random_key(self): - self.basedir = "system/SystemTest/test_upload_and_download_random_key" return self._test_upload_and_download(convergence=None) def test_upload_and_download_convergent(self): - self.basedir = "system/SystemTest/test_upload_and_download_convergent" return self._test_upload_and_download(convergence=b"some convergence string") def _test_upload_and_download(self, convergence): @@ -516,7 +520,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): def test_mutable(self): - self.basedir = "system/SystemTest/test_mutable" DATA = b"initial contents go here." # 25 bytes % 3 != 0 DATA_uploadable = MutableData(DATA) NEWDATA = b"new contents yay" @@ -746,7 +749,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): # plaintext_hash check. def test_filesystem(self): - self.basedir = "system/SystemTest/test_filesystem" self.data = LARGE_DATA d = self.set_up_nodes() def _new_happy_semantics(ign): @@ -1713,7 +1715,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): def test_filesystem_with_cli_in_subprocess(self): # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe. - self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess" d = self.set_up_nodes() def _new_happy_semantics(ign): for c in self.clients: @@ -1794,9 +1795,21 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): class Connections(SystemTestMixin, unittest.TestCase): + FORCE_FOOLSCAP_FOR_STORAGE = True def test_rref(self): - self.basedir = "system/Connections/rref" + # The way the listening port is created is via + # SameProcessStreamEndpointAssigner (allmydata.test.common), which then + # makes an endpoint string parsed by AdoptedServerPort. The latter does + # dup(fd), which results in the filedescriptor staying alive _until the + # test ends_. That means that when we disown the service, we still have + # the listening port there on the OS level! Just the resulting + # connections aren't handled. So this test relies on aggressive + # timeouts in the HTTP client and presumably some equivalent in + # Foolscap, since connection refused does _not_ happen. + self.basedir = "system/Connections/rref-foolscap-{}".format( + self.FORCE_FOOLSCAP_FOR_STORAGE + ) d = self.set_up_nodes(2) def _start(ign): self.c0 = self.clients[0] @@ -1812,9 +1825,13 @@ class Connections(SystemTestMixin, unittest.TestCase): # now shut down the server d.addCallback(lambda ign: self.clients[1].disownServiceParent()) + + # kill any persistent http connections that might continue to work + d.addCallback(lambda ign: self.close_idle_http_connections()) + # and wait for the client to notice def _poll(): - return len(self.c0.storage_broker.get_connected_servers()) < 2 + return len(self.c0.storage_broker.get_connected_servers()) == 1 d.addCallback(lambda ign: self.poll(_poll)) def _down(ign): @@ -1824,3 +1841,16 @@ class Connections(SystemTestMixin, unittest.TestCase): self.assertEqual(storage_server, self.s1_storage_server) d.addCallback(_down) return d + + +class HTTPSystemTest(SystemTest): + """HTTP storage protocol variant of the system tests.""" + + FORCE_FOOLSCAP_FOR_STORAGE = False + + + +class HTTPConnections(Connections): + """HTTP storage protocol variant of the connections tests.""" + FORCE_FOOLSCAP_FOR_STORAGE = False +