diff --git a/.circleci/config.yml b/.circleci/config.yml index 0c831af04..d327ecbc7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -24,6 +24,11 @@ version: 2.1 dockerhub-context-template: &DOCKERHUB_CONTEXT context: "dockerhub-auth" +# Required environment for using the coveralls tool to upload partial coverage +# reports and then finish the process. +coveralls-environment: &COVERALLS_ENVIRONMENT + COVERALLS_REPO_TOKEN: "JPf16rLB7T2yjgATIxFzTsEgMdN1UNq6o" + # Next is a Docker executor template that gets the credentials from the # environment and supplies them to the executor. dockerhub-auth-template: &DOCKERHUB_AUTH @@ -112,6 +117,21 @@ workflows: - "another-locale": {} + - "windows-server-2022": + name: "Windows Server 2022, CPython <>" + matrix: + parameters: + # Run the job for a number of CPython versions. These are the + # two versions installed on the version of the Windows VM image + # we specify (in the executor). This is handy since it means we + # don't have to do any Python installation work. We pin the + # Windows VM image so these shouldn't shuffle around beneath us + # but if we want to update that image or get different versions + # of Python, we probably have to do something here. + pythonVersion: + - "3.9" + - "3.11" + - "integration": # Run even the slow integration tests here. We need the `--` to # sneak past tox and get to pytest. @@ -126,6 +146,15 @@ workflows: - "docs": {} + - "finish-coverage-report": + requires: + # Referencing the job by "alias" (as CircleCI calls the mapping + # key) instead of the value of its "name" property causes us to + # require every instance of the job from its matrix expansion. So + # this requirement is enough to require every Windows Server 2022 + # job. + - "windows-server-2022" + images: <<: *IMAGES @@ -133,6 +162,20 @@ workflows: when: "<< pipeline.parameters.build-images >>" jobs: + finish-coverage-report: + docker: + - <<: *DOCKERHUB_AUTH + image: "python:3-slim" + + steps: + - run: + name: "Indicate completion to coveralls.io" + environment: + <<: *COVERALLS_ENVIRONMENT + command: | + pip install coveralls==3.3.1 + python -m coveralls --finish + codechecks: docker: - <<: *DOCKERHUB_AUTH @@ -151,6 +194,161 @@ jobs: command: | ~/.local/bin/tox -e codechecks + windows-server-2022: + parameters: + pythonVersion: + description: >- + An argument to pass to the `py` launcher to choose a Python version. + type: "string" + default: "" + + executor: "windows" + environment: + # Tweak Hypothesis to make its behavior more suitable for the CI + # environment. This should improve reproducibility and lessen the + # effects of variable compute resources. + TAHOE_LAFS_HYPOTHESIS_PROFILE: "ci" + + # Tell pip where its download cache lives. This must agree with the + # "save_cache" step below or caching won't really work right. + PIP_CACHE_DIR: "pip-cache" + + # And tell pip where it can find out cached wheelhouse for fast wheel + # installation, even for projects that don't distribute wheels. This + # must also agree with the "save_cache" step below. + PIP_FIND_LINKS: "wheelhouse" + + steps: + - "checkout" + + # If possible, restore a pip download cache to save us from having to + # download all our Python dependencies from PyPI. + - "restore_cache": + keys: + # The download cache and/or the wheelhouse may contain Python + # version-specific binary packages so include the Python version + # in this key, as well as the canonical source of our + # dependencies. + - &CACHE_KEY "pip-packages-v1-<< parameters.pythonVersion >>-{{ checksum \"setup.py\" }}" + + - "run": + name: "Fix $env:PATH" + command: | + # The Python this job is parameterized is not necessarily the one + # at the front of $env:PATH. Modify $env:PATH so that it is so we + # can just say "python" in the rest of the steps. Also get the + # related Scripts directory so tools from packages we install are + # also available. + $p = py -<> -c "import sys; print(sys.prefix)" + $q = py -<> -c "import sysconfig; print(sysconfig.get_path('scripts'))" + + New-Item $Profile.CurrentUserAllHosts -Force + # $p gets "python" on PATH and $q gets tools from packages we + # install. Note we carefully construct the string so that + # $env:PATH is not substituted now but $p and $q are. ` is the + # PowerShell string escape character. + Add-Content -Path $Profile.CurrentUserAllHosts -Value "`$env:PATH = `"$p;$q;`$env:PATH`"" + + - "run": + name: "Display tool versions" + command: | + python misc/build_helpers/show-tool-versions.py + + - "run": + # It's faster to install a wheel than a source package. If we don't + # have a cached wheelhouse then build all of the wheels and dump + # them into a directory where they can become a cached wheelhouse. + # We would have built these wheels during installation anyway so it + # doesn't cost us anything extra and saves us effort next time. + name: "(Maybe) Build Wheels" + command: | + if ((Test-Path .\wheelhouse) -and (Test-Path .\wheelhouse\*)) { + echo "Found populated wheelhouse, skipping wheel building." + } else { + python -m pip install wheel + python -m pip wheel --wheel-dir $env:PIP_FIND_LINKS .[testenv] .[test] + } + + - "save_cache": + paths: + # Make sure this agrees with PIP_CACHE_DIR in the environment. + - "pip-cache" + - "wheelhouse" + key: *CACHE_KEY + + - "run": + name: "Install Dependencies" + environment: + # By this point we should no longer need an index. + PIP_NO_INDEX: "1" + command: | + python -m pip install .[testenv] .[test] + + - "run": + name: "Run Unit Tests" + environment: + # Configure the results location for the subunitv2-file reporter + # from subunitreporter + SUBUNITREPORTER_OUTPUT_PATH: "test-results.subunit2" + + # Try to get prompt output from the reporter to avoid no-output + # timeouts. + PYTHONUNBUFFERED: "1" + + command: | + # Run the test suite under coverage measurement using the + # parameterized version of Python, writing subunitv2-format + # results to the file given in the environment. + python -b -m coverage run -m twisted.trial --reporter=subunitv2-file --rterrors allmydata + + - "run": + name: "Upload Coverage" + environment: + <<: *COVERALLS_ENVIRONMENT + # Mark the data as just one piece of many because we have more + # than one instance of this job (two on Windows now, some on other + # platforms later) which collects and reports coverage. This is + # necessary to cause Coveralls to merge multiple coverage results + # into a single report. Note the merge only happens when we + # "finish" a particular build, as identified by its "build_num" + # (aka "service_number"). + COVERALLS_PARALLEL: "true" + command: | + python -m pip install coveralls==3.3.1 + + # .coveragerc sets parallel = True so we don't have a `.coverage` + # file but a `.coverage.` file (or maybe more than + # one, but probably not). coveralls can't work with these so + # merge them before invoking it. + python -m coverage combine + + # Now coveralls will be able to find the data, so have it do the + # upload. Also, have it strip the system config-specific prefix + # from all of the source paths. + $prefix = python -c "import sysconfig; print(sysconfig.get_path('purelib'))" + python -m coveralls --basedir $prefix + + - "run": + name: "Convert Result Log" + command: | + # subunit2junitxml exits with error if the result stream it is + # converting has test failures in it! So this step might fail. + # Since the step in which we actually _ran_ the tests won't fail + # even if there are test failures, this is a good thing for now. + subunit2junitxml.exe --output-to=test-results.xml test-results.subunit2 + + - "store_test_results": + path: "test-results.xml" + + - "store_artifacts": + path: "_trial_temp/test.log" + + - "store_artifacts": + path: "eliot.log" + + - "store_artifacts": + path: ".coverage" + pyinstaller: docker: - <<: *DOCKERHUB_AUTH @@ -527,6 +725,15 @@ jobs: # PYTHON_VERSION: "2" executors: + windows: + # Choose a Windows environment that closest matches our testing + # requirements and goals. + # https://circleci.com/developer/orbs/orb/circleci/windows#executors-server-2022 + machine: + image: "windows-server-2022-gui:2023.06.1" + shell: "powershell.exe -ExecutionPolicy Bypass" + resource_class: "windows.large" + nix: docker: # Run in a highly Nix-capable environment. diff --git a/.coveragerc b/.coveragerc index d09554cad..5b41f9ce3 100644 --- a/.coveragerc +++ b/.coveragerc @@ -19,7 +19,7 @@ skip_covered = True source = # It looks like this in the checkout src/ -# It looks like this in the Windows build environment +# It looks like this in the GitHub Actions Windows build environment D:/a/tahoe-lafs/tahoe-lafs/.tox/py*-coverage/Lib/site-packages/ # Although sometimes it looks like this instead. Also it looks like this on macOS. .tox/py*-coverage/lib/python*/site-packages/ diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d3862ffad..0f38b0291 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,13 +44,6 @@ jobs: strategy: fail-fast: false matrix: - os: - - windows-latest - python-version: - - "3.8" - - "3.9" - - "3.10" - - "3.11" include: # On macOS don't bother with 3.8, just to get faster builds. - os: macos-12 diff --git a/integration/conftest.py b/integration/conftest.py index c94c05429..313ff36c2 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -7,16 +7,11 @@ from __future__ import annotations import os import sys import shutil +from attr import frozen from time import sleep -from os import mkdir, listdir, environ +from os import mkdir, environ from os.path import join, exists -from tempfile import mkdtemp, mktemp -from functools import partial -from json import loads - -from foolscap.furl import ( - decode_furl, -) +from tempfile import mkdtemp from eliot import ( to_file, @@ -25,7 +20,7 @@ from eliot import ( from twisted.python.filepath import FilePath from twisted.python.procutils import which -from twisted.internet.defer import DeferredList +from twisted.internet.defer import DeferredList, succeed from twisted.internet.error import ( ProcessExitedAlready, ProcessTerminated, @@ -33,22 +28,23 @@ from twisted.internet.error import ( import pytest import pytest_twisted +from typing import Mapping from .util import ( - _CollectOutputProtocol, _MagicTextProtocol, _DumpOutputProtocol, _ProcessExitedProtocol, _create_node, - _cleanup_tahoe_process, _tahoe_runner_optional_coverage, await_client_ready, - TahoeProcess, - cli, - generate_ssh_key, block_with_timeout, ) +from .grid import ( + create_flog_gatherer, + create_grid, +) from allmydata.node import read_config +from allmydata.util.iputil import allocate_tcp_port # No reason for HTTP requests to take longer than four minutes in the # integration tests. See allmydata/scripts/common_http.py for usage. @@ -116,6 +112,18 @@ def reactor(): return _reactor +@pytest.fixture(scope='session') +@log_call(action_type=u"integration:port_allocator", include_result=False) +def port_allocator(reactor): + # these will appear basically random, which can make especially + # manual debugging harder but we're re-using code instead of + # writing our own...so, win? + def allocate(): + port = allocate_tcp_port() + return succeed(port) + return allocate + + @pytest.fixture(scope='session') @log_call(action_type=u"integration:temp_dir", include_args=[]) def temp_dir(request) -> str: @@ -150,133 +158,36 @@ def flog_binary(): @pytest.fixture(scope='session') @log_call(action_type=u"integration:flog_gatherer", include_args=[]) def flog_gatherer(reactor, temp_dir, flog_binary, request): - out_protocol = _CollectOutputProtocol() - gather_dir = join(temp_dir, 'flog_gather') - reactor.spawnProcess( - out_protocol, - flog_binary, - ( - 'flogtool', 'create-gatherer', - '--location', 'tcp:localhost:3117', - '--port', '3117', - gather_dir, - ), - env=environ, + fg = pytest_twisted.blockon( + create_flog_gatherer(reactor, request, temp_dir, flog_binary) ) - pytest_twisted.blockon(out_protocol.done) - - twistd_protocol = _MagicTextProtocol("Gatherer waiting at", "gatherer") - twistd_process = reactor.spawnProcess( - twistd_protocol, - which('twistd')[0], - ( - 'twistd', '--nodaemon', '--python', - join(gather_dir, 'gatherer.tac'), - ), - path=gather_dir, - env=environ, - ) - pytest_twisted.blockon(twistd_protocol.magic_seen) - - def cleanup(): - _cleanup_tahoe_process(twistd_process, twistd_protocol.exited) - - flog_file = mktemp('.flog_dump') - flog_protocol = _DumpOutputProtocol(open(flog_file, 'w')) - flog_dir = join(temp_dir, 'flog_gather') - flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')] - - print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file)) - reactor.spawnProcess( - flog_protocol, - flog_binary, - ( - 'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0]) - ), - env=environ, - ) - print("Waiting for flogtool to complete") - try: - block_with_timeout(flog_protocol.done, reactor) - except ProcessTerminated as e: - print("flogtool exited unexpectedly: {}".format(str(e))) - print("Flogtool completed") - - request.addfinalizer(cleanup) - - with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f: - furl = f.read().strip() - return furl + return fg @pytest.fixture(scope='session') -@log_call( - action_type=u"integration:introducer", - include_args=["temp_dir", "flog_gatherer"], - include_result=False, -) -def introducer(reactor, temp_dir, flog_gatherer, request): - intro_dir = join(temp_dir, 'introducer') - print("making introducer", intro_dir) +@log_call(action_type=u"integration:grid", include_args=[]) +def grid(reactor, request, temp_dir, flog_gatherer, port_allocator): + """ + Provides a new Grid with a single Introducer and flog-gathering process. - if not exists(intro_dir): - mkdir(intro_dir) - done_proto = _ProcessExitedProtocol() - _tahoe_runner_optional_coverage( - done_proto, - reactor, - request, - ( - 'create-introducer', - '--listen=tcp', - '--hostname=localhost', - intro_dir, - ), - ) - pytest_twisted.blockon(done_proto.done) - - config = read_config(intro_dir, "tub.port") - config.set_config("node", "nickname", "introducer-tor") - config.set_config("node", "web.port", "4562") - config.set_config("node", "log_gatherer.furl", flog_gatherer) - - # "tahoe run" is consistent across Linux/macOS/Windows, unlike the old - # "start" command. - protocol = _MagicTextProtocol('introducer running', "introducer") - transport = _tahoe_runner_optional_coverage( - protocol, - reactor, - request, - ( - 'run', - intro_dir, - ), + Notably does _not_ provide storage servers; use the storage_nodes + fixture if your tests need a Grid that can be used for puts / gets. + """ + g = pytest_twisted.blockon( + create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) ) - request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited)) + return g - pytest_twisted.blockon(protocol.magic_seen) - return TahoeProcess(transport, intro_dir) + +@pytest.fixture(scope='session') +def introducer(grid): + return grid.introducer @pytest.fixture(scope='session') @log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"]) def introducer_furl(introducer, temp_dir): - furl_fname = join(temp_dir, 'introducer', 'private', 'introducer.furl') - while not exists(furl_fname): - print("Don't see {} yet".format(furl_fname)) - sleep(.1) - furl = open(furl_fname, 'r').read() - tubID, location_hints, name = decode_furl(furl) - if not location_hints: - # If there are no location hints then nothing can ever possibly - # connect to it and the only thing that can happen next is something - # will hang or time out. So just give up right now. - raise ValueError( - "Introducer ({!r}) fURL has no location hints!".format( - introducer_furl, - ), - ) - return furl + return introducer.furl @pytest.fixture @@ -285,7 +196,7 @@ def introducer_furl(introducer, temp_dir): include_args=["temp_dir", "flog_gatherer"], include_result=False, ) -def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_control_port): +def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_network): intro_dir = join(temp_dir, 'introducer_tor') print("making Tor introducer in {}".format(intro_dir)) print("(this can take tens of seconds to allocate Onion address)") @@ -299,7 +210,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_control_port): request, ( 'create-introducer', - '--tor-control-port', tor_control_port, + '--tor-control-port', tor_network.client_control_endpoint, '--hide-ip', '--listen=tor', intro_dir, @@ -311,7 +222,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_control_port): config = read_config(intro_dir, "tub.port") config.set_config("node", "nickname", "introducer-tor") config.set_config("node", "web.port", "4561") - config.set_config("node", "log_gatherer.furl", flog_gatherer) + config.set_config("node", "log_gatherer.furl", flog_gatherer.furl) # "tahoe run" is consistent across Linux/macOS/Windows, unlike the old # "start" command. @@ -354,87 +265,31 @@ def tor_introducer_furl(tor_introducer, temp_dir): @pytest.fixture(scope='session') @log_call( action_type=u"integration:storage_nodes", - include_args=["temp_dir", "introducer_furl", "flog_gatherer"], + include_args=["grid"], include_result=False, ) -def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request): +def storage_nodes(grid): nodes_d = [] # start all 5 nodes in parallel for x in range(5): - name = 'node{}'.format(x) - web_port= 9990 + x - nodes_d.append( - _create_node( - reactor, request, temp_dir, introducer_furl, flog_gatherer, name, - web_port="tcp:{}:interface=localhost".format(web_port), - storage=True, - ) - ) - nodes_status = pytest_twisted.blockon(DeferredList(nodes_d)) - nodes = [] - for ok, process in nodes_status: - assert ok, "Storage node creation failed: {}".format(process) - nodes.append(process) - return nodes + nodes_d.append(grid.add_storage_node()) + + nodes_status = pytest_twisted.blockon(DeferredList(nodes_d)) + for ok, value in nodes_status: + assert ok, "Storage node creation failed: {}".format(value) + return grid.storage_servers -@pytest.fixture(scope="session") -def alice_sftp_client_key_path(temp_dir): - # The client SSH key path is typically going to be somewhere else (~/.ssh, - # typically), but for convenience sake for testing we'll put it inside node. - return join(temp_dir, "alice", "private", "ssh_client_rsa_key") @pytest.fixture(scope='session') @log_call(action_type=u"integration:alice", include_args=[], include_result=False) -def alice( - reactor, - temp_dir, - introducer_furl, - flog_gatherer, - storage_nodes, - alice_sftp_client_key_path, - request, -): - process = pytest_twisted.blockon( - _create_node( - reactor, request, temp_dir, introducer_furl, flog_gatherer, "alice", - web_port="tcp:9980:interface=localhost", - storage=False, - ) - ) - pytest_twisted.blockon(await_client_ready(process)) - - # 1. Create a new RW directory cap: - cli(process, "create-alias", "test") - rwcap = loads(cli(process, "list-aliases", "--json"))["test"]["readwrite"] - - # 2. Enable SFTP on the node: - host_ssh_key_path = join(process.node_dir, "private", "ssh_host_rsa_key") - accounts_path = join(process.node_dir, "private", "accounts") - with open(join(process.node_dir, "tahoe.cfg"), "a") as f: - f.write("""\ -[sftpd] -enabled = true -port = tcp:8022:interface=127.0.0.1 -host_pubkey_file = {ssh_key_path}.pub -host_privkey_file = {ssh_key_path} -accounts.file = {accounts_path} -""".format(ssh_key_path=host_ssh_key_path, accounts_path=accounts_path)) - generate_ssh_key(host_ssh_key_path) - - # 3. Add a SFTP access file with an SSH key for auth. - generate_ssh_key(alice_sftp_client_key_path) - # Pub key format is "ssh-rsa ". We want the key. - ssh_public_key = open(alice_sftp_client_key_path + ".pub").read().strip().split()[1] - with open(accounts_path, "w") as f: - f.write("""\ -alice-key ssh-rsa {ssh_public_key} {rwcap} -""".format(rwcap=rwcap, ssh_public_key=ssh_public_key)) - - # 4. Restart the node with new SFTP config. - pytest_twisted.blockon(process.restart_async(reactor, request)) - pytest_twisted.blockon(await_client_ready(process)) - print(f"Alice pid: {process.transport.pid}") - return process +def alice(reactor, request, grid, storage_nodes): + """ + :returns grid.Client: the associated instance for Alice + """ + alice = pytest_twisted.blockon(grid.add_client("alice")) + pytest_twisted.blockon(alice.add_sftp(reactor, request)) + print(f"Alice pid: {alice.process.transport.pid}") + return alice @pytest.fixture(scope='session') @@ -455,6 +310,12 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques @pytest.mark.skipif(sys.platform.startswith('win'), 'Tor tests are unstable on Windows') def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]: + """ + Install the Chutney software that is required to run a small local Tor grid. + + (Chutney lacks the normal "python stuff" so we can't just declare + it in Tox or similar dependencies) + """ # Try to find Chutney already installed in the environment. try: import chutney @@ -512,19 +373,23 @@ def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]: ) pytest_twisted.blockon(proto.done) - return (chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")}) + return chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")} -@pytest.fixture(scope='session') -def tor_control_port(tor_network): + +@frozen +class ChutneyTorNetwork: """ - Get an endpoint description for the Tor control port for the local Tor - network we run.. + Represents a running Chutney (tor) network. Returned by the + "tor_network" fixture. """ - # We ignore tor_network because it can't tell us the control port. But - # asking for it forces the Tor network to be built before we run - so if - # we get the hard-coded control port value correct, there should be - # something listening at that address. - return 'tcp:localhost:8007' + dir: FilePath + environ: Mapping[str, str] + client_control_port: int + + @property + def client_control_endpoint(self) -> str: + return "tcp:localhost:{}".format(self.client_control_port) + @pytest.fixture(scope='session') @pytest.mark.skipif(sys.platform.startswith('win'), @@ -533,6 +398,20 @@ def tor_network(reactor, temp_dir, chutney, request): """ Build a basic Tor network. + Instantiate the "networks/basic" Chutney configuration for a local + Tor network. + + This provides a small, local Tor network that can run v3 Onion + Services. It has 3 authorities, 5 relays and 2 clients. + + The 'chutney' fixture pins a Chutney git qrevision, so things + shouldn't change. This network has two clients which are the only + nodes with valid SocksPort configuration ("008c" and "009c" 9008 + and 9009) + + The control ports start at 8000 (so the ControlPort for the client + nodes are 8008 and 8009). + :param chutney: The root directory of a Chutney checkout and a dict of additional environment variables to set so a Python process can use it. @@ -575,6 +454,32 @@ def tor_network(reactor, temp_dir, chutney, request): request.addfinalizer(cleanup) pytest_twisted.blockon(chutney(("start", basic_network))) + + # Wait for the nodes to "bootstrap" - ie, form a network among themselves. + # Successful bootstrap is reported with a message something like: + # + # Everything bootstrapped after 151 sec + # Bootstrap finished: 151 seconds + # Node status: + # test000a : 100, done , Done + # test001a : 100, done , Done + # test002a : 100, done , Done + # test003r : 100, done , Done + # test004r : 100, done , Done + # test005r : 100, done , Done + # test006r : 100, done , Done + # test007r : 100, done , Done + # test008c : 100, done , Done + # test009c : 100, done , Done + # Published dir info: + # test000a : 100, all nodes , desc md md_cons ns_cons , Dir info cached + # test001a : 100, all nodes , desc md md_cons ns_cons , Dir info cached + # test002a : 100, all nodes , desc md md_cons ns_cons , Dir info cached + # test003r : 100, all nodes , desc md md_cons ns_cons , Dir info cached + # test004r : 100, all nodes , desc md md_cons ns_cons , Dir info cached + # test005r : 100, all nodes , desc md md_cons ns_cons , Dir info cached + # test006r : 100, all nodes , desc md md_cons ns_cons , Dir info cached + # test007r : 100, all nodes , desc md md_cons ns_cons , Dir info cached pytest_twisted.blockon(chutney(("wait_for_bootstrap", basic_network))) # print some useful stuff @@ -582,3 +487,11 @@ def tor_network(reactor, temp_dir, chutney, request): pytest_twisted.blockon(chutney(("status", basic_network))) except ProcessTerminated: print("Chutney.TorNet status failed (continuing)") + + # the "8008" comes from configuring "networks/basic" in chutney + # and then examining "net/nodes/008c/torrc" for ControlPort value + return ChutneyTorNetwork( + chutney_root, + chutney_env, + 8008, + ) diff --git a/integration/grid.py b/integration/grid.py new file mode 100644 index 000000000..b97c22bf7 --- /dev/null +++ b/integration/grid.py @@ -0,0 +1,529 @@ +""" +Classes which directly represent various kinds of Tahoe processes +that co-operate to for "a Grid". + +These methods and objects are used by conftest.py fixtures but may +also be used as direct helpers for tests that don't want to (or can't) +rely on 'the' global grid as provided by fixtures like 'alice' or +'storage_servers'. +""" + +from os import mkdir, listdir +from os.path import join, exists +from json import loads +from tempfile import mktemp +from time import sleep + +from eliot import ( + log_call, +) + +from foolscap.furl import ( + decode_furl, +) + +from twisted.python.procutils import which +from twisted.internet.defer import ( + inlineCallbacks, + returnValue, + Deferred, +) +from twisted.internet.task import ( + deferLater, +) +from twisted.internet.interfaces import ( + IProcessTransport, + IProcessProtocol, +) +from twisted.internet.error import ProcessTerminated + +from allmydata.util.attrs_provides import ( + provides, +) +from allmydata.node import read_config +from .util import ( + _CollectOutputProtocol, + _MagicTextProtocol, + _DumpOutputProtocol, + _ProcessExitedProtocol, + _run_node, + _cleanup_tahoe_process, + _tahoe_runner_optional_coverage, + TahoeProcess, + await_client_ready, + generate_ssh_key, + cli, + reconfigure, + _create_node, +) + +import attr +import pytest_twisted + + +# currently, we pass a "request" around a bunch but it seems to only +# be for addfinalizer() calls. +# - is "keeping" a request like that okay? What if it's a session-scoped one? +# (i.e. in Grid etc) +# - maybe limit to "a callback to hang your cleanup off of" (instead of request)? + + +@attr.s +class FlogGatherer(object): + """ + Flog Gatherer process. + """ + process = attr.ib( + validator=provides(IProcessTransport) + ) + protocol = attr.ib( + validator=provides(IProcessProtocol) + ) + furl = attr.ib() + + +@inlineCallbacks +def create_flog_gatherer(reactor, request, temp_dir, flog_binary): + out_protocol = _CollectOutputProtocol() + gather_dir = join(temp_dir, 'flog_gather') + reactor.spawnProcess( + out_protocol, + flog_binary, + ( + 'flogtool', 'create-gatherer', + '--location', 'tcp:localhost:3117', + '--port', '3117', + gather_dir, + ) + ) + yield out_protocol.done + + twistd_protocol = _MagicTextProtocol("Gatherer waiting at", "gatherer") + twistd_process = reactor.spawnProcess( + twistd_protocol, + which('twistd')[0], + ( + 'twistd', '--nodaemon', '--python', + join(gather_dir, 'gatherer.tac'), + ), + path=gather_dir, + ) + yield twistd_protocol.magic_seen + + def cleanup(): + _cleanup_tahoe_process(twistd_process, twistd_protocol.exited) + + flog_file = mktemp('.flog_dump') + flog_protocol = _DumpOutputProtocol(open(flog_file, 'w')) + flog_dir = join(temp_dir, 'flog_gather') + flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')] + + print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file)) + for flog_path in flogs: + reactor.spawnProcess( + flog_protocol, + flog_binary, + ( + 'flogtool', 'dump', join(temp_dir, 'flog_gather', flog_path) + ), + ) + print("Waiting for flogtool to complete") + try: + pytest_twisted.blockon(flog_protocol.done) + except ProcessTerminated as e: + print("flogtool exited unexpectedly: {}".format(str(e))) + print("Flogtool completed") + + request.addfinalizer(cleanup) + + with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f: + furl = f.read().strip() + returnValue( + FlogGatherer( + protocol=twistd_protocol, + process=twistd_process, + furl=furl, + ) + ) + + +@attr.s +class StorageServer(object): + """ + Represents a Tahoe Storage Server + """ + + process = attr.ib( + validator=attr.validators.instance_of(TahoeProcess) + ) + protocol = attr.ib( + validator=provides(IProcessProtocol) + ) + + @inlineCallbacks + def restart(self, reactor, request): + """ + re-start our underlying process by issuing a TERM, waiting and + then running again. await_client_ready() will be done as well + + Note that self.process and self.protocol will be new instances + after this. + """ + self.process.transport.signalProcess('TERM') + yield self.protocol.exited + self.process = yield _run_node( + reactor, self.process.node_dir, request, None, + ) + self.protocol = self.process.transport.proto + yield await_client_ready(self.process) + + +@inlineCallbacks +def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port, + needed=2, happy=3, total=4): + """ + Create a new storage server + """ + node_process = yield _create_node( + reactor, request, temp_dir, introducer.furl, flog_gatherer, + name, web_port, storage=True, needed=needed, happy=happy, total=total, + ) + storage = StorageServer( + process=node_process, + # node_process is a TahoeProcess. its transport is an + # IProcessTransport. in practice, this means it is a + # twisted.internet._baseprocess.BaseProcess. BaseProcess records the + # process protocol as its proto attribute. + protocol=node_process.transport.proto, + ) + returnValue(storage) + + +@attr.s +class Client(object): + """ + Represents a Tahoe client + """ + + process = attr.ib( + validator=attr.validators.instance_of(TahoeProcess) + ) + protocol = attr.ib( + validator=provides(IProcessProtocol) + ) + request = attr.ib() # original request, for addfinalizer() + +## XXX convenience? or confusion? +# @property +# def node_dir(self): +# return self.process.node_dir + + @inlineCallbacks + def reconfigure_zfec(self, reactor, zfec_params, convergence=None, max_segment_size=None): + """ + Reconfigure the ZFEC parameters for this node + """ + # XXX this is a stop-gap to keep tests running "as is" + # -> we should fix the tests so that they create a new client + # in the grid with the required parameters, instead of + # re-configuring Alice (or whomever) + + rtn = yield Deferred.fromCoroutine( + reconfigure(reactor, self.request, self.process, zfec_params, convergence, max_segment_size) + ) + return rtn + + @inlineCallbacks + def restart(self, reactor, request, servers=1): + """ + re-start our underlying process by issuing a TERM, waiting and + then running again. + + :param int servers: number of server connections we will wait + for before being 'ready' + + Note that self.process and self.protocol will be new instances + after this. + """ + # XXX similar to above, can we make this return a new instance + # instead of mutating? + self.process.transport.signalProcess('TERM') + yield self.protocol.exited + process = yield _run_node( + reactor, self.process.node_dir, request, None, + ) + self.process = process + self.protocol = self.process.transport.proto + yield await_client_ready(self.process, minimum_number_of_servers=servers) + + @inlineCallbacks + def add_sftp(self, reactor, request): + """ + """ + # if other things need to add or change configuration, further + # refactoring could be useful here (i.e. move reconfigure + # parts to their own functions) + + # XXX why do we need an alias? + # 1. Create a new RW directory cap: + cli(self.process, "create-alias", "test") + rwcap = loads(cli(self.process, "list-aliases", "--json"))["test"]["readwrite"] + + # 2. Enable SFTP on the node: + host_ssh_key_path = join(self.process.node_dir, "private", "ssh_host_rsa_key") + sftp_client_key_path = join(self.process.node_dir, "private", "ssh_client_rsa_key") + accounts_path = join(self.process.node_dir, "private", "accounts") + with open(join(self.process.node_dir, "tahoe.cfg"), "a") as f: + f.write( + ("\n\n[sftpd]\n" + "enabled = true\n" + "port = tcp:8022:interface=127.0.0.1\n" + "host_pubkey_file = {ssh_key_path}.pub\n" + "host_privkey_file = {ssh_key_path}\n" + "accounts.file = {accounts_path}\n").format( + ssh_key_path=host_ssh_key_path, + accounts_path=accounts_path, + ) + ) + generate_ssh_key(host_ssh_key_path) + + # 3. Add a SFTP access file with an SSH key for auth. + generate_ssh_key(sftp_client_key_path) + # Pub key format is "ssh-rsa ". We want the key. + with open(sftp_client_key_path + ".pub") as pubkey_file: + ssh_public_key = pubkey_file.read().strip().split()[1] + with open(accounts_path, "w") as f: + f.write( + "alice-key ssh-rsa {ssh_public_key} {rwcap}\n".format( + rwcap=rwcap, + ssh_public_key=ssh_public_key, + ) + ) + + # 4. Restart the node with new SFTP config. + print("restarting for SFTP") + yield self.restart(reactor, request) + print("restart done") + # XXX i think this is broken because we're "waiting for ready" during first bootstrap? or something? + + +@inlineCallbacks +def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port, + needed=2, happy=3, total=4): + """ + Create a new storage server + """ + from .util import _create_node + node_process = yield _create_node( + reactor, request, temp_dir, introducer.furl, flog_gatherer, + name, web_port, storage=False, needed=needed, happy=happy, total=total, + ) + returnValue( + Client( + process=node_process, + protocol=node_process.transport.proto, + request=request, + ) + ) + + +@attr.s +class Introducer(object): + """ + Reprsents a running introducer + """ + + process = attr.ib( + validator=attr.validators.instance_of(TahoeProcess) + ) + protocol = attr.ib( + validator=provides(IProcessProtocol) + ) + furl = attr.ib() + + +def _validate_furl(furl_fname): + """ + Opens and validates a fURL, ensuring location hints. + :returns: the furl + :raises: ValueError if no location hints + """ + while not exists(furl_fname): + print("Don't see {} yet".format(furl_fname)) + sleep(.1) + furl = open(furl_fname, 'r').read() + tubID, location_hints, name = decode_furl(furl) + if not location_hints: + # If there are no location hints then nothing can ever possibly + # connect to it and the only thing that can happen next is something + # will hang or time out. So just give up right now. + raise ValueError( + "Introducer ({!r}) fURL has no location hints!".format( + furl, + ), + ) + return furl + + +@inlineCallbacks +@log_call( + action_type=u"integration:introducer", + include_args=["temp_dir", "flog_gatherer"], + include_result=False, +) +def create_introducer(reactor, request, temp_dir, flog_gatherer, port): + """ + Run a new Introducer and return an Introducer instance. + """ + intro_dir = join(temp_dir, 'introducer{}'.format(port)) + + if not exists(intro_dir): + mkdir(intro_dir) + done_proto = _ProcessExitedProtocol() + _tahoe_runner_optional_coverage( + done_proto, + reactor, + request, + ( + 'create-introducer', + '--listen=tcp', + '--hostname=localhost', + intro_dir, + ), + ) + yield done_proto.done + + config = read_config(intro_dir, "tub.port") + config.set_config("node", "nickname", f"introducer-{port}") + config.set_config("node", "web.port", f"{port}") + config.set_config("node", "log_gatherer.furl", flog_gatherer.furl) + + # on windows, "tahoe start" means: run forever in the foreground, + # but on linux it means daemonize. "tahoe run" is consistent + # between platforms. + protocol = _MagicTextProtocol('introducer running', "introducer") + transport = _tahoe_runner_optional_coverage( + protocol, + reactor, + request, + ( + 'run', + intro_dir, + ), + ) + + def clean(): + return _cleanup_tahoe_process(transport, protocol.exited) + request.addfinalizer(clean) + + yield protocol.magic_seen + + furl_fname = join(intro_dir, 'private', 'introducer.furl') + while not exists(furl_fname): + print("Don't see {} yet".format(furl_fname)) + yield deferLater(reactor, .1, lambda: None) + furl = _validate_furl(furl_fname) + + returnValue( + Introducer( + process=TahoeProcess(transport, intro_dir), + protocol=protocol, + furl=furl, + ) + ) + + +@attr.s +class Grid(object): + """ + Represents an entire Tahoe Grid setup + + A Grid includes an Introducer, Flog Gatherer and some number of + Storage Servers. Optionally includes Clients. + """ + + _reactor = attr.ib() + _request = attr.ib() + _temp_dir = attr.ib() + _port_allocator = attr.ib() + introducer = attr.ib() + flog_gatherer = attr.ib() + storage_servers = attr.ib(factory=list) + clients = attr.ib(factory=dict) + + @storage_servers.validator + def check(self, attribute, value): + for server in value: + if not isinstance(server, StorageServer): + raise ValueError( + "storage_servers must be StorageServer" + ) + + @inlineCallbacks + def add_storage_node(self): + """ + Creates a new storage node, returns a StorageServer instance + (which will already be added to our .storage_servers list) + """ + port = yield self._port_allocator() + print("make {}".format(port)) + name = 'node{}'.format(port) + web_port = 'tcp:{}:interface=localhost'.format(port) + server = yield create_storage_server( + self._reactor, + self._request, + self._temp_dir, + self.introducer, + self.flog_gatherer, + name, + web_port, + ) + self.storage_servers.append(server) + returnValue(server) + + @inlineCallbacks + def add_client(self, name, needed=2, happy=3, total=4): + """ + Create a new client node + """ + port = yield self._port_allocator() + web_port = 'tcp:{}:interface=localhost'.format(port) + client = yield create_client( + self._reactor, + self._request, + self._temp_dir, + self.introducer, + self.flog_gatherer, + name, + web_port, + needed=needed, + happy=happy, + total=total, + ) + self.clients[name] = client + yield await_client_ready(client.process) + returnValue(client) + + +# A grid is now forever tied to its original 'request' which is where +# it must hang finalizers off of. The "main" one is a session-level +# fixture so it'll live the life of the tests but it could be +# per-function Grid too. +@inlineCallbacks +def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator): + """ + Create a new grid. This will have one Introducer but zero + storage-servers or clients; those must be added by a test or + subsequent fixtures. + """ + intro_port = yield port_allocator() + introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port) + grid = Grid( + reactor, + request, + temp_dir, + port_allocator, + introducer, + flog_gatherer, + ) + returnValue(grid) diff --git a/integration/test_get_put.py b/integration/test_get_put.py index e30a34f97..2f6642493 100644 --- a/integration/test_get_put.py +++ b/integration/test_get_put.py @@ -8,9 +8,8 @@ from subprocess import Popen, PIPE, check_output, check_call import pytest from twisted.internet import reactor from twisted.internet.threads import blockingCallFromThread -from twisted.internet.defer import Deferred -from .util import run_in_thread, cli, reconfigure +from .util import run_in_thread, cli DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11" try: @@ -23,7 +22,7 @@ else: @pytest.fixture(scope="session") def get_put_alias(alice): - cli(alice, "create-alias", "getput") + cli(alice.process, "create-alias", "getput") def read_bytes(path): @@ -39,14 +38,14 @@ def test_put_from_stdin(alice, get_put_alias, tmpdir): """ tempfile = str(tmpdir.join("file")) p = Popen( - ["tahoe", "--node-directory", alice.node_dir, "put", "-", "getput:fromstdin"], + ["tahoe", "--node-directory", alice.process.node_dir, "put", "-", "getput:fromstdin"], stdin=PIPE ) p.stdin.write(DATA) p.stdin.close() assert p.wait() == 0 - cli(alice, "get", "getput:fromstdin", tempfile) + cli(alice.process, "get", "getput:fromstdin", tempfile) assert read_bytes(tempfile) == DATA @@ -58,10 +57,10 @@ def test_get_to_stdout(alice, get_put_alias, tmpdir): tempfile = tmpdir.join("file") with tempfile.open("wb") as f: f.write(DATA) - cli(alice, "put", str(tempfile), "getput:tostdout") + cli(alice.process, "put", str(tempfile), "getput:tostdout") p = Popen( - ["tahoe", "--node-directory", alice.node_dir, "get", "getput:tostdout", "-"], + ["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:tostdout", "-"], stdout=PIPE ) assert p.stdout.read() == DATA @@ -78,11 +77,11 @@ def test_large_file(alice, get_put_alias, tmp_path): tempfile = tmp_path / "file" with tempfile.open("wb") as f: f.write(DATA * 1_000_000) - cli(alice, "put", str(tempfile), "getput:largefile") + cli(alice.process, "put", str(tempfile), "getput:largefile") outfile = tmp_path / "out" check_call( - ["tahoe", "--node-directory", alice.node_dir, "get", "getput:largefile", str(outfile)], + ["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:largefile", str(outfile)], ) assert outfile.read_bytes() == tempfile.read_bytes() @@ -104,31 +103,29 @@ def test_upload_download_immutable_different_default_max_segment_size(alice, get def set_segment_size(segment_size): return blockingCallFromThread( reactor, - lambda: Deferred.fromCoroutine(reconfigure( + lambda: alice.reconfigure_zfec( reactor, - request, - alice, (1, 1, 1), None, max_segment_size=segment_size - )) + ) ) # 1. Upload file 1 with default segment size set to 1MB set_segment_size(1024 * 1024) - cli(alice, "put", str(tempfile), "getput:seg1024kb") + cli(alice.process, "put", str(tempfile), "getput:seg1024kb") # 2. Download file 1 with default segment size set to 128KB set_segment_size(128 * 1024) assert large_data == check_output( - ["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg1024kb", "-"] + ["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg1024kb", "-"] ) # 3. Upload file 2 with default segment size set to 128KB - cli(alice, "put", str(tempfile), "getput:seg128kb") + cli(alice.process, "put", str(tempfile), "getput:seg128kb") # 4. Download file 2 with default segment size set to 1MB set_segment_size(1024 * 1024) assert large_data == check_output( - ["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg128kb", "-"] + ["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg128kb", "-"] ) diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py new file mode 100644 index 000000000..437fe7455 --- /dev/null +++ b/integration/test_grid_manager.py @@ -0,0 +1,351 @@ +import sys +import json +from os.path import join + +from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, +) + +from twisted.internet.utils import ( + getProcessOutputAndValue, +) +from twisted.internet.defer import ( + inlineCallbacks, + returnValue, +) + +from allmydata.crypto import ed25519 +from allmydata.util import base32 +from allmydata.util import configutil + +from . import util +from .grid import ( + create_grid, +) + +import pytest_twisted + + +@inlineCallbacks +def _run_gm(reactor, request, *args, **kwargs): + """ + Run the grid-manager process, passing all arguments as extra CLI + args. + + :returns: all process output + """ + if request.config.getoption('coverage'): + base_args = ("-b", "-m", "coverage", "run", "-m", "allmydata.cli.grid_manager") + else: + base_args = ("-m", "allmydata.cli.grid_manager") + + output, errput, exit_code = yield getProcessOutputAndValue( + sys.executable, + base_args + args, + reactor=reactor, + **kwargs + ) + if exit_code != 0: + raise util.ProcessFailed( + RuntimeError("Exit code {}".format(exit_code)), + output + errput, + ) + returnValue(output) + + +@pytest_twisted.inlineCallbacks +def test_create_certificate(reactor, request): + """ + The Grid Manager produces a valid, correctly-signed certificate. + """ + gm_config = yield _run_gm(reactor, request, "--config", "-", "create") + privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') + privkey, pubkey = ed25519.signing_keypair_from_string(privkey_bytes) + + # Note that zara + her key here are arbitrary and don't match any + # "actual" clients in the test-grid; we're just checking that the + # Grid Manager signs this properly. + gm_config = yield _run_gm( + reactor, request, "--config", "-", "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + stdinBytes=gm_config, + ) + zara_cert_bytes = yield _run_gm( + reactor, request, "--config", "-", "sign", "zara", "1", + stdinBytes=gm_config, + ) + zara_cert = json.loads(zara_cert_bytes) + + # confirm that zara's certificate is made by the Grid Manager + # (.verify returns None on success, raises exception on error) + pubkey.verify( + base32.a2b(zara_cert['signature'].encode('ascii')), + zara_cert['certificate'].encode('ascii'), + ) + + +@pytest_twisted.inlineCallbacks +def test_remove_client(reactor, request): + """ + A Grid Manager can add and successfully remove a client + """ + gm_config = yield _run_gm( + reactor, request, "--config", "-", "create", + ) + + gm_config = yield _run_gm( + reactor, request, "--config", "-", "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + stdinBytes=gm_config, + ) + gm_config = yield _run_gm( + reactor, request, "--config", "-", "add", + "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", + stdinBytes=gm_config, + ) + assert "zara" in json.loads(gm_config)['storage_servers'] + assert "yakov" in json.loads(gm_config)['storage_servers'] + + gm_config = yield _run_gm( + reactor, request, "--config", "-", "remove", + "zara", + stdinBytes=gm_config, + ) + assert "zara" not in json.loads(gm_config)['storage_servers'] + assert "yakov" in json.loads(gm_config)['storage_servers'] + + +@pytest_twisted.inlineCallbacks +def test_remove_last_client(reactor, request): + """ + A Grid Manager can remove all clients + """ + gm_config = yield _run_gm( + reactor, request, "--config", "-", "create", + ) + + gm_config = yield _run_gm( + reactor, request, "--config", "-", "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + stdinBytes=gm_config, + ) + assert "zara" in json.loads(gm_config)['storage_servers'] + + gm_config = yield _run_gm( + reactor, request, "--config", "-", "remove", + "zara", + stdinBytes=gm_config, + ) + # there are no storage servers left at all now + assert "storage_servers" not in json.loads(gm_config) + + +@pytest_twisted.inlineCallbacks +def test_add_remove_client_file(reactor, request, temp_dir): + """ + A Grid Manager can add and successfully remove a client (when + keeping data on disk) + """ + gmconfig = join(temp_dir, "gmtest") + gmconfig_file = join(temp_dir, "gmtest", "config.json") + yield _run_gm( + reactor, request, "--config", gmconfig, "create", + ) + + yield _run_gm( + reactor, request, "--config", gmconfig, "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + ) + yield _run_gm( + reactor, request, "--config", gmconfig, "add", + "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", + ) + assert "zara" in json.load(open(gmconfig_file, "r"))['storage_servers'] + assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers'] + + yield _run_gm( + reactor, request, "--config", gmconfig, "remove", + "zara", + ) + assert "zara" not in json.load(open(gmconfig_file, "r"))['storage_servers'] + assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers'] + + +@pytest_twisted.inlineCallbacks +def _test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): + """ + A client with happines=2 fails to upload to a Grid when it is + using Grid Manager and there is only 1 storage server with a valid + certificate. + """ + grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) + storage0 = yield grid.add_storage_node() + _ = yield grid.add_storage_node() + + gm_config = yield _run_gm( + reactor, request, "--config", "-", "create", + ) + gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') + gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes) + + # create certificate for the first storage-server + pubkey_fname = join(storage0.process.node_dir, "node.pubkey") + with open(pubkey_fname, 'r') as f: + pubkey_str = f.read().strip() + + gm_config = yield _run_gm( + reactor, request, "--config", "-", "add", + "storage0", pubkey_str, + stdinBytes=gm_config, + ) + assert json.loads(gm_config)['storage_servers'].keys() == {'storage0'} + + print("inserting certificate") + cert = yield _run_gm( + reactor, request, "--config", "-", "sign", "storage0", "1", + stdinBytes=gm_config, + ) + print(cert) + + yield util.run_tahoe( + reactor, request, "--node-directory", storage0.process.node_dir, + "admin", "add-grid-manager-cert", + "--name", "default", + "--filename", "-", + stdin=cert, + ) + + # re-start this storage server + yield storage0.restart(reactor, request) + + # now only one storage-server has the certificate .. configure + # diana to have the grid-manager certificate + + diana = yield grid.add_client("diana", needed=2, happy=2, total=2) + + config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg")) + config.add_section("grid_managers") + config.set("grid_managers", "test", str(ed25519.string_from_verifying_key(gm_pubkey), "ascii")) + with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f: + config.write(f) + + yield diana.restart(reactor, request, servers=2) + + # try to put something into the grid, which should fail (because + # diana has happy=2 but should only find storage0 to be acceptable + # to upload to) + + try: + yield util.run_tahoe( + reactor, request, "--node-directory", diana.process.node_dir, + "put", "-", + stdin=b"some content\n" * 200, + ) + assert False, "Should get a failure" + except util.ProcessFailed as e: + if b'UploadUnhappinessError' in e.output: + # We're done! We've succeeded. + return + + assert False, "Failed to see one of out of two servers" + + +@pytest_twisted.inlineCallbacks +def _test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): + """ + Successfully upload to a Grid Manager enabled Grid. + """ + grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) + happy0 = yield grid.add_storage_node() + happy1 = yield grid.add_storage_node() + + gm_config = yield _run_gm( + reactor, request, "--config", "-", "create", + ) + gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') + gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes) + + # create certificates for all storage-servers + servers = ( + ("happy0", happy0), + ("happy1", happy1), + ) + for st_name, st in servers: + pubkey_fname = join(st.process.node_dir, "node.pubkey") + with open(pubkey_fname, 'r') as f: + pubkey_str = f.read().strip() + + gm_config = yield _run_gm( + reactor, request, "--config", "-", "add", + st_name, pubkey_str, + stdinBytes=gm_config, + ) + assert json.loads(gm_config)['storage_servers'].keys() == {'happy0', 'happy1'} + + # add the certificates from the grid-manager to the storage servers + print("inserting storage-server certificates") + for st_name, st in servers: + cert = yield _run_gm( + reactor, request, "--config", "-", "sign", st_name, "1", + stdinBytes=gm_config, + ) + + yield util.run_tahoe( + reactor, request, "--node-directory", st.process.node_dir, + "admin", "add-grid-manager-cert", + "--name", "default", + "--filename", "-", + stdin=cert, + ) + + # re-start the storage servers + yield happy0.restart(reactor, request) + yield happy1.restart(reactor, request) + + # configure freya (a client) to have the grid-manager certificate + freya = yield grid.add_client("freya", needed=2, happy=2, total=2) + + config = configutil.get_config(join(freya.process.node_dir, "tahoe.cfg")) + config.add_section("grid_managers") + config.set("grid_managers", "test", str(ed25519.string_from_verifying_key(gm_pubkey), "ascii")) + with open(join(freya.process.node_dir, "tahoe.cfg"), "w") as f: + config.write(f) + + yield freya.restart(reactor, request, servers=2) + + # confirm that Freya will upload to the GridManager-enabled Grid + yield util.run_tahoe( + reactor, request, "--node-directory", freya.process.node_dir, + "put", "-", + stdin=b"some content\n" * 200, + ) + + +@pytest_twisted.inlineCallbacks +def test_identity(reactor, request, temp_dir): + """ + Dump public key to CLI + """ + gm_config = join(temp_dir, "test_identity") + yield _run_gm( + reactor, request, "--config", gm_config, "create", + ) + + # ask the CLI for the grid-manager pubkey + pubkey = yield _run_gm( + reactor, request, "--config", gm_config, "public-identity", + ) + alleged_pubkey = ed25519.verifying_key_from_string(pubkey.strip()) + + # load the grid-manager pubkey "ourselves" + with open(join(gm_config, "config.json"), "r") as f: + real_config = json.load(f) + real_privkey, real_pubkey = ed25519.signing_keypair_from_string( + real_config["private_key"].encode("ascii"), + ) + + # confirm the CLI told us the correct thing + alleged_bytes = alleged_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) + real_bytes = real_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) + assert alleged_bytes == real_bytes, "Keys don't match" diff --git a/integration/test_i2p.py b/integration/test_i2p.py index 2ee603573..c99c469fa 100644 --- a/integration/test_i2p.py +++ b/integration/test_i2p.py @@ -24,6 +24,7 @@ from allmydata.test.common import ( write_introducer, ) from allmydata.node import read_config +from allmydata.util.iputil import allocate_tcp_port if which("docker") is None: @@ -132,8 +133,10 @@ def i2p_introducer_furl(i2p_introducer, temp_dir): @pytest_twisted.inlineCallbacks @pytest.mark.skip("I2P tests are not functioning at all, for unknown reasons") def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl): - yield _create_anonymous_node(reactor, 'carol_i2p', 8008, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) - yield _create_anonymous_node(reactor, 'dave_i2p', 8009, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) + web_port0 = allocate_tcp_port() + web_port1 = allocate_tcp_port() + yield _create_anonymous_node(reactor, 'carol_i2p', web_port0, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) + yield _create_anonymous_node(reactor, 'dave_i2p', web_port1, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) # ensure both nodes are connected to "a grid" by uploading # something via carol, and retrieve it using dave. gold_path = join(temp_dir, "gold") @@ -179,9 +182,8 @@ def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_netw @pytest_twisted.inlineCallbacks -def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, i2p_network, introducer_furl): +def _create_anonymous_node(reactor, name, web_port, request, temp_dir, flog_gatherer, i2p_network, introducer_furl): node_dir = FilePath(temp_dir).child(name) - web_port = "tcp:{}:interface=localhost".format(control_port + 2000) print("creating", node_dir.path) node_dir.makedirs() diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index 8363edb35..8f64696a8 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -6,8 +6,6 @@ import sys from os.path import join from os import environ -from twisted.internet.error import ProcessTerminated - from . import util import pytest_twisted @@ -44,8 +42,8 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto try: yield proto.done assert False, "should raise exception" - except Exception as e: - assert isinstance(e, ProcessTerminated) + except util.ProcessFailed as e: + assert b"UploadUnhappinessError" in e.output output = proto.output.getvalue() assert b"shares could be placed on only" in output diff --git a/integration/test_sftp.py b/integration/test_sftp.py index 3fdbb56d7..01ddfdf8a 100644 --- a/integration/test_sftp.py +++ b/integration/test_sftp.py @@ -72,7 +72,7 @@ def test_bad_account_password_ssh_key(alice, tmpdir): another_key = os.path.join(str(tmpdir), "ssh_key") generate_ssh_key(another_key) - good_key = RSAKey(filename=os.path.join(alice.node_dir, "private", "ssh_client_rsa_key")) + good_key = RSAKey(filename=os.path.join(alice.process.node_dir, "private", "ssh_client_rsa_key")) bad_key = RSAKey(filename=another_key) # Wrong key: @@ -87,17 +87,16 @@ def test_bad_account_password_ssh_key(alice, tmpdir): "username": "someoneelse", "pkey": good_key, }) -def sftp_client_key(node): + +def sftp_client_key(client): + """ + :return RSAKey: the RSA client key associated with this grid.Client + """ + # XXX move to Client / grid.py? return RSAKey( - filename=os.path.join(node.node_dir, "private", "ssh_client_rsa_key"), + filename=os.path.join(client.process.node_dir, "private", "ssh_client_rsa_key"), ) -def test_sftp_client_key_exists(alice, alice_sftp_client_key_path): - """ - Weakly validate the sftp client key fixture by asserting that *something* - exists at the supposed key path. - """ - assert os.path.exists(alice_sftp_client_key_path) @run_in_thread def test_ssh_key_auth(alice): diff --git a/integration/test_tor.py b/integration/test_tor.py index af83e2ba1..d7fed5790 100644 --- a/integration/test_tor.py +++ b/integration/test_tor.py @@ -38,8 +38,8 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne The two nodes can talk to the introducer and each other: we upload to one node, read from the other. """ - carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2) - dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2) + carol = yield _create_anonymous_node(reactor, 'carol', 8100, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2) + dave = yield _create_anonymous_node(reactor, 'dave', 8101, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2) yield util.await_client_ready(carol, minimum_number_of_servers=2, timeout=600) yield util.await_client_ready(dave, minimum_number_of_servers=2, timeout=600) yield upload_to_one_download_from_the_other(reactor, temp_dir, carol, dave) @@ -94,44 +94,45 @@ async def upload_to_one_download_from_the_other(reactor, temp_dir, upload_to: ut @pytest_twisted.inlineCallbacks -def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess: +def _create_anonymous_node(reactor, name, web_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess: node_dir = FilePath(temp_dir).child(name) - web_port = "tcp:{}:interface=localhost".format(control_port + 2000) - - if True: - print(f"creating {node_dir.path} with introducer {introducer_furl}") - node_dir.makedirs() - proto = util._DumpOutputProtocol(None) - reactor.spawnProcess( - proto, - sys.executable, - ( - sys.executable, '-b', '-m', 'allmydata.scripts.runner', - 'create-node', - '--nickname', name, - '--webport', web_port, - '--introducer', introducer_furl, - '--hide-ip', - '--tor-control-port', 'tcp:localhost:{}'.format(control_port), - '--listen', 'tor', - '--shares-needed', '1', - '--shares-happy', '1', - '--shares-total', str(shares_total), - node_dir.path, - ), - env=environ, + if node_dir.exists(): + raise RuntimeError( + "A node already exists in '{}'".format(node_dir) ) - yield proto.done + print(f"creating {node_dir.path} with introducer {introducer_furl}") + node_dir.makedirs() + proto = util._DumpOutputProtocol(None) + reactor.spawnProcess( + proto, + sys.executable, + ( + sys.executable, '-b', '-m', 'allmydata.scripts.runner', + 'create-node', + '--nickname', name, + '--webport', str(web_port), + '--introducer', introducer_furl, + '--hide-ip', + '--tor-control-port', tor_network.client_control_endpoint, + '--listen', 'tor', + '--shares-needed', '1', + '--shares-happy', '1', + '--shares-total', str(shares_total), + node_dir.path, + ), + env=environ, + ) + yield proto.done # Which services should this client connect to? write_introducer(node_dir, "default", introducer_furl) - util.basic_node_configuration(request, flog_gatherer, node_dir.path) + util.basic_node_configuration(request, flog_gatherer.furl, node_dir.path) config = read_config(node_dir.path, "tub.port") config.set_config("tor", "onion", "true") config.set_config("tor", "onion.external_port", "3457") - config.set_config("tor", "control.port", f"tcp:port={control_port}:host=127.0.0.1") + config.set_config("tor", "control.port", tor_network.client_control_endpoint) config.set_config("tor", "onion.private_key_file", "private/tor_onion.privkey") print("running") @@ -157,7 +158,7 @@ def test_anonymous_client(reactor, request, temp_dir, flog_gatherer, tor_network ) yield util.await_client_ready(normie) - anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8008, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1) + anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8102, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1) yield util.await_client_ready(anonymoose, minimum_number_of_servers=1, timeout=600) yield upload_to_one_download_from_the_other(reactor, temp_dir, normie, anonymoose) diff --git a/integration/test_vectors.py b/integration/test_vectors.py index 6e7b5746a..f53ec1741 100644 --- a/integration/test_vectors.py +++ b/integration/test_vectors.py @@ -15,7 +15,8 @@ from pytest_twisted import ensureDeferred from . import vectors from .vectors import parameters -from .util import reconfigure, upload, TahoeProcess +from .util import upload +from .grid import Client @mark.parametrize('convergence', parameters.CONVERGENCE_SECRETS) def test_convergence(convergence): @@ -36,8 +37,8 @@ async def test_capability(reactor, request, alice, case, expected): computed value. """ # rewrite alice's config to match params and convergence - await reconfigure( - reactor, request, alice, (1, case.params.required, case.params.total), case.convergence, case.segment_size) + await alice.reconfigure_zfec( + reactor, (1, case.params.required, case.params.total), case.convergence, case.segment_size) # upload data in the correct format actual = upload(alice, case.fmt, case.data) @@ -82,7 +83,7 @@ async def skiptest_generate(reactor, request, alice): async def generate( reactor, request, - alice: TahoeProcess, + alice: Client, cases: Iterator[vectors.Case], ) -> AsyncGenerator[[vectors.Case, str], None]: """ @@ -106,10 +107,8 @@ async def generate( # reliability of this generator, be happy if we can put shares anywhere happy = 1 for case in cases: - await reconfigure( + await alice.reconfigure_zfec( reactor, - request, - alice, (happy, case.params.required, case.params.total), case.convergence, case.segment_size @@ -117,5 +116,5 @@ async def generate( # Give the format a chance to make an RSA key if it needs it. case = evolve(case, fmt=case.fmt.customize()) - cap = upload(alice, case.fmt, case.data) + cap = upload(alice.process, case.fmt, case.data) yield case, cap diff --git a/integration/test_web.py b/integration/test_web.py index fd29504f8..08c6e6217 100644 --- a/integration/test_web.py +++ b/integration/test_web.py @@ -33,7 +33,7 @@ def test_index(alice): """ we can download the index file """ - util.web_get(alice, u"") + util.web_get(alice.process, u"") @run_in_thread @@ -41,7 +41,7 @@ def test_index_json(alice): """ we can download the index file as json """ - data = util.web_get(alice, u"", params={u"t": u"json"}) + data = util.web_get(alice.process, u"", params={u"t": u"json"}) # it should be valid json json.loads(data) @@ -55,7 +55,7 @@ def test_upload_download(alice): FILE_CONTENTS = u"some contents" readcap = util.web_post( - alice, u"uri", + alice.process, u"uri", data={ u"t": u"upload", u"format": u"mdmf", @@ -67,7 +67,7 @@ def test_upload_download(alice): readcap = readcap.strip() data = util.web_get( - alice, u"uri", + alice.process, u"uri", params={ u"uri": readcap, u"filename": u"boom", @@ -85,11 +85,11 @@ def test_put(alice): FILE_CONTENTS = b"added via PUT" * 20 resp = requests.put( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), data=FILE_CONTENTS, ) cap = allmydata.uri.from_string(resp.text.strip().encode('ascii')) - cfg = alice.get_config() + cfg = alice.process.get_config() assert isinstance(cap, allmydata.uri.CHKFileURI) assert cap.size == len(FILE_CONTENTS) assert cap.total_shares == int(cfg.get_config("client", "shares.total")) @@ -102,7 +102,7 @@ def test_helper_status(storage_nodes): successfully GET the /helper_status page """ - url = util.node_url(storage_nodes[0].node_dir, "helper_status") + url = util.node_url(storage_nodes[0].process.node_dir, "helper_status") resp = requests.get(url) assert resp.status_code >= 200 and resp.status_code < 300 dom = BeautifulSoup(resp.content, "html5lib") @@ -116,7 +116,7 @@ def test_deep_stats(alice): URIs work """ resp = requests.post( - util.node_url(alice.node_dir, "uri"), + util.node_url(alice.process.node_dir, "uri"), params={ "format": "sdmf", "t": "mkdir", @@ -130,7 +130,7 @@ def test_deep_stats(alice): uri = url_unquote(resp.url) assert 'URI:DIR2:' in uri dircap = uri[uri.find("URI:DIR2:"):].rstrip('/') - dircap_uri = util.node_url(alice.node_dir, "uri/{}".format(url_quote(dircap))) + dircap_uri = util.node_url(alice.process.node_dir, "uri/{}".format(url_quote(dircap))) # POST a file into this directory FILE_CONTENTS = u"a file in a directory" @@ -176,7 +176,7 @@ def test_deep_stats(alice): while tries > 0: tries -= 1 resp = requests.get( - util.node_url(alice.node_dir, u"operations/something_random"), + util.node_url(alice.process.node_dir, u"operations/something_random"), ) d = json.loads(resp.content) if d['size-literal-files'] == len(FILE_CONTENTS): @@ -201,21 +201,21 @@ def test_status(alice): FILE_CONTENTS = u"all the Important Data of alice\n" * 1200 resp = requests.put( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), data=FILE_CONTENTS, ) cap = resp.text.strip() print("Uploaded data, cap={}".format(cap)) resp = requests.get( - util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap))), + util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap))), ) print("Downloaded {} bytes of data".format(len(resp.content))) assert str(resp.content, "ascii") == FILE_CONTENTS resp = requests.get( - util.node_url(alice.node_dir, "status"), + util.node_url(alice.process.node_dir, "status"), ) dom = html5lib.parse(resp.content) @@ -229,7 +229,7 @@ def test_status(alice): for href in hrefs: if href == u"/" or not href: continue - resp = requests.get(util.node_url(alice.node_dir, href)) + resp = requests.get(util.node_url(alice.process.node_dir, href)) if href.startswith(u"/status/up"): assert b"File Upload Status" in resp.content if b"Total Size: %d" % (len(FILE_CONTENTS),) in resp.content: @@ -241,7 +241,7 @@ def test_status(alice): # download the specialized event information resp = requests.get( - util.node_url(alice.node_dir, u"{}/event_json".format(href)), + util.node_url(alice.process.node_dir, u"{}/event_json".format(href)), ) js = json.loads(resp.content) # there's usually just one "read" operation, but this can handle many .. @@ -264,14 +264,14 @@ async def test_directory_deep_check(reactor, request, alice): required = 2 total = 4 - await util.reconfigure(reactor, request, alice, (happy, required, total), convergence=None) + await alice.reconfigure_zfec(reactor, (happy, required, total), convergence=None) await deferToThread(_test_directory_deep_check_blocking, alice) def _test_directory_deep_check_blocking(alice): # create a directory resp = requests.post( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), params={ u"t": u"mkdir", u"redirect_to_result": u"true", @@ -320,7 +320,7 @@ def _test_directory_deep_check_blocking(alice): print("Uploaded data1, cap={}".format(cap1)) resp = requests.get( - util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap0))), + util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap0))), params={u"t": u"info"}, ) @@ -437,7 +437,7 @@ def test_storage_info(storage_nodes): storage0 = storage_nodes[0] requests.get( - util.node_url(storage0.node_dir, u"storage"), + util.node_url(storage0.process.node_dir, u"storage"), ) @@ -449,7 +449,7 @@ def test_storage_info_json(storage_nodes): storage0 = storage_nodes[0] resp = requests.get( - util.node_url(storage0.node_dir, u"storage"), + util.node_url(storage0.process.node_dir, u"storage"), params={u"t": u"json"}, ) data = json.loads(resp.content) @@ -462,12 +462,12 @@ def test_introducer_info(introducer): retrieve and confirm /introducer URI for the introducer """ resp = requests.get( - util.node_url(introducer.node_dir, u""), + util.node_url(introducer.process.node_dir, u""), ) assert b"Introducer" in resp.content resp = requests.get( - util.node_url(introducer.node_dir, u""), + util.node_url(introducer.process.node_dir, u""), params={u"t": u"json"}, ) data = json.loads(resp.content) @@ -484,14 +484,14 @@ def test_mkdir_with_children(alice): # create a file to put in our directory FILE_CONTENTS = u"some file contents\n" * 500 resp = requests.put( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), data=FILE_CONTENTS, ) filecap = resp.content.strip() # create a (sub) directory to put in our directory resp = requests.post( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), params={ u"t": u"mkdir", } @@ -534,7 +534,7 @@ def test_mkdir_with_children(alice): # create a new directory with one file and one sub-dir (all-at-once) resp = util.web_post( - alice, u"uri", + alice.process, u"uri", params={u"t": "mkdir-with-children"}, data=json.dumps(meta), ) diff --git a/integration/util.py b/integration/util.py index 31d351bc1..85a2fc3ee 100644 --- a/integration/util.py +++ b/integration/util.py @@ -70,16 +70,40 @@ class _ProcessExitedProtocol(ProcessProtocol): self.done.callback(None) +class ProcessFailed(Exception): + """ + A subprocess has failed. + + :ivar ProcessTerminated reason: the original reason from .processExited + + :ivar StringIO output: all stdout and stderr collected to this point. + """ + + def __init__(self, reason, output): + self.reason = reason + self.output = output + + def __str__(self): + return ":\n{}".format(self.reason, self.output) + + class _CollectOutputProtocol(ProcessProtocol): """ Internal helper. Collects all output (stdout + stderr) into self.output, and callback's on done with all of it after the process exits (for any reason). """ - def __init__(self, capture_stderr=True): + + def __init__(self, capture_stderr=True, stdin=None): self.done = Deferred() self.output = BytesIO() self.capture_stderr = capture_stderr + self._stdin = stdin + + def connectionMade(self): + if self._stdin is not None: + self.transport.write(self._stdin) + self.transport.closeStdin() def processEnded(self, reason): if not self.done.called: @@ -87,7 +111,7 @@ class _CollectOutputProtocol(ProcessProtocol): def processExited(self, reason): if not isinstance(reason.value, ProcessDone): - self.done.errback(reason) + self.done.errback(ProcessFailed(reason, self.output.getvalue())) def outReceived(self, data): self.output.write(data) @@ -153,38 +177,33 @@ class _MagicTextProtocol(ProcessProtocol): sys.stdout.write(self.name + line + "\n") -def _cleanup_process_async(transport: IProcessTransport, allow_missing: bool) -> None: +def _cleanup_process_async(transport: IProcessTransport) -> None: """ If the given process transport seems to still be associated with a running process, send a SIGTERM to that process. :param transport: The transport to use. - :param allow_missing: If ``True`` then it is not an error for the - transport to have no associated process. Otherwise, an exception will - be raised in that case. - :raise: ``ValueError`` if ``allow_missing`` is ``False`` and the transport has no process. """ if transport.pid is None: - if allow_missing: - print("Process already cleaned up and that's okay.") - return - else: - raise ValueError("Process is not running") + # in cases of "restart", we will have registered a finalizer + # that will kill the process -- but already explicitly killed + # it (and then ran again) due to the "restart". So, if the + # process is already killed, our job is done. + print("Process already cleaned up and that's okay.") + return print("signaling {} with TERM".format(transport.pid)) try: transport.signalProcess('TERM') except ProcessExitedAlready: # The transport object thought it still had a process but the real OS # process has already exited. That's fine. We accomplished what we - # wanted to. We don't care about ``allow_missing`` here because - # there's no way we could have known the real OS process already - # exited. + # wanted to. pass -def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False): +def _cleanup_tahoe_process(tahoe_transport, exited): """ Terminate the given process with a kill signal (SIGTERM on POSIX, TerminateProcess on Windows). @@ -195,12 +214,26 @@ def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False): :return: After the process has exited. """ from twisted.internet import reactor - _cleanup_process_async(tahoe_transport, allow_missing=allow_missing) - print("signaled, blocking on exit") + _cleanup_process_async(tahoe_transport) + print(f"signaled, blocking on exit {exited}") block_with_timeout(exited, reactor) print("exited, goodbye") +def run_tahoe(reactor, request, *args, **kwargs): + """ + Helper to run tahoe with optional coverage. + + :returns: a Deferred that fires when the command is done (or a + ProcessFailed exception if it exits non-zero) + """ + stdin = kwargs.get("stdin", None) + protocol = _CollectOutputProtocol(stdin=stdin) + process = _tahoe_runner_optional_coverage(protocol, reactor, request, args) + process.exited = protocol.done + return protocol.done + + def _tahoe_runner_optional_coverage(proto, reactor, request, other_args): """ Internal helper. Calls spawnProcess with `-m @@ -244,16 +277,20 @@ class TahoeProcess(object): ) def kill(self): - """Kill the process, block until it's done.""" + """ + Kill the process, block until it's done. + Does nothing if the process is already stopped (or never started). + """ print(f"TahoeProcess.kill({self.transport.pid} / {self.node_dir})") _cleanup_tahoe_process(self.transport, self.transport.exited) def kill_async(self): """ Kill the process, return a Deferred that fires when it's done. + Does nothing if the process is already stopped (or never started). """ print(f"TahoeProcess.kill_async({self.transport.pid} / {self.node_dir})") - _cleanup_process_async(self.transport, allow_missing=False) + _cleanup_process_async(self.transport) return self.transport.exited def restart_async(self, reactor: IReactorProcess, request: Any) -> Deferred: @@ -264,7 +301,7 @@ class TahoeProcess(object): handle requests. """ d = self.kill_async() - d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None, finalize=False)) + d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None)) def got_new_process(proc): # Grab the new transport since the one we had before is no longer # valid after the stop/start cycle. @@ -276,7 +313,7 @@ class TahoeProcess(object): return "".format(self._node_dir) -def _run_node(reactor, node_dir, request, magic_text, finalize=True): +def _run_node(reactor, node_dir, request, magic_text): """ Run a tahoe process from its node_dir. @@ -305,8 +342,7 @@ def _run_node(reactor, node_dir, request, magic_text, finalize=True): node_dir, ) - if finalize: - request.addfinalizer(tahoe_process.kill) + request.addfinalizer(tahoe_process.kill) d = protocol.magic_seen d.addCallback(lambda ignored: tahoe_process) @@ -348,8 +384,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam magic_text=None, needed=2, happy=3, - total=4, - finalize=True): + total=4): """ Helper to create a single node, run it and return the instance spawnProcess returned (ITransport) @@ -360,7 +395,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam if exists(node_dir): created_d = succeed(None) else: - print("creating", node_dir) + print("creating: {}".format(node_dir)) mkdir(node_dir) done_proto = _ProcessExitedProtocol() args = [ @@ -383,13 +418,13 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam created_d = done_proto.done def created(_): - basic_node_configuration(request, flog_gatherer, node_dir) + basic_node_configuration(request, flog_gatherer.furl, node_dir) created_d.addCallback(created) d = Deferred() d.callback(None) d.addCallback(lambda _: created_d) - d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text, finalize=finalize)) + d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text)) return d @@ -622,19 +657,28 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve time.sleep(1) continue - print( - f"Now: {time.ctime()}\n" - f"Server last-received-data: {[time.ctime(s['last_received_data']) for s in servers]}" - ) - + now = time.time() server_times = [ server['last_received_data'] - for server in servers + for server + in servers + if server['last_received_data'] is not None ] + print( + f"Now: {time.ctime(now)}\n" + f"Liveness required: {liveness}\n" + f"Server last-received-data: {[time.ctime(s) for s in server_times]}\n" + f"Server ages: {[now - s for s in server_times]}\n" + ) + # check that all times are 'recent enough' (it's OK if _some_ servers # are down, we just want to make sure a sufficient number are up) - if len([time.time() - t <= liveness for t in server_times if t is not None]) < minimum_number_of_servers: - print("waiting because at least one server too old") + alive = [t for t in server_times if now - t <= liveness] + if len(alive) < minimum_number_of_servers: + print( + f"waiting because we found {len(alive)} servers " + f"and want {minimum_number_of_servers}" + ) time.sleep(1) continue @@ -706,7 +750,6 @@ class SSK: def load(cls, params: dict) -> SSK: assert params.keys() == {"format", "mutable", "key"} return cls(params["format"], params["key"].encode("ascii")) - def customize(self) -> SSK: """ Return an SSK with a newly generated random RSA key. @@ -745,7 +788,7 @@ def upload(alice: TahoeProcess, fmt: CHK | SSK, data: bytes) -> str: f.write(data) f.flush() with fmt.to_argv() as fmt_argv: - argv = [alice, "put"] + fmt_argv + [f.name] + argv = [alice.process, "put"] + fmt_argv + [f.name] return cli(*argv).decode("utf-8").strip() diff --git a/newsfragments/3508.minor b/newsfragments/3508.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3899.bugfix b/newsfragments/3899.bugfix new file mode 100644 index 000000000..55d4fabd4 --- /dev/null +++ b/newsfragments/3899.bugfix @@ -0,0 +1,4 @@ +Provide better feedback from plugin configuration errors + +Local errors now print a useful message and exit. +Announcements that only contain invalid / unusable plugins now show a message in the Welcome page. diff --git a/newsfragments/4052.minor b/newsfragments/4052.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/4055.minor b/newsfragments/4055.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/4056.bugfix b/newsfragments/4056.bugfix new file mode 100644 index 000000000..7e637b48c --- /dev/null +++ b/newsfragments/4056.bugfix @@ -0,0 +1,3 @@ +Provide our own copy of attrs' "provides()" validator + +This validator is deprecated and slated for removal; that project's suggestion is to copy the code to our project. diff --git a/newsfragments/4059.minor b/newsfragments/4059.minor new file mode 100644 index 000000000..e69de29bb diff --git a/setup.py b/setup.py index 7ca2650d5..6f807a2d2 100644 --- a/setup.py +++ b/setup.py @@ -118,10 +118,10 @@ install_requires = [ "pyrsistent", # A great way to define types of values. - "attrs >= 18.2.0", + "attrs >= 20.1.0", # WebSocket library for twisted and asyncio - "autobahn", + "autobahn >= 22.4.3", # Support for Python 3 transition "future >= 0.18.2", @@ -151,7 +151,7 @@ install_requires = [ "pycddl >= 0.4", # Command-line parsing - "click >= 7.0", + "click >= 8.1.1", # for pid-file support "psutil", @@ -413,7 +413,7 @@ setup(name="tahoe-lafs", # also set in __init__.py "pip==22.0.3", "wheel==0.37.1", "setuptools==60.9.1", - "subunitreporter==22.2.0", + "subunitreporter==23.8.0", "python-subunit==1.4.2", "junitxml==0.7", "coverage==7.2.5", @@ -435,7 +435,7 @@ setup(name="tahoe-lafs", # also set in __init__.py "paramiko < 2.9", "pytest-timeout", # Does our OpenMetrics endpoint adhere to the spec: - "prometheus-client == 0.11.0", + "prometheus-client == 0.11.0" ] + tor_requires + i2p_requires, "tor": tor_requires, "i2p": i2p_requires, diff --git a/src/allmydata/cli/grid_manager.py b/src/allmydata/cli/grid_manager.py index 3110a072e..ddb1f2f37 100644 --- a/src/allmydata/cli/grid_manager.py +++ b/src/allmydata/cli/grid_manager.py @@ -222,3 +222,7 @@ def _config_path_from_option(config: str) -> Optional[FilePath]: if config == "-": return None return FilePath(config) + + +if __name__ == '__main__': + grid_manager() # type: ignore diff --git a/src/allmydata/client.py b/src/allmydata/client.py index aff2d5815..cfc0977a1 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -483,6 +483,11 @@ def create_storage_farm_broker(config: _Config, default_connection_handlers, foo storage_client_config = storage_client.StorageClientConfig.from_node_config( config, ) + # ensure that we can at least load all plugins that the + # configuration mentions; doing this early (i.e. before creating + # storage-clients themselves) allows us to exit in case of a + # problem. + storage_client_config.get_configured_storage_plugins() def tub_creator(handler_overrides=None, **kwargs): return node.create_tub( diff --git a/src/allmydata/grid_manager.py b/src/allmydata/grid_manager.py index e264734b2..f366391fc 100644 --- a/src/allmydata/grid_manager.py +++ b/src/allmydata/grid_manager.py @@ -486,7 +486,9 @@ def create_grid_manager_verifier(keys, certs, public_key, now_fn=None, bad_cert= now = now_fn() for cert in valid_certs: expires = datetime.fromisoformat(cert["expires"]) - if cert['public_key'].encode("ascii") == public_key: + pc = cert['public_key'].encode('ascii') + assert type(pc) == type(public_key), "{} isn't {}".format(type(pc), type(public_key)) + if pc == public_key: if expires > now: # not-expired return True diff --git a/src/allmydata/scripts/tahoe_run.py b/src/allmydata/scripts/tahoe_run.py index ff3ff9efd..eba5ae329 100644 --- a/src/allmydata/scripts/tahoe_run.py +++ b/src/allmydata/scripts/tahoe_run.py @@ -42,6 +42,9 @@ from allmydata.util.pid import ( from allmydata.storage.crawler import ( MigratePickleFileError, ) +from allmydata.storage_client import ( + MissingPlugin, +) from allmydata.node import ( PortAssignmentRequired, PrivacyError, @@ -197,6 +200,17 @@ class DaemonizeTheRealService(Service, HookMixin): self.basedir, ) ) + elif reason.check(MissingPlugin): + self.stderr.write( + "Missing Plugin\n" + "The configuration requests a plugin:\n" + "\n {}\n\n" + "...which cannot be found.\n" + "This typically means that some software hasn't been installed or the plugin couldn't be instantiated.\n\n" + .format( + reason.value.plugin_name, + ) + ) else: self.stderr.write("\nUnknown error, here's the traceback:\n") reason.printTraceback(self.stderr) diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index 765e94319..b508c07fd 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -41,6 +41,7 @@ from twisted.internet.interfaces import ( IDelayedCall, ) from twisted.internet.ssl import CertificateOptions +from twisted.protocols.tls import TLSMemoryBIOProtocol from twisted.web.client import Agent, HTTPConnectionPool from zope.interface import implementer from hyperlink import DecodedURL @@ -72,7 +73,7 @@ except ImportError: pass -def _encode_si(si): # type: (bytes) -> str +def _encode_si(si: bytes) -> str: """Encode the storage index into Unicode string.""" return str(si_b2a(si), "ascii") @@ -80,9 +81,13 @@ def _encode_si(si): # type: (bytes) -> str class ClientException(Exception): """An unexpected response code from the server.""" - def __init__(self, code, *additional_args): - Exception.__init__(self, code, *additional_args) + def __init__( + self, code: int, message: Optional[str] = None, body: Optional[bytes] = None + ): + Exception.__init__(self, code, message, body) self.code = code + self.message = message + self.body = body register_exception_extractor(ClientException, lambda e: {"response_code": e.code}) @@ -93,7 +98,7 @@ register_exception_extractor(ClientException, lambda e: {"response_code": e.code # Tags are of the form #6.nnn, where the number is documented at # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258 # indicates a set. -_SCHEMAS = { +_SCHEMAS: Mapping[str, Schema] = { "get_version": Schema( # Note that the single-quoted (`'`) string keys in this schema # represent *byte* strings - per the CDDL specification. Text strings @@ -155,7 +160,7 @@ class _LengthLimitedCollector: timeout_on_silence: IDelayedCall f: BytesIO = field(factory=BytesIO) - def __call__(self, data: bytes): + def __call__(self, data: bytes) -> None: self.timeout_on_silence.reset(60) self.remaining_length -= len(data) if self.remaining_length < 0: @@ -164,7 +169,7 @@ class _LengthLimitedCollector: def limited_content( - response, + response: IResponse, clock: IReactorTime, max_length: int = 30 * 1024 * 1024, ) -> Deferred[BinaryIO]: @@ -300,11 +305,13 @@ class _StorageClientHTTPSPolicy: expected_spki_hash: bytes # IPolicyForHTTPS - def creatorForNetloc(self, hostname, port): + def creatorForNetloc(self, hostname: str, port: int) -> _StorageClientHTTPSPolicy: return self # IOpenSSLClientConnectionCreator - def clientConnectionForTLS(self, tlsProtocol): + def clientConnectionForTLS( + self, tlsProtocol: TLSMemoryBIOProtocol + ) -> SSL.Connection: return SSL.Connection( _TLSContextFactory(self.expected_spki_hash).getContext(), None ) @@ -344,7 +351,7 @@ class StorageClientFactory: cls.TEST_MODE_REGISTER_HTTP_POOL = callback @classmethod - def stop_test_mode(cls): + def stop_test_mode(cls) -> None: """Stop testing mode.""" cls.TEST_MODE_REGISTER_HTTP_POOL = None @@ -437,7 +444,7 @@ class StorageClient(object): """Get a URL relative to the base URL.""" return self._base_url.click(path) - def _get_headers(self, headers): # type: (Optional[Headers]) -> Headers + def _get_headers(self, headers: Optional[Headers]) -> Headers: """Return the basic headers to be used by default.""" if headers is None: headers = Headers() @@ -565,7 +572,7 @@ class StorageClient(object): ).read() raise ClientException(response.code, response.phrase, data) - def shutdown(self) -> Deferred: + def shutdown(self) -> Deferred[object]: """Shutdown any connections.""" return self._pool.closeCachedConnections() diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 3ff98e933..66b0dd6de 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -4,7 +4,18 @@ HTTP server for storage. from __future__ import annotations -from typing import Any, Callable, Union, cast, Optional +from typing import ( + Any, + Callable, + Union, + cast, + Optional, + TypeVar, + Sequence, + Protocol, + Dict, +) +from typing_extensions import ParamSpec, Concatenate from functools import wraps from base64 import b64decode import binascii @@ -15,20 +26,24 @@ import mmap from eliot import start_action from cryptography.x509 import Certificate as CryptoCertificate from zope.interface import implementer -from klein import Klein +from klein import Klein, KleinRenderable +from klein.resource import KleinResource from twisted.web import http from twisted.internet.interfaces import ( IListeningPort, IStreamServerEndpoint, IPullProducer, + IProtocolFactory, ) from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.defer import Deferred from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate from twisted.internet.interfaces import IReactorFromThreads from twisted.web.server import Site, Request +from twisted.web.iweb import IRequest from twisted.protocols.tls import TLSMemoryBIOFactory from twisted.python.filepath import FilePath +from twisted.python.failure import Failure from attrs import define, field, Factory from werkzeug.http import ( @@ -68,7 +83,7 @@ class ClientSecretsException(Exception): def _extract_secrets( - header_values: list[str], required_secrets: set[Secrets] + header_values: Sequence[str], required_secrets: set[Secrets] ) -> dict[Secrets, bytes]: """ Given list of values of ``X-Tahoe-Authorization`` headers, and required @@ -102,18 +117,43 @@ def _extract_secrets( return result -def _authorization_decorator(required_secrets): +class BaseApp(Protocol): + """Protocol for ``HTTPServer`` and testing equivalent.""" + + _swissnum: bytes + + +P = ParamSpec("P") +T = TypeVar("T") +SecretsDict = Dict[Secrets, bytes] +App = TypeVar("App", bound=BaseApp) + + +def _authorization_decorator( + required_secrets: set[Secrets], +) -> Callable[ + [Callable[Concatenate[App, Request, SecretsDict, P], T]], + Callable[Concatenate[App, Request, P], T], +]: """ 1. Check the ``Authorization`` header matches server swissnum. 2. Extract ``X-Tahoe-Authorization`` headers and pass them in. 3. Log the request and response. """ - def decorator(f): + def decorator( + f: Callable[Concatenate[App, Request, SecretsDict, P], T] + ) -> Callable[Concatenate[App, Request, P], T]: @wraps(f) - def route(self, request, *args, **kwargs): - # Don't set text/html content type by default: - request.defaultContentType = None + def route( + self: App, + request: Request, + *args: P.args, + **kwargs: P.kwargs, + ) -> T: + # Don't set text/html content type by default. + # None is actually supported, see https://github.com/twisted/twisted/issues/11902 + request.defaultContentType = None # type: ignore[assignment] with start_action( action_type="allmydata:storage:http-server:handle-request", @@ -163,7 +203,22 @@ def _authorization_decorator(required_secrets): return decorator -def _authorized_route(app, required_secrets, *route_args, **route_kwargs): +def _authorized_route( + klein_app: Klein, + required_secrets: set[Secrets], + url: str, + *route_args: Any, + branch: bool = False, + **route_kwargs: Any, +) -> Callable[ + [ + Callable[ + Concatenate[App, Request, SecretsDict, P], + KleinRenderable, + ] + ], + Callable[..., KleinRenderable], +]: """ Like Klein's @route, but with additional support for checking the ``Authorization`` header as well as ``X-Tahoe-Authorization`` headers. The @@ -173,12 +228,23 @@ def _authorized_route(app, required_secrets, *route_args, **route_kwargs): :param required_secrets: Set of required ``Secret`` types. """ - def decorator(f): - @app.route(*route_args, **route_kwargs) + def decorator( + f: Callable[ + Concatenate[App, Request, SecretsDict, P], + KleinRenderable, + ] + ) -> Callable[..., KleinRenderable]: + @klein_app.route(url, *route_args, branch=branch, **route_kwargs) # type: ignore[arg-type] @_authorization_decorator(required_secrets) @wraps(f) - def handle_route(*args, **kwargs): - return f(*args, **kwargs) + def handle_route( + app: App, + request: Request, + secrets: SecretsDict, + *args: P.args, + **kwargs: P.kwargs, + ) -> KleinRenderable: + return f(app, request, secrets, *args, **kwargs) return handle_route @@ -234,7 +300,7 @@ class UploadsInProgress(object): except (KeyError, IndexError): raise _HTTPError(http.NOT_FOUND) - def remove_write_bucket(self, bucket: BucketWriter): + def remove_write_bucket(self, bucket: BucketWriter) -> None: """Stop tracking the given ``BucketWriter``.""" try: storage_index, share_number = self._bucketwriters.pop(bucket) @@ -250,7 +316,7 @@ class UploadsInProgress(object): def validate_upload_secret( self, storage_index: bytes, share_number: int, upload_secret: bytes - ): + ) -> None: """ Raise an unauthorized-HTTP-response exception if the given storage_index+share_number have a different upload secret than the @@ -272,7 +338,7 @@ class StorageIndexConverter(BaseConverter): regex = "[" + str(rfc3548_alphabet, "ascii") + "]{26}" - def to_python(self, value): + def to_python(self, value: str) -> bytes: try: return si_a2b(value.encode("ascii")) except (AssertionError, binascii.Error, ValueError): @@ -351,7 +417,7 @@ class _ReadAllProducer: start: int = field(default=0) @classmethod - def produce_to(cls, request: Request, read_data: ReadData) -> Deferred: + def produce_to(cls, request: Request, read_data: ReadData) -> Deferred[bytes]: """ Create and register the producer, returning ``Deferred`` that should be returned from a HTTP server endpoint. @@ -360,7 +426,7 @@ class _ReadAllProducer: request.registerProducer(producer, False) return producer.result - def resumeProducing(self): + def resumeProducing(self) -> None: data = self.read_data(self.start, 65536) if not data: self.request.unregisterProducer() @@ -371,10 +437,10 @@ class _ReadAllProducer: self.request.write(data) self.start += len(data) - def pauseProducing(self): + def pauseProducing(self) -> None: pass - def stopProducing(self): + def stopProducing(self) -> None: pass @@ -392,7 +458,7 @@ class _ReadRangeProducer: start: int remaining: int - def resumeProducing(self): + def resumeProducing(self) -> None: if self.result is None or self.request is None: return @@ -429,10 +495,10 @@ class _ReadRangeProducer: if self.remaining == 0: self.stopProducing() - def pauseProducing(self): + def pauseProducing(self) -> None: pass - def stopProducing(self): + def stopProducing(self) -> None: if self.request is not None: self.request.unregisterProducer() self.request = None @@ -511,12 +577,13 @@ def read_range( return d -def _add_error_handling(app: Klein): +def _add_error_handling(app: Klein) -> None: """Add exception handlers to a Klein app.""" @app.handle_errors(_HTTPError) - def _http_error(_, request, failure): + def _http_error(self: Any, request: IRequest, failure: Failure) -> KleinRenderable: """Handle ``_HTTPError`` exceptions.""" + assert isinstance(failure.value, _HTTPError) request.setResponseCode(failure.value.code) if failure.value.body is not None: return failure.value.body @@ -524,7 +591,9 @@ def _add_error_handling(app: Klein): return b"" @app.handle_errors(CDDLValidationError) - def _cddl_validation_error(_, request, failure): + def _cddl_validation_error( + self: Any, request: IRequest, failure: Failure + ) -> KleinRenderable: """Handle CDDL validation errors.""" request.setResponseCode(http.BAD_REQUEST) return str(failure.value).encode("utf-8") @@ -584,7 +653,7 @@ async def read_encoded( return cbor2.load(request.content) -class HTTPServer(object): +class HTTPServer(BaseApp): """ A HTTP interface to the storage server. """ @@ -611,11 +680,11 @@ class HTTPServer(object): self._uploads.remove_write_bucket ) - def get_resource(self): + def get_resource(self) -> KleinResource: """Return twisted.web ``Resource`` for this object.""" return self._app.resource() - def _send_encoded(self, request, data): + def _send_encoded(self, request: Request, data: object) -> Deferred[bytes]: """ Return encoded data suitable for writing as the HTTP body response, by default using CBOR. @@ -641,11 +710,10 @@ class HTTPServer(object): # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 raise _HTTPError(http.NOT_ACCEPTABLE) - ##### Generic APIs ##### @_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"]) - def version(self, request, authorization): + def version(self, request: Request, authorization: SecretsDict) -> KleinRenderable: """Return version information.""" return self._send_encoded(request, self._get_version()) @@ -677,7 +745,9 @@ class HTTPServer(object): methods=["POST"], ) @async_to_deferred - async def allocate_buckets(self, request, authorization, storage_index): + async def allocate_buckets( + self, request: Request, authorization: SecretsDict, storage_index: bytes + ) -> KleinRenderable: """Allocate buckets.""" upload_secret = authorization[Secrets.UPLOAD] # It's just a list of up to ~256 shares, shouldn't use many bytes. @@ -716,7 +786,13 @@ class HTTPServer(object): "/storage/v1/immutable///abort", methods=["PUT"], ) - def abort_share_upload(self, request, authorization, storage_index, share_number): + def abort_share_upload( + self, + request: Request, + authorization: SecretsDict, + storage_index: bytes, + share_number: int, + ) -> KleinRenderable: """Abort an in-progress immutable share upload.""" try: bucket = self._uploads.get_write_bucket( @@ -747,7 +823,13 @@ class HTTPServer(object): "/storage/v1/immutable//", methods=["PATCH"], ) - def write_share_data(self, request, authorization, storage_index, share_number): + def write_share_data( + self, + request: Request, + authorization: SecretsDict, + storage_index: bytes, + share_number: int, + ) -> KleinRenderable: """Write data to an in-progress immutable upload.""" content_range = parse_content_range_header(request.getHeader("content-range")) if content_range is None or content_range.units != "bytes": @@ -757,14 +839,17 @@ class HTTPServer(object): bucket = self._uploads.get_write_bucket( storage_index, share_number, authorization[Secrets.UPLOAD] ) - offset = content_range.start - remaining = content_range.stop - content_range.start + offset = content_range.start or 0 + # We don't support an unspecified stop for the range: + assert content_range.stop is not None + # Missing body makes no sense: + assert request.content is not None + remaining = content_range.stop - offset finished = False while remaining > 0: data = request.content.read(min(remaining, 65536)) assert data, "uploaded data length doesn't match range" - try: finished = bucket.write(offset, data) except ConflictingWriteError: @@ -790,7 +875,9 @@ class HTTPServer(object): "/storage/v1/immutable//shares", methods=["GET"], ) - def list_shares(self, request, authorization, storage_index): + def list_shares( + self, request: Request, authorization: SecretsDict, storage_index: bytes + ) -> KleinRenderable: """ List shares for the given storage index. """ @@ -803,7 +890,13 @@ class HTTPServer(object): "/storage/v1/immutable//", methods=["GET"], ) - def read_share_chunk(self, request, authorization, storage_index, share_number): + def read_share_chunk( + self, + request: Request, + authorization: SecretsDict, + storage_index: bytes, + share_number: int, + ) -> KleinRenderable: """Read a chunk for an already uploaded immutable.""" request.setHeader("content-type", "application/octet-stream") try: @@ -820,7 +913,9 @@ class HTTPServer(object): "/storage/v1/lease/", methods=["PUT"], ) - def add_or_renew_lease(self, request, authorization, storage_index): + def add_or_renew_lease( + self, request: Request, authorization: SecretsDict, storage_index: bytes + ) -> KleinRenderable: """Update the lease for an immutable or mutable share.""" if not list(self._storage_server.get_shares(storage_index)): raise _HTTPError(http.NOT_FOUND) @@ -843,8 +938,12 @@ class HTTPServer(object): ) @async_to_deferred async def advise_corrupt_share_immutable( - self, request, authorization, storage_index, share_number - ): + self, + request: Request, + authorization: SecretsDict, + storage_index: bytes, + share_number: int, + ) -> KleinRenderable: """Indicate that given share is corrupt, with a text reason.""" try: bucket = self._storage_server.get_buckets(storage_index)[share_number] @@ -871,10 +970,15 @@ class HTTPServer(object): methods=["POST"], ) @async_to_deferred - async def mutable_read_test_write(self, request, authorization, storage_index): + async def mutable_read_test_write( + self, request: Request, authorization: SecretsDict, storage_index: bytes + ) -> KleinRenderable: """Read/test/write combined operation for mutables.""" rtw_request = await read_encoded( - self._reactor, request, _SCHEMAS["mutable_read_test_write"], max_size=2**48 + self._reactor, + request, + _SCHEMAS["mutable_read_test_write"], + max_size=2**48, ) secrets = ( authorization[Secrets.WRITE_ENABLER], @@ -910,7 +1014,13 @@ class HTTPServer(object): "/storage/v1/mutable//", methods=["GET"], ) - def read_mutable_chunk(self, request, authorization, storage_index, share_number): + def read_mutable_chunk( + self, + request: Request, + authorization: SecretsDict, + storage_index: bytes, + share_number: int, + ) -> KleinRenderable: """Read a chunk from a mutable.""" request.setHeader("content-type", "application/octet-stream") @@ -950,8 +1060,12 @@ class HTTPServer(object): ) @async_to_deferred async def advise_corrupt_share_mutable( - self, request, authorization, storage_index, share_number - ): + self, + request: Request, + authorization: SecretsDict, + storage_index: bytes, + share_number: int, + ) -> KleinRenderable: """Indicate that given share is corrupt, with a text reason.""" if share_number not in { shnum for (shnum, _) in self._storage_server.get_shares(storage_index) @@ -983,7 +1097,10 @@ class _TLSEndpointWrapper(object): @classmethod def from_paths( - cls, endpoint, private_key_path: FilePath, cert_path: FilePath + cls: type[_TLSEndpointWrapper], + endpoint: IStreamServerEndpoint, + private_key_path: FilePath, + cert_path: FilePath, ) -> "_TLSEndpointWrapper": """ Create an endpoint with the given private key and certificate paths on @@ -998,7 +1115,7 @@ class _TLSEndpointWrapper(object): ) return cls(endpoint=endpoint, context_factory=certificate_options) - def listen(self, factory): + def listen(self, factory: IProtocolFactory) -> Deferred[IListeningPort]: return self.endpoint.listen( TLSMemoryBIOFactory(self.context_factory, False, factory) ) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index b58131837..ae7ea7ca1 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -33,15 +33,17 @@ Ported to Python 3. from __future__ import annotations from six import ensure_text -from typing import Union, Callable, Any, Optional, cast +from typing import Union, Callable, Any, Optional, cast, Dict from os import urandom import re import time import hashlib - +from io import StringIO from configparser import NoSectionError +import json import attr +from attr import define from hyperlink import DecodedURL from twisted.web.client import HTTPConnectionPool from zope.interface import ( @@ -55,12 +57,14 @@ from twisted.internet.task import LoopingCall from twisted.internet import defer, reactor from twisted.internet.interfaces import IReactorTime from twisted.application import service +from twisted.logger import Logger from twisted.plugin import ( getPlugins, ) from eliot import ( log_call, ) +from foolscap.ipb import IRemoteReference from foolscap.api import eventually, RemoteException from foolscap.reconnector import ( ReconnectionInfo, @@ -74,7 +78,7 @@ from allmydata.interfaces import ( VersionMessage ) from allmydata.grid_manager import ( - create_grid_manager_verifier, + create_grid_manager_verifier, SignedCertificate ) from allmydata.crypto import ( ed25519, @@ -87,6 +91,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.hashutil import permute_server_hash from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict from allmydata.util.deferredutil import async_to_deferred, race +from allmydata.util.attrs_provides import provides from allmydata.storage.http_client import ( StorageClient, StorageClientImmutables, StorageClientGeneral, ClientException as HTTPClientException, StorageClientMutables, @@ -95,6 +100,8 @@ from allmydata.storage.http_client import ( ) from .node import _Config +_log = Logger() + ANONYMOUS_STORAGE_NURLS = "anonymous-storage-NURLs" @@ -180,6 +187,31 @@ class StorageClientConfig(object): grid_manager_keys, ) + def get_configured_storage_plugins(self) -> dict[str, IFoolscapStoragePlugin]: + """ + :returns: a mapping from names to instances for all available + plugins + + :raises MissingPlugin: if the configuration asks for a plugin + for which there is no corresponding instance (e.g. it is + not installed). + """ + plugins = { + plugin.name: plugin + for plugin + in getPlugins(IFoolscapStoragePlugin) + } + + # mypy doesn't like "str" in place of Any ... + configured: Dict[Any, IFoolscapStoragePlugin] = dict() + for plugin_name in self.storage_plugins: + try: + plugin = plugins[plugin_name] + except KeyError: + raise MissingPlugin(plugin_name) + configured[plugin_name] = plugin + return configured + @implementer(IStorageBroker) class StorageFarmBroker(service.MultiService): @@ -317,8 +349,8 @@ class StorageFarmBroker(service.MultiService): assert isinstance(server_id, bytes) gm_verifier = create_grid_manager_verifier( self.storage_client_config.grid_manager_keys, - server["ann"].get("grid-manager-certificates", []), - "pub-{}".format(str(server_id, "ascii")), # server_id is v0- not pub-v0-key .. for reasons? + [SignedCertificate.load(StringIO(json.dumps(data))) for data in server["ann"].get("grid-manager-certificates", [])], + "pub-{}".format(str(server_id, "ascii")).encode("ascii"), # server_id is v0- not pub-v0-key .. for reasons? ) if self._should_we_use_http(self.node_config, server["ann"]): @@ -658,7 +690,7 @@ class _FoolscapStorage(object): permutation_seed = attr.ib() tubid = attr.ib() - storage_server = attr.ib(validator=attr.validators.provides(IStorageServer)) + storage_server = attr.ib(validator=provides(IStorageServer)) _furl = attr.ib() _short_description = attr.ib() @@ -708,6 +740,7 @@ class _FoolscapStorage(object): @implementer(IFoolscapStorageServer) +@define class _NullStorage(object): """ Abstraction for *not* communicating with a storage server of a type with @@ -721,7 +754,7 @@ class _NullStorage(object): lease_seed = hashlib.sha256(b"").digest() name = "" - longname = "" + longname: str = "" def connect_to(self, tub, got_connection): return NonReconnector() @@ -740,8 +773,6 @@ class NonReconnector(object): def getReconnectionInfo(self): return ReconnectionInfo() -_null_storage = _NullStorage() - class AnnouncementNotMatched(Exception): """ @@ -750,6 +781,18 @@ class AnnouncementNotMatched(Exception): """ +@attr.s(auto_exc=True) +class MissingPlugin(Exception): + """ + A particular plugin was requested but is missing + """ + + plugin_name = attr.ib() + + def __str__(self): + return "Missing plugin '{}'".format(self.plugin_name) + + def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref): """ Construct an ``IStorageServer`` from the most locally-preferred plugin @@ -757,27 +800,37 @@ def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref): :param allmydata.node._Config node_config: The node configuration to pass to the plugin. + + :param dict announcement: The storage announcement for the storage + server we should build """ - plugins = { - plugin.name: plugin - for plugin - in getPlugins(IFoolscapStoragePlugin) - } storage_options = announcement.get(u"storage-options", []) - for plugin_name, plugin_config in list(config.storage_plugins.items()): + plugins = config.get_configured_storage_plugins() + + # for every storage-option that we have enabled locally (in order + # of preference), see if the announcement asks for such a thing. + # if it does, great: we return that storage-client + # otherwise we've run out of options... + + for options in storage_options: try: - plugin = plugins[plugin_name] + plugin = plugins[options[u"name"]] except KeyError: - raise ValueError("{} not installed".format(plugin_name)) - for option in storage_options: - if plugin_name == option[u"name"]: - furl = option[u"storage-server-FURL"] - return furl, plugin.get_storage_client( - node_config, - option, - get_rref, - ) - raise AnnouncementNotMatched() + # we didn't configure this kind of plugin locally, so + # consider the next announced option + continue + + furl = options[u"storage-server-FURL"] + return furl, plugin.get_storage_client( + node_config, + options, + get_rref, + ) + + # none of the storage options in the announcement are configured + # locally; we can't make a storage-client. + plugin_names = ", ".join(sorted(option["name"] for option in storage_options)) + raise AnnouncementNotMatched(plugin_names) def _available_space_from_version(version): @@ -790,6 +843,83 @@ def _available_space_from_version(version): return available_space +def _make_storage_system( + node_config: _Config, + config: StorageClientConfig, + ann: dict, + server_id: bytes, + get_rref: Callable[[], Optional[IRemoteReference]], +) -> IFoolscapStorageServer: + """ + Create an object for interacting with the storage server described by + the given announcement. + + :param node_config: The node configuration to pass to any configured + storage plugins. + + :param config: Configuration specifying desired storage client behavior. + + :param ann: The storage announcement from the storage server we are meant + to communicate with. + + :param server_id: The unique identifier for the server. + + :param get_rref: A function which returns a remote reference to the + server-side object which implements this storage system, if one is + available (otherwise None). + + :return: An object enabling communication via Foolscap with the server + which generated the announcement. + """ + unmatched = None + # Try to match the announcement against a plugin. + try: + furl, storage_server = _storage_from_foolscap_plugin( + node_config, + config, + ann, + # Pass in an accessor for our _rref attribute. The value of + # the attribute may change over time as connections are lost + # and re-established. The _StorageServer should always be + # able to get the most up-to-date value. + get_rref, + ) + except AnnouncementNotMatched as e: + # show a more-specific error to the user for this server + # (Note this will only be shown if the server _doesn't_ offer + # anonymous service, which will match below) + unmatched = _NullStorage('{}: missing plugin "{}"'.format(server_id.decode("utf8"), str(e))) + else: + return _FoolscapStorage.from_announcement( + server_id, + furl, + ann, + storage_server, + ) + + # Try to match the announcement against the anonymous access scheme. + try: + furl = ann[u"anonymous-storage-FURL"] + except KeyError: + # Nope + pass + else: + # See comment above for the _storage_from_foolscap_plugin case + # about passing in get_rref. + storage_server = _StorageServer(get_rref=get_rref) + return _FoolscapStorage.from_announcement( + server_id, + furl, + ann, + storage_server, + ) + + # Nothing matched so we can't talk to this server. (There should + # not be a way to get here without this local being valid) + assert unmatched is not None, "Expected unmatched plugin error" + return unmatched + + @implementer(IServer) class NativeStorageServer(service.MultiService): """I hold information about a storage server that we want to connect to. @@ -831,7 +961,7 @@ class NativeStorageServer(service.MultiService): self._grid_manager_verifier = grid_manager_verifier - self._storage = self._make_storage_system(node_config, config, ann) + self._storage = _make_storage_system(node_config, config, ann, self._server_id, self.get_rref) self.last_connect_time = None self.last_loss_time = None @@ -856,63 +986,6 @@ class NativeStorageServer(service.MultiService): return True return self._grid_manager_verifier() - def _make_storage_system(self, node_config, config, ann): - """ - :param allmydata.node._Config node_config: The node configuration to pass - to any configured storage plugins. - - :param StorageClientConfig config: Configuration specifying desired - storage client behavior. - - :param dict ann: The storage announcement from the storage server we - are meant to communicate with. - - :return IFoolscapStorageServer: An object enabling communication via - Foolscap with the server which generated the announcement. - """ - # Try to match the announcement against a plugin. - try: - furl, storage_server = _storage_from_foolscap_plugin( - node_config, - config, - ann, - # Pass in an accessor for our _rref attribute. The value of - # the attribute may change over time as connections are lost - # and re-established. The _StorageServer should always be - # able to get the most up-to-date value. - self.get_rref, - ) - except AnnouncementNotMatched: - # Nope. - pass - else: - return _FoolscapStorage.from_announcement( - self._server_id, - furl, - ann, - storage_server, - ) - - # Try to match the announcement against the anonymous access scheme. - try: - furl = ann[u"anonymous-storage-FURL"] - except KeyError: - # Nope - pass - else: - # See comment above for the _storage_from_foolscap_plugin case - # about passing in get_rref. - storage_server = _StorageServer(get_rref=self.get_rref) - return _FoolscapStorage.from_announcement( - self._server_id, - furl, - ann, - storage_server, - ) - - # Nothing matched so we can't talk to this server. - return _null_storage - def get_permutation_seed(self): return self._storage.permutation_seed def get_name(self): # keep methodname short @@ -1428,7 +1501,7 @@ class _FakeRemoteReference(object): result = yield getattr(self.local_object, action)(*args, **kwargs) defer.returnValue(result) except HTTPClientException as e: - raise RemoteException(e.args) + raise RemoteException((e.code, e.message, e.body)) @attr.s diff --git a/src/allmydata/test/cli/test_grid_manager.py b/src/allmydata/test/cli/test_grid_manager.py index 604cd6b7b..b44b322d2 100644 --- a/src/allmydata/test/cli/test_grid_manager.py +++ b/src/allmydata/test/cli/test_grid_manager.py @@ -23,6 +23,9 @@ import click.testing from ..common_util import ( run_cli, ) +from ..common import ( + superuser, +) from twisted.internet.defer import ( inlineCallbacks, ) @@ -34,7 +37,6 @@ from twisted.python.runtime import ( ) from allmydata.util import jsonbytes as json - class GridManagerCommandLine(TestCase): """ Test the mechanics of the `grid-manager` command @@ -223,7 +225,7 @@ class GridManagerCommandLine(TestCase): ) @skipIf(platform.isWindows(), "We don't know how to set permissions on Windows.") - @skipIf(os.getuid() == 0, "cannot test as superuser with all permissions") + @skipIf(superuser, "cannot test as superuser with all permissions") def test_sign_bad_perms(self): """ Error reported if we can't create certificate file diff --git a/src/allmydata/test/cli/test_run.py b/src/allmydata/test/cli/test_run.py index 2adcfea19..ae0f92131 100644 --- a/src/allmydata/test/cli/test_run.py +++ b/src/allmydata/test/cli/test_run.py @@ -264,7 +264,7 @@ class RunTests(SyncTestCase): self.assertThat(runs, Equals([])) self.assertThat(result_code, Equals(1)) - good_file_content_re = re.compile(r"\w[0-9]*\w[0-9]*\w") + good_file_content_re = re.compile(r"\s[0-9]*\s[0-9]*\s", re.M) @given(text()) def test_pidfile_contents(self, content): diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index db2921e86..1186bd540 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -117,6 +117,10 @@ from subprocess import ( PIPE, ) +# Is the process running as an OS user with elevated privileges (ie, root)? +# We only know how to determine this for POSIX systems. +superuser = getattr(os, "getuid", lambda: -1)() == 0 + EMPTY_CLIENT_CONFIG = config_from_string( "/dev/null", "tub.port", @@ -303,13 +307,17 @@ class UseNode(object): if self.plugin_config is None: plugin_config_section = "" else: - plugin_config_section = """ -[storageclient.plugins.{storage_plugin}] -{config} -""".format( - storage_plugin=self.storage_plugin, - config=format_config_items(self.plugin_config), -) + plugin_config_section = ( + "[storageclient.plugins.{storage_plugin}]\n" + "{config}\n").format( + storage_plugin=self.storage_plugin, + config=format_config_items(self.plugin_config), + ) + + if self.storage_plugin is None: + plugins = "" + else: + plugins = "storage.plugins = {}".format(self.storage_plugin) write_introducer( self.basedir, @@ -336,18 +344,17 @@ class UseNode(object): self.config = config_from_string( self.basedir.asTextMode().path, "tub.port", -""" -[node] -{node_config} - -[client] -storage.plugins = {storage_plugin} -{plugin_config_section} -""".format( - storage_plugin=self.storage_plugin, - node_config=format_config_items(node_config), - plugin_config_section=plugin_config_section, -) + "[node]\n" + "{node_config}\n" + "\n" + "[client]\n" + "{plugins}\n" + "{plugin_config_section}\n" + .format( + plugins=plugins, + node_config=format_config_items(node_config), + plugin_config_section=plugin_config_section, + ) ) def create_node(self): diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index 86c95a310..c0cce2809 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -77,6 +77,7 @@ from allmydata.scripts.common import ( from foolscap.api import flushEventualQueue import allmydata.test.common_util as testutil from .common import ( + superuser, EMPTY_CLIENT_CONFIG, SyncTestCase, AsyncBrokenTestCase, @@ -151,7 +152,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): # EnvironmentError when reading a file that really exists), on # windows, please fix this @skipIf(platform.isWindows(), "We don't know how to set permissions on Windows.") - @skipIf(os.getuid() == 0, "cannot test as superuser with all permissions") + @skipIf(superuser, "cannot test as superuser with all permissions") def test_unreadable_config(self): basedir = "test_client.Basic.test_unreadable_config" os.mkdir(basedir) diff --git a/src/allmydata/test/test_node.py b/src/allmydata/test/test_node.py index 1469ec5b2..90da877fb 100644 --- a/src/allmydata/test/test_node.py +++ b/src/allmydata/test/test_node.py @@ -62,6 +62,7 @@ from .common import ( ConstantAddresses, SameProcessStreamEndpointAssigner, UseNode, + superuser, ) def port_numbers(): @@ -325,7 +326,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase): self.assertEqual(config.items("nosuch", default), default) @skipIf(platform.isWindows(), "We don't know how to set permissions on Windows.") - @skipIf(os.getuid() == 0, "cannot test as superuser with all permissions") + @skipIf(superuser, "cannot test as superuser with all permissions") def test_private_config_unreadable(self): """ Asking for inaccessible private config is an error @@ -341,7 +342,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase): config.get_or_create_private_config("foo") @skipIf(platform.isWindows(), "We don't know how to set permissions on Windows.") - @skipIf(os.getuid() == 0, "cannot test as superuser with all permissions") + @skipIf(superuser, "cannot test as superuser with all permissions") def test_private_config_unreadable_preexisting(self): """ error if reading private config data fails @@ -398,7 +399,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase): self.assertEqual(len(counter), 1) # don't call unless necessary self.assertEqual(value, "newer") - @skipIf(os.getuid() == 0, "cannot test as superuser with all permissions") + @skipIf(superuser, "cannot test as superuser with all permissions") def test_write_config_unwritable_file(self): """ Existing behavior merely logs any errors upon writing diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index 514c4ef78..6e73281f6 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -8,7 +8,7 @@ from json import ( loads, ) import hashlib -from typing import Union, Any +from typing import Union, Any, Optional from hyperlink import DecodedURL from fixtures import ( @@ -89,6 +89,8 @@ from allmydata.storage_client import ( IFoolscapStorageServer, NativeStorageServer, StorageFarmBroker, + StorageClientConfig, + MissingPlugin, _FoolscapStorage, _NullStorage, _pick_a_http_server, @@ -170,16 +172,21 @@ class UnrecognizedAnnouncement(unittest.TestCase): an announcement generated by a storage server plugin which is not loaded in the client. """ + plugin_name = u"tahoe-lafs-testing-v1" ann = { - u"name": u"tahoe-lafs-testing-v1", - u"any-parameter": 12345, + u"storage-options": [ + { + u"name": plugin_name, + u"any-parameter": 12345, + }, + ], } server_id = b"abc" def _tub_maker(self, overrides): return Service() - def native_storage_server(self): + def native_storage_server(self, config: Optional[StorageClientConfig] = None) -> NativeStorageServer: """ Make a ``NativeStorageServer`` out of an unrecognizable announcement. """ @@ -188,7 +195,8 @@ class UnrecognizedAnnouncement(unittest.TestCase): self.ann, self._tub_maker, {}, - EMPTY_CLIENT_CONFIG, + node_config=EMPTY_CLIENT_CONFIG, + config=config if config is not None else StorageClientConfig(), ) def test_no_exceptions(self): @@ -235,6 +243,18 @@ class UnrecognizedAnnouncement(unittest.TestCase): server.get_foolscap_write_enabler_seed() server.get_nickname() + def test_missing_plugin(self) -> None: + """ + An exception is produced if the plugin is missing + """ + with self.assertRaises(MissingPlugin): + self.native_storage_server( + StorageClientConfig( + storage_plugins={ + "missing-plugin-name": {} + } + ) + ) class PluginMatchedAnnouncement(SyncTestCase): diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 1f17621d7..bc266e824 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -62,6 +62,7 @@ from ..storage.http_server import ( _add_error_handling, read_encoded, _SCHEMAS as SERVER_SCHEMAS, + BaseApp, ) from ..storage.http_client import ( StorageClient, @@ -257,7 +258,7 @@ def gen_bytes(length: int) -> bytes: return result -class TestApp(object): +class TestApp(BaseApp): """HTTP API for testing purposes.""" clock: IReactorTime @@ -265,7 +266,7 @@ class TestApp(object): _add_error_handling(_app) _swissnum = SWISSNUM_FOR_TEST # Match what the test client is using - @_authorized_route(_app, {}, "/noop", methods=["GET"]) + @_authorized_route(_app, set(), "/noop", methods=["GET"]) def noop(self, request, authorization): return "noop" diff --git a/src/allmydata/test/test_storage_https.py b/src/allmydata/test/test_storage_https.py index a11b0eed5..0e0bbcc95 100644 --- a/src/allmydata/test/test_storage_https.py +++ b/src/allmydata/test/test_storage_https.py @@ -109,9 +109,11 @@ class PinningHTTPSValidation(AsyncTestCase): root.isLeaf = True listening_port = await endpoint.listen(Site(root)) try: - yield f"https://127.0.0.1:{listening_port.getHost().port}/" + yield f"https://127.0.0.1:{listening_port.getHost().port}/" # type: ignore[attr-defined] finally: - await listening_port.stopListening() + result = listening_port.stopListening() + if result is not None: + await result def request(self, url: str, expected_certificate: x509.Certificate): """ diff --git a/src/allmydata/util/attrs_provides.py b/src/allmydata/util/attrs_provides.py new file mode 100644 index 000000000..4282c3d38 --- /dev/null +++ b/src/allmydata/util/attrs_provides.py @@ -0,0 +1,50 @@ +""" +Utilities related to attrs + +Handling for zope.interface is deprecated in attrs so we copy the +relevant support method here since we depend on zope.interface anyway +""" + +from attr._make import attrs, attrib + + +@attrs(repr=False, slots=True, hash=True) +class _ProvidesValidator: + interface = attrib() + + def __call__(self, inst, attr, value): + """ + We use a callable class to be able to change the ``__repr__``. + """ + if not self.interface.providedBy(value): + raise TypeError( + "'{name}' must provide {interface!r} which {value!r} " + "doesn't.".format( + name=attr.name, interface=self.interface, value=value + ), + attr, + self.interface, + value, + ) + + def __repr__(self): + return "".format( + interface=self.interface + ) + + +def provides(interface): + """ + A validator that raises a `TypeError` if the initializer is called + with an object that does not provide the requested *interface* (checks are + performed using ``interface.providedBy(value)`` (see `zope.interface + `_). + + :param interface: The interface to check for. + :type interface: ``zope.interface.Interface`` + + :raises TypeError: With a human readable error message, the attribute + (of type `attrs.Attribute`), the expected interface, and the + value it got. + """ + return _ProvidesValidator(interface)