Merge remote-tracking branch 'origin/master' into 4041

This commit is contained in:
Itamar Turner-Trauring 2023-08-16 10:11:01 -04:00
commit 28f6902b1f
37 changed files with 1890 additions and 545 deletions

View File

@ -24,6 +24,11 @@ version: 2.1
dockerhub-context-template: &DOCKERHUB_CONTEXT
context: "dockerhub-auth"
# Required environment for using the coveralls tool to upload partial coverage
# reports and then finish the process.
coveralls-environment: &COVERALLS_ENVIRONMENT
COVERALLS_REPO_TOKEN: "JPf16rLB7T2yjgATIxFzTsEgMdN1UNq6o"
# Next is a Docker executor template that gets the credentials from the
# environment and supplies them to the executor.
dockerhub-auth-template: &DOCKERHUB_AUTH
@ -112,6 +117,21 @@ workflows:
- "another-locale":
{}
- "windows-server-2022":
name: "Windows Server 2022, CPython <<matrix.pythonVersion>>"
matrix:
parameters:
# Run the job for a number of CPython versions. These are the
# two versions installed on the version of the Windows VM image
# we specify (in the executor). This is handy since it means we
# don't have to do any Python installation work. We pin the
# Windows VM image so these shouldn't shuffle around beneath us
# but if we want to update that image or get different versions
# of Python, we probably have to do something here.
pythonVersion:
- "3.9"
- "3.11"
- "integration":
# Run even the slow integration tests here. We need the `--` to
# sneak past tox and get to pytest.
@ -126,6 +146,15 @@ workflows:
- "docs":
{}
- "finish-coverage-report":
requires:
# Referencing the job by "alias" (as CircleCI calls the mapping
# key) instead of the value of its "name" property causes us to
# require every instance of the job from its matrix expansion. So
# this requirement is enough to require every Windows Server 2022
# job.
- "windows-server-2022"
images:
<<: *IMAGES
@ -133,6 +162,20 @@ workflows:
when: "<< pipeline.parameters.build-images >>"
jobs:
finish-coverage-report:
docker:
- <<: *DOCKERHUB_AUTH
image: "python:3-slim"
steps:
- run:
name: "Indicate completion to coveralls.io"
environment:
<<: *COVERALLS_ENVIRONMENT
command: |
pip install coveralls==3.3.1
python -m coveralls --finish
codechecks:
docker:
- <<: *DOCKERHUB_AUTH
@ -151,6 +194,161 @@ jobs:
command: |
~/.local/bin/tox -e codechecks
windows-server-2022:
parameters:
pythonVersion:
description: >-
An argument to pass to the `py` launcher to choose a Python version.
type: "string"
default: ""
executor: "windows"
environment:
# Tweak Hypothesis to make its behavior more suitable for the CI
# environment. This should improve reproducibility and lessen the
# effects of variable compute resources.
TAHOE_LAFS_HYPOTHESIS_PROFILE: "ci"
# Tell pip where its download cache lives. This must agree with the
# "save_cache" step below or caching won't really work right.
PIP_CACHE_DIR: "pip-cache"
# And tell pip where it can find out cached wheelhouse for fast wheel
# installation, even for projects that don't distribute wheels. This
# must also agree with the "save_cache" step below.
PIP_FIND_LINKS: "wheelhouse"
steps:
- "checkout"
# If possible, restore a pip download cache to save us from having to
# download all our Python dependencies from PyPI.
- "restore_cache":
keys:
# The download cache and/or the wheelhouse may contain Python
# version-specific binary packages so include the Python version
# in this key, as well as the canonical source of our
# dependencies.
- &CACHE_KEY "pip-packages-v1-<< parameters.pythonVersion >>-{{ checksum \"setup.py\" }}"
- "run":
name: "Fix $env:PATH"
command: |
# The Python this job is parameterized is not necessarily the one
# at the front of $env:PATH. Modify $env:PATH so that it is so we
# can just say "python" in the rest of the steps. Also get the
# related Scripts directory so tools from packages we install are
# also available.
$p = py -<<parameters.pythonVersion>> -c "import sys; print(sys.prefix)"
$q = py -<<parameters.pythonVersion>> -c "import sysconfig; print(sysconfig.get_path('scripts'))"
New-Item $Profile.CurrentUserAllHosts -Force
# $p gets "python" on PATH and $q gets tools from packages we
# install. Note we carefully construct the string so that
# $env:PATH is not substituted now but $p and $q are. ` is the
# PowerShell string escape character.
Add-Content -Path $Profile.CurrentUserAllHosts -Value "`$env:PATH = `"$p;$q;`$env:PATH`""
- "run":
name: "Display tool versions"
command: |
python misc/build_helpers/show-tool-versions.py
- "run":
# It's faster to install a wheel than a source package. If we don't
# have a cached wheelhouse then build all of the wheels and dump
# them into a directory where they can become a cached wheelhouse.
# We would have built these wheels during installation anyway so it
# doesn't cost us anything extra and saves us effort next time.
name: "(Maybe) Build Wheels"
command: |
if ((Test-Path .\wheelhouse) -and (Test-Path .\wheelhouse\*)) {
echo "Found populated wheelhouse, skipping wheel building."
} else {
python -m pip install wheel
python -m pip wheel --wheel-dir $env:PIP_FIND_LINKS .[testenv] .[test]
}
- "save_cache":
paths:
# Make sure this agrees with PIP_CACHE_DIR in the environment.
- "pip-cache"
- "wheelhouse"
key: *CACHE_KEY
- "run":
name: "Install Dependencies"
environment:
# By this point we should no longer need an index.
PIP_NO_INDEX: "1"
command: |
python -m pip install .[testenv] .[test]
- "run":
name: "Run Unit Tests"
environment:
# Configure the results location for the subunitv2-file reporter
# from subunitreporter
SUBUNITREPORTER_OUTPUT_PATH: "test-results.subunit2"
# Try to get prompt output from the reporter to avoid no-output
# timeouts.
PYTHONUNBUFFERED: "1"
command: |
# Run the test suite under coverage measurement using the
# parameterized version of Python, writing subunitv2-format
# results to the file given in the environment.
python -b -m coverage run -m twisted.trial --reporter=subunitv2-file --rterrors allmydata
- "run":
name: "Upload Coverage"
environment:
<<: *COVERALLS_ENVIRONMENT
# Mark the data as just one piece of many because we have more
# than one instance of this job (two on Windows now, some on other
# platforms later) which collects and reports coverage. This is
# necessary to cause Coveralls to merge multiple coverage results
# into a single report. Note the merge only happens when we
# "finish" a particular build, as identified by its "build_num"
# (aka "service_number").
COVERALLS_PARALLEL: "true"
command: |
python -m pip install coveralls==3.3.1
# .coveragerc sets parallel = True so we don't have a `.coverage`
# file but a `.coverage.<unique stuff>` file (or maybe more than
# one, but probably not). coveralls can't work with these so
# merge them before invoking it.
python -m coverage combine
# Now coveralls will be able to find the data, so have it do the
# upload. Also, have it strip the system config-specific prefix
# from all of the source paths.
$prefix = python -c "import sysconfig; print(sysconfig.get_path('purelib'))"
python -m coveralls --basedir $prefix
- "run":
name: "Convert Result Log"
command: |
# subunit2junitxml exits with error if the result stream it is
# converting has test failures in it! So this step might fail.
# Since the step in which we actually _ran_ the tests won't fail
# even if there are test failures, this is a good thing for now.
subunit2junitxml.exe --output-to=test-results.xml test-results.subunit2
- "store_test_results":
path: "test-results.xml"
- "store_artifacts":
path: "_trial_temp/test.log"
- "store_artifacts":
path: "eliot.log"
- "store_artifacts":
path: ".coverage"
pyinstaller:
docker:
- <<: *DOCKERHUB_AUTH
@ -527,6 +725,15 @@ jobs:
# PYTHON_VERSION: "2"
executors:
windows:
# Choose a Windows environment that closest matches our testing
# requirements and goals.
# https://circleci.com/developer/orbs/orb/circleci/windows#executors-server-2022
machine:
image: "windows-server-2022-gui:2023.06.1"
shell: "powershell.exe -ExecutionPolicy Bypass"
resource_class: "windows.large"
nix:
docker:
# Run in a highly Nix-capable environment.

View File

@ -19,7 +19,7 @@ skip_covered = True
source =
# It looks like this in the checkout
src/
# It looks like this in the Windows build environment
# It looks like this in the GitHub Actions Windows build environment
D:/a/tahoe-lafs/tahoe-lafs/.tox/py*-coverage/Lib/site-packages/
# Although sometimes it looks like this instead. Also it looks like this on macOS.
.tox/py*-coverage/lib/python*/site-packages/

View File

@ -44,13 +44,6 @@ jobs:
strategy:
fail-fast: false
matrix:
os:
- windows-latest
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
include:
# On macOS don't bother with 3.8, just to get faster builds.
- os: macos-12

View File

@ -7,16 +7,11 @@ from __future__ import annotations
import os
import sys
import shutil
from attr import frozen
from time import sleep
from os import mkdir, listdir, environ
from os import mkdir, environ
from os.path import join, exists
from tempfile import mkdtemp, mktemp
from functools import partial
from json import loads
from foolscap.furl import (
decode_furl,
)
from tempfile import mkdtemp
from eliot import (
to_file,
@ -25,7 +20,7 @@ from eliot import (
from twisted.python.filepath import FilePath
from twisted.python.procutils import which
from twisted.internet.defer import DeferredList
from twisted.internet.defer import DeferredList, succeed
from twisted.internet.error import (
ProcessExitedAlready,
ProcessTerminated,
@ -33,22 +28,23 @@ from twisted.internet.error import (
import pytest
import pytest_twisted
from typing import Mapping
from .util import (
_CollectOutputProtocol,
_MagicTextProtocol,
_DumpOutputProtocol,
_ProcessExitedProtocol,
_create_node,
_cleanup_tahoe_process,
_tahoe_runner_optional_coverage,
await_client_ready,
TahoeProcess,
cli,
generate_ssh_key,
block_with_timeout,
)
from .grid import (
create_flog_gatherer,
create_grid,
)
from allmydata.node import read_config
from allmydata.util.iputil import allocate_tcp_port
# No reason for HTTP requests to take longer than four minutes in the
# integration tests. See allmydata/scripts/common_http.py for usage.
@ -116,6 +112,18 @@ def reactor():
return _reactor
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:port_allocator", include_result=False)
def port_allocator(reactor):
# these will appear basically random, which can make especially
# manual debugging harder but we're re-using code instead of
# writing our own...so, win?
def allocate():
port = allocate_tcp_port()
return succeed(port)
return allocate
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:temp_dir", include_args=[])
def temp_dir(request) -> str:
@ -150,133 +158,36 @@ def flog_binary():
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:flog_gatherer", include_args=[])
def flog_gatherer(reactor, temp_dir, flog_binary, request):
out_protocol = _CollectOutputProtocol()
gather_dir = join(temp_dir, 'flog_gather')
reactor.spawnProcess(
out_protocol,
flog_binary,
(
'flogtool', 'create-gatherer',
'--location', 'tcp:localhost:3117',
'--port', '3117',
gather_dir,
),
env=environ,
fg = pytest_twisted.blockon(
create_flog_gatherer(reactor, request, temp_dir, flog_binary)
)
pytest_twisted.blockon(out_protocol.done)
twistd_protocol = _MagicTextProtocol("Gatherer waiting at", "gatherer")
twistd_process = reactor.spawnProcess(
twistd_protocol,
which('twistd')[0],
(
'twistd', '--nodaemon', '--python',
join(gather_dir, 'gatherer.tac'),
),
path=gather_dir,
env=environ,
)
pytest_twisted.blockon(twistd_protocol.magic_seen)
def cleanup():
_cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
flog_file = mktemp('.flog_dump')
flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
flog_dir = join(temp_dir, 'flog_gather')
flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')]
print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file))
reactor.spawnProcess(
flog_protocol,
flog_binary,
(
'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0])
),
env=environ,
)
print("Waiting for flogtool to complete")
try:
block_with_timeout(flog_protocol.done, reactor)
except ProcessTerminated as e:
print("flogtool exited unexpectedly: {}".format(str(e)))
print("Flogtool completed")
request.addfinalizer(cleanup)
with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f:
furl = f.read().strip()
return furl
return fg
@pytest.fixture(scope='session')
@log_call(
action_type=u"integration:introducer",
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def introducer(reactor, temp_dir, flog_gatherer, request):
intro_dir = join(temp_dir, 'introducer')
print("making introducer", intro_dir)
@log_call(action_type=u"integration:grid", include_args=[])
def grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
Provides a new Grid with a single Introducer and flog-gathering process.
if not exists(intro_dir):
mkdir(intro_dir)
done_proto = _ProcessExitedProtocol()
_tahoe_runner_optional_coverage(
done_proto,
reactor,
request,
(
'create-introducer',
'--listen=tcp',
'--hostname=localhost',
intro_dir,
),
)
pytest_twisted.blockon(done_proto.done)
config = read_config(intro_dir, "tub.port")
config.set_config("node", "nickname", "introducer-tor")
config.set_config("node", "web.port", "4562")
config.set_config("node", "log_gatherer.furl", flog_gatherer)
# "tahoe run" is consistent across Linux/macOS/Windows, unlike the old
# "start" command.
protocol = _MagicTextProtocol('introducer running', "introducer")
transport = _tahoe_runner_optional_coverage(
protocol,
reactor,
request,
(
'run',
intro_dir,
),
Notably does _not_ provide storage servers; use the storage_nodes
fixture if your tests need a Grid that can be used for puts / gets.
"""
g = pytest_twisted.blockon(
create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
)
request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited))
return g
pytest_twisted.blockon(protocol.magic_seen)
return TahoeProcess(transport, intro_dir)
@pytest.fixture(scope='session')
def introducer(grid):
return grid.introducer
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"])
def introducer_furl(introducer, temp_dir):
furl_fname = join(temp_dir, 'introducer', 'private', 'introducer.furl')
while not exists(furl_fname):
print("Don't see {} yet".format(furl_fname))
sleep(.1)
furl = open(furl_fname, 'r').read()
tubID, location_hints, name = decode_furl(furl)
if not location_hints:
# If there are no location hints then nothing can ever possibly
# connect to it and the only thing that can happen next is something
# will hang or time out. So just give up right now.
raise ValueError(
"Introducer ({!r}) fURL has no location hints!".format(
introducer_furl,
),
)
return furl
return introducer.furl
@pytest.fixture
@ -285,7 +196,7 @@ def introducer_furl(introducer, temp_dir):
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_control_port):
def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_network):
intro_dir = join(temp_dir, 'introducer_tor')
print("making Tor introducer in {}".format(intro_dir))
print("(this can take tens of seconds to allocate Onion address)")
@ -299,7 +210,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_control_port):
request,
(
'create-introducer',
'--tor-control-port', tor_control_port,
'--tor-control-port', tor_network.client_control_endpoint,
'--hide-ip',
'--listen=tor',
intro_dir,
@ -311,7 +222,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_control_port):
config = read_config(intro_dir, "tub.port")
config.set_config("node", "nickname", "introducer-tor")
config.set_config("node", "web.port", "4561")
config.set_config("node", "log_gatherer.furl", flog_gatherer)
config.set_config("node", "log_gatherer.furl", flog_gatherer.furl)
# "tahoe run" is consistent across Linux/macOS/Windows, unlike the old
# "start" command.
@ -354,87 +265,31 @@ def tor_introducer_furl(tor_introducer, temp_dir):
@pytest.fixture(scope='session')
@log_call(
action_type=u"integration:storage_nodes",
include_args=["temp_dir", "introducer_furl", "flog_gatherer"],
include_args=["grid"],
include_result=False,
)
def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request):
def storage_nodes(grid):
nodes_d = []
# start all 5 nodes in parallel
for x in range(5):
name = 'node{}'.format(x)
web_port= 9990 + x
nodes_d.append(
_create_node(
reactor, request, temp_dir, introducer_furl, flog_gatherer, name,
web_port="tcp:{}:interface=localhost".format(web_port),
storage=True,
)
)
nodes_status = pytest_twisted.blockon(DeferredList(nodes_d))
nodes = []
for ok, process in nodes_status:
assert ok, "Storage node creation failed: {}".format(process)
nodes.append(process)
return nodes
nodes_d.append(grid.add_storage_node())
nodes_status = pytest_twisted.blockon(DeferredList(nodes_d))
for ok, value in nodes_status:
assert ok, "Storage node creation failed: {}".format(value)
return grid.storage_servers
@pytest.fixture(scope="session")
def alice_sftp_client_key_path(temp_dir):
# The client SSH key path is typically going to be somewhere else (~/.ssh,
# typically), but for convenience sake for testing we'll put it inside node.
return join(temp_dir, "alice", "private", "ssh_client_rsa_key")
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:alice", include_args=[], include_result=False)
def alice(
reactor,
temp_dir,
introducer_furl,
flog_gatherer,
storage_nodes,
alice_sftp_client_key_path,
request,
):
process = pytest_twisted.blockon(
_create_node(
reactor, request, temp_dir, introducer_furl, flog_gatherer, "alice",
web_port="tcp:9980:interface=localhost",
storage=False,
)
)
pytest_twisted.blockon(await_client_ready(process))
# 1. Create a new RW directory cap:
cli(process, "create-alias", "test")
rwcap = loads(cli(process, "list-aliases", "--json"))["test"]["readwrite"]
# 2. Enable SFTP on the node:
host_ssh_key_path = join(process.node_dir, "private", "ssh_host_rsa_key")
accounts_path = join(process.node_dir, "private", "accounts")
with open(join(process.node_dir, "tahoe.cfg"), "a") as f:
f.write("""\
[sftpd]
enabled = true
port = tcp:8022:interface=127.0.0.1
host_pubkey_file = {ssh_key_path}.pub
host_privkey_file = {ssh_key_path}
accounts.file = {accounts_path}
""".format(ssh_key_path=host_ssh_key_path, accounts_path=accounts_path))
generate_ssh_key(host_ssh_key_path)
# 3. Add a SFTP access file with an SSH key for auth.
generate_ssh_key(alice_sftp_client_key_path)
# Pub key format is "ssh-rsa <thekey> <username>". We want the key.
ssh_public_key = open(alice_sftp_client_key_path + ".pub").read().strip().split()[1]
with open(accounts_path, "w") as f:
f.write("""\
alice-key ssh-rsa {ssh_public_key} {rwcap}
""".format(rwcap=rwcap, ssh_public_key=ssh_public_key))
# 4. Restart the node with new SFTP config.
pytest_twisted.blockon(process.restart_async(reactor, request))
pytest_twisted.blockon(await_client_ready(process))
print(f"Alice pid: {process.transport.pid}")
return process
def alice(reactor, request, grid, storage_nodes):
"""
:returns grid.Client: the associated instance for Alice
"""
alice = pytest_twisted.blockon(grid.add_client("alice"))
pytest_twisted.blockon(alice.add_sftp(reactor, request))
print(f"Alice pid: {alice.process.transport.pid}")
return alice
@pytest.fixture(scope='session')
@ -455,6 +310,12 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques
@pytest.mark.skipif(sys.platform.startswith('win'),
'Tor tests are unstable on Windows')
def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]:
"""
Install the Chutney software that is required to run a small local Tor grid.
(Chutney lacks the normal "python stuff" so we can't just declare
it in Tox or similar dependencies)
"""
# Try to find Chutney already installed in the environment.
try:
import chutney
@ -512,19 +373,23 @@ def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]:
)
pytest_twisted.blockon(proto.done)
return (chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")})
return chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")}
@pytest.fixture(scope='session')
def tor_control_port(tor_network):
@frozen
class ChutneyTorNetwork:
"""
Get an endpoint description for the Tor control port for the local Tor
network we run..
Represents a running Chutney (tor) network. Returned by the
"tor_network" fixture.
"""
# We ignore tor_network because it can't tell us the control port. But
# asking for it forces the Tor network to be built before we run - so if
# we get the hard-coded control port value correct, there should be
# something listening at that address.
return 'tcp:localhost:8007'
dir: FilePath
environ: Mapping[str, str]
client_control_port: int
@property
def client_control_endpoint(self) -> str:
return "tcp:localhost:{}".format(self.client_control_port)
@pytest.fixture(scope='session')
@pytest.mark.skipif(sys.platform.startswith('win'),
@ -533,6 +398,20 @@ def tor_network(reactor, temp_dir, chutney, request):
"""
Build a basic Tor network.
Instantiate the "networks/basic" Chutney configuration for a local
Tor network.
This provides a small, local Tor network that can run v3 Onion
Services. It has 3 authorities, 5 relays and 2 clients.
The 'chutney' fixture pins a Chutney git qrevision, so things
shouldn't change. This network has two clients which are the only
nodes with valid SocksPort configuration ("008c" and "009c" 9008
and 9009)
The control ports start at 8000 (so the ControlPort for the client
nodes are 8008 and 8009).
:param chutney: The root directory of a Chutney checkout and a dict of
additional environment variables to set so a Python process can use
it.
@ -575,6 +454,32 @@ def tor_network(reactor, temp_dir, chutney, request):
request.addfinalizer(cleanup)
pytest_twisted.blockon(chutney(("start", basic_network)))
# Wait for the nodes to "bootstrap" - ie, form a network among themselves.
# Successful bootstrap is reported with a message something like:
#
# Everything bootstrapped after 151 sec
# Bootstrap finished: 151 seconds
# Node status:
# test000a : 100, done , Done
# test001a : 100, done , Done
# test002a : 100, done , Done
# test003r : 100, done , Done
# test004r : 100, done , Done
# test005r : 100, done , Done
# test006r : 100, done , Done
# test007r : 100, done , Done
# test008c : 100, done , Done
# test009c : 100, done , Done
# Published dir info:
# test000a : 100, all nodes , desc md md_cons ns_cons , Dir info cached
# test001a : 100, all nodes , desc md md_cons ns_cons , Dir info cached
# test002a : 100, all nodes , desc md md_cons ns_cons , Dir info cached
# test003r : 100, all nodes , desc md md_cons ns_cons , Dir info cached
# test004r : 100, all nodes , desc md md_cons ns_cons , Dir info cached
# test005r : 100, all nodes , desc md md_cons ns_cons , Dir info cached
# test006r : 100, all nodes , desc md md_cons ns_cons , Dir info cached
# test007r : 100, all nodes , desc md md_cons ns_cons , Dir info cached
pytest_twisted.blockon(chutney(("wait_for_bootstrap", basic_network)))
# print some useful stuff
@ -582,3 +487,11 @@ def tor_network(reactor, temp_dir, chutney, request):
pytest_twisted.blockon(chutney(("status", basic_network)))
except ProcessTerminated:
print("Chutney.TorNet status failed (continuing)")
# the "8008" comes from configuring "networks/basic" in chutney
# and then examining "net/nodes/008c/torrc" for ControlPort value
return ChutneyTorNetwork(
chutney_root,
chutney_env,
8008,
)

529
integration/grid.py Normal file
View File

@ -0,0 +1,529 @@
"""
Classes which directly represent various kinds of Tahoe processes
that co-operate to for "a Grid".
These methods and objects are used by conftest.py fixtures but may
also be used as direct helpers for tests that don't want to (or can't)
rely on 'the' global grid as provided by fixtures like 'alice' or
'storage_servers'.
"""
from os import mkdir, listdir
from os.path import join, exists
from json import loads
from tempfile import mktemp
from time import sleep
from eliot import (
log_call,
)
from foolscap.furl import (
decode_furl,
)
from twisted.python.procutils import which
from twisted.internet.defer import (
inlineCallbacks,
returnValue,
Deferred,
)
from twisted.internet.task import (
deferLater,
)
from twisted.internet.interfaces import (
IProcessTransport,
IProcessProtocol,
)
from twisted.internet.error import ProcessTerminated
from allmydata.util.attrs_provides import (
provides,
)
from allmydata.node import read_config
from .util import (
_CollectOutputProtocol,
_MagicTextProtocol,
_DumpOutputProtocol,
_ProcessExitedProtocol,
_run_node,
_cleanup_tahoe_process,
_tahoe_runner_optional_coverage,
TahoeProcess,
await_client_ready,
generate_ssh_key,
cli,
reconfigure,
_create_node,
)
import attr
import pytest_twisted
# currently, we pass a "request" around a bunch but it seems to only
# be for addfinalizer() calls.
# - is "keeping" a request like that okay? What if it's a session-scoped one?
# (i.e. in Grid etc)
# - maybe limit to "a callback to hang your cleanup off of" (instead of request)?
@attr.s
class FlogGatherer(object):
"""
Flog Gatherer process.
"""
process = attr.ib(
validator=provides(IProcessTransport)
)
protocol = attr.ib(
validator=provides(IProcessProtocol)
)
furl = attr.ib()
@inlineCallbacks
def create_flog_gatherer(reactor, request, temp_dir, flog_binary):
out_protocol = _CollectOutputProtocol()
gather_dir = join(temp_dir, 'flog_gather')
reactor.spawnProcess(
out_protocol,
flog_binary,
(
'flogtool', 'create-gatherer',
'--location', 'tcp:localhost:3117',
'--port', '3117',
gather_dir,
)
)
yield out_protocol.done
twistd_protocol = _MagicTextProtocol("Gatherer waiting at", "gatherer")
twistd_process = reactor.spawnProcess(
twistd_protocol,
which('twistd')[0],
(
'twistd', '--nodaemon', '--python',
join(gather_dir, 'gatherer.tac'),
),
path=gather_dir,
)
yield twistd_protocol.magic_seen
def cleanup():
_cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
flog_file = mktemp('.flog_dump')
flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
flog_dir = join(temp_dir, 'flog_gather')
flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')]
print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file))
for flog_path in flogs:
reactor.spawnProcess(
flog_protocol,
flog_binary,
(
'flogtool', 'dump', join(temp_dir, 'flog_gather', flog_path)
),
)
print("Waiting for flogtool to complete")
try:
pytest_twisted.blockon(flog_protocol.done)
except ProcessTerminated as e:
print("flogtool exited unexpectedly: {}".format(str(e)))
print("Flogtool completed")
request.addfinalizer(cleanup)
with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f:
furl = f.read().strip()
returnValue(
FlogGatherer(
protocol=twistd_protocol,
process=twistd_process,
furl=furl,
)
)
@attr.s
class StorageServer(object):
"""
Represents a Tahoe Storage Server
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=provides(IProcessProtocol)
)
@inlineCallbacks
def restart(self, reactor, request):
"""
re-start our underlying process by issuing a TERM, waiting and
then running again. await_client_ready() will be done as well
Note that self.process and self.protocol will be new instances
after this.
"""
self.process.transport.signalProcess('TERM')
yield self.protocol.exited
self.process = yield _run_node(
reactor, self.process.node_dir, request, None,
)
self.protocol = self.process.transport.proto
yield await_client_ready(self.process)
@inlineCallbacks
def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
needed=2, happy=3, total=4):
"""
Create a new storage server
"""
node_process = yield _create_node(
reactor, request, temp_dir, introducer.furl, flog_gatherer,
name, web_port, storage=True, needed=needed, happy=happy, total=total,
)
storage = StorageServer(
process=node_process,
# node_process is a TahoeProcess. its transport is an
# IProcessTransport. in practice, this means it is a
# twisted.internet._baseprocess.BaseProcess. BaseProcess records the
# process protocol as its proto attribute.
protocol=node_process.transport.proto,
)
returnValue(storage)
@attr.s
class Client(object):
"""
Represents a Tahoe client
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=provides(IProcessProtocol)
)
request = attr.ib() # original request, for addfinalizer()
## XXX convenience? or confusion?
# @property
# def node_dir(self):
# return self.process.node_dir
@inlineCallbacks
def reconfigure_zfec(self, reactor, zfec_params, convergence=None, max_segment_size=None):
"""
Reconfigure the ZFEC parameters for this node
"""
# XXX this is a stop-gap to keep tests running "as is"
# -> we should fix the tests so that they create a new client
# in the grid with the required parameters, instead of
# re-configuring Alice (or whomever)
rtn = yield Deferred.fromCoroutine(
reconfigure(reactor, self.request, self.process, zfec_params, convergence, max_segment_size)
)
return rtn
@inlineCallbacks
def restart(self, reactor, request, servers=1):
"""
re-start our underlying process by issuing a TERM, waiting and
then running again.
:param int servers: number of server connections we will wait
for before being 'ready'
Note that self.process and self.protocol will be new instances
after this.
"""
# XXX similar to above, can we make this return a new instance
# instead of mutating?
self.process.transport.signalProcess('TERM')
yield self.protocol.exited
process = yield _run_node(
reactor, self.process.node_dir, request, None,
)
self.process = process
self.protocol = self.process.transport.proto
yield await_client_ready(self.process, minimum_number_of_servers=servers)
@inlineCallbacks
def add_sftp(self, reactor, request):
"""
"""
# if other things need to add or change configuration, further
# refactoring could be useful here (i.e. move reconfigure
# parts to their own functions)
# XXX why do we need an alias?
# 1. Create a new RW directory cap:
cli(self.process, "create-alias", "test")
rwcap = loads(cli(self.process, "list-aliases", "--json"))["test"]["readwrite"]
# 2. Enable SFTP on the node:
host_ssh_key_path = join(self.process.node_dir, "private", "ssh_host_rsa_key")
sftp_client_key_path = join(self.process.node_dir, "private", "ssh_client_rsa_key")
accounts_path = join(self.process.node_dir, "private", "accounts")
with open(join(self.process.node_dir, "tahoe.cfg"), "a") as f:
f.write(
("\n\n[sftpd]\n"
"enabled = true\n"
"port = tcp:8022:interface=127.0.0.1\n"
"host_pubkey_file = {ssh_key_path}.pub\n"
"host_privkey_file = {ssh_key_path}\n"
"accounts.file = {accounts_path}\n").format(
ssh_key_path=host_ssh_key_path,
accounts_path=accounts_path,
)
)
generate_ssh_key(host_ssh_key_path)
# 3. Add a SFTP access file with an SSH key for auth.
generate_ssh_key(sftp_client_key_path)
# Pub key format is "ssh-rsa <thekey> <username>". We want the key.
with open(sftp_client_key_path + ".pub") as pubkey_file:
ssh_public_key = pubkey_file.read().strip().split()[1]
with open(accounts_path, "w") as f:
f.write(
"alice-key ssh-rsa {ssh_public_key} {rwcap}\n".format(
rwcap=rwcap,
ssh_public_key=ssh_public_key,
)
)
# 4. Restart the node with new SFTP config.
print("restarting for SFTP")
yield self.restart(reactor, request)
print("restart done")
# XXX i think this is broken because we're "waiting for ready" during first bootstrap? or something?
@inlineCallbacks
def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
needed=2, happy=3, total=4):
"""
Create a new storage server
"""
from .util import _create_node
node_process = yield _create_node(
reactor, request, temp_dir, introducer.furl, flog_gatherer,
name, web_port, storage=False, needed=needed, happy=happy, total=total,
)
returnValue(
Client(
process=node_process,
protocol=node_process.transport.proto,
request=request,
)
)
@attr.s
class Introducer(object):
"""
Reprsents a running introducer
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=provides(IProcessProtocol)
)
furl = attr.ib()
def _validate_furl(furl_fname):
"""
Opens and validates a fURL, ensuring location hints.
:returns: the furl
:raises: ValueError if no location hints
"""
while not exists(furl_fname):
print("Don't see {} yet".format(furl_fname))
sleep(.1)
furl = open(furl_fname, 'r').read()
tubID, location_hints, name = decode_furl(furl)
if not location_hints:
# If there are no location hints then nothing can ever possibly
# connect to it and the only thing that can happen next is something
# will hang or time out. So just give up right now.
raise ValueError(
"Introducer ({!r}) fURL has no location hints!".format(
furl,
),
)
return furl
@inlineCallbacks
@log_call(
action_type=u"integration:introducer",
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def create_introducer(reactor, request, temp_dir, flog_gatherer, port):
"""
Run a new Introducer and return an Introducer instance.
"""
intro_dir = join(temp_dir, 'introducer{}'.format(port))
if not exists(intro_dir):
mkdir(intro_dir)
done_proto = _ProcessExitedProtocol()
_tahoe_runner_optional_coverage(
done_proto,
reactor,
request,
(
'create-introducer',
'--listen=tcp',
'--hostname=localhost',
intro_dir,
),
)
yield done_proto.done
config = read_config(intro_dir, "tub.port")
config.set_config("node", "nickname", f"introducer-{port}")
config.set_config("node", "web.port", f"{port}")
config.set_config("node", "log_gatherer.furl", flog_gatherer.furl)
# on windows, "tahoe start" means: run forever in the foreground,
# but on linux it means daemonize. "tahoe run" is consistent
# between platforms.
protocol = _MagicTextProtocol('introducer running', "introducer")
transport = _tahoe_runner_optional_coverage(
protocol,
reactor,
request,
(
'run',
intro_dir,
),
)
def clean():
return _cleanup_tahoe_process(transport, protocol.exited)
request.addfinalizer(clean)
yield protocol.magic_seen
furl_fname = join(intro_dir, 'private', 'introducer.furl')
while not exists(furl_fname):
print("Don't see {} yet".format(furl_fname))
yield deferLater(reactor, .1, lambda: None)
furl = _validate_furl(furl_fname)
returnValue(
Introducer(
process=TahoeProcess(transport, intro_dir),
protocol=protocol,
furl=furl,
)
)
@attr.s
class Grid(object):
"""
Represents an entire Tahoe Grid setup
A Grid includes an Introducer, Flog Gatherer and some number of
Storage Servers. Optionally includes Clients.
"""
_reactor = attr.ib()
_request = attr.ib()
_temp_dir = attr.ib()
_port_allocator = attr.ib()
introducer = attr.ib()
flog_gatherer = attr.ib()
storage_servers = attr.ib(factory=list)
clients = attr.ib(factory=dict)
@storage_servers.validator
def check(self, attribute, value):
for server in value:
if not isinstance(server, StorageServer):
raise ValueError(
"storage_servers must be StorageServer"
)
@inlineCallbacks
def add_storage_node(self):
"""
Creates a new storage node, returns a StorageServer instance
(which will already be added to our .storage_servers list)
"""
port = yield self._port_allocator()
print("make {}".format(port))
name = 'node{}'.format(port)
web_port = 'tcp:{}:interface=localhost'.format(port)
server = yield create_storage_server(
self._reactor,
self._request,
self._temp_dir,
self.introducer,
self.flog_gatherer,
name,
web_port,
)
self.storage_servers.append(server)
returnValue(server)
@inlineCallbacks
def add_client(self, name, needed=2, happy=3, total=4):
"""
Create a new client node
"""
port = yield self._port_allocator()
web_port = 'tcp:{}:interface=localhost'.format(port)
client = yield create_client(
self._reactor,
self._request,
self._temp_dir,
self.introducer,
self.flog_gatherer,
name,
web_port,
needed=needed,
happy=happy,
total=total,
)
self.clients[name] = client
yield await_client_ready(client.process)
returnValue(client)
# A grid is now forever tied to its original 'request' which is where
# it must hang finalizers off of. The "main" one is a session-level
# fixture so it'll live the life of the tests but it could be
# per-function Grid too.
@inlineCallbacks
def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
Create a new grid. This will have one Introducer but zero
storage-servers or clients; those must be added by a test or
subsequent fixtures.
"""
intro_port = yield port_allocator()
introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port)
grid = Grid(
reactor,
request,
temp_dir,
port_allocator,
introducer,
flog_gatherer,
)
returnValue(grid)

View File

@ -8,9 +8,8 @@ from subprocess import Popen, PIPE, check_output, check_call
import pytest
from twisted.internet import reactor
from twisted.internet.threads import blockingCallFromThread
from twisted.internet.defer import Deferred
from .util import run_in_thread, cli, reconfigure
from .util import run_in_thread, cli
DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11"
try:
@ -23,7 +22,7 @@ else:
@pytest.fixture(scope="session")
def get_put_alias(alice):
cli(alice, "create-alias", "getput")
cli(alice.process, "create-alias", "getput")
def read_bytes(path):
@ -39,14 +38,14 @@ def test_put_from_stdin(alice, get_put_alias, tmpdir):
"""
tempfile = str(tmpdir.join("file"))
p = Popen(
["tahoe", "--node-directory", alice.node_dir, "put", "-", "getput:fromstdin"],
["tahoe", "--node-directory", alice.process.node_dir, "put", "-", "getput:fromstdin"],
stdin=PIPE
)
p.stdin.write(DATA)
p.stdin.close()
assert p.wait() == 0
cli(alice, "get", "getput:fromstdin", tempfile)
cli(alice.process, "get", "getput:fromstdin", tempfile)
assert read_bytes(tempfile) == DATA
@ -58,10 +57,10 @@ def test_get_to_stdout(alice, get_put_alias, tmpdir):
tempfile = tmpdir.join("file")
with tempfile.open("wb") as f:
f.write(DATA)
cli(alice, "put", str(tempfile), "getput:tostdout")
cli(alice.process, "put", str(tempfile), "getput:tostdout")
p = Popen(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:tostdout", "-"],
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:tostdout", "-"],
stdout=PIPE
)
assert p.stdout.read() == DATA
@ -78,11 +77,11 @@ def test_large_file(alice, get_put_alias, tmp_path):
tempfile = tmp_path / "file"
with tempfile.open("wb") as f:
f.write(DATA * 1_000_000)
cli(alice, "put", str(tempfile), "getput:largefile")
cli(alice.process, "put", str(tempfile), "getput:largefile")
outfile = tmp_path / "out"
check_call(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:largefile", str(outfile)],
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:largefile", str(outfile)],
)
assert outfile.read_bytes() == tempfile.read_bytes()
@ -104,31 +103,29 @@ def test_upload_download_immutable_different_default_max_segment_size(alice, get
def set_segment_size(segment_size):
return blockingCallFromThread(
reactor,
lambda: Deferred.fromCoroutine(reconfigure(
lambda: alice.reconfigure_zfec(
reactor,
request,
alice,
(1, 1, 1),
None,
max_segment_size=segment_size
))
)
)
# 1. Upload file 1 with default segment size set to 1MB
set_segment_size(1024 * 1024)
cli(alice, "put", str(tempfile), "getput:seg1024kb")
cli(alice.process, "put", str(tempfile), "getput:seg1024kb")
# 2. Download file 1 with default segment size set to 128KB
set_segment_size(128 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg1024kb", "-"]
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg1024kb", "-"]
)
# 3. Upload file 2 with default segment size set to 128KB
cli(alice, "put", str(tempfile), "getput:seg128kb")
cli(alice.process, "put", str(tempfile), "getput:seg128kb")
# 4. Download file 2 with default segment size set to 1MB
set_segment_size(1024 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg128kb", "-"]
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg128kb", "-"]
)

View File

@ -0,0 +1,351 @@
import sys
import json
from os.path import join
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
)
from twisted.internet.utils import (
getProcessOutputAndValue,
)
from twisted.internet.defer import (
inlineCallbacks,
returnValue,
)
from allmydata.crypto import ed25519
from allmydata.util import base32
from allmydata.util import configutil
from . import util
from .grid import (
create_grid,
)
import pytest_twisted
@inlineCallbacks
def _run_gm(reactor, request, *args, **kwargs):
"""
Run the grid-manager process, passing all arguments as extra CLI
args.
:returns: all process output
"""
if request.config.getoption('coverage'):
base_args = ("-b", "-m", "coverage", "run", "-m", "allmydata.cli.grid_manager")
else:
base_args = ("-m", "allmydata.cli.grid_manager")
output, errput, exit_code = yield getProcessOutputAndValue(
sys.executable,
base_args + args,
reactor=reactor,
**kwargs
)
if exit_code != 0:
raise util.ProcessFailed(
RuntimeError("Exit code {}".format(exit_code)),
output + errput,
)
returnValue(output)
@pytest_twisted.inlineCallbacks
def test_create_certificate(reactor, request):
"""
The Grid Manager produces a valid, correctly-signed certificate.
"""
gm_config = yield _run_gm(reactor, request, "--config", "-", "create")
privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
privkey, pubkey = ed25519.signing_keypair_from_string(privkey_bytes)
# Note that zara + her key here are arbitrary and don't match any
# "actual" clients in the test-grid; we're just checking that the
# Grid Manager signs this properly.
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
stdinBytes=gm_config,
)
zara_cert_bytes = yield _run_gm(
reactor, request, "--config", "-", "sign", "zara", "1",
stdinBytes=gm_config,
)
zara_cert = json.loads(zara_cert_bytes)
# confirm that zara's certificate is made by the Grid Manager
# (.verify returns None on success, raises exception on error)
pubkey.verify(
base32.a2b(zara_cert['signature'].encode('ascii')),
zara_cert['certificate'].encode('ascii'),
)
@pytest_twisted.inlineCallbacks
def test_remove_client(reactor, request):
"""
A Grid Manager can add and successfully remove a client
"""
gm_config = yield _run_gm(
reactor, request, "--config", "-", "create",
)
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
stdinBytes=gm_config,
)
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq",
stdinBytes=gm_config,
)
assert "zara" in json.loads(gm_config)['storage_servers']
assert "yakov" in json.loads(gm_config)['storage_servers']
gm_config = yield _run_gm(
reactor, request, "--config", "-", "remove",
"zara",
stdinBytes=gm_config,
)
assert "zara" not in json.loads(gm_config)['storage_servers']
assert "yakov" in json.loads(gm_config)['storage_servers']
@pytest_twisted.inlineCallbacks
def test_remove_last_client(reactor, request):
"""
A Grid Manager can remove all clients
"""
gm_config = yield _run_gm(
reactor, request, "--config", "-", "create",
)
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
stdinBytes=gm_config,
)
assert "zara" in json.loads(gm_config)['storage_servers']
gm_config = yield _run_gm(
reactor, request, "--config", "-", "remove",
"zara",
stdinBytes=gm_config,
)
# there are no storage servers left at all now
assert "storage_servers" not in json.loads(gm_config)
@pytest_twisted.inlineCallbacks
def test_add_remove_client_file(reactor, request, temp_dir):
"""
A Grid Manager can add and successfully remove a client (when
keeping data on disk)
"""
gmconfig = join(temp_dir, "gmtest")
gmconfig_file = join(temp_dir, "gmtest", "config.json")
yield _run_gm(
reactor, request, "--config", gmconfig, "create",
)
yield _run_gm(
reactor, request, "--config", gmconfig, "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
)
yield _run_gm(
reactor, request, "--config", gmconfig, "add",
"yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq",
)
assert "zara" in json.load(open(gmconfig_file, "r"))['storage_servers']
assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers']
yield _run_gm(
reactor, request, "--config", gmconfig, "remove",
"zara",
)
assert "zara" not in json.load(open(gmconfig_file, "r"))['storage_servers']
assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers']
@pytest_twisted.inlineCallbacks
def _test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
A client with happines=2 fails to upload to a Grid when it is
using Grid Manager and there is only 1 storage server with a valid
certificate.
"""
grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
storage0 = yield grid.add_storage_node()
_ = yield grid.add_storage_node()
gm_config = yield _run_gm(
reactor, request, "--config", "-", "create",
)
gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes)
# create certificate for the first storage-server
pubkey_fname = join(storage0.process.node_dir, "node.pubkey")
with open(pubkey_fname, 'r') as f:
pubkey_str = f.read().strip()
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"storage0", pubkey_str,
stdinBytes=gm_config,
)
assert json.loads(gm_config)['storage_servers'].keys() == {'storage0'}
print("inserting certificate")
cert = yield _run_gm(
reactor, request, "--config", "-", "sign", "storage0", "1",
stdinBytes=gm_config,
)
print(cert)
yield util.run_tahoe(
reactor, request, "--node-directory", storage0.process.node_dir,
"admin", "add-grid-manager-cert",
"--name", "default",
"--filename", "-",
stdin=cert,
)
# re-start this storage server
yield storage0.restart(reactor, request)
# now only one storage-server has the certificate .. configure
# diana to have the grid-manager certificate
diana = yield grid.add_client("diana", needed=2, happy=2, total=2)
config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg"))
config.add_section("grid_managers")
config.set("grid_managers", "test", str(ed25519.string_from_verifying_key(gm_pubkey), "ascii"))
with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f:
config.write(f)
yield diana.restart(reactor, request, servers=2)
# try to put something into the grid, which should fail (because
# diana has happy=2 but should only find storage0 to be acceptable
# to upload to)
try:
yield util.run_tahoe(
reactor, request, "--node-directory", diana.process.node_dir,
"put", "-",
stdin=b"some content\n" * 200,
)
assert False, "Should get a failure"
except util.ProcessFailed as e:
if b'UploadUnhappinessError' in e.output:
# We're done! We've succeeded.
return
assert False, "Failed to see one of out of two servers"
@pytest_twisted.inlineCallbacks
def _test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
Successfully upload to a Grid Manager enabled Grid.
"""
grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
happy0 = yield grid.add_storage_node()
happy1 = yield grid.add_storage_node()
gm_config = yield _run_gm(
reactor, request, "--config", "-", "create",
)
gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes)
# create certificates for all storage-servers
servers = (
("happy0", happy0),
("happy1", happy1),
)
for st_name, st in servers:
pubkey_fname = join(st.process.node_dir, "node.pubkey")
with open(pubkey_fname, 'r') as f:
pubkey_str = f.read().strip()
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
st_name, pubkey_str,
stdinBytes=gm_config,
)
assert json.loads(gm_config)['storage_servers'].keys() == {'happy0', 'happy1'}
# add the certificates from the grid-manager to the storage servers
print("inserting storage-server certificates")
for st_name, st in servers:
cert = yield _run_gm(
reactor, request, "--config", "-", "sign", st_name, "1",
stdinBytes=gm_config,
)
yield util.run_tahoe(
reactor, request, "--node-directory", st.process.node_dir,
"admin", "add-grid-manager-cert",
"--name", "default",
"--filename", "-",
stdin=cert,
)
# re-start the storage servers
yield happy0.restart(reactor, request)
yield happy1.restart(reactor, request)
# configure freya (a client) to have the grid-manager certificate
freya = yield grid.add_client("freya", needed=2, happy=2, total=2)
config = configutil.get_config(join(freya.process.node_dir, "tahoe.cfg"))
config.add_section("grid_managers")
config.set("grid_managers", "test", str(ed25519.string_from_verifying_key(gm_pubkey), "ascii"))
with open(join(freya.process.node_dir, "tahoe.cfg"), "w") as f:
config.write(f)
yield freya.restart(reactor, request, servers=2)
# confirm that Freya will upload to the GridManager-enabled Grid
yield util.run_tahoe(
reactor, request, "--node-directory", freya.process.node_dir,
"put", "-",
stdin=b"some content\n" * 200,
)
@pytest_twisted.inlineCallbacks
def test_identity(reactor, request, temp_dir):
"""
Dump public key to CLI
"""
gm_config = join(temp_dir, "test_identity")
yield _run_gm(
reactor, request, "--config", gm_config, "create",
)
# ask the CLI for the grid-manager pubkey
pubkey = yield _run_gm(
reactor, request, "--config", gm_config, "public-identity",
)
alleged_pubkey = ed25519.verifying_key_from_string(pubkey.strip())
# load the grid-manager pubkey "ourselves"
with open(join(gm_config, "config.json"), "r") as f:
real_config = json.load(f)
real_privkey, real_pubkey = ed25519.signing_keypair_from_string(
real_config["private_key"].encode("ascii"),
)
# confirm the CLI told us the correct thing
alleged_bytes = alleged_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw)
real_bytes = real_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw)
assert alleged_bytes == real_bytes, "Keys don't match"

View File

@ -24,6 +24,7 @@ from allmydata.test.common import (
write_introducer,
)
from allmydata.node import read_config
from allmydata.util.iputil import allocate_tcp_port
if which("docker") is None:
@ -132,8 +133,10 @@ def i2p_introducer_furl(i2p_introducer, temp_dir):
@pytest_twisted.inlineCallbacks
@pytest.mark.skip("I2P tests are not functioning at all, for unknown reasons")
def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl):
yield _create_anonymous_node(reactor, 'carol_i2p', 8008, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl)
yield _create_anonymous_node(reactor, 'dave_i2p', 8009, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl)
web_port0 = allocate_tcp_port()
web_port1 = allocate_tcp_port()
yield _create_anonymous_node(reactor, 'carol_i2p', web_port0, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl)
yield _create_anonymous_node(reactor, 'dave_i2p', web_port1, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl)
# ensure both nodes are connected to "a grid" by uploading
# something via carol, and retrieve it using dave.
gold_path = join(temp_dir, "gold")
@ -179,9 +182,8 @@ def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_netw
@pytest_twisted.inlineCallbacks
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, i2p_network, introducer_furl):
def _create_anonymous_node(reactor, name, web_port, request, temp_dir, flog_gatherer, i2p_network, introducer_furl):
node_dir = FilePath(temp_dir).child(name)
web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
print("creating", node_dir.path)
node_dir.makedirs()

View File

@ -6,8 +6,6 @@ import sys
from os.path import join
from os import environ
from twisted.internet.error import ProcessTerminated
from . import util
import pytest_twisted
@ -44,8 +42,8 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto
try:
yield proto.done
assert False, "should raise exception"
except Exception as e:
assert isinstance(e, ProcessTerminated)
except util.ProcessFailed as e:
assert b"UploadUnhappinessError" in e.output
output = proto.output.getvalue()
assert b"shares could be placed on only" in output

View File

@ -72,7 +72,7 @@ def test_bad_account_password_ssh_key(alice, tmpdir):
another_key = os.path.join(str(tmpdir), "ssh_key")
generate_ssh_key(another_key)
good_key = RSAKey(filename=os.path.join(alice.node_dir, "private", "ssh_client_rsa_key"))
good_key = RSAKey(filename=os.path.join(alice.process.node_dir, "private", "ssh_client_rsa_key"))
bad_key = RSAKey(filename=another_key)
# Wrong key:
@ -87,17 +87,16 @@ def test_bad_account_password_ssh_key(alice, tmpdir):
"username": "someoneelse", "pkey": good_key,
})
def sftp_client_key(node):
def sftp_client_key(client):
"""
:return RSAKey: the RSA client key associated with this grid.Client
"""
# XXX move to Client / grid.py?
return RSAKey(
filename=os.path.join(node.node_dir, "private", "ssh_client_rsa_key"),
filename=os.path.join(client.process.node_dir, "private", "ssh_client_rsa_key"),
)
def test_sftp_client_key_exists(alice, alice_sftp_client_key_path):
"""
Weakly validate the sftp client key fixture by asserting that *something*
exists at the supposed key path.
"""
assert os.path.exists(alice_sftp_client_key_path)
@run_in_thread
def test_ssh_key_auth(alice):

View File

@ -38,8 +38,8 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
The two nodes can talk to the introducer and each other: we upload to one
node, read from the other.
"""
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
carol = yield _create_anonymous_node(reactor, 'carol', 8100, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
dave = yield _create_anonymous_node(reactor, 'dave', 8101, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
yield util.await_client_ready(carol, minimum_number_of_servers=2, timeout=600)
yield util.await_client_ready(dave, minimum_number_of_servers=2, timeout=600)
yield upload_to_one_download_from_the_other(reactor, temp_dir, carol, dave)
@ -94,44 +94,45 @@ async def upload_to_one_download_from_the_other(reactor, temp_dir, upload_to: ut
@pytest_twisted.inlineCallbacks
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess:
def _create_anonymous_node(reactor, name, web_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess:
node_dir = FilePath(temp_dir).child(name)
web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
if True:
print(f"creating {node_dir.path} with introducer {introducer_furl}")
node_dir.makedirs()
proto = util._DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'create-node',
'--nickname', name,
'--webport', web_port,
'--introducer', introducer_furl,
'--hide-ip',
'--tor-control-port', 'tcp:localhost:{}'.format(control_port),
'--listen', 'tor',
'--shares-needed', '1',
'--shares-happy', '1',
'--shares-total', str(shares_total),
node_dir.path,
),
env=environ,
if node_dir.exists():
raise RuntimeError(
"A node already exists in '{}'".format(node_dir)
)
yield proto.done
print(f"creating {node_dir.path} with introducer {introducer_furl}")
node_dir.makedirs()
proto = util._DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'create-node',
'--nickname', name,
'--webport', str(web_port),
'--introducer', introducer_furl,
'--hide-ip',
'--tor-control-port', tor_network.client_control_endpoint,
'--listen', 'tor',
'--shares-needed', '1',
'--shares-happy', '1',
'--shares-total', str(shares_total),
node_dir.path,
),
env=environ,
)
yield proto.done
# Which services should this client connect to?
write_introducer(node_dir, "default", introducer_furl)
util.basic_node_configuration(request, flog_gatherer, node_dir.path)
util.basic_node_configuration(request, flog_gatherer.furl, node_dir.path)
config = read_config(node_dir.path, "tub.port")
config.set_config("tor", "onion", "true")
config.set_config("tor", "onion.external_port", "3457")
config.set_config("tor", "control.port", f"tcp:port={control_port}:host=127.0.0.1")
config.set_config("tor", "control.port", tor_network.client_control_endpoint)
config.set_config("tor", "onion.private_key_file", "private/tor_onion.privkey")
print("running")
@ -157,7 +158,7 @@ def test_anonymous_client(reactor, request, temp_dir, flog_gatherer, tor_network
)
yield util.await_client_ready(normie)
anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8008, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1)
anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8102, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1)
yield util.await_client_ready(anonymoose, minimum_number_of_servers=1, timeout=600)
yield upload_to_one_download_from_the_other(reactor, temp_dir, normie, anonymoose)

View File

@ -15,7 +15,8 @@ from pytest_twisted import ensureDeferred
from . import vectors
from .vectors import parameters
from .util import reconfigure, upload, TahoeProcess
from .util import upload
from .grid import Client
@mark.parametrize('convergence', parameters.CONVERGENCE_SECRETS)
def test_convergence(convergence):
@ -36,8 +37,8 @@ async def test_capability(reactor, request, alice, case, expected):
computed value.
"""
# rewrite alice's config to match params and convergence
await reconfigure(
reactor, request, alice, (1, case.params.required, case.params.total), case.convergence, case.segment_size)
await alice.reconfigure_zfec(
reactor, (1, case.params.required, case.params.total), case.convergence, case.segment_size)
# upload data in the correct format
actual = upload(alice, case.fmt, case.data)
@ -82,7 +83,7 @@ async def skiptest_generate(reactor, request, alice):
async def generate(
reactor,
request,
alice: TahoeProcess,
alice: Client,
cases: Iterator[vectors.Case],
) -> AsyncGenerator[[vectors.Case, str], None]:
"""
@ -106,10 +107,8 @@ async def generate(
# reliability of this generator, be happy if we can put shares anywhere
happy = 1
for case in cases:
await reconfigure(
await alice.reconfigure_zfec(
reactor,
request,
alice,
(happy, case.params.required, case.params.total),
case.convergence,
case.segment_size
@ -117,5 +116,5 @@ async def generate(
# Give the format a chance to make an RSA key if it needs it.
case = evolve(case, fmt=case.fmt.customize())
cap = upload(alice, case.fmt, case.data)
cap = upload(alice.process, case.fmt, case.data)
yield case, cap

View File

@ -33,7 +33,7 @@ def test_index(alice):
"""
we can download the index file
"""
util.web_get(alice, u"")
util.web_get(alice.process, u"")
@run_in_thread
@ -41,7 +41,7 @@ def test_index_json(alice):
"""
we can download the index file as json
"""
data = util.web_get(alice, u"", params={u"t": u"json"})
data = util.web_get(alice.process, u"", params={u"t": u"json"})
# it should be valid json
json.loads(data)
@ -55,7 +55,7 @@ def test_upload_download(alice):
FILE_CONTENTS = u"some contents"
readcap = util.web_post(
alice, u"uri",
alice.process, u"uri",
data={
u"t": u"upload",
u"format": u"mdmf",
@ -67,7 +67,7 @@ def test_upload_download(alice):
readcap = readcap.strip()
data = util.web_get(
alice, u"uri",
alice.process, u"uri",
params={
u"uri": readcap,
u"filename": u"boom",
@ -85,11 +85,11 @@ def test_put(alice):
FILE_CONTENTS = b"added via PUT" * 20
resp = requests.put(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
data=FILE_CONTENTS,
)
cap = allmydata.uri.from_string(resp.text.strip().encode('ascii'))
cfg = alice.get_config()
cfg = alice.process.get_config()
assert isinstance(cap, allmydata.uri.CHKFileURI)
assert cap.size == len(FILE_CONTENTS)
assert cap.total_shares == int(cfg.get_config("client", "shares.total"))
@ -102,7 +102,7 @@ def test_helper_status(storage_nodes):
successfully GET the /helper_status page
"""
url = util.node_url(storage_nodes[0].node_dir, "helper_status")
url = util.node_url(storage_nodes[0].process.node_dir, "helper_status")
resp = requests.get(url)
assert resp.status_code >= 200 and resp.status_code < 300
dom = BeautifulSoup(resp.content, "html5lib")
@ -116,7 +116,7 @@ def test_deep_stats(alice):
URIs work
"""
resp = requests.post(
util.node_url(alice.node_dir, "uri"),
util.node_url(alice.process.node_dir, "uri"),
params={
"format": "sdmf",
"t": "mkdir",
@ -130,7 +130,7 @@ def test_deep_stats(alice):
uri = url_unquote(resp.url)
assert 'URI:DIR2:' in uri
dircap = uri[uri.find("URI:DIR2:"):].rstrip('/')
dircap_uri = util.node_url(alice.node_dir, "uri/{}".format(url_quote(dircap)))
dircap_uri = util.node_url(alice.process.node_dir, "uri/{}".format(url_quote(dircap)))
# POST a file into this directory
FILE_CONTENTS = u"a file in a directory"
@ -176,7 +176,7 @@ def test_deep_stats(alice):
while tries > 0:
tries -= 1
resp = requests.get(
util.node_url(alice.node_dir, u"operations/something_random"),
util.node_url(alice.process.node_dir, u"operations/something_random"),
)
d = json.loads(resp.content)
if d['size-literal-files'] == len(FILE_CONTENTS):
@ -201,21 +201,21 @@ def test_status(alice):
FILE_CONTENTS = u"all the Important Data of alice\n" * 1200
resp = requests.put(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
data=FILE_CONTENTS,
)
cap = resp.text.strip()
print("Uploaded data, cap={}".format(cap))
resp = requests.get(
util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap))),
util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap))),
)
print("Downloaded {} bytes of data".format(len(resp.content)))
assert str(resp.content, "ascii") == FILE_CONTENTS
resp = requests.get(
util.node_url(alice.node_dir, "status"),
util.node_url(alice.process.node_dir, "status"),
)
dom = html5lib.parse(resp.content)
@ -229,7 +229,7 @@ def test_status(alice):
for href in hrefs:
if href == u"/" or not href:
continue
resp = requests.get(util.node_url(alice.node_dir, href))
resp = requests.get(util.node_url(alice.process.node_dir, href))
if href.startswith(u"/status/up"):
assert b"File Upload Status" in resp.content
if b"Total Size: %d" % (len(FILE_CONTENTS),) in resp.content:
@ -241,7 +241,7 @@ def test_status(alice):
# download the specialized event information
resp = requests.get(
util.node_url(alice.node_dir, u"{}/event_json".format(href)),
util.node_url(alice.process.node_dir, u"{}/event_json".format(href)),
)
js = json.loads(resp.content)
# there's usually just one "read" operation, but this can handle many ..
@ -264,14 +264,14 @@ async def test_directory_deep_check(reactor, request, alice):
required = 2
total = 4
await util.reconfigure(reactor, request, alice, (happy, required, total), convergence=None)
await alice.reconfigure_zfec(reactor, (happy, required, total), convergence=None)
await deferToThread(_test_directory_deep_check_blocking, alice)
def _test_directory_deep_check_blocking(alice):
# create a directory
resp = requests.post(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
params={
u"t": u"mkdir",
u"redirect_to_result": u"true",
@ -320,7 +320,7 @@ def _test_directory_deep_check_blocking(alice):
print("Uploaded data1, cap={}".format(cap1))
resp = requests.get(
util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap0))),
util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap0))),
params={u"t": u"info"},
)
@ -437,7 +437,7 @@ def test_storage_info(storage_nodes):
storage0 = storage_nodes[0]
requests.get(
util.node_url(storage0.node_dir, u"storage"),
util.node_url(storage0.process.node_dir, u"storage"),
)
@ -449,7 +449,7 @@ def test_storage_info_json(storage_nodes):
storage0 = storage_nodes[0]
resp = requests.get(
util.node_url(storage0.node_dir, u"storage"),
util.node_url(storage0.process.node_dir, u"storage"),
params={u"t": u"json"},
)
data = json.loads(resp.content)
@ -462,12 +462,12 @@ def test_introducer_info(introducer):
retrieve and confirm /introducer URI for the introducer
"""
resp = requests.get(
util.node_url(introducer.node_dir, u""),
util.node_url(introducer.process.node_dir, u""),
)
assert b"Introducer" in resp.content
resp = requests.get(
util.node_url(introducer.node_dir, u""),
util.node_url(introducer.process.node_dir, u""),
params={u"t": u"json"},
)
data = json.loads(resp.content)
@ -484,14 +484,14 @@ def test_mkdir_with_children(alice):
# create a file to put in our directory
FILE_CONTENTS = u"some file contents\n" * 500
resp = requests.put(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
data=FILE_CONTENTS,
)
filecap = resp.content.strip()
# create a (sub) directory to put in our directory
resp = requests.post(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
params={
u"t": u"mkdir",
}
@ -534,7 +534,7 @@ def test_mkdir_with_children(alice):
# create a new directory with one file and one sub-dir (all-at-once)
resp = util.web_post(
alice, u"uri",
alice.process, u"uri",
params={u"t": "mkdir-with-children"},
data=json.dumps(meta),
)

View File

@ -70,16 +70,40 @@ class _ProcessExitedProtocol(ProcessProtocol):
self.done.callback(None)
class ProcessFailed(Exception):
"""
A subprocess has failed.
:ivar ProcessTerminated reason: the original reason from .processExited
:ivar StringIO output: all stdout and stderr collected to this point.
"""
def __init__(self, reason, output):
self.reason = reason
self.output = output
def __str__(self):
return "<ProcessFailed: {}>:\n{}".format(self.reason, self.output)
class _CollectOutputProtocol(ProcessProtocol):
"""
Internal helper. Collects all output (stdout + stderr) into
self.output, and callback's on done with all of it after the
process exits (for any reason).
"""
def __init__(self, capture_stderr=True):
def __init__(self, capture_stderr=True, stdin=None):
self.done = Deferred()
self.output = BytesIO()
self.capture_stderr = capture_stderr
self._stdin = stdin
def connectionMade(self):
if self._stdin is not None:
self.transport.write(self._stdin)
self.transport.closeStdin()
def processEnded(self, reason):
if not self.done.called:
@ -87,7 +111,7 @@ class _CollectOutputProtocol(ProcessProtocol):
def processExited(self, reason):
if not isinstance(reason.value, ProcessDone):
self.done.errback(reason)
self.done.errback(ProcessFailed(reason, self.output.getvalue()))
def outReceived(self, data):
self.output.write(data)
@ -153,38 +177,33 @@ class _MagicTextProtocol(ProcessProtocol):
sys.stdout.write(self.name + line + "\n")
def _cleanup_process_async(transport: IProcessTransport, allow_missing: bool) -> None:
def _cleanup_process_async(transport: IProcessTransport) -> None:
"""
If the given process transport seems to still be associated with a
running process, send a SIGTERM to that process.
:param transport: The transport to use.
:param allow_missing: If ``True`` then it is not an error for the
transport to have no associated process. Otherwise, an exception will
be raised in that case.
:raise: ``ValueError`` if ``allow_missing`` is ``False`` and the transport
has no process.
"""
if transport.pid is None:
if allow_missing:
print("Process already cleaned up and that's okay.")
return
else:
raise ValueError("Process is not running")
# in cases of "restart", we will have registered a finalizer
# that will kill the process -- but already explicitly killed
# it (and then ran again) due to the "restart". So, if the
# process is already killed, our job is done.
print("Process already cleaned up and that's okay.")
return
print("signaling {} with TERM".format(transport.pid))
try:
transport.signalProcess('TERM')
except ProcessExitedAlready:
# The transport object thought it still had a process but the real OS
# process has already exited. That's fine. We accomplished what we
# wanted to. We don't care about ``allow_missing`` here because
# there's no way we could have known the real OS process already
# exited.
# wanted to.
pass
def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False):
def _cleanup_tahoe_process(tahoe_transport, exited):
"""
Terminate the given process with a kill signal (SIGTERM on POSIX,
TerminateProcess on Windows).
@ -195,12 +214,26 @@ def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False):
:return: After the process has exited.
"""
from twisted.internet import reactor
_cleanup_process_async(tahoe_transport, allow_missing=allow_missing)
print("signaled, blocking on exit")
_cleanup_process_async(tahoe_transport)
print(f"signaled, blocking on exit {exited}")
block_with_timeout(exited, reactor)
print("exited, goodbye")
def run_tahoe(reactor, request, *args, **kwargs):
"""
Helper to run tahoe with optional coverage.
:returns: a Deferred that fires when the command is done (or a
ProcessFailed exception if it exits non-zero)
"""
stdin = kwargs.get("stdin", None)
protocol = _CollectOutputProtocol(stdin=stdin)
process = _tahoe_runner_optional_coverage(protocol, reactor, request, args)
process.exited = protocol.done
return protocol.done
def _tahoe_runner_optional_coverage(proto, reactor, request, other_args):
"""
Internal helper. Calls spawnProcess with `-m
@ -244,16 +277,20 @@ class TahoeProcess(object):
)
def kill(self):
"""Kill the process, block until it's done."""
"""
Kill the process, block until it's done.
Does nothing if the process is already stopped (or never started).
"""
print(f"TahoeProcess.kill({self.transport.pid} / {self.node_dir})")
_cleanup_tahoe_process(self.transport, self.transport.exited)
def kill_async(self):
"""
Kill the process, return a Deferred that fires when it's done.
Does nothing if the process is already stopped (or never started).
"""
print(f"TahoeProcess.kill_async({self.transport.pid} / {self.node_dir})")
_cleanup_process_async(self.transport, allow_missing=False)
_cleanup_process_async(self.transport)
return self.transport.exited
def restart_async(self, reactor: IReactorProcess, request: Any) -> Deferred:
@ -264,7 +301,7 @@ class TahoeProcess(object):
handle requests.
"""
d = self.kill_async()
d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None, finalize=False))
d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None))
def got_new_process(proc):
# Grab the new transport since the one we had before is no longer
# valid after the stop/start cycle.
@ -276,7 +313,7 @@ class TahoeProcess(object):
return "<TahoeProcess in '{}'>".format(self._node_dir)
def _run_node(reactor, node_dir, request, magic_text, finalize=True):
def _run_node(reactor, node_dir, request, magic_text):
"""
Run a tahoe process from its node_dir.
@ -305,8 +342,7 @@ def _run_node(reactor, node_dir, request, magic_text, finalize=True):
node_dir,
)
if finalize:
request.addfinalizer(tahoe_process.kill)
request.addfinalizer(tahoe_process.kill)
d = protocol.magic_seen
d.addCallback(lambda ignored: tahoe_process)
@ -348,8 +384,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
magic_text=None,
needed=2,
happy=3,
total=4,
finalize=True):
total=4):
"""
Helper to create a single node, run it and return the instance
spawnProcess returned (ITransport)
@ -360,7 +395,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
if exists(node_dir):
created_d = succeed(None)
else:
print("creating", node_dir)
print("creating: {}".format(node_dir))
mkdir(node_dir)
done_proto = _ProcessExitedProtocol()
args = [
@ -383,13 +418,13 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
created_d = done_proto.done
def created(_):
basic_node_configuration(request, flog_gatherer, node_dir)
basic_node_configuration(request, flog_gatherer.furl, node_dir)
created_d.addCallback(created)
d = Deferred()
d.callback(None)
d.addCallback(lambda _: created_d)
d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text, finalize=finalize))
d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text))
return d
@ -622,19 +657,28 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve
time.sleep(1)
continue
print(
f"Now: {time.ctime()}\n"
f"Server last-received-data: {[time.ctime(s['last_received_data']) for s in servers]}"
)
now = time.time()
server_times = [
server['last_received_data']
for server in servers
for server
in servers
if server['last_received_data'] is not None
]
print(
f"Now: {time.ctime(now)}\n"
f"Liveness required: {liveness}\n"
f"Server last-received-data: {[time.ctime(s) for s in server_times]}\n"
f"Server ages: {[now - s for s in server_times]}\n"
)
# check that all times are 'recent enough' (it's OK if _some_ servers
# are down, we just want to make sure a sufficient number are up)
if len([time.time() - t <= liveness for t in server_times if t is not None]) < minimum_number_of_servers:
print("waiting because at least one server too old")
alive = [t for t in server_times if now - t <= liveness]
if len(alive) < minimum_number_of_servers:
print(
f"waiting because we found {len(alive)} servers "
f"and want {minimum_number_of_servers}"
)
time.sleep(1)
continue
@ -706,7 +750,6 @@ class SSK:
def load(cls, params: dict) -> SSK:
assert params.keys() == {"format", "mutable", "key"}
return cls(params["format"], params["key"].encode("ascii"))
def customize(self) -> SSK:
"""
Return an SSK with a newly generated random RSA key.
@ -745,7 +788,7 @@ def upload(alice: TahoeProcess, fmt: CHK | SSK, data: bytes) -> str:
f.write(data)
f.flush()
with fmt.to_argv() as fmt_argv:
argv = [alice, "put"] + fmt_argv + [f.name]
argv = [alice.process, "put"] + fmt_argv + [f.name]
return cli(*argv).decode("utf-8").strip()

0
newsfragments/3508.minor Normal file
View File

View File

@ -0,0 +1,4 @@
Provide better feedback from plugin configuration errors
Local errors now print a useful message and exit.
Announcements that only contain invalid / unusable plugins now show a message in the Welcome page.

0
newsfragments/4052.minor Normal file
View File

0
newsfragments/4055.minor Normal file
View File

View File

@ -0,0 +1,3 @@
Provide our own copy of attrs' "provides()" validator
This validator is deprecated and slated for removal; that project's suggestion is to copy the code to our project.

0
newsfragments/4059.minor Normal file
View File

View File

@ -118,10 +118,10 @@ install_requires = [
"pyrsistent",
# A great way to define types of values.
"attrs >= 18.2.0",
"attrs >= 20.1.0",
# WebSocket library for twisted and asyncio
"autobahn",
"autobahn >= 22.4.3",
# Support for Python 3 transition
"future >= 0.18.2",
@ -151,7 +151,7 @@ install_requires = [
"pycddl >= 0.4",
# Command-line parsing
"click >= 7.0",
"click >= 8.1.1",
# for pid-file support
"psutil",
@ -413,7 +413,7 @@ setup(name="tahoe-lafs", # also set in __init__.py
"pip==22.0.3",
"wheel==0.37.1",
"setuptools==60.9.1",
"subunitreporter==22.2.0",
"subunitreporter==23.8.0",
"python-subunit==1.4.2",
"junitxml==0.7",
"coverage==7.2.5",
@ -435,7 +435,7 @@ setup(name="tahoe-lafs", # also set in __init__.py
"paramiko < 2.9",
"pytest-timeout",
# Does our OpenMetrics endpoint adhere to the spec:
"prometheus-client == 0.11.0",
"prometheus-client == 0.11.0"
] + tor_requires + i2p_requires,
"tor": tor_requires,
"i2p": i2p_requires,

View File

@ -222,3 +222,7 @@ def _config_path_from_option(config: str) -> Optional[FilePath]:
if config == "-":
return None
return FilePath(config)
if __name__ == '__main__':
grid_manager() # type: ignore

View File

@ -483,6 +483,11 @@ def create_storage_farm_broker(config: _Config, default_connection_handlers, foo
storage_client_config = storage_client.StorageClientConfig.from_node_config(
config,
)
# ensure that we can at least load all plugins that the
# configuration mentions; doing this early (i.e. before creating
# storage-clients themselves) allows us to exit in case of a
# problem.
storage_client_config.get_configured_storage_plugins()
def tub_creator(handler_overrides=None, **kwargs):
return node.create_tub(

View File

@ -486,7 +486,9 @@ def create_grid_manager_verifier(keys, certs, public_key, now_fn=None, bad_cert=
now = now_fn()
for cert in valid_certs:
expires = datetime.fromisoformat(cert["expires"])
if cert['public_key'].encode("ascii") == public_key:
pc = cert['public_key'].encode('ascii')
assert type(pc) == type(public_key), "{} isn't {}".format(type(pc), type(public_key))
if pc == public_key:
if expires > now:
# not-expired
return True

View File

@ -42,6 +42,9 @@ from allmydata.util.pid import (
from allmydata.storage.crawler import (
MigratePickleFileError,
)
from allmydata.storage_client import (
MissingPlugin,
)
from allmydata.node import (
PortAssignmentRequired,
PrivacyError,
@ -197,6 +200,17 @@ class DaemonizeTheRealService(Service, HookMixin):
self.basedir,
)
)
elif reason.check(MissingPlugin):
self.stderr.write(
"Missing Plugin\n"
"The configuration requests a plugin:\n"
"\n {}\n\n"
"...which cannot be found.\n"
"This typically means that some software hasn't been installed or the plugin couldn't be instantiated.\n\n"
.format(
reason.value.plugin_name,
)
)
else:
self.stderr.write("\nUnknown error, here's the traceback:\n")
reason.printTraceback(self.stderr)

View File

@ -41,6 +41,7 @@ from twisted.internet.interfaces import (
IDelayedCall,
)
from twisted.internet.ssl import CertificateOptions
from twisted.protocols.tls import TLSMemoryBIOProtocol
from twisted.web.client import Agent, HTTPConnectionPool
from zope.interface import implementer
from hyperlink import DecodedURL
@ -72,7 +73,7 @@ except ImportError:
pass
def _encode_si(si): # type: (bytes) -> str
def _encode_si(si: bytes) -> str:
"""Encode the storage index into Unicode string."""
return str(si_b2a(si), "ascii")
@ -80,9 +81,13 @@ def _encode_si(si): # type: (bytes) -> str
class ClientException(Exception):
"""An unexpected response code from the server."""
def __init__(self, code, *additional_args):
Exception.__init__(self, code, *additional_args)
def __init__(
self, code: int, message: Optional[str] = None, body: Optional[bytes] = None
):
Exception.__init__(self, code, message, body)
self.code = code
self.message = message
self.body = body
register_exception_extractor(ClientException, lambda e: {"response_code": e.code})
@ -93,7 +98,7 @@ register_exception_extractor(ClientException, lambda e: {"response_code": e.code
# Tags are of the form #6.nnn, where the number is documented at
# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258
# indicates a set.
_SCHEMAS = {
_SCHEMAS: Mapping[str, Schema] = {
"get_version": Schema(
# Note that the single-quoted (`'`) string keys in this schema
# represent *byte* strings - per the CDDL specification. Text strings
@ -155,7 +160,7 @@ class _LengthLimitedCollector:
timeout_on_silence: IDelayedCall
f: BytesIO = field(factory=BytesIO)
def __call__(self, data: bytes):
def __call__(self, data: bytes) -> None:
self.timeout_on_silence.reset(60)
self.remaining_length -= len(data)
if self.remaining_length < 0:
@ -164,7 +169,7 @@ class _LengthLimitedCollector:
def limited_content(
response,
response: IResponse,
clock: IReactorTime,
max_length: int = 30 * 1024 * 1024,
) -> Deferred[BinaryIO]:
@ -300,11 +305,13 @@ class _StorageClientHTTPSPolicy:
expected_spki_hash: bytes
# IPolicyForHTTPS
def creatorForNetloc(self, hostname, port):
def creatorForNetloc(self, hostname: str, port: int) -> _StorageClientHTTPSPolicy:
return self
# IOpenSSLClientConnectionCreator
def clientConnectionForTLS(self, tlsProtocol):
def clientConnectionForTLS(
self, tlsProtocol: TLSMemoryBIOProtocol
) -> SSL.Connection:
return SSL.Connection(
_TLSContextFactory(self.expected_spki_hash).getContext(), None
)
@ -344,7 +351,7 @@ class StorageClientFactory:
cls.TEST_MODE_REGISTER_HTTP_POOL = callback
@classmethod
def stop_test_mode(cls):
def stop_test_mode(cls) -> None:
"""Stop testing mode."""
cls.TEST_MODE_REGISTER_HTTP_POOL = None
@ -437,7 +444,7 @@ class StorageClient(object):
"""Get a URL relative to the base URL."""
return self._base_url.click(path)
def _get_headers(self, headers): # type: (Optional[Headers]) -> Headers
def _get_headers(self, headers: Optional[Headers]) -> Headers:
"""Return the basic headers to be used by default."""
if headers is None:
headers = Headers()
@ -565,7 +572,7 @@ class StorageClient(object):
).read()
raise ClientException(response.code, response.phrase, data)
def shutdown(self) -> Deferred:
def shutdown(self) -> Deferred[object]:
"""Shutdown any connections."""
return self._pool.closeCachedConnections()

View File

@ -4,7 +4,18 @@ HTTP server for storage.
from __future__ import annotations
from typing import Any, Callable, Union, cast, Optional
from typing import (
Any,
Callable,
Union,
cast,
Optional,
TypeVar,
Sequence,
Protocol,
Dict,
)
from typing_extensions import ParamSpec, Concatenate
from functools import wraps
from base64 import b64decode
import binascii
@ -15,20 +26,24 @@ import mmap
from eliot import start_action
from cryptography.x509 import Certificate as CryptoCertificate
from zope.interface import implementer
from klein import Klein
from klein import Klein, KleinRenderable
from klein.resource import KleinResource
from twisted.web import http
from twisted.internet.interfaces import (
IListeningPort,
IStreamServerEndpoint,
IPullProducer,
IProtocolFactory,
)
from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import Deferred
from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate
from twisted.internet.interfaces import IReactorFromThreads
from twisted.web.server import Site, Request
from twisted.web.iweb import IRequest
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.filepath import FilePath
from twisted.python.failure import Failure
from attrs import define, field, Factory
from werkzeug.http import (
@ -68,7 +83,7 @@ class ClientSecretsException(Exception):
def _extract_secrets(
header_values: list[str], required_secrets: set[Secrets]
header_values: Sequence[str], required_secrets: set[Secrets]
) -> dict[Secrets, bytes]:
"""
Given list of values of ``X-Tahoe-Authorization`` headers, and required
@ -102,18 +117,43 @@ def _extract_secrets(
return result
def _authorization_decorator(required_secrets):
class BaseApp(Protocol):
"""Protocol for ``HTTPServer`` and testing equivalent."""
_swissnum: bytes
P = ParamSpec("P")
T = TypeVar("T")
SecretsDict = Dict[Secrets, bytes]
App = TypeVar("App", bound=BaseApp)
def _authorization_decorator(
required_secrets: set[Secrets],
) -> Callable[
[Callable[Concatenate[App, Request, SecretsDict, P], T]],
Callable[Concatenate[App, Request, P], T],
]:
"""
1. Check the ``Authorization`` header matches server swissnum.
2. Extract ``X-Tahoe-Authorization`` headers and pass them in.
3. Log the request and response.
"""
def decorator(f):
def decorator(
f: Callable[Concatenate[App, Request, SecretsDict, P], T]
) -> Callable[Concatenate[App, Request, P], T]:
@wraps(f)
def route(self, request, *args, **kwargs):
# Don't set text/html content type by default:
request.defaultContentType = None
def route(
self: App,
request: Request,
*args: P.args,
**kwargs: P.kwargs,
) -> T:
# Don't set text/html content type by default.
# None is actually supported, see https://github.com/twisted/twisted/issues/11902
request.defaultContentType = None # type: ignore[assignment]
with start_action(
action_type="allmydata:storage:http-server:handle-request",
@ -163,7 +203,22 @@ def _authorization_decorator(required_secrets):
return decorator
def _authorized_route(app, required_secrets, *route_args, **route_kwargs):
def _authorized_route(
klein_app: Klein,
required_secrets: set[Secrets],
url: str,
*route_args: Any,
branch: bool = False,
**route_kwargs: Any,
) -> Callable[
[
Callable[
Concatenate[App, Request, SecretsDict, P],
KleinRenderable,
]
],
Callable[..., KleinRenderable],
]:
"""
Like Klein's @route, but with additional support for checking the
``Authorization`` header as well as ``X-Tahoe-Authorization`` headers. The
@ -173,12 +228,23 @@ def _authorized_route(app, required_secrets, *route_args, **route_kwargs):
:param required_secrets: Set of required ``Secret`` types.
"""
def decorator(f):
@app.route(*route_args, **route_kwargs)
def decorator(
f: Callable[
Concatenate[App, Request, SecretsDict, P],
KleinRenderable,
]
) -> Callable[..., KleinRenderable]:
@klein_app.route(url, *route_args, branch=branch, **route_kwargs) # type: ignore[arg-type]
@_authorization_decorator(required_secrets)
@wraps(f)
def handle_route(*args, **kwargs):
return f(*args, **kwargs)
def handle_route(
app: App,
request: Request,
secrets: SecretsDict,
*args: P.args,
**kwargs: P.kwargs,
) -> KleinRenderable:
return f(app, request, secrets, *args, **kwargs)
return handle_route
@ -234,7 +300,7 @@ class UploadsInProgress(object):
except (KeyError, IndexError):
raise _HTTPError(http.NOT_FOUND)
def remove_write_bucket(self, bucket: BucketWriter):
def remove_write_bucket(self, bucket: BucketWriter) -> None:
"""Stop tracking the given ``BucketWriter``."""
try:
storage_index, share_number = self._bucketwriters.pop(bucket)
@ -250,7 +316,7 @@ class UploadsInProgress(object):
def validate_upload_secret(
self, storage_index: bytes, share_number: int, upload_secret: bytes
):
) -> None:
"""
Raise an unauthorized-HTTP-response exception if the given
storage_index+share_number have a different upload secret than the
@ -272,7 +338,7 @@ class StorageIndexConverter(BaseConverter):
regex = "[" + str(rfc3548_alphabet, "ascii") + "]{26}"
def to_python(self, value):
def to_python(self, value: str) -> bytes:
try:
return si_a2b(value.encode("ascii"))
except (AssertionError, binascii.Error, ValueError):
@ -351,7 +417,7 @@ class _ReadAllProducer:
start: int = field(default=0)
@classmethod
def produce_to(cls, request: Request, read_data: ReadData) -> Deferred:
def produce_to(cls, request: Request, read_data: ReadData) -> Deferred[bytes]:
"""
Create and register the producer, returning ``Deferred`` that should be
returned from a HTTP server endpoint.
@ -360,7 +426,7 @@ class _ReadAllProducer:
request.registerProducer(producer, False)
return producer.result
def resumeProducing(self):
def resumeProducing(self) -> None:
data = self.read_data(self.start, 65536)
if not data:
self.request.unregisterProducer()
@ -371,10 +437,10 @@ class _ReadAllProducer:
self.request.write(data)
self.start += len(data)
def pauseProducing(self):
def pauseProducing(self) -> None:
pass
def stopProducing(self):
def stopProducing(self) -> None:
pass
@ -392,7 +458,7 @@ class _ReadRangeProducer:
start: int
remaining: int
def resumeProducing(self):
def resumeProducing(self) -> None:
if self.result is None or self.request is None:
return
@ -429,10 +495,10 @@ class _ReadRangeProducer:
if self.remaining == 0:
self.stopProducing()
def pauseProducing(self):
def pauseProducing(self) -> None:
pass
def stopProducing(self):
def stopProducing(self) -> None:
if self.request is not None:
self.request.unregisterProducer()
self.request = None
@ -511,12 +577,13 @@ def read_range(
return d
def _add_error_handling(app: Klein):
def _add_error_handling(app: Klein) -> None:
"""Add exception handlers to a Klein app."""
@app.handle_errors(_HTTPError)
def _http_error(_, request, failure):
def _http_error(self: Any, request: IRequest, failure: Failure) -> KleinRenderable:
"""Handle ``_HTTPError`` exceptions."""
assert isinstance(failure.value, _HTTPError)
request.setResponseCode(failure.value.code)
if failure.value.body is not None:
return failure.value.body
@ -524,7 +591,9 @@ def _add_error_handling(app: Klein):
return b""
@app.handle_errors(CDDLValidationError)
def _cddl_validation_error(_, request, failure):
def _cddl_validation_error(
self: Any, request: IRequest, failure: Failure
) -> KleinRenderable:
"""Handle CDDL validation errors."""
request.setResponseCode(http.BAD_REQUEST)
return str(failure.value).encode("utf-8")
@ -584,7 +653,7 @@ async def read_encoded(
return cbor2.load(request.content)
class HTTPServer(object):
class HTTPServer(BaseApp):
"""
A HTTP interface to the storage server.
"""
@ -611,11 +680,11 @@ class HTTPServer(object):
self._uploads.remove_write_bucket
)
def get_resource(self):
def get_resource(self) -> KleinResource:
"""Return twisted.web ``Resource`` for this object."""
return self._app.resource()
def _send_encoded(self, request, data):
def _send_encoded(self, request: Request, data: object) -> Deferred[bytes]:
"""
Return encoded data suitable for writing as the HTTP body response, by
default using CBOR.
@ -641,11 +710,10 @@ class HTTPServer(object):
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
raise _HTTPError(http.NOT_ACCEPTABLE)
##### Generic APIs #####
@_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"])
def version(self, request, authorization):
def version(self, request: Request, authorization: SecretsDict) -> KleinRenderable:
"""Return version information."""
return self._send_encoded(request, self._get_version())
@ -677,7 +745,9 @@ class HTTPServer(object):
methods=["POST"],
)
@async_to_deferred
async def allocate_buckets(self, request, authorization, storage_index):
async def allocate_buckets(
self, request: Request, authorization: SecretsDict, storage_index: bytes
) -> KleinRenderable:
"""Allocate buckets."""
upload_secret = authorization[Secrets.UPLOAD]
# It's just a list of up to ~256 shares, shouldn't use many bytes.
@ -716,7 +786,13 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/abort",
methods=["PUT"],
)
def abort_share_upload(self, request, authorization, storage_index, share_number):
def abort_share_upload(
self,
request: Request,
authorization: SecretsDict,
storage_index: bytes,
share_number: int,
) -> KleinRenderable:
"""Abort an in-progress immutable share upload."""
try:
bucket = self._uploads.get_write_bucket(
@ -747,7 +823,13 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
methods=["PATCH"],
)
def write_share_data(self, request, authorization, storage_index, share_number):
def write_share_data(
self,
request: Request,
authorization: SecretsDict,
storage_index: bytes,
share_number: int,
) -> KleinRenderable:
"""Write data to an in-progress immutable upload."""
content_range = parse_content_range_header(request.getHeader("content-range"))
if content_range is None or content_range.units != "bytes":
@ -757,14 +839,17 @@ class HTTPServer(object):
bucket = self._uploads.get_write_bucket(
storage_index, share_number, authorization[Secrets.UPLOAD]
)
offset = content_range.start
remaining = content_range.stop - content_range.start
offset = content_range.start or 0
# We don't support an unspecified stop for the range:
assert content_range.stop is not None
# Missing body makes no sense:
assert request.content is not None
remaining = content_range.stop - offset
finished = False
while remaining > 0:
data = request.content.read(min(remaining, 65536))
assert data, "uploaded data length doesn't match range"
try:
finished = bucket.write(offset, data)
except ConflictingWriteError:
@ -790,7 +875,9 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>/shares",
methods=["GET"],
)
def list_shares(self, request, authorization, storage_index):
def list_shares(
self, request: Request, authorization: SecretsDict, storage_index: bytes
) -> KleinRenderable:
"""
List shares for the given storage index.
"""
@ -803,7 +890,13 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
methods=["GET"],
)
def read_share_chunk(self, request, authorization, storage_index, share_number):
def read_share_chunk(
self,
request: Request,
authorization: SecretsDict,
storage_index: bytes,
share_number: int,
) -> KleinRenderable:
"""Read a chunk for an already uploaded immutable."""
request.setHeader("content-type", "application/octet-stream")
try:
@ -820,7 +913,9 @@ class HTTPServer(object):
"/storage/v1/lease/<storage_index:storage_index>",
methods=["PUT"],
)
def add_or_renew_lease(self, request, authorization, storage_index):
def add_or_renew_lease(
self, request: Request, authorization: SecretsDict, storage_index: bytes
) -> KleinRenderable:
"""Update the lease for an immutable or mutable share."""
if not list(self._storage_server.get_shares(storage_index)):
raise _HTTPError(http.NOT_FOUND)
@ -843,8 +938,12 @@ class HTTPServer(object):
)
@async_to_deferred
async def advise_corrupt_share_immutable(
self, request, authorization, storage_index, share_number
):
self,
request: Request,
authorization: SecretsDict,
storage_index: bytes,
share_number: int,
) -> KleinRenderable:
"""Indicate that given share is corrupt, with a text reason."""
try:
bucket = self._storage_server.get_buckets(storage_index)[share_number]
@ -871,10 +970,15 @@ class HTTPServer(object):
methods=["POST"],
)
@async_to_deferred
async def mutable_read_test_write(self, request, authorization, storage_index):
async def mutable_read_test_write(
self, request: Request, authorization: SecretsDict, storage_index: bytes
) -> KleinRenderable:
"""Read/test/write combined operation for mutables."""
rtw_request = await read_encoded(
self._reactor, request, _SCHEMAS["mutable_read_test_write"], max_size=2**48
self._reactor,
request,
_SCHEMAS["mutable_read_test_write"],
max_size=2**48,
)
secrets = (
authorization[Secrets.WRITE_ENABLER],
@ -910,7 +1014,13 @@ class HTTPServer(object):
"/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>",
methods=["GET"],
)
def read_mutable_chunk(self, request, authorization, storage_index, share_number):
def read_mutable_chunk(
self,
request: Request,
authorization: SecretsDict,
storage_index: bytes,
share_number: int,
) -> KleinRenderable:
"""Read a chunk from a mutable."""
request.setHeader("content-type", "application/octet-stream")
@ -950,8 +1060,12 @@ class HTTPServer(object):
)
@async_to_deferred
async def advise_corrupt_share_mutable(
self, request, authorization, storage_index, share_number
):
self,
request: Request,
authorization: SecretsDict,
storage_index: bytes,
share_number: int,
) -> KleinRenderable:
"""Indicate that given share is corrupt, with a text reason."""
if share_number not in {
shnum for (shnum, _) in self._storage_server.get_shares(storage_index)
@ -983,7 +1097,10 @@ class _TLSEndpointWrapper(object):
@classmethod
def from_paths(
cls, endpoint, private_key_path: FilePath, cert_path: FilePath
cls: type[_TLSEndpointWrapper],
endpoint: IStreamServerEndpoint,
private_key_path: FilePath,
cert_path: FilePath,
) -> "_TLSEndpointWrapper":
"""
Create an endpoint with the given private key and certificate paths on
@ -998,7 +1115,7 @@ class _TLSEndpointWrapper(object):
)
return cls(endpoint=endpoint, context_factory=certificate_options)
def listen(self, factory):
def listen(self, factory: IProtocolFactory) -> Deferred[IListeningPort]:
return self.endpoint.listen(
TLSMemoryBIOFactory(self.context_factory, False, factory)
)

View File

@ -33,15 +33,17 @@ Ported to Python 3.
from __future__ import annotations
from six import ensure_text
from typing import Union, Callable, Any, Optional, cast
from typing import Union, Callable, Any, Optional, cast, Dict
from os import urandom
import re
import time
import hashlib
from io import StringIO
from configparser import NoSectionError
import json
import attr
from attr import define
from hyperlink import DecodedURL
from twisted.web.client import HTTPConnectionPool
from zope.interface import (
@ -55,12 +57,14 @@ from twisted.internet.task import LoopingCall
from twisted.internet import defer, reactor
from twisted.internet.interfaces import IReactorTime
from twisted.application import service
from twisted.logger import Logger
from twisted.plugin import (
getPlugins,
)
from eliot import (
log_call,
)
from foolscap.ipb import IRemoteReference
from foolscap.api import eventually, RemoteException
from foolscap.reconnector import (
ReconnectionInfo,
@ -74,7 +78,7 @@ from allmydata.interfaces import (
VersionMessage
)
from allmydata.grid_manager import (
create_grid_manager_verifier,
create_grid_manager_verifier, SignedCertificate
)
from allmydata.crypto import (
ed25519,
@ -87,6 +91,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.util.deferredutil import async_to_deferred, race
from allmydata.util.attrs_provides import provides
from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException, StorageClientMutables,
@ -95,6 +100,8 @@ from allmydata.storage.http_client import (
)
from .node import _Config
_log = Logger()
ANONYMOUS_STORAGE_NURLS = "anonymous-storage-NURLs"
@ -180,6 +187,31 @@ class StorageClientConfig(object):
grid_manager_keys,
)
def get_configured_storage_plugins(self) -> dict[str, IFoolscapStoragePlugin]:
"""
:returns: a mapping from names to instances for all available
plugins
:raises MissingPlugin: if the configuration asks for a plugin
for which there is no corresponding instance (e.g. it is
not installed).
"""
plugins = {
plugin.name: plugin
for plugin
in getPlugins(IFoolscapStoragePlugin)
}
# mypy doesn't like "str" in place of Any ...
configured: Dict[Any, IFoolscapStoragePlugin] = dict()
for plugin_name in self.storage_plugins:
try:
plugin = plugins[plugin_name]
except KeyError:
raise MissingPlugin(plugin_name)
configured[plugin_name] = plugin
return configured
@implementer(IStorageBroker)
class StorageFarmBroker(service.MultiService):
@ -317,8 +349,8 @@ class StorageFarmBroker(service.MultiService):
assert isinstance(server_id, bytes)
gm_verifier = create_grid_manager_verifier(
self.storage_client_config.grid_manager_keys,
server["ann"].get("grid-manager-certificates", []),
"pub-{}".format(str(server_id, "ascii")), # server_id is v0-<key> not pub-v0-key .. for reasons?
[SignedCertificate.load(StringIO(json.dumps(data))) for data in server["ann"].get("grid-manager-certificates", [])],
"pub-{}".format(str(server_id, "ascii")).encode("ascii"), # server_id is v0-<key> not pub-v0-key .. for reasons?
)
if self._should_we_use_http(self.node_config, server["ann"]):
@ -658,7 +690,7 @@ class _FoolscapStorage(object):
permutation_seed = attr.ib()
tubid = attr.ib()
storage_server = attr.ib(validator=attr.validators.provides(IStorageServer))
storage_server = attr.ib(validator=provides(IStorageServer))
_furl = attr.ib()
_short_description = attr.ib()
@ -708,6 +740,7 @@ class _FoolscapStorage(object):
@implementer(IFoolscapStorageServer)
@define
class _NullStorage(object):
"""
Abstraction for *not* communicating with a storage server of a type with
@ -721,7 +754,7 @@ class _NullStorage(object):
lease_seed = hashlib.sha256(b"").digest()
name = "<unsupported>"
longname = "<storage with unsupported protocol>"
longname: str = "<storage with unsupported protocol>"
def connect_to(self, tub, got_connection):
return NonReconnector()
@ -740,8 +773,6 @@ class NonReconnector(object):
def getReconnectionInfo(self):
return ReconnectionInfo()
_null_storage = _NullStorage()
class AnnouncementNotMatched(Exception):
"""
@ -750,6 +781,18 @@ class AnnouncementNotMatched(Exception):
"""
@attr.s(auto_exc=True)
class MissingPlugin(Exception):
"""
A particular plugin was requested but is missing
"""
plugin_name = attr.ib()
def __str__(self):
return "Missing plugin '{}'".format(self.plugin_name)
def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref):
"""
Construct an ``IStorageServer`` from the most locally-preferred plugin
@ -757,27 +800,37 @@ def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref):
:param allmydata.node._Config node_config: The node configuration to
pass to the plugin.
:param dict announcement: The storage announcement for the storage
server we should build
"""
plugins = {
plugin.name: plugin
for plugin
in getPlugins(IFoolscapStoragePlugin)
}
storage_options = announcement.get(u"storage-options", [])
for plugin_name, plugin_config in list(config.storage_plugins.items()):
plugins = config.get_configured_storage_plugins()
# for every storage-option that we have enabled locally (in order
# of preference), see if the announcement asks for such a thing.
# if it does, great: we return that storage-client
# otherwise we've run out of options...
for options in storage_options:
try:
plugin = plugins[plugin_name]
plugin = plugins[options[u"name"]]
except KeyError:
raise ValueError("{} not installed".format(plugin_name))
for option in storage_options:
if plugin_name == option[u"name"]:
furl = option[u"storage-server-FURL"]
return furl, plugin.get_storage_client(
node_config,
option,
get_rref,
)
raise AnnouncementNotMatched()
# we didn't configure this kind of plugin locally, so
# consider the next announced option
continue
furl = options[u"storage-server-FURL"]
return furl, plugin.get_storage_client(
node_config,
options,
get_rref,
)
# none of the storage options in the announcement are configured
# locally; we can't make a storage-client.
plugin_names = ", ".join(sorted(option["name"] for option in storage_options))
raise AnnouncementNotMatched(plugin_names)
def _available_space_from_version(version):
@ -790,6 +843,83 @@ def _available_space_from_version(version):
return available_space
def _make_storage_system(
node_config: _Config,
config: StorageClientConfig,
ann: dict,
server_id: bytes,
get_rref: Callable[[], Optional[IRemoteReference]],
) -> IFoolscapStorageServer:
"""
Create an object for interacting with the storage server described by
the given announcement.
:param node_config: The node configuration to pass to any configured
storage plugins.
:param config: Configuration specifying desired storage client behavior.
:param ann: The storage announcement from the storage server we are meant
to communicate with.
:param server_id: The unique identifier for the server.
:param get_rref: A function which returns a remote reference to the
server-side object which implements this storage system, if one is
available (otherwise None).
:return: An object enabling communication via Foolscap with the server
which generated the announcement.
"""
unmatched = None
# Try to match the announcement against a plugin.
try:
furl, storage_server = _storage_from_foolscap_plugin(
node_config,
config,
ann,
# Pass in an accessor for our _rref attribute. The value of
# the attribute may change over time as connections are lost
# and re-established. The _StorageServer should always be
# able to get the most up-to-date value.
get_rref,
)
except AnnouncementNotMatched as e:
# show a more-specific error to the user for this server
# (Note this will only be shown if the server _doesn't_ offer
# anonymous service, which will match below)
unmatched = _NullStorage('{}: missing plugin "{}"'.format(server_id.decode("utf8"), str(e)))
else:
return _FoolscapStorage.from_announcement(
server_id,
furl,
ann,
storage_server,
)
# Try to match the announcement against the anonymous access scheme.
try:
furl = ann[u"anonymous-storage-FURL"]
except KeyError:
# Nope
pass
else:
# See comment above for the _storage_from_foolscap_plugin case
# about passing in get_rref.
storage_server = _StorageServer(get_rref=get_rref)
return _FoolscapStorage.from_announcement(
server_id,
furl,
ann,
storage_server,
)
# Nothing matched so we can't talk to this server. (There should
# not be a way to get here without this local being valid)
assert unmatched is not None, "Expected unmatched plugin error"
return unmatched
@implementer(IServer)
class NativeStorageServer(service.MultiService):
"""I hold information about a storage server that we want to connect to.
@ -831,7 +961,7 @@ class NativeStorageServer(service.MultiService):
self._grid_manager_verifier = grid_manager_verifier
self._storage = self._make_storage_system(node_config, config, ann)
self._storage = _make_storage_system(node_config, config, ann, self._server_id, self.get_rref)
self.last_connect_time = None
self.last_loss_time = None
@ -856,63 +986,6 @@ class NativeStorageServer(service.MultiService):
return True
return self._grid_manager_verifier()
def _make_storage_system(self, node_config, config, ann):
"""
:param allmydata.node._Config node_config: The node configuration to pass
to any configured storage plugins.
:param StorageClientConfig config: Configuration specifying desired
storage client behavior.
:param dict ann: The storage announcement from the storage server we
are meant to communicate with.
:return IFoolscapStorageServer: An object enabling communication via
Foolscap with the server which generated the announcement.
"""
# Try to match the announcement against a plugin.
try:
furl, storage_server = _storage_from_foolscap_plugin(
node_config,
config,
ann,
# Pass in an accessor for our _rref attribute. The value of
# the attribute may change over time as connections are lost
# and re-established. The _StorageServer should always be
# able to get the most up-to-date value.
self.get_rref,
)
except AnnouncementNotMatched:
# Nope.
pass
else:
return _FoolscapStorage.from_announcement(
self._server_id,
furl,
ann,
storage_server,
)
# Try to match the announcement against the anonymous access scheme.
try:
furl = ann[u"anonymous-storage-FURL"]
except KeyError:
# Nope
pass
else:
# See comment above for the _storage_from_foolscap_plugin case
# about passing in get_rref.
storage_server = _StorageServer(get_rref=self.get_rref)
return _FoolscapStorage.from_announcement(
self._server_id,
furl,
ann,
storage_server,
)
# Nothing matched so we can't talk to this server.
return _null_storage
def get_permutation_seed(self):
return self._storage.permutation_seed
def get_name(self): # keep methodname short
@ -1428,7 +1501,7 @@ class _FakeRemoteReference(object):
result = yield getattr(self.local_object, action)(*args, **kwargs)
defer.returnValue(result)
except HTTPClientException as e:
raise RemoteException(e.args)
raise RemoteException((e.code, e.message, e.body))
@attr.s

View File

@ -23,6 +23,9 @@ import click.testing
from ..common_util import (
run_cli,
)
from ..common import (
superuser,
)
from twisted.internet.defer import (
inlineCallbacks,
)
@ -34,7 +37,6 @@ from twisted.python.runtime import (
)
from allmydata.util import jsonbytes as json
class GridManagerCommandLine(TestCase):
"""
Test the mechanics of the `grid-manager` command
@ -223,7 +225,7 @@ class GridManagerCommandLine(TestCase):
)
@skipIf(platform.isWindows(), "We don't know how to set permissions on Windows.")
@skipIf(os.getuid() == 0, "cannot test as superuser with all permissions")
@skipIf(superuser, "cannot test as superuser with all permissions")
def test_sign_bad_perms(self):
"""
Error reported if we can't create certificate file

View File

@ -264,7 +264,7 @@ class RunTests(SyncTestCase):
self.assertThat(runs, Equals([]))
self.assertThat(result_code, Equals(1))
good_file_content_re = re.compile(r"\w[0-9]*\w[0-9]*\w")
good_file_content_re = re.compile(r"\s[0-9]*\s[0-9]*\s", re.M)
@given(text())
def test_pidfile_contents(self, content):

View File

@ -117,6 +117,10 @@ from subprocess import (
PIPE,
)
# Is the process running as an OS user with elevated privileges (ie, root)?
# We only know how to determine this for POSIX systems.
superuser = getattr(os, "getuid", lambda: -1)() == 0
EMPTY_CLIENT_CONFIG = config_from_string(
"/dev/null",
"tub.port",
@ -303,13 +307,17 @@ class UseNode(object):
if self.plugin_config is None:
plugin_config_section = ""
else:
plugin_config_section = """
[storageclient.plugins.{storage_plugin}]
{config}
""".format(
storage_plugin=self.storage_plugin,
config=format_config_items(self.plugin_config),
)
plugin_config_section = (
"[storageclient.plugins.{storage_plugin}]\n"
"{config}\n").format(
storage_plugin=self.storage_plugin,
config=format_config_items(self.plugin_config),
)
if self.storage_plugin is None:
plugins = ""
else:
plugins = "storage.plugins = {}".format(self.storage_plugin)
write_introducer(
self.basedir,
@ -336,18 +344,17 @@ class UseNode(object):
self.config = config_from_string(
self.basedir.asTextMode().path,
"tub.port",
"""
[node]
{node_config}
[client]
storage.plugins = {storage_plugin}
{plugin_config_section}
""".format(
storage_plugin=self.storage_plugin,
node_config=format_config_items(node_config),
plugin_config_section=plugin_config_section,
)
"[node]\n"
"{node_config}\n"
"\n"
"[client]\n"
"{plugins}\n"
"{plugin_config_section}\n"
.format(
plugins=plugins,
node_config=format_config_items(node_config),
plugin_config_section=plugin_config_section,
)
)
def create_node(self):

View File

@ -77,6 +77,7 @@ from allmydata.scripts.common import (
from foolscap.api import flushEventualQueue
import allmydata.test.common_util as testutil
from .common import (
superuser,
EMPTY_CLIENT_CONFIG,
SyncTestCase,
AsyncBrokenTestCase,
@ -151,7 +152,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
# EnvironmentError when reading a file that really exists), on
# windows, please fix this
@skipIf(platform.isWindows(), "We don't know how to set permissions on Windows.")
@skipIf(os.getuid() == 0, "cannot test as superuser with all permissions")
@skipIf(superuser, "cannot test as superuser with all permissions")
def test_unreadable_config(self):
basedir = "test_client.Basic.test_unreadable_config"
os.mkdir(basedir)

View File

@ -62,6 +62,7 @@ from .common import (
ConstantAddresses,
SameProcessStreamEndpointAssigner,
UseNode,
superuser,
)
def port_numbers():
@ -325,7 +326,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
self.assertEqual(config.items("nosuch", default), default)
@skipIf(platform.isWindows(), "We don't know how to set permissions on Windows.")
@skipIf(os.getuid() == 0, "cannot test as superuser with all permissions")
@skipIf(superuser, "cannot test as superuser with all permissions")
def test_private_config_unreadable(self):
"""
Asking for inaccessible private config is an error
@ -341,7 +342,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
config.get_or_create_private_config("foo")
@skipIf(platform.isWindows(), "We don't know how to set permissions on Windows.")
@skipIf(os.getuid() == 0, "cannot test as superuser with all permissions")
@skipIf(superuser, "cannot test as superuser with all permissions")
def test_private_config_unreadable_preexisting(self):
"""
error if reading private config data fails
@ -398,7 +399,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
self.assertEqual(len(counter), 1) # don't call unless necessary
self.assertEqual(value, "newer")
@skipIf(os.getuid() == 0, "cannot test as superuser with all permissions")
@skipIf(superuser, "cannot test as superuser with all permissions")
def test_write_config_unwritable_file(self):
"""
Existing behavior merely logs any errors upon writing

View File

@ -8,7 +8,7 @@ from json import (
loads,
)
import hashlib
from typing import Union, Any
from typing import Union, Any, Optional
from hyperlink import DecodedURL
from fixtures import (
@ -89,6 +89,8 @@ from allmydata.storage_client import (
IFoolscapStorageServer,
NativeStorageServer,
StorageFarmBroker,
StorageClientConfig,
MissingPlugin,
_FoolscapStorage,
_NullStorage,
_pick_a_http_server,
@ -170,16 +172,21 @@ class UnrecognizedAnnouncement(unittest.TestCase):
an announcement generated by a storage server plugin which is not loaded
in the client.
"""
plugin_name = u"tahoe-lafs-testing-v1"
ann = {
u"name": u"tahoe-lafs-testing-v1",
u"any-parameter": 12345,
u"storage-options": [
{
u"name": plugin_name,
u"any-parameter": 12345,
},
],
}
server_id = b"abc"
def _tub_maker(self, overrides):
return Service()
def native_storage_server(self):
def native_storage_server(self, config: Optional[StorageClientConfig] = None) -> NativeStorageServer:
"""
Make a ``NativeStorageServer`` out of an unrecognizable announcement.
"""
@ -188,7 +195,8 @@ class UnrecognizedAnnouncement(unittest.TestCase):
self.ann,
self._tub_maker,
{},
EMPTY_CLIENT_CONFIG,
node_config=EMPTY_CLIENT_CONFIG,
config=config if config is not None else StorageClientConfig(),
)
def test_no_exceptions(self):
@ -235,6 +243,18 @@ class UnrecognizedAnnouncement(unittest.TestCase):
server.get_foolscap_write_enabler_seed()
server.get_nickname()
def test_missing_plugin(self) -> None:
"""
An exception is produced if the plugin is missing
"""
with self.assertRaises(MissingPlugin):
self.native_storage_server(
StorageClientConfig(
storage_plugins={
"missing-plugin-name": {}
}
)
)
class PluginMatchedAnnouncement(SyncTestCase):

View File

@ -62,6 +62,7 @@ from ..storage.http_server import (
_add_error_handling,
read_encoded,
_SCHEMAS as SERVER_SCHEMAS,
BaseApp,
)
from ..storage.http_client import (
StorageClient,
@ -257,7 +258,7 @@ def gen_bytes(length: int) -> bytes:
return result
class TestApp(object):
class TestApp(BaseApp):
"""HTTP API for testing purposes."""
clock: IReactorTime
@ -265,7 +266,7 @@ class TestApp(object):
_add_error_handling(_app)
_swissnum = SWISSNUM_FOR_TEST # Match what the test client is using
@_authorized_route(_app, {}, "/noop", methods=["GET"])
@_authorized_route(_app, set(), "/noop", methods=["GET"])
def noop(self, request, authorization):
return "noop"

View File

@ -109,9 +109,11 @@ class PinningHTTPSValidation(AsyncTestCase):
root.isLeaf = True
listening_port = await endpoint.listen(Site(root))
try:
yield f"https://127.0.0.1:{listening_port.getHost().port}/"
yield f"https://127.0.0.1:{listening_port.getHost().port}/" # type: ignore[attr-defined]
finally:
await listening_port.stopListening()
result = listening_port.stopListening()
if result is not None:
await result
def request(self, url: str, expected_certificate: x509.Certificate):
"""

View File

@ -0,0 +1,50 @@
"""
Utilities related to attrs
Handling for zope.interface is deprecated in attrs so we copy the
relevant support method here since we depend on zope.interface anyway
"""
from attr._make import attrs, attrib
@attrs(repr=False, slots=True, hash=True)
class _ProvidesValidator:
interface = attrib()
def __call__(self, inst, attr, value):
"""
We use a callable class to be able to change the ``__repr__``.
"""
if not self.interface.providedBy(value):
raise TypeError(
"'{name}' must provide {interface!r} which {value!r} "
"doesn't.".format(
name=attr.name, interface=self.interface, value=value
),
attr,
self.interface,
value,
)
def __repr__(self):
return "<provides validator for interface {interface!r}>".format(
interface=self.interface
)
def provides(interface):
"""
A validator that raises a `TypeError` if the initializer is called
with an object that does not provide the requested *interface* (checks are
performed using ``interface.providedBy(value)`` (see `zope.interface
<https://zopeinterface.readthedocs.io/en/latest/>`_).
:param interface: The interface to check for.
:type interface: ``zope.interface.Interface``
:raises TypeError: With a human readable error message, the attribute
(of type `attrs.Attribute`), the expected interface, and the
value it got.
"""
return _ProvidesValidator(interface)