Merge pull request #1208 from tahoe-lafs/2916.grid-manager-integration-tests.2

Grid manager integration tests

Also some re-factoring of the integration test infrastructure to facilitate tests running their own grids.
This commit is contained in:
meejah 2023-08-02 17:54:38 -06:00 committed by GitHub
commit fc413d49f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1153 additions and 350 deletions

View File

@ -7,16 +7,11 @@ from __future__ import annotations
import os
import sys
import shutil
from attr import frozen
from time import sleep
from os import mkdir, listdir, environ
from os import mkdir, environ
from os.path import join, exists
from tempfile import mkdtemp, mktemp
from functools import partial
from json import loads
from foolscap.furl import (
decode_furl,
)
from tempfile import mkdtemp
from eliot import (
to_file,
@ -25,7 +20,7 @@ from eliot import (
from twisted.python.filepath import FilePath
from twisted.python.procutils import which
from twisted.internet.defer import DeferredList
from twisted.internet.defer import DeferredList, succeed
from twisted.internet.error import (
ProcessExitedAlready,
ProcessTerminated,
@ -33,22 +28,23 @@ from twisted.internet.error import (
import pytest
import pytest_twisted
from typing import Mapping
from .util import (
_CollectOutputProtocol,
_MagicTextProtocol,
_DumpOutputProtocol,
_ProcessExitedProtocol,
_create_node,
_cleanup_tahoe_process,
_tahoe_runner_optional_coverage,
await_client_ready,
TahoeProcess,
cli,
generate_ssh_key,
block_with_timeout,
)
from .grid import (
create_flog_gatherer,
create_grid,
)
from allmydata.node import read_config
from allmydata.util.iputil import allocate_tcp_port
# No reason for HTTP requests to take longer than four minutes in the
# integration tests. See allmydata/scripts/common_http.py for usage.
@ -116,6 +112,18 @@ def reactor():
return _reactor
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:port_allocator", include_result=False)
def port_allocator(reactor):
# these will appear basically random, which can make especially
# manual debugging harder but we're re-using code instead of
# writing our own...so, win?
def allocate():
port = allocate_tcp_port()
return succeed(port)
return allocate
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:temp_dir", include_args=[])
def temp_dir(request) -> str:
@ -150,133 +158,36 @@ def flog_binary():
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:flog_gatherer", include_args=[])
def flog_gatherer(reactor, temp_dir, flog_binary, request):
out_protocol = _CollectOutputProtocol()
gather_dir = join(temp_dir, 'flog_gather')
reactor.spawnProcess(
out_protocol,
flog_binary,
(
'flogtool', 'create-gatherer',
'--location', 'tcp:localhost:3117',
'--port', '3117',
gather_dir,
),
env=environ,
fg = pytest_twisted.blockon(
create_flog_gatherer(reactor, request, temp_dir, flog_binary)
)
pytest_twisted.blockon(out_protocol.done)
twistd_protocol = _MagicTextProtocol("Gatherer waiting at", "gatherer")
twistd_process = reactor.spawnProcess(
twistd_protocol,
which('twistd')[0],
(
'twistd', '--nodaemon', '--python',
join(gather_dir, 'gatherer.tac'),
),
path=gather_dir,
env=environ,
)
pytest_twisted.blockon(twistd_protocol.magic_seen)
def cleanup():
_cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
flog_file = mktemp('.flog_dump')
flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
flog_dir = join(temp_dir, 'flog_gather')
flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')]
print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file))
reactor.spawnProcess(
flog_protocol,
flog_binary,
(
'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0])
),
env=environ,
)
print("Waiting for flogtool to complete")
try:
block_with_timeout(flog_protocol.done, reactor)
except ProcessTerminated as e:
print("flogtool exited unexpectedly: {}".format(str(e)))
print("Flogtool completed")
request.addfinalizer(cleanup)
with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f:
furl = f.read().strip()
return furl
return fg
@pytest.fixture(scope='session')
@log_call(
action_type=u"integration:introducer",
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def introducer(reactor, temp_dir, flog_gatherer, request):
intro_dir = join(temp_dir, 'introducer')
print("making introducer", intro_dir)
@log_call(action_type=u"integration:grid", include_args=[])
def grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
Provides a new Grid with a single Introducer and flog-gathering process.
if not exists(intro_dir):
mkdir(intro_dir)
done_proto = _ProcessExitedProtocol()
_tahoe_runner_optional_coverage(
done_proto,
reactor,
request,
(
'create-introducer',
'--listen=tcp',
'--hostname=localhost',
intro_dir,
),
)
pytest_twisted.blockon(done_proto.done)
config = read_config(intro_dir, "tub.port")
config.set_config("node", "nickname", "introducer-tor")
config.set_config("node", "web.port", "4562")
config.set_config("node", "log_gatherer.furl", flog_gatherer)
# "tahoe run" is consistent across Linux/macOS/Windows, unlike the old
# "start" command.
protocol = _MagicTextProtocol('introducer running', "introducer")
transport = _tahoe_runner_optional_coverage(
protocol,
reactor,
request,
(
'run',
intro_dir,
),
Notably does _not_ provide storage servers; use the storage_nodes
fixture if your tests need a Grid that can be used for puts / gets.
"""
g = pytest_twisted.blockon(
create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
)
request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited))
return g
pytest_twisted.blockon(protocol.magic_seen)
return TahoeProcess(transport, intro_dir)
@pytest.fixture(scope='session')
def introducer(grid):
return grid.introducer
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"])
def introducer_furl(introducer, temp_dir):
furl_fname = join(temp_dir, 'introducer', 'private', 'introducer.furl')
while not exists(furl_fname):
print("Don't see {} yet".format(furl_fname))
sleep(.1)
furl = open(furl_fname, 'r').read()
tubID, location_hints, name = decode_furl(furl)
if not location_hints:
# If there are no location hints then nothing can ever possibly
# connect to it and the only thing that can happen next is something
# will hang or time out. So just give up right now.
raise ValueError(
"Introducer ({!r}) fURL has no location hints!".format(
introducer_furl,
),
)
return furl
return introducer.furl
@pytest.fixture
@ -285,7 +196,7 @@ def introducer_furl(introducer, temp_dir):
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_control_port):
def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_network):
intro_dir = join(temp_dir, 'introducer_tor')
print("making Tor introducer in {}".format(intro_dir))
print("(this can take tens of seconds to allocate Onion address)")
@ -299,7 +210,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_control_port):
request,
(
'create-introducer',
'--tor-control-port', tor_control_port,
'--tor-control-port', tor_network.client_control_endpoint,
'--hide-ip',
'--listen=tor',
intro_dir,
@ -311,7 +222,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_control_port):
config = read_config(intro_dir, "tub.port")
config.set_config("node", "nickname", "introducer-tor")
config.set_config("node", "web.port", "4561")
config.set_config("node", "log_gatherer.furl", flog_gatherer)
config.set_config("node", "log_gatherer.furl", flog_gatherer.furl)
# "tahoe run" is consistent across Linux/macOS/Windows, unlike the old
# "start" command.
@ -354,87 +265,31 @@ def tor_introducer_furl(tor_introducer, temp_dir):
@pytest.fixture(scope='session')
@log_call(
action_type=u"integration:storage_nodes",
include_args=["temp_dir", "introducer_furl", "flog_gatherer"],
include_args=["grid"],
include_result=False,
)
def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request):
def storage_nodes(grid):
nodes_d = []
# start all 5 nodes in parallel
for x in range(5):
name = 'node{}'.format(x)
web_port= 9990 + x
nodes_d.append(
_create_node(
reactor, request, temp_dir, introducer_furl, flog_gatherer, name,
web_port="tcp:{}:interface=localhost".format(web_port),
storage=True,
)
)
nodes_status = pytest_twisted.blockon(DeferredList(nodes_d))
nodes = []
for ok, process in nodes_status:
assert ok, "Storage node creation failed: {}".format(process)
nodes.append(process)
return nodes
nodes_d.append(grid.add_storage_node())
nodes_status = pytest_twisted.blockon(DeferredList(nodes_d))
for ok, value in nodes_status:
assert ok, "Storage node creation failed: {}".format(value)
return grid.storage_servers
@pytest.fixture(scope="session")
def alice_sftp_client_key_path(temp_dir):
# The client SSH key path is typically going to be somewhere else (~/.ssh,
# typically), but for convenience sake for testing we'll put it inside node.
return join(temp_dir, "alice", "private", "ssh_client_rsa_key")
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:alice", include_args=[], include_result=False)
def alice(
reactor,
temp_dir,
introducer_furl,
flog_gatherer,
storage_nodes,
alice_sftp_client_key_path,
request,
):
process = pytest_twisted.blockon(
_create_node(
reactor, request, temp_dir, introducer_furl, flog_gatherer, "alice",
web_port="tcp:9980:interface=localhost",
storage=False,
)
)
pytest_twisted.blockon(await_client_ready(process))
# 1. Create a new RW directory cap:
cli(process, "create-alias", "test")
rwcap = loads(cli(process, "list-aliases", "--json"))["test"]["readwrite"]
# 2. Enable SFTP on the node:
host_ssh_key_path = join(process.node_dir, "private", "ssh_host_rsa_key")
accounts_path = join(process.node_dir, "private", "accounts")
with open(join(process.node_dir, "tahoe.cfg"), "a") as f:
f.write("""\
[sftpd]
enabled = true
port = tcp:8022:interface=127.0.0.1
host_pubkey_file = {ssh_key_path}.pub
host_privkey_file = {ssh_key_path}
accounts.file = {accounts_path}
""".format(ssh_key_path=host_ssh_key_path, accounts_path=accounts_path))
generate_ssh_key(host_ssh_key_path)
# 3. Add a SFTP access file with an SSH key for auth.
generate_ssh_key(alice_sftp_client_key_path)
# Pub key format is "ssh-rsa <thekey> <username>". We want the key.
ssh_public_key = open(alice_sftp_client_key_path + ".pub").read().strip().split()[1]
with open(accounts_path, "w") as f:
f.write("""\
alice-key ssh-rsa {ssh_public_key} {rwcap}
""".format(rwcap=rwcap, ssh_public_key=ssh_public_key))
# 4. Restart the node with new SFTP config.
pytest_twisted.blockon(process.restart_async(reactor, request))
pytest_twisted.blockon(await_client_ready(process))
print(f"Alice pid: {process.transport.pid}")
return process
def alice(reactor, request, grid, storage_nodes):
"""
:returns grid.Client: the associated instance for Alice
"""
alice = pytest_twisted.blockon(grid.add_client("alice"))
pytest_twisted.blockon(alice.add_sftp(reactor, request))
print(f"Alice pid: {alice.process.transport.pid}")
return alice
@pytest.fixture(scope='session')
@ -455,6 +310,12 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques
@pytest.mark.skipif(sys.platform.startswith('win'),
'Tor tests are unstable on Windows')
def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]:
"""
Install the Chutney software that is required to run a small local Tor grid.
(Chutney lacks the normal "python stuff" so we can't just declare
it in Tox or similar dependencies)
"""
# Try to find Chutney already installed in the environment.
try:
import chutney
@ -512,19 +373,23 @@ def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]:
)
pytest_twisted.blockon(proto.done)
return (chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")})
return chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")}
@pytest.fixture(scope='session')
def tor_control_port(tor_network):
@frozen
class ChutneyTorNetwork:
"""
Get an endpoint description for the Tor control port for the local Tor
network we run..
Represents a running Chutney (tor) network. Returned by the
"tor_network" fixture.
"""
# We ignore tor_network because it can't tell us the control port. But
# asking for it forces the Tor network to be built before we run - so if
# we get the hard-coded control port value correct, there should be
# something listening at that address.
return 'tcp:localhost:8007'
dir: FilePath
environ: Mapping[str, str]
client_control_port: int
@property
def client_control_endpoint(self) -> str:
return "tcp:localhost:{}".format(self.client_control_port)
@pytest.fixture(scope='session')
@pytest.mark.skipif(sys.platform.startswith('win'),
@ -533,6 +398,20 @@ def tor_network(reactor, temp_dir, chutney, request):
"""
Build a basic Tor network.
Instantiate the "networks/basic" Chutney configuration for a local
Tor network.
This provides a small, local Tor network that can run v3 Onion
Services. It has 3 authorities, 5 relays and 2 clients.
The 'chutney' fixture pins a Chutney git qrevision, so things
shouldn't change. This network has two clients which are the only
nodes with valid SocksPort configuration ("008c" and "009c" 9008
and 9009)
The control ports start at 8000 (so the ControlPort for the client
nodes are 8008 and 8009).
:param chutney: The root directory of a Chutney checkout and a dict of
additional environment variables to set so a Python process can use
it.
@ -608,3 +487,11 @@ def tor_network(reactor, temp_dir, chutney, request):
pytest_twisted.blockon(chutney(("status", basic_network)))
except ProcessTerminated:
print("Chutney.TorNet status failed (continuing)")
# the "8008" comes from configuring "networks/basic" in chutney
# and then examining "net/nodes/008c/torrc" for ControlPort value
return ChutneyTorNetwork(
chutney_root,
chutney_env,
8008,
)

528
integration/grid.py Normal file
View File

@ -0,0 +1,528 @@
"""
Classes which directly represent various kinds of Tahoe processes
that co-operate to for "a Grid".
These methods and objects are used by conftest.py fixtures but may
also be used as direct helpers for tests that don't want to (or can't)
rely on 'the' global grid as provided by fixtures like 'alice' or
'storage_servers'.
"""
from os import mkdir, listdir
from os.path import join, exists
from json import loads
from tempfile import mktemp
from time import sleep
from eliot import (
log_call,
)
from foolscap.furl import (
decode_furl,
)
from twisted.python.procutils import which
from twisted.internet.defer import (
inlineCallbacks,
returnValue,
Deferred,
)
from twisted.internet.task import (
deferLater,
)
from twisted.internet.protocol import ProcessProtocol # see ticket 4056
from twisted.internet.error import ProcessTerminated
from allmydata.node import read_config
from .util import (
_CollectOutputProtocol,
_MagicTextProtocol,
_DumpOutputProtocol,
_ProcessExitedProtocol,
_run_node,
_cleanup_tahoe_process,
_tahoe_runner_optional_coverage,
TahoeProcess,
await_client_ready,
generate_ssh_key,
cli,
reconfigure,
_create_node,
)
import attr
import pytest_twisted
# currently, we pass a "request" around a bunch but it seems to only
# be for addfinalizer() calls.
# - is "keeping" a request like that okay? What if it's a session-scoped one?
# (i.e. in Grid etc)
# - maybe limit to "a callback to hang your cleanup off of" (instead of request)?
@attr.s
class FlogGatherer(object):
"""
Flog Gatherer process.
"""
# it would be best to use attr.validators.provides() here with the
# corresponding Twisted interface (IProcessTransport,
# IProcessProtocol) but that is deprecated; please replace with
# our own "provides" as part of
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4056#ticket
# insisting on a subclass is narrower than necessary
process = attr.ib()
protocol = attr.ib(
validator=attr.validators.instance_of(ProcessProtocol)
)
furl = attr.ib()
@inlineCallbacks
def create_flog_gatherer(reactor, request, temp_dir, flog_binary):
out_protocol = _CollectOutputProtocol()
gather_dir = join(temp_dir, 'flog_gather')
reactor.spawnProcess(
out_protocol,
flog_binary,
(
'flogtool', 'create-gatherer',
'--location', 'tcp:localhost:3117',
'--port', '3117',
gather_dir,
)
)
yield out_protocol.done
twistd_protocol = _MagicTextProtocol("Gatherer waiting at", "gatherer")
twistd_process = reactor.spawnProcess(
twistd_protocol,
which('twistd')[0],
(
'twistd', '--nodaemon', '--python',
join(gather_dir, 'gatherer.tac'),
),
path=gather_dir,
)
yield twistd_protocol.magic_seen
def cleanup():
_cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
flog_file = mktemp('.flog_dump')
flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
flog_dir = join(temp_dir, 'flog_gather')
flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')]
print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file))
for flog_path in flogs:
reactor.spawnProcess(
flog_protocol,
flog_binary,
(
'flogtool', 'dump', join(temp_dir, 'flog_gather', flog_path)
),
)
print("Waiting for flogtool to complete")
try:
pytest_twisted.blockon(flog_protocol.done)
except ProcessTerminated as e:
print("flogtool exited unexpectedly: {}".format(str(e)))
print("Flogtool completed")
request.addfinalizer(cleanup)
with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f:
furl = f.read().strip()
returnValue(
FlogGatherer(
protocol=twistd_protocol,
process=twistd_process,
furl=furl,
)
)
@attr.s
class StorageServer(object):
"""
Represents a Tahoe Storage Server
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=attr.validators.instance_of(ProcessProtocol)
)
@inlineCallbacks
def restart(self, reactor, request):
"""
re-start our underlying process by issuing a TERM, waiting and
then running again. await_client_ready() will be done as well
Note that self.process and self.protocol will be new instances
after this.
"""
self.process.transport.signalProcess('TERM')
yield self.protocol.exited
self.process = yield _run_node(
reactor, self.process.node_dir, request, None,
)
self.protocol = self.process.transport.proto
yield await_client_ready(self.process)
@inlineCallbacks
def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
needed=2, happy=3, total=4):
"""
Create a new storage server
"""
node_process = yield _create_node(
reactor, request, temp_dir, introducer.furl, flog_gatherer,
name, web_port, storage=True, needed=needed, happy=happy, total=total,
)
storage = StorageServer(
process=node_process,
# node_process is a TahoeProcess. its transport is an
# IProcessTransport. in practice, this means it is a
# twisted.internet._baseprocess.BaseProcess. BaseProcess records the
# process protocol as its proto attribute.
protocol=node_process.transport.proto,
)
returnValue(storage)
@attr.s
class Client(object):
"""
Represents a Tahoe client
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=attr.validators.instance_of(ProcessProtocol)
)
request = attr.ib() # original request, for addfinalizer()
## XXX convenience? or confusion?
# @property
# def node_dir(self):
# return self.process.node_dir
@inlineCallbacks
def reconfigure_zfec(self, reactor, zfec_params, convergence=None, max_segment_size=None):
"""
Reconfigure the ZFEC parameters for this node
"""
# XXX this is a stop-gap to keep tests running "as is"
# -> we should fix the tests so that they create a new client
# in the grid with the required parameters, instead of
# re-configuring Alice (or whomever)
rtn = yield Deferred.fromCoroutine(
reconfigure(reactor, self.request, self.process, zfec_params, convergence, max_segment_size)
)
return rtn
@inlineCallbacks
def restart(self, reactor, request, servers=1):
"""
re-start our underlying process by issuing a TERM, waiting and
then running again.
:param int servers: number of server connections we will wait
for before being 'ready'
Note that self.process and self.protocol will be new instances
after this.
"""
# XXX similar to above, can we make this return a new instance
# instead of mutating?
self.process.transport.signalProcess('TERM')
yield self.protocol.exited
process = yield _run_node(
reactor, self.process.node_dir, request, None,
)
self.process = process
self.protocol = self.process.transport.proto
yield await_client_ready(self.process, minimum_number_of_servers=servers)
@inlineCallbacks
def add_sftp(self, reactor, request):
"""
"""
# if other things need to add or change configuration, further
# refactoring could be useful here (i.e. move reconfigure
# parts to their own functions)
# XXX why do we need an alias?
# 1. Create a new RW directory cap:
cli(self.process, "create-alias", "test")
rwcap = loads(cli(self.process, "list-aliases", "--json"))["test"]["readwrite"]
# 2. Enable SFTP on the node:
host_ssh_key_path = join(self.process.node_dir, "private", "ssh_host_rsa_key")
sftp_client_key_path = join(self.process.node_dir, "private", "ssh_client_rsa_key")
accounts_path = join(self.process.node_dir, "private", "accounts")
with open(join(self.process.node_dir, "tahoe.cfg"), "a") as f:
f.write(
("\n\n[sftpd]\n"
"enabled = true\n"
"port = tcp:8022:interface=127.0.0.1\n"
"host_pubkey_file = {ssh_key_path}.pub\n"
"host_privkey_file = {ssh_key_path}\n"
"accounts.file = {accounts_path}\n").format(
ssh_key_path=host_ssh_key_path,
accounts_path=accounts_path,
)
)
generate_ssh_key(host_ssh_key_path)
# 3. Add a SFTP access file with an SSH key for auth.
generate_ssh_key(sftp_client_key_path)
# Pub key format is "ssh-rsa <thekey> <username>". We want the key.
with open(sftp_client_key_path + ".pub") as pubkey_file:
ssh_public_key = pubkey_file.read().strip().split()[1]
with open(accounts_path, "w") as f:
f.write(
"alice-key ssh-rsa {ssh_public_key} {rwcap}\n".format(
rwcap=rwcap,
ssh_public_key=ssh_public_key,
)
)
# 4. Restart the node with new SFTP config.
print("restarting for SFTP")
yield self.restart(reactor, request)
print("restart done")
# XXX i think this is broken because we're "waiting for ready" during first bootstrap? or something?
@inlineCallbacks
def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
needed=2, happy=3, total=4):
"""
Create a new storage server
"""
from .util import _create_node
node_process = yield _create_node(
reactor, request, temp_dir, introducer.furl, flog_gatherer,
name, web_port, storage=False, needed=needed, happy=happy, total=total,
)
returnValue(
Client(
process=node_process,
protocol=node_process.transport.proto,
request=request,
)
)
@attr.s
class Introducer(object):
"""
Reprsents a running introducer
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=attr.validators.instance_of(ProcessProtocol)
)
furl = attr.ib()
def _validate_furl(furl_fname):
"""
Opens and validates a fURL, ensuring location hints.
:returns: the furl
:raises: ValueError if no location hints
"""
while not exists(furl_fname):
print("Don't see {} yet".format(furl_fname))
sleep(.1)
furl = open(furl_fname, 'r').read()
tubID, location_hints, name = decode_furl(furl)
if not location_hints:
# If there are no location hints then nothing can ever possibly
# connect to it and the only thing that can happen next is something
# will hang or time out. So just give up right now.
raise ValueError(
"Introducer ({!r}) fURL has no location hints!".format(
furl,
),
)
return furl
@inlineCallbacks
@log_call(
action_type=u"integration:introducer",
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def create_introducer(reactor, request, temp_dir, flog_gatherer, port):
"""
Run a new Introducer and return an Introducer instance.
"""
intro_dir = join(temp_dir, 'introducer{}'.format(port))
if not exists(intro_dir):
mkdir(intro_dir)
done_proto = _ProcessExitedProtocol()
_tahoe_runner_optional_coverage(
done_proto,
reactor,
request,
(
'create-introducer',
'--listen=tcp',
'--hostname=localhost',
intro_dir,
),
)
yield done_proto.done
config = read_config(intro_dir, "tub.port")
config.set_config("node", "nickname", f"introducer-{port}")
config.set_config("node", "web.port", f"{port}")
config.set_config("node", "log_gatherer.furl", flog_gatherer.furl)
# on windows, "tahoe start" means: run forever in the foreground,
# but on linux it means daemonize. "tahoe run" is consistent
# between platforms.
protocol = _MagicTextProtocol('introducer running', "introducer")
transport = _tahoe_runner_optional_coverage(
protocol,
reactor,
request,
(
'run',
intro_dir,
),
)
def clean():
return _cleanup_tahoe_process(transport, protocol.exited)
request.addfinalizer(clean)
yield protocol.magic_seen
furl_fname = join(intro_dir, 'private', 'introducer.furl')
while not exists(furl_fname):
print("Don't see {} yet".format(furl_fname))
yield deferLater(reactor, .1, lambda: None)
furl = _validate_furl(furl_fname)
returnValue(
Introducer(
process=TahoeProcess(transport, intro_dir),
protocol=protocol,
furl=furl,
)
)
@attr.s
class Grid(object):
"""
Represents an entire Tahoe Grid setup
A Grid includes an Introducer, Flog Gatherer and some number of
Storage Servers. Optionally includes Clients.
"""
_reactor = attr.ib()
_request = attr.ib()
_temp_dir = attr.ib()
_port_allocator = attr.ib()
introducer = attr.ib()
flog_gatherer = attr.ib()
storage_servers = attr.ib(factory=list)
clients = attr.ib(factory=dict)
@storage_servers.validator
def check(self, attribute, value):
for server in value:
if not isinstance(server, StorageServer):
raise ValueError(
"storage_servers must be StorageServer"
)
@inlineCallbacks
def add_storage_node(self):
"""
Creates a new storage node, returns a StorageServer instance
(which will already be added to our .storage_servers list)
"""
port = yield self._port_allocator()
print("make {}".format(port))
name = 'node{}'.format(port)
web_port = 'tcp:{}:interface=localhost'.format(port)
server = yield create_storage_server(
self._reactor,
self._request,
self._temp_dir,
self.introducer,
self.flog_gatherer,
name,
web_port,
)
self.storage_servers.append(server)
returnValue(server)
@inlineCallbacks
def add_client(self, name, needed=2, happy=3, total=4):
"""
Create a new client node
"""
port = yield self._port_allocator()
web_port = 'tcp:{}:interface=localhost'.format(port)
client = yield create_client(
self._reactor,
self._request,
self._temp_dir,
self.introducer,
self.flog_gatherer,
name,
web_port,
needed=needed,
happy=happy,
total=total,
)
self.clients[name] = client
yield await_client_ready(client.process)
returnValue(client)
# A grid is now forever tied to its original 'request' which is where
# it must hang finalizers off of. The "main" one is a session-level
# fixture so it'll live the life of the tests but it could be
# per-function Grid too.
@inlineCallbacks
def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
Create a new grid. This will have one Introducer but zero
storage-servers or clients; those must be added by a test or
subsequent fixtures.
"""
intro_port = yield port_allocator()
introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port)
grid = Grid(
reactor,
request,
temp_dir,
port_allocator,
introducer,
flog_gatherer,
)
returnValue(grid)

View File

@ -8,9 +8,8 @@ from subprocess import Popen, PIPE, check_output, check_call
import pytest
from twisted.internet import reactor
from twisted.internet.threads import blockingCallFromThread
from twisted.internet.defer import Deferred
from .util import run_in_thread, cli, reconfigure
from .util import run_in_thread, cli
DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11"
try:
@ -23,7 +22,7 @@ else:
@pytest.fixture(scope="session")
def get_put_alias(alice):
cli(alice, "create-alias", "getput")
cli(alice.process, "create-alias", "getput")
def read_bytes(path):
@ -39,14 +38,14 @@ def test_put_from_stdin(alice, get_put_alias, tmpdir):
"""
tempfile = str(tmpdir.join("file"))
p = Popen(
["tahoe", "--node-directory", alice.node_dir, "put", "-", "getput:fromstdin"],
["tahoe", "--node-directory", alice.process.node_dir, "put", "-", "getput:fromstdin"],
stdin=PIPE
)
p.stdin.write(DATA)
p.stdin.close()
assert p.wait() == 0
cli(alice, "get", "getput:fromstdin", tempfile)
cli(alice.process, "get", "getput:fromstdin", tempfile)
assert read_bytes(tempfile) == DATA
@ -58,10 +57,10 @@ def test_get_to_stdout(alice, get_put_alias, tmpdir):
tempfile = tmpdir.join("file")
with tempfile.open("wb") as f:
f.write(DATA)
cli(alice, "put", str(tempfile), "getput:tostdout")
cli(alice.process, "put", str(tempfile), "getput:tostdout")
p = Popen(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:tostdout", "-"],
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:tostdout", "-"],
stdout=PIPE
)
assert p.stdout.read() == DATA
@ -78,11 +77,11 @@ def test_large_file(alice, get_put_alias, tmp_path):
tempfile = tmp_path / "file"
with tempfile.open("wb") as f:
f.write(DATA * 1_000_000)
cli(alice, "put", str(tempfile), "getput:largefile")
cli(alice.process, "put", str(tempfile), "getput:largefile")
outfile = tmp_path / "out"
check_call(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:largefile", str(outfile)],
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:largefile", str(outfile)],
)
assert outfile.read_bytes() == tempfile.read_bytes()
@ -104,31 +103,29 @@ def test_upload_download_immutable_different_default_max_segment_size(alice, get
def set_segment_size(segment_size):
return blockingCallFromThread(
reactor,
lambda: Deferred.fromCoroutine(reconfigure(
lambda: alice.reconfigure_zfec(
reactor,
request,
alice,
(1, 1, 1),
None,
max_segment_size=segment_size
))
)
)
# 1. Upload file 1 with default segment size set to 1MB
set_segment_size(1024 * 1024)
cli(alice, "put", str(tempfile), "getput:seg1024kb")
cli(alice.process, "put", str(tempfile), "getput:seg1024kb")
# 2. Download file 1 with default segment size set to 128KB
set_segment_size(128 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg1024kb", "-"]
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg1024kb", "-"]
)
# 3. Upload file 2 with default segment size set to 128KB
cli(alice, "put", str(tempfile), "getput:seg128kb")
cli(alice.process, "put", str(tempfile), "getput:seg128kb")
# 4. Download file 2 with default segment size set to 1MB
set_segment_size(1024 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg128kb", "-"]
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg128kb", "-"]
)

View File

@ -0,0 +1,351 @@
import sys
import json
from os.path import join
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
)
from twisted.internet.utils import (
getProcessOutputAndValue,
)
from twisted.internet.defer import (
inlineCallbacks,
returnValue,
)
from allmydata.crypto import ed25519
from allmydata.util import base32
from allmydata.util import configutil
from . import util
from .grid import (
create_grid,
)
import pytest_twisted
@inlineCallbacks
def _run_gm(reactor, request, *args, **kwargs):
"""
Run the grid-manager process, passing all arguments as extra CLI
args.
:returns: all process output
"""
if request.config.getoption('coverage'):
base_args = ("-b", "-m", "coverage", "run", "-m", "allmydata.cli.grid_manager")
else:
base_args = ("-m", "allmydata.cli.grid_manager")
output, errput, exit_code = yield getProcessOutputAndValue(
sys.executable,
base_args + args,
reactor=reactor,
**kwargs
)
if exit_code != 0:
raise util.ProcessFailed(
RuntimeError("Exit code {}".format(exit_code)),
output + errput,
)
returnValue(output)
@pytest_twisted.inlineCallbacks
def test_create_certificate(reactor, request):
"""
The Grid Manager produces a valid, correctly-signed certificate.
"""
gm_config = yield _run_gm(reactor, request, "--config", "-", "create")
privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
privkey, pubkey = ed25519.signing_keypair_from_string(privkey_bytes)
# Note that zara + her key here are arbitrary and don't match any
# "actual" clients in the test-grid; we're just checking that the
# Grid Manager signs this properly.
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
stdinBytes=gm_config,
)
zara_cert_bytes = yield _run_gm(
reactor, request, "--config", "-", "sign", "zara", "1",
stdinBytes=gm_config,
)
zara_cert = json.loads(zara_cert_bytes)
# confirm that zara's certificate is made by the Grid Manager
# (.verify returns None on success, raises exception on error)
pubkey.verify(
base32.a2b(zara_cert['signature'].encode('ascii')),
zara_cert['certificate'].encode('ascii'),
)
@pytest_twisted.inlineCallbacks
def test_remove_client(reactor, request):
"""
A Grid Manager can add and successfully remove a client
"""
gm_config = yield _run_gm(
reactor, request, "--config", "-", "create",
)
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
stdinBytes=gm_config,
)
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq",
stdinBytes=gm_config,
)
assert "zara" in json.loads(gm_config)['storage_servers']
assert "yakov" in json.loads(gm_config)['storage_servers']
gm_config = yield _run_gm(
reactor, request, "--config", "-", "remove",
"zara",
stdinBytes=gm_config,
)
assert "zara" not in json.loads(gm_config)['storage_servers']
assert "yakov" in json.loads(gm_config)['storage_servers']
@pytest_twisted.inlineCallbacks
def test_remove_last_client(reactor, request):
"""
A Grid Manager can remove all clients
"""
gm_config = yield _run_gm(
reactor, request, "--config", "-", "create",
)
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
stdinBytes=gm_config,
)
assert "zara" in json.loads(gm_config)['storage_servers']
gm_config = yield _run_gm(
reactor, request, "--config", "-", "remove",
"zara",
stdinBytes=gm_config,
)
# there are no storage servers left at all now
assert "storage_servers" not in json.loads(gm_config)
@pytest_twisted.inlineCallbacks
def test_add_remove_client_file(reactor, request, temp_dir):
"""
A Grid Manager can add and successfully remove a client (when
keeping data on disk)
"""
gmconfig = join(temp_dir, "gmtest")
gmconfig_file = join(temp_dir, "gmtest", "config.json")
yield _run_gm(
reactor, request, "--config", gmconfig, "create",
)
yield _run_gm(
reactor, request, "--config", gmconfig, "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
)
yield _run_gm(
reactor, request, "--config", gmconfig, "add",
"yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq",
)
assert "zara" in json.load(open(gmconfig_file, "r"))['storage_servers']
assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers']
yield _run_gm(
reactor, request, "--config", gmconfig, "remove",
"zara",
)
assert "zara" not in json.load(open(gmconfig_file, "r"))['storage_servers']
assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers']
@pytest_twisted.inlineCallbacks
def _test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
A client with happines=2 fails to upload to a Grid when it is
using Grid Manager and there is only 1 storage server with a valid
certificate.
"""
grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
storage0 = yield grid.add_storage_node()
_ = yield grid.add_storage_node()
gm_config = yield _run_gm(
reactor, request, "--config", "-", "create",
)
gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes)
# create certificate for the first storage-server
pubkey_fname = join(storage0.process.node_dir, "node.pubkey")
with open(pubkey_fname, 'r') as f:
pubkey_str = f.read().strip()
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
"storage0", pubkey_str,
stdinBytes=gm_config,
)
assert json.loads(gm_config)['storage_servers'].keys() == {'storage0'}
print("inserting certificate")
cert = yield _run_gm(
reactor, request, "--config", "-", "sign", "storage0", "1",
stdinBytes=gm_config,
)
print(cert)
yield util.run_tahoe(
reactor, request, "--node-directory", storage0.process.node_dir,
"admin", "add-grid-manager-cert",
"--name", "default",
"--filename", "-",
stdin=cert,
)
# re-start this storage server
yield storage0.restart(reactor, request)
# now only one storage-server has the certificate .. configure
# diana to have the grid-manager certificate
diana = yield grid.add_client("diana", needed=2, happy=2, total=2)
config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg"))
config.add_section("grid_managers")
config.set("grid_managers", "test", str(ed25519.string_from_verifying_key(gm_pubkey), "ascii"))
with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f:
config.write(f)
yield diana.restart(reactor, request, servers=2)
# try to put something into the grid, which should fail (because
# diana has happy=2 but should only find storage0 to be acceptable
# to upload to)
try:
yield util.run_tahoe(
reactor, request, "--node-directory", diana.process.node_dir,
"put", "-",
stdin=b"some content\n" * 200,
)
assert False, "Should get a failure"
except util.ProcessFailed as e:
if b'UploadUnhappinessError' in e.output:
# We're done! We've succeeded.
return
assert False, "Failed to see one of out of two servers"
@pytest_twisted.inlineCallbacks
def _test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
Successfully upload to a Grid Manager enabled Grid.
"""
grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
happy0 = yield grid.add_storage_node()
happy1 = yield grid.add_storage_node()
gm_config = yield _run_gm(
reactor, request, "--config", "-", "create",
)
gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes)
# create certificates for all storage-servers
servers = (
("happy0", happy0),
("happy1", happy1),
)
for st_name, st in servers:
pubkey_fname = join(st.process.node_dir, "node.pubkey")
with open(pubkey_fname, 'r') as f:
pubkey_str = f.read().strip()
gm_config = yield _run_gm(
reactor, request, "--config", "-", "add",
st_name, pubkey_str,
stdinBytes=gm_config,
)
assert json.loads(gm_config)['storage_servers'].keys() == {'happy0', 'happy1'}
# add the certificates from the grid-manager to the storage servers
print("inserting storage-server certificates")
for st_name, st in servers:
cert = yield _run_gm(
reactor, request, "--config", "-", "sign", st_name, "1",
stdinBytes=gm_config,
)
yield util.run_tahoe(
reactor, request, "--node-directory", st.process.node_dir,
"admin", "add-grid-manager-cert",
"--name", "default",
"--filename", "-",
stdin=cert,
)
# re-start the storage servers
yield happy0.restart(reactor, request)
yield happy1.restart(reactor, request)
# configure freya (a client) to have the grid-manager certificate
freya = yield grid.add_client("freya", needed=2, happy=2, total=2)
config = configutil.get_config(join(freya.process.node_dir, "tahoe.cfg"))
config.add_section("grid_managers")
config.set("grid_managers", "test", str(ed25519.string_from_verifying_key(gm_pubkey), "ascii"))
with open(join(freya.process.node_dir, "tahoe.cfg"), "w") as f:
config.write(f)
yield freya.restart(reactor, request, servers=2)
# confirm that Freya will upload to the GridManager-enabled Grid
yield util.run_tahoe(
reactor, request, "--node-directory", freya.process.node_dir,
"put", "-",
stdin=b"some content\n" * 200,
)
@pytest_twisted.inlineCallbacks
def test_identity(reactor, request, temp_dir):
"""
Dump public key to CLI
"""
gm_config = join(temp_dir, "test_identity")
yield _run_gm(
reactor, request, "--config", gm_config, "create",
)
# ask the CLI for the grid-manager pubkey
pubkey = yield _run_gm(
reactor, request, "--config", gm_config, "public-identity",
)
alleged_pubkey = ed25519.verifying_key_from_string(pubkey.strip())
# load the grid-manager pubkey "ourselves"
with open(join(gm_config, "config.json"), "r") as f:
real_config = json.load(f)
real_privkey, real_pubkey = ed25519.signing_keypair_from_string(
real_config["private_key"].encode("ascii"),
)
# confirm the CLI told us the correct thing
alleged_bytes = alleged_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw)
real_bytes = real_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw)
assert alleged_bytes == real_bytes, "Keys don't match"

View File

@ -24,6 +24,7 @@ from allmydata.test.common import (
write_introducer,
)
from allmydata.node import read_config
from allmydata.util.iputil import allocate_tcp_port
if which("docker") is None:
@ -132,8 +133,10 @@ def i2p_introducer_furl(i2p_introducer, temp_dir):
@pytest_twisted.inlineCallbacks
@pytest.mark.skip("I2P tests are not functioning at all, for unknown reasons")
def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl):
yield _create_anonymous_node(reactor, 'carol_i2p', 8008, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl)
yield _create_anonymous_node(reactor, 'dave_i2p', 8009, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl)
web_port0 = allocate_tcp_port()
web_port1 = allocate_tcp_port()
yield _create_anonymous_node(reactor, 'carol_i2p', web_port0, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl)
yield _create_anonymous_node(reactor, 'dave_i2p', web_port1, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl)
# ensure both nodes are connected to "a grid" by uploading
# something via carol, and retrieve it using dave.
gold_path = join(temp_dir, "gold")
@ -179,9 +182,8 @@ def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_netw
@pytest_twisted.inlineCallbacks
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, i2p_network, introducer_furl):
def _create_anonymous_node(reactor, name, web_port, request, temp_dir, flog_gatherer, i2p_network, introducer_furl):
node_dir = FilePath(temp_dir).child(name)
web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
print("creating", node_dir.path)
node_dir.makedirs()

View File

@ -6,8 +6,6 @@ import sys
from os.path import join
from os import environ
from twisted.internet.error import ProcessTerminated
from . import util
import pytest_twisted
@ -44,8 +42,8 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto
try:
yield proto.done
assert False, "should raise exception"
except Exception as e:
assert isinstance(e, ProcessTerminated)
except util.ProcessFailed as e:
assert b"UploadUnhappinessError" in e.output
output = proto.output.getvalue()
assert b"shares could be placed on only" in output

View File

@ -72,7 +72,7 @@ def test_bad_account_password_ssh_key(alice, tmpdir):
another_key = os.path.join(str(tmpdir), "ssh_key")
generate_ssh_key(another_key)
good_key = RSAKey(filename=os.path.join(alice.node_dir, "private", "ssh_client_rsa_key"))
good_key = RSAKey(filename=os.path.join(alice.process.node_dir, "private", "ssh_client_rsa_key"))
bad_key = RSAKey(filename=another_key)
# Wrong key:
@ -87,17 +87,16 @@ def test_bad_account_password_ssh_key(alice, tmpdir):
"username": "someoneelse", "pkey": good_key,
})
def sftp_client_key(node):
def sftp_client_key(client):
"""
:return RSAKey: the RSA client key associated with this grid.Client
"""
# XXX move to Client / grid.py?
return RSAKey(
filename=os.path.join(node.node_dir, "private", "ssh_client_rsa_key"),
filename=os.path.join(client.process.node_dir, "private", "ssh_client_rsa_key"),
)
def test_sftp_client_key_exists(alice, alice_sftp_client_key_path):
"""
Weakly validate the sftp client key fixture by asserting that *something*
exists at the supposed key path.
"""
assert os.path.exists(alice_sftp_client_key_path)
@run_in_thread
def test_ssh_key_auth(alice):

View File

@ -38,8 +38,8 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
The two nodes can talk to the introducer and each other: we upload to one
node, read from the other.
"""
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
carol = yield _create_anonymous_node(reactor, 'carol', 8100, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
dave = yield _create_anonymous_node(reactor, 'dave', 8101, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
yield util.await_client_ready(carol, minimum_number_of_servers=2, timeout=600)
yield util.await_client_ready(dave, minimum_number_of_servers=2, timeout=600)
yield upload_to_one_download_from_the_other(reactor, temp_dir, carol, dave)
@ -94,44 +94,45 @@ async def upload_to_one_download_from_the_other(reactor, temp_dir, upload_to: ut
@pytest_twisted.inlineCallbacks
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess:
def _create_anonymous_node(reactor, name, web_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess:
node_dir = FilePath(temp_dir).child(name)
web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
if True:
print(f"creating {node_dir.path} with introducer {introducer_furl}")
node_dir.makedirs()
proto = util._DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'create-node',
'--nickname', name,
'--webport', web_port,
'--introducer', introducer_furl,
'--hide-ip',
'--tor-control-port', 'tcp:localhost:{}'.format(control_port),
'--listen', 'tor',
'--shares-needed', '1',
'--shares-happy', '1',
'--shares-total', str(shares_total),
node_dir.path,
),
env=environ,
if node_dir.exists():
raise RuntimeError(
"A node already exists in '{}'".format(node_dir)
)
yield proto.done
print(f"creating {node_dir.path} with introducer {introducer_furl}")
node_dir.makedirs()
proto = util._DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'create-node',
'--nickname', name,
'--webport', str(web_port),
'--introducer', introducer_furl,
'--hide-ip',
'--tor-control-port', tor_network.client_control_endpoint,
'--listen', 'tor',
'--shares-needed', '1',
'--shares-happy', '1',
'--shares-total', str(shares_total),
node_dir.path,
),
env=environ,
)
yield proto.done
# Which services should this client connect to?
write_introducer(node_dir, "default", introducer_furl)
util.basic_node_configuration(request, flog_gatherer, node_dir.path)
util.basic_node_configuration(request, flog_gatherer.furl, node_dir.path)
config = read_config(node_dir.path, "tub.port")
config.set_config("tor", "onion", "true")
config.set_config("tor", "onion.external_port", "3457")
config.set_config("tor", "control.port", f"tcp:port={control_port}:host=127.0.0.1")
config.set_config("tor", "control.port", tor_network.client_control_endpoint)
config.set_config("tor", "onion.private_key_file", "private/tor_onion.privkey")
print("running")
@ -157,7 +158,7 @@ def test_anonymous_client(reactor, request, temp_dir, flog_gatherer, tor_network
)
yield util.await_client_ready(normie)
anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8008, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1)
anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8102, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1)
yield util.await_client_ready(anonymoose, minimum_number_of_servers=1, timeout=600)
yield upload_to_one_download_from_the_other(reactor, temp_dir, normie, anonymoose)

View File

@ -15,7 +15,8 @@ from pytest_twisted import ensureDeferred
from . import vectors
from .vectors import parameters
from .util import reconfigure, upload, TahoeProcess
from .util import upload
from .grid import Client
@mark.parametrize('convergence', parameters.CONVERGENCE_SECRETS)
def test_convergence(convergence):
@ -36,8 +37,8 @@ async def test_capability(reactor, request, alice, case, expected):
computed value.
"""
# rewrite alice's config to match params and convergence
await reconfigure(
reactor, request, alice, (1, case.params.required, case.params.total), case.convergence, case.segment_size)
await alice.reconfigure_zfec(
reactor, (1, case.params.required, case.params.total), case.convergence, case.segment_size)
# upload data in the correct format
actual = upload(alice, case.fmt, case.data)
@ -82,7 +83,7 @@ async def skiptest_generate(reactor, request, alice):
async def generate(
reactor,
request,
alice: TahoeProcess,
alice: Client,
cases: Iterator[vectors.Case],
) -> AsyncGenerator[[vectors.Case, str], None]:
"""
@ -106,10 +107,8 @@ async def generate(
# reliability of this generator, be happy if we can put shares anywhere
happy = 1
for case in cases:
await reconfigure(
await alice.reconfigure_zfec(
reactor,
request,
alice,
(happy, case.params.required, case.params.total),
case.convergence,
case.segment_size
@ -117,5 +116,5 @@ async def generate(
# Give the format a chance to make an RSA key if it needs it.
case = evolve(case, fmt=case.fmt.customize())
cap = upload(alice, case.fmt, case.data)
cap = upload(alice.process, case.fmt, case.data)
yield case, cap

View File

@ -33,7 +33,7 @@ def test_index(alice):
"""
we can download the index file
"""
util.web_get(alice, u"")
util.web_get(alice.process, u"")
@run_in_thread
@ -41,7 +41,7 @@ def test_index_json(alice):
"""
we can download the index file as json
"""
data = util.web_get(alice, u"", params={u"t": u"json"})
data = util.web_get(alice.process, u"", params={u"t": u"json"})
# it should be valid json
json.loads(data)
@ -55,7 +55,7 @@ def test_upload_download(alice):
FILE_CONTENTS = u"some contents"
readcap = util.web_post(
alice, u"uri",
alice.process, u"uri",
data={
u"t": u"upload",
u"format": u"mdmf",
@ -67,7 +67,7 @@ def test_upload_download(alice):
readcap = readcap.strip()
data = util.web_get(
alice, u"uri",
alice.process, u"uri",
params={
u"uri": readcap,
u"filename": u"boom",
@ -85,11 +85,11 @@ def test_put(alice):
FILE_CONTENTS = b"added via PUT" * 20
resp = requests.put(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
data=FILE_CONTENTS,
)
cap = allmydata.uri.from_string(resp.text.strip().encode('ascii'))
cfg = alice.get_config()
cfg = alice.process.get_config()
assert isinstance(cap, allmydata.uri.CHKFileURI)
assert cap.size == len(FILE_CONTENTS)
assert cap.total_shares == int(cfg.get_config("client", "shares.total"))
@ -102,7 +102,7 @@ def test_helper_status(storage_nodes):
successfully GET the /helper_status page
"""
url = util.node_url(storage_nodes[0].node_dir, "helper_status")
url = util.node_url(storage_nodes[0].process.node_dir, "helper_status")
resp = requests.get(url)
assert resp.status_code >= 200 and resp.status_code < 300
dom = BeautifulSoup(resp.content, "html5lib")
@ -116,7 +116,7 @@ def test_deep_stats(alice):
URIs work
"""
resp = requests.post(
util.node_url(alice.node_dir, "uri"),
util.node_url(alice.process.node_dir, "uri"),
params={
"format": "sdmf",
"t": "mkdir",
@ -130,7 +130,7 @@ def test_deep_stats(alice):
uri = url_unquote(resp.url)
assert 'URI:DIR2:' in uri
dircap = uri[uri.find("URI:DIR2:"):].rstrip('/')
dircap_uri = util.node_url(alice.node_dir, "uri/{}".format(url_quote(dircap)))
dircap_uri = util.node_url(alice.process.node_dir, "uri/{}".format(url_quote(dircap)))
# POST a file into this directory
FILE_CONTENTS = u"a file in a directory"
@ -176,7 +176,7 @@ def test_deep_stats(alice):
while tries > 0:
tries -= 1
resp = requests.get(
util.node_url(alice.node_dir, u"operations/something_random"),
util.node_url(alice.process.node_dir, u"operations/something_random"),
)
d = json.loads(resp.content)
if d['size-literal-files'] == len(FILE_CONTENTS):
@ -201,21 +201,21 @@ def test_status(alice):
FILE_CONTENTS = u"all the Important Data of alice\n" * 1200
resp = requests.put(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
data=FILE_CONTENTS,
)
cap = resp.text.strip()
print("Uploaded data, cap={}".format(cap))
resp = requests.get(
util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap))),
util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap))),
)
print("Downloaded {} bytes of data".format(len(resp.content)))
assert str(resp.content, "ascii") == FILE_CONTENTS
resp = requests.get(
util.node_url(alice.node_dir, "status"),
util.node_url(alice.process.node_dir, "status"),
)
dom = html5lib.parse(resp.content)
@ -229,7 +229,7 @@ def test_status(alice):
for href in hrefs:
if href == u"/" or not href:
continue
resp = requests.get(util.node_url(alice.node_dir, href))
resp = requests.get(util.node_url(alice.process.node_dir, href))
if href.startswith(u"/status/up"):
assert b"File Upload Status" in resp.content
if b"Total Size: %d" % (len(FILE_CONTENTS),) in resp.content:
@ -241,7 +241,7 @@ def test_status(alice):
# download the specialized event information
resp = requests.get(
util.node_url(alice.node_dir, u"{}/event_json".format(href)),
util.node_url(alice.process.node_dir, u"{}/event_json".format(href)),
)
js = json.loads(resp.content)
# there's usually just one "read" operation, but this can handle many ..
@ -264,14 +264,14 @@ async def test_directory_deep_check(reactor, request, alice):
required = 2
total = 4
await util.reconfigure(reactor, request, alice, (happy, required, total), convergence=None)
await alice.reconfigure_zfec(reactor, (happy, required, total), convergence=None)
await deferToThread(_test_directory_deep_check_blocking, alice)
def _test_directory_deep_check_blocking(alice):
# create a directory
resp = requests.post(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
params={
u"t": u"mkdir",
u"redirect_to_result": u"true",
@ -320,7 +320,7 @@ def _test_directory_deep_check_blocking(alice):
print("Uploaded data1, cap={}".format(cap1))
resp = requests.get(
util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap0))),
util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap0))),
params={u"t": u"info"},
)
@ -437,7 +437,7 @@ def test_storage_info(storage_nodes):
storage0 = storage_nodes[0]
requests.get(
util.node_url(storage0.node_dir, u"storage"),
util.node_url(storage0.process.node_dir, u"storage"),
)
@ -449,7 +449,7 @@ def test_storage_info_json(storage_nodes):
storage0 = storage_nodes[0]
resp = requests.get(
util.node_url(storage0.node_dir, u"storage"),
util.node_url(storage0.process.node_dir, u"storage"),
params={u"t": u"json"},
)
data = json.loads(resp.content)
@ -462,12 +462,12 @@ def test_introducer_info(introducer):
retrieve and confirm /introducer URI for the introducer
"""
resp = requests.get(
util.node_url(introducer.node_dir, u""),
util.node_url(introducer.process.node_dir, u""),
)
assert b"Introducer" in resp.content
resp = requests.get(
util.node_url(introducer.node_dir, u""),
util.node_url(introducer.process.node_dir, u""),
params={u"t": u"json"},
)
data = json.loads(resp.content)
@ -484,14 +484,14 @@ def test_mkdir_with_children(alice):
# create a file to put in our directory
FILE_CONTENTS = u"some file contents\n" * 500
resp = requests.put(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
data=FILE_CONTENTS,
)
filecap = resp.content.strip()
# create a (sub) directory to put in our directory
resp = requests.post(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
params={
u"t": u"mkdir",
}
@ -534,7 +534,7 @@ def test_mkdir_with_children(alice):
# create a new directory with one file and one sub-dir (all-at-once)
resp = util.web_post(
alice, u"uri",
alice.process, u"uri",
params={u"t": "mkdir-with-children"},
data=json.dumps(meta),
)

View File

@ -70,16 +70,40 @@ class _ProcessExitedProtocol(ProcessProtocol):
self.done.callback(None)
class ProcessFailed(Exception):
"""
A subprocess has failed.
:ivar ProcessTerminated reason: the original reason from .processExited
:ivar StringIO output: all stdout and stderr collected to this point.
"""
def __init__(self, reason, output):
self.reason = reason
self.output = output
def __str__(self):
return "<ProcessFailed: {}>:\n{}".format(self.reason, self.output)
class _CollectOutputProtocol(ProcessProtocol):
"""
Internal helper. Collects all output (stdout + stderr) into
self.output, and callback's on done with all of it after the
process exits (for any reason).
"""
def __init__(self, capture_stderr=True):
def __init__(self, capture_stderr=True, stdin=None):
self.done = Deferred()
self.output = BytesIO()
self.capture_stderr = capture_stderr
self._stdin = stdin
def connectionMade(self):
if self._stdin is not None:
self.transport.write(self._stdin)
self.transport.closeStdin()
def processEnded(self, reason):
if not self.done.called:
@ -87,7 +111,7 @@ class _CollectOutputProtocol(ProcessProtocol):
def processExited(self, reason):
if not isinstance(reason.value, ProcessDone):
self.done.errback(reason)
self.done.errback(ProcessFailed(reason, self.output.getvalue()))
def outReceived(self, data):
self.output.write(data)
@ -153,38 +177,33 @@ class _MagicTextProtocol(ProcessProtocol):
sys.stdout.write(self.name + line + "\n")
def _cleanup_process_async(transport: IProcessTransport, allow_missing: bool) -> None:
def _cleanup_process_async(transport: IProcessTransport) -> None:
"""
If the given process transport seems to still be associated with a
running process, send a SIGTERM to that process.
:param transport: The transport to use.
:param allow_missing: If ``True`` then it is not an error for the
transport to have no associated process. Otherwise, an exception will
be raised in that case.
:raise: ``ValueError`` if ``allow_missing`` is ``False`` and the transport
has no process.
"""
if transport.pid is None:
if allow_missing:
print("Process already cleaned up and that's okay.")
return
else:
raise ValueError("Process is not running")
# in cases of "restart", we will have registered a finalizer
# that will kill the process -- but already explicitly killed
# it (and then ran again) due to the "restart". So, if the
# process is already killed, our job is done.
print("Process already cleaned up and that's okay.")
return
print("signaling {} with TERM".format(transport.pid))
try:
transport.signalProcess('TERM')
except ProcessExitedAlready:
# The transport object thought it still had a process but the real OS
# process has already exited. That's fine. We accomplished what we
# wanted to. We don't care about ``allow_missing`` here because
# there's no way we could have known the real OS process already
# exited.
# wanted to.
pass
def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False):
def _cleanup_tahoe_process(tahoe_transport, exited):
"""
Terminate the given process with a kill signal (SIGTERM on POSIX,
TerminateProcess on Windows).
@ -195,12 +214,26 @@ def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False):
:return: After the process has exited.
"""
from twisted.internet import reactor
_cleanup_process_async(tahoe_transport, allow_missing=allow_missing)
print("signaled, blocking on exit")
_cleanup_process_async(tahoe_transport)
print(f"signaled, blocking on exit {exited}")
block_with_timeout(exited, reactor)
print("exited, goodbye")
def run_tahoe(reactor, request, *args, **kwargs):
"""
Helper to run tahoe with optional coverage.
:returns: a Deferred that fires when the command is done (or a
ProcessFailed exception if it exits non-zero)
"""
stdin = kwargs.get("stdin", None)
protocol = _CollectOutputProtocol(stdin=stdin)
process = _tahoe_runner_optional_coverage(protocol, reactor, request, args)
process.exited = protocol.done
return protocol.done
def _tahoe_runner_optional_coverage(proto, reactor, request, other_args):
"""
Internal helper. Calls spawnProcess with `-m
@ -244,16 +277,20 @@ class TahoeProcess(object):
)
def kill(self):
"""Kill the process, block until it's done."""
"""
Kill the process, block until it's done.
Does nothing if the process is already stopped (or never started).
"""
print(f"TahoeProcess.kill({self.transport.pid} / {self.node_dir})")
_cleanup_tahoe_process(self.transport, self.transport.exited)
def kill_async(self):
"""
Kill the process, return a Deferred that fires when it's done.
Does nothing if the process is already stopped (or never started).
"""
print(f"TahoeProcess.kill_async({self.transport.pid} / {self.node_dir})")
_cleanup_process_async(self.transport, allow_missing=False)
_cleanup_process_async(self.transport)
return self.transport.exited
def restart_async(self, reactor: IReactorProcess, request: Any) -> Deferred:
@ -264,7 +301,7 @@ class TahoeProcess(object):
handle requests.
"""
d = self.kill_async()
d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None, finalize=False))
d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None))
def got_new_process(proc):
# Grab the new transport since the one we had before is no longer
# valid after the stop/start cycle.
@ -276,7 +313,7 @@ class TahoeProcess(object):
return "<TahoeProcess in '{}'>".format(self._node_dir)
def _run_node(reactor, node_dir, request, magic_text, finalize=True):
def _run_node(reactor, node_dir, request, magic_text):
"""
Run a tahoe process from its node_dir.
@ -305,8 +342,7 @@ def _run_node(reactor, node_dir, request, magic_text, finalize=True):
node_dir,
)
if finalize:
request.addfinalizer(tahoe_process.kill)
request.addfinalizer(tahoe_process.kill)
d = protocol.magic_seen
d.addCallback(lambda ignored: tahoe_process)
@ -348,8 +384,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
magic_text=None,
needed=2,
happy=3,
total=4,
finalize=True):
total=4):
"""
Helper to create a single node, run it and return the instance
spawnProcess returned (ITransport)
@ -360,7 +395,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
if exists(node_dir):
created_d = succeed(None)
else:
print("creating", node_dir)
print("creating: {}".format(node_dir))
mkdir(node_dir)
done_proto = _ProcessExitedProtocol()
args = [
@ -383,13 +418,13 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
created_d = done_proto.done
def created(_):
basic_node_configuration(request, flog_gatherer, node_dir)
basic_node_configuration(request, flog_gatherer.furl, node_dir)
created_d.addCallback(created)
d = Deferred()
d.callback(None)
d.addCallback(lambda _: created_d)
d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text, finalize=finalize))
d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text))
return d
@ -715,7 +750,6 @@ class SSK:
def load(cls, params: dict) -> SSK:
assert params.keys() == {"format", "mutable", "key"}
return cls(params["format"], params["key"].encode("ascii"))
def customize(self) -> SSK:
"""
Return an SSK with a newly generated random RSA key.
@ -754,7 +788,7 @@ def upload(alice: TahoeProcess, fmt: CHK | SSK, data: bytes) -> str:
f.write(data)
f.flush()
with fmt.to_argv() as fmt_argv:
argv = [alice, "put"] + fmt_argv + [f.name]
argv = [alice.process, "put"] + fmt_argv + [f.name]
return cli(*argv).decode("utf-8").strip()

0
newsfragments/3508.minor Normal file
View File

View File

@ -151,7 +151,7 @@ install_requires = [
"pycddl >= 0.4",
# Command-line parsing
"click >= 7.0",
"click >= 8.1.1",
# for pid-file support
"psutil",
@ -435,7 +435,7 @@ setup(name="tahoe-lafs", # also set in __init__.py
"paramiko < 2.9",
"pytest-timeout",
# Does our OpenMetrics endpoint adhere to the spec:
"prometheus-client == 0.11.0",
"prometheus-client == 0.11.0"
] + tor_requires + i2p_requires,
"tor": tor_requires,
"i2p": i2p_requires,

View File

@ -222,3 +222,7 @@ def _config_path_from_option(config: str) -> Optional[FilePath]:
if config == "-":
return None
return FilePath(config)
if __name__ == '__main__':
grid_manager() # type: ignore

View File

@ -486,7 +486,9 @@ def create_grid_manager_verifier(keys, certs, public_key, now_fn=None, bad_cert=
now = now_fn()
for cert in valid_certs:
expires = datetime.fromisoformat(cert["expires"])
if cert['public_key'].encode("ascii") == public_key:
pc = cert['public_key'].encode('ascii')
assert type(pc) == type(public_key), "{} isn't {}".format(type(pc), type(public_key))
if pc == public_key:
if expires > now:
# not-expired
return True

View File

@ -38,8 +38,9 @@ from os import urandom
import re
import time
import hashlib
from io import StringIO
from configparser import NoSectionError
import json
import attr
from hyperlink import DecodedURL
@ -74,7 +75,7 @@ from allmydata.interfaces import (
VersionMessage
)
from allmydata.grid_manager import (
create_grid_manager_verifier,
create_grid_manager_verifier, SignedCertificate
)
from allmydata.crypto import (
ed25519,
@ -317,8 +318,8 @@ class StorageFarmBroker(service.MultiService):
assert isinstance(server_id, bytes)
gm_verifier = create_grid_manager_verifier(
self.storage_client_config.grid_manager_keys,
server["ann"].get("grid-manager-certificates", []),
"pub-{}".format(str(server_id, "ascii")), # server_id is v0-<key> not pub-v0-key .. for reasons?
[SignedCertificate.load(StringIO(json.dumps(data))) for data in server["ann"].get("grid-manager-certificates", [])],
"pub-{}".format(str(server_id, "ascii")).encode("ascii"), # server_id is v0-<key> not pub-v0-key .. for reasons?
)
if self._should_we_use_http(self.node_config, server["ann"]):