mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-02 03:06:41 +00:00
Merge remote-tracking branch 'meejah/2916.grid-manager-integration-tests.1' into 2916.grid-manager-integration-tests.2
This commit is contained in:
commit
6d833607db
@ -13,16 +13,11 @@ if PY2:
|
|||||||
import sys
|
import sys
|
||||||
import shutil
|
import shutil
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from os import mkdir, listdir, environ
|
from os import mkdir, environ
|
||||||
from os.path import join, exists
|
from os.path import join, exists
|
||||||
from tempfile import mkdtemp, mktemp
|
from tempfile import mkdtemp
|
||||||
from functools import partial
|
|
||||||
from json import loads
|
from json import loads
|
||||||
|
|
||||||
from foolscap.furl import (
|
|
||||||
decode_furl,
|
|
||||||
)
|
|
||||||
|
|
||||||
from eliot import (
|
from eliot import (
|
||||||
to_file,
|
to_file,
|
||||||
log_call,
|
log_call,
|
||||||
@ -44,15 +39,18 @@ from .util import (
|
|||||||
_DumpOutputProtocol,
|
_DumpOutputProtocol,
|
||||||
_ProcessExitedProtocol,
|
_ProcessExitedProtocol,
|
||||||
_create_node,
|
_create_node,
|
||||||
_cleanup_tahoe_process,
|
|
||||||
_tahoe_runner_optional_coverage,
|
_tahoe_runner_optional_coverage,
|
||||||
await_client_ready,
|
await_client_ready,
|
||||||
TahoeProcess,
|
|
||||||
cli,
|
cli,
|
||||||
_run_node,
|
_run_node,
|
||||||
generate_ssh_key,
|
generate_ssh_key,
|
||||||
block_with_timeout,
|
block_with_timeout,
|
||||||
)
|
)
|
||||||
|
from .grid import (
|
||||||
|
create_port_allocator,
|
||||||
|
create_flog_gatherer,
|
||||||
|
create_grid,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# pytest customization hooks
|
# pytest customization hooks
|
||||||
@ -89,6 +87,12 @@ def reactor():
|
|||||||
return _reactor
|
return _reactor
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='session')
|
||||||
|
@log_call(action_type=u"integration:port_allocator", include_result=False)
|
||||||
|
def port_allocator(reactor):
|
||||||
|
return create_port_allocator(start_port=45000)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(scope='session')
|
||||||
@log_call(action_type=u"integration:temp_dir", include_args=[])
|
@log_call(action_type=u"integration:temp_dir", include_args=[])
|
||||||
def temp_dir(request):
|
def temp_dir(request):
|
||||||
@ -123,136 +127,30 @@ def flog_binary():
|
|||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(scope='session')
|
||||||
@log_call(action_type=u"integration:flog_gatherer", include_args=[])
|
@log_call(action_type=u"integration:flog_gatherer", include_args=[])
|
||||||
def flog_gatherer(reactor, temp_dir, flog_binary, request):
|
def flog_gatherer(reactor, temp_dir, flog_binary, request):
|
||||||
out_protocol = _CollectOutputProtocol()
|
fg = pytest_twisted.blockon(
|
||||||
gather_dir = join(temp_dir, 'flog_gather')
|
create_flog_gatherer(reactor, request, temp_dir, flog_binary)
|
||||||
reactor.spawnProcess(
|
|
||||||
out_protocol,
|
|
||||||
flog_binary,
|
|
||||||
(
|
|
||||||
'flogtool', 'create-gatherer',
|
|
||||||
'--location', 'tcp:localhost:3117',
|
|
||||||
'--port', '3117',
|
|
||||||
gather_dir,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
pytest_twisted.blockon(out_protocol.done)
|
return fg
|
||||||
|
|
||||||
twistd_protocol = _MagicTextProtocol("Gatherer waiting at")
|
|
||||||
twistd_process = reactor.spawnProcess(
|
|
||||||
twistd_protocol,
|
|
||||||
which('twistd')[0],
|
|
||||||
(
|
|
||||||
'twistd', '--nodaemon', '--python',
|
|
||||||
join(gather_dir, 'gatherer.tac'),
|
|
||||||
),
|
|
||||||
path=gather_dir,
|
|
||||||
)
|
|
||||||
pytest_twisted.blockon(twistd_protocol.magic_seen)
|
|
||||||
|
|
||||||
def cleanup():
|
|
||||||
_cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
|
|
||||||
|
|
||||||
flog_file = mktemp('.flog_dump')
|
|
||||||
flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
|
|
||||||
flog_dir = join(temp_dir, 'flog_gather')
|
|
||||||
flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')]
|
|
||||||
|
|
||||||
print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file))
|
|
||||||
reactor.spawnProcess(
|
|
||||||
flog_protocol,
|
|
||||||
flog_binary,
|
|
||||||
(
|
|
||||||
'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0])
|
|
||||||
),
|
|
||||||
)
|
|
||||||
print("Waiting for flogtool to complete")
|
|
||||||
try:
|
|
||||||
block_with_timeout(flog_protocol.done, reactor)
|
|
||||||
except ProcessTerminated as e:
|
|
||||||
print("flogtool exited unexpectedly: {}".format(str(e)))
|
|
||||||
print("Flogtool completed")
|
|
||||||
|
|
||||||
request.addfinalizer(cleanup)
|
|
||||||
|
|
||||||
with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f:
|
|
||||||
furl = f.read().strip()
|
|
||||||
return furl
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(scope='session')
|
||||||
@log_call(
|
@log_call(action_type=u"integration:grid", include_args=[])
|
||||||
action_type=u"integration:introducer",
|
def grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
|
||||||
include_args=["temp_dir", "flog_gatherer"],
|
g = pytest_twisted.blockon(
|
||||||
include_result=False,
|
create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
|
||||||
)
|
|
||||||
def introducer(reactor, temp_dir, flog_gatherer, request):
|
|
||||||
config = '''
|
|
||||||
[node]
|
|
||||||
nickname = introducer0
|
|
||||||
web.port = 4560
|
|
||||||
log_gatherer.furl = {log_furl}
|
|
||||||
'''.format(log_furl=flog_gatherer)
|
|
||||||
|
|
||||||
intro_dir = join(temp_dir, 'introducer')
|
|
||||||
print("making introducer", intro_dir)
|
|
||||||
|
|
||||||
if not exists(intro_dir):
|
|
||||||
mkdir(intro_dir)
|
|
||||||
done_proto = _ProcessExitedProtocol()
|
|
||||||
_tahoe_runner_optional_coverage(
|
|
||||||
done_proto,
|
|
||||||
reactor,
|
|
||||||
request,
|
|
||||||
(
|
|
||||||
'create-introducer',
|
|
||||||
'--listen=tcp',
|
|
||||||
'--hostname=localhost',
|
|
||||||
intro_dir,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
pytest_twisted.blockon(done_proto.done)
|
|
||||||
|
|
||||||
# over-write the config file with our stuff
|
|
||||||
with open(join(intro_dir, 'tahoe.cfg'), 'w') as f:
|
|
||||||
f.write(config)
|
|
||||||
|
|
||||||
# "tahoe run" is consistent across Linux/macOS/Windows, unlike the old
|
|
||||||
# "start" command.
|
|
||||||
protocol = _MagicTextProtocol('introducer running')
|
|
||||||
transport = _tahoe_runner_optional_coverage(
|
|
||||||
protocol,
|
|
||||||
reactor,
|
|
||||||
request,
|
|
||||||
(
|
|
||||||
'run',
|
|
||||||
intro_dir,
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited))
|
return g
|
||||||
|
|
||||||
pytest_twisted.blockon(protocol.magic_seen)
|
|
||||||
return TahoeProcess(transport, intro_dir)
|
@pytest.fixture(scope='session')
|
||||||
|
def introducer(grid):
|
||||||
|
return grid.introducer
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(scope='session')
|
||||||
@log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"])
|
@log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"])
|
||||||
def introducer_furl(introducer, temp_dir):
|
def introducer_furl(introducer, temp_dir):
|
||||||
furl_fname = join(temp_dir, 'introducer', 'private', 'introducer.furl')
|
return introducer.furl
|
||||||
while not exists(furl_fname):
|
|
||||||
print("Don't see {} yet".format(furl_fname))
|
|
||||||
sleep(.1)
|
|
||||||
furl = open(furl_fname, 'r').read()
|
|
||||||
tubID, location_hints, name = decode_furl(furl)
|
|
||||||
if not location_hints:
|
|
||||||
# If there are no location hints then nothing can ever possibly
|
|
||||||
# connect to it and the only thing that can happen next is something
|
|
||||||
# will hang or time out. So just give up right now.
|
|
||||||
raise ValueError(
|
|
||||||
"Introducer ({!r}) fURL has no location hints!".format(
|
|
||||||
introducer_furl,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
return furl
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(scope='session')
|
||||||
@ -330,28 +228,20 @@ def tor_introducer_furl(tor_introducer, temp_dir):
|
|||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(scope='session')
|
||||||
@log_call(
|
@log_call(
|
||||||
action_type=u"integration:storage_nodes",
|
action_type=u"integration:storage_nodes",
|
||||||
include_args=["temp_dir", "introducer_furl", "flog_gatherer"],
|
include_args=["grid"],
|
||||||
include_result=False,
|
include_result=False,
|
||||||
)
|
)
|
||||||
def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request):
|
def storage_nodes(grid):
|
||||||
nodes_d = []
|
nodes_d = []
|
||||||
# start all 5 nodes in parallel
|
# start all 5 nodes in parallel
|
||||||
for x in range(5):
|
for x in range(5):
|
||||||
name = 'node{}'.format(x)
|
#nodes_d.append(grid.add_storage_node())
|
||||||
web_port= 9990 + x
|
pytest_twisted.blockon(grid.add_storage_node())
|
||||||
nodes_d.append(
|
|
||||||
_create_node(
|
|
||||||
reactor, request, temp_dir, introducer_furl, flog_gatherer, name,
|
|
||||||
web_port="tcp:{}:interface=localhost".format(web_port),
|
|
||||||
storage=True,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
nodes_status = pytest_twisted.blockon(DeferredList(nodes_d))
|
nodes_status = pytest_twisted.blockon(DeferredList(nodes_d))
|
||||||
nodes = []
|
for ok, value in nodes_status:
|
||||||
for ok, process in nodes_status:
|
assert ok, "Storage node creation failed: {}".format(value)
|
||||||
assert ok, "Storage node creation failed: {}".format(process)
|
return grid.storage_servers
|
||||||
nodes.append(process)
|
|
||||||
return nodes
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
def alice_sftp_client_key_path(temp_dir):
|
def alice_sftp_client_key_path(temp_dir):
|
||||||
|
507
integration/grid.py
Normal file
507
integration/grid.py
Normal file
@ -0,0 +1,507 @@
|
|||||||
|
"""
|
||||||
|
Classes which directly represent various kinds of Tahoe processes
|
||||||
|
that co-operate to for "a Grid".
|
||||||
|
|
||||||
|
These methods and objects are used by conftest.py fixtures but may
|
||||||
|
also be used as direct helpers for tests that don't want to (or can't)
|
||||||
|
rely on 'the' global grid as provided by fixtures like 'alice' or
|
||||||
|
'storage_servers'.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from os import mkdir, listdir
|
||||||
|
from os.path import join, exists
|
||||||
|
from tempfile import mktemp
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
from eliot import (
|
||||||
|
log_call,
|
||||||
|
)
|
||||||
|
|
||||||
|
from foolscap.furl import (
|
||||||
|
decode_furl,
|
||||||
|
)
|
||||||
|
|
||||||
|
from twisted.python.procutils import which
|
||||||
|
from twisted.internet.defer import (
|
||||||
|
inlineCallbacks,
|
||||||
|
returnValue,
|
||||||
|
maybeDeferred,
|
||||||
|
)
|
||||||
|
from twisted.internet.task import (
|
||||||
|
deferLater,
|
||||||
|
)
|
||||||
|
from twisted.internet.interfaces import (
|
||||||
|
IProcessTransport,
|
||||||
|
IProcessProtocol,
|
||||||
|
)
|
||||||
|
from twisted.internet.endpoints import (
|
||||||
|
TCP4ServerEndpoint,
|
||||||
|
)
|
||||||
|
from twisted.internet.protocol import (
|
||||||
|
Factory,
|
||||||
|
Protocol,
|
||||||
|
)
|
||||||
|
from twisted.internet.error import ProcessTerminated
|
||||||
|
|
||||||
|
from .util import (
|
||||||
|
_CollectOutputProtocol,
|
||||||
|
_MagicTextProtocol,
|
||||||
|
_DumpOutputProtocol,
|
||||||
|
_ProcessExitedProtocol,
|
||||||
|
_run_node,
|
||||||
|
_cleanup_tahoe_process,
|
||||||
|
_tahoe_runner_optional_coverage,
|
||||||
|
TahoeProcess,
|
||||||
|
await_client_ready,
|
||||||
|
)
|
||||||
|
|
||||||
|
import attr
|
||||||
|
import pytest_twisted
|
||||||
|
|
||||||
|
|
||||||
|
# further directions:
|
||||||
|
# - "Grid" is unused, basically -- tie into the rest?
|
||||||
|
# - could make a Grid instance mandatory for create_* calls
|
||||||
|
# - could instead make create_* calls methods of Grid
|
||||||
|
# - Bring more 'util' or 'conftest' code into here
|
||||||
|
# - stop()/start()/restart() methods on StorageServer etc
|
||||||
|
# - more-complex stuff like config changes (which imply a restart too)?
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class FlogGatherer(object):
|
||||||
|
"""
|
||||||
|
Flog Gatherer process.
|
||||||
|
"""
|
||||||
|
|
||||||
|
process = attr.ib(
|
||||||
|
validator=attr.validators.provides(IProcessTransport)
|
||||||
|
)
|
||||||
|
protocol = attr.ib(
|
||||||
|
validator=attr.validators.provides(IProcessProtocol)
|
||||||
|
)
|
||||||
|
furl = attr.ib()
|
||||||
|
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def create_flog_gatherer(reactor, request, temp_dir, flog_binary):
|
||||||
|
out_protocol = _CollectOutputProtocol()
|
||||||
|
gather_dir = join(temp_dir, 'flog_gather')
|
||||||
|
reactor.spawnProcess(
|
||||||
|
out_protocol,
|
||||||
|
flog_binary,
|
||||||
|
(
|
||||||
|
'flogtool', 'create-gatherer',
|
||||||
|
'--location', 'tcp:localhost:3117',
|
||||||
|
'--port', '3117',
|
||||||
|
gather_dir,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
yield out_protocol.done
|
||||||
|
|
||||||
|
twistd_protocol = _MagicTextProtocol("Gatherer waiting at")
|
||||||
|
twistd_process = reactor.spawnProcess(
|
||||||
|
twistd_protocol,
|
||||||
|
which('twistd')[0],
|
||||||
|
(
|
||||||
|
'twistd', '--nodaemon', '--python',
|
||||||
|
join(gather_dir, 'gatherer.tac'),
|
||||||
|
),
|
||||||
|
path=gather_dir,
|
||||||
|
)
|
||||||
|
yield twistd_protocol.magic_seen
|
||||||
|
|
||||||
|
def cleanup():
|
||||||
|
_cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
|
||||||
|
|
||||||
|
flog_file = mktemp('.flog_dump')
|
||||||
|
flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
|
||||||
|
flog_dir = join(temp_dir, 'flog_gather')
|
||||||
|
flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')]
|
||||||
|
|
||||||
|
print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file))
|
||||||
|
reactor.spawnProcess(
|
||||||
|
flog_protocol,
|
||||||
|
flog_binary,
|
||||||
|
(
|
||||||
|
'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0])
|
||||||
|
),
|
||||||
|
)
|
||||||
|
print("Waiting for flogtool to complete")
|
||||||
|
try:
|
||||||
|
pytest_twisted.blockon(flog_protocol.done)
|
||||||
|
except ProcessTerminated as e:
|
||||||
|
print("flogtool exited unexpectedly: {}".format(str(e)))
|
||||||
|
print("Flogtool completed")
|
||||||
|
|
||||||
|
request.addfinalizer(cleanup)
|
||||||
|
|
||||||
|
with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f:
|
||||||
|
furl = f.read().strip()
|
||||||
|
returnValue(
|
||||||
|
FlogGatherer(
|
||||||
|
protocol=twistd_protocol,
|
||||||
|
process=twistd_process,
|
||||||
|
furl=furl,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class StorageServer(object):
|
||||||
|
"""
|
||||||
|
Represents a Tahoe Storage Server
|
||||||
|
"""
|
||||||
|
|
||||||
|
process = attr.ib(
|
||||||
|
validator=attr.validators.instance_of(TahoeProcess)
|
||||||
|
)
|
||||||
|
protocol = attr.ib(
|
||||||
|
validator=attr.validators.provides(IProcessProtocol)
|
||||||
|
)
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def restart(self, reactor, request):
|
||||||
|
"""
|
||||||
|
re-start our underlying process by issuing a TERM, waiting and
|
||||||
|
then running again. await_client_ready() will be done as well
|
||||||
|
|
||||||
|
Note that self.process and self.protocol will be new instances
|
||||||
|
after this.
|
||||||
|
"""
|
||||||
|
self.process.transport.signalProcess('TERM')
|
||||||
|
yield self.protocol.exited
|
||||||
|
self.process = yield _run_node(
|
||||||
|
reactor, self.process.node_dir, request, None,
|
||||||
|
)
|
||||||
|
self.protocol = self.process.transport._protocol
|
||||||
|
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
|
||||||
|
needed=2, happy=3, total=4):
|
||||||
|
"""
|
||||||
|
Create a new storage server
|
||||||
|
"""
|
||||||
|
from .util import _create_node
|
||||||
|
node_process = yield _create_node(
|
||||||
|
reactor, request, temp_dir, introducer.furl, flog_gatherer,
|
||||||
|
name, web_port, storage=True, needed=needed, happy=happy, total=total,
|
||||||
|
)
|
||||||
|
storage = StorageServer(
|
||||||
|
process=node_process,
|
||||||
|
protocol=node_process.transport._protocol,
|
||||||
|
)
|
||||||
|
returnValue(storage)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class Client(object):
|
||||||
|
"""
|
||||||
|
Represents a Tahoe client
|
||||||
|
"""
|
||||||
|
|
||||||
|
process = attr.ib(
|
||||||
|
validator=attr.validators.instance_of(TahoeProcess)
|
||||||
|
)
|
||||||
|
protocol = attr.ib(
|
||||||
|
validator=attr.validators.provides(IProcessProtocol)
|
||||||
|
)
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def restart(self, reactor, request, servers=1):
|
||||||
|
"""
|
||||||
|
re-start our underlying process by issuing a TERM, waiting and
|
||||||
|
then running again.
|
||||||
|
|
||||||
|
:param int servers: number of server connections we will wait
|
||||||
|
for before being 'ready'
|
||||||
|
|
||||||
|
Note that self.process and self.protocol will be new instances
|
||||||
|
after this.
|
||||||
|
"""
|
||||||
|
self.process.transport.signalProcess('TERM')
|
||||||
|
yield self.protocol.exited
|
||||||
|
process = yield _run_node(
|
||||||
|
reactor, self.process.node_dir, request, None,
|
||||||
|
)
|
||||||
|
self.process = process
|
||||||
|
self.protocol = self.process.transport._protocol
|
||||||
|
|
||||||
|
|
||||||
|
# XXX add stop / start / restart
|
||||||
|
# ...maybe "reconfig" of some kind?
|
||||||
|
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
|
||||||
|
needed=2, happy=3, total=4):
|
||||||
|
"""
|
||||||
|
Create a new storage server
|
||||||
|
"""
|
||||||
|
from .util import _create_node
|
||||||
|
node_process = yield _create_node(
|
||||||
|
reactor, request, temp_dir, introducer.furl, flog_gatherer,
|
||||||
|
name, web_port, storage=False, needed=needed, happy=happy, total=total,
|
||||||
|
)
|
||||||
|
returnValue(
|
||||||
|
Client(
|
||||||
|
process=node_process,
|
||||||
|
protocol=node_process.transport._protocol,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class Introducer(object):
|
||||||
|
"""
|
||||||
|
Reprsents a running introducer
|
||||||
|
"""
|
||||||
|
|
||||||
|
process = attr.ib(
|
||||||
|
validator=attr.validators.instance_of(TahoeProcess)
|
||||||
|
)
|
||||||
|
protocol = attr.ib(
|
||||||
|
validator=attr.validators.provides(IProcessProtocol)
|
||||||
|
)
|
||||||
|
furl = attr.ib()
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_furl(furl_fname):
|
||||||
|
"""
|
||||||
|
Opens and validates a fURL, ensuring location hints.
|
||||||
|
:returns: the furl
|
||||||
|
:raises: ValueError if no location hints
|
||||||
|
"""
|
||||||
|
while not exists(furl_fname):
|
||||||
|
print("Don't see {} yet".format(furl_fname))
|
||||||
|
sleep(.1)
|
||||||
|
furl = open(furl_fname, 'r').read()
|
||||||
|
tubID, location_hints, name = decode_furl(furl)
|
||||||
|
if not location_hints:
|
||||||
|
# If there are no location hints then nothing can ever possibly
|
||||||
|
# connect to it and the only thing that can happen next is something
|
||||||
|
# will hang or time out. So just give up right now.
|
||||||
|
raise ValueError(
|
||||||
|
"Introducer ({!r}) fURL has no location hints!".format(
|
||||||
|
furl,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return furl
|
||||||
|
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
@log_call(
|
||||||
|
action_type=u"integration:introducer",
|
||||||
|
include_args=["temp_dir", "flog_gatherer"],
|
||||||
|
include_result=False,
|
||||||
|
)
|
||||||
|
def create_introducer(reactor, request, temp_dir, flog_gatherer, port):
|
||||||
|
"""
|
||||||
|
Run a new Introducer and return an Introducer instance.
|
||||||
|
"""
|
||||||
|
config = (
|
||||||
|
'[node]\n'
|
||||||
|
'nickname = introducer{port}\n'
|
||||||
|
'web.port = {port}\n'
|
||||||
|
'log_gatherer.furl = {log_furl}\n'
|
||||||
|
).format(
|
||||||
|
port=port,
|
||||||
|
log_furl=flog_gatherer.furl,
|
||||||
|
)
|
||||||
|
|
||||||
|
intro_dir = join(temp_dir, 'introducer{}'.format(port))
|
||||||
|
|
||||||
|
if not exists(intro_dir):
|
||||||
|
mkdir(intro_dir)
|
||||||
|
done_proto = _ProcessExitedProtocol()
|
||||||
|
_tahoe_runner_optional_coverage(
|
||||||
|
done_proto,
|
||||||
|
reactor,
|
||||||
|
request,
|
||||||
|
(
|
||||||
|
'create-introducer',
|
||||||
|
'--listen=tcp',
|
||||||
|
'--hostname=localhost',
|
||||||
|
intro_dir,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
yield done_proto.done
|
||||||
|
|
||||||
|
# over-write the config file with our stuff
|
||||||
|
with open(join(intro_dir, 'tahoe.cfg'), 'w') as f:
|
||||||
|
f.write(config)
|
||||||
|
|
||||||
|
# on windows, "tahoe start" means: run forever in the foreground,
|
||||||
|
# but on linux it means daemonize. "tahoe run" is consistent
|
||||||
|
# between platforms.
|
||||||
|
protocol = _MagicTextProtocol('introducer running')
|
||||||
|
transport = _tahoe_runner_optional_coverage(
|
||||||
|
protocol,
|
||||||
|
reactor,
|
||||||
|
request,
|
||||||
|
(
|
||||||
|
'run',
|
||||||
|
intro_dir,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def clean():
|
||||||
|
return _cleanup_tahoe_process(transport, protocol.exited)
|
||||||
|
request.addfinalizer(clean)
|
||||||
|
|
||||||
|
yield protocol.magic_seen
|
||||||
|
|
||||||
|
furl_fname = join(intro_dir, 'private', 'introducer.furl')
|
||||||
|
while not exists(furl_fname):
|
||||||
|
print("Don't see {} yet".format(furl_fname))
|
||||||
|
yield deferLater(reactor, .1, lambda: None)
|
||||||
|
furl = _validate_furl(furl_fname)
|
||||||
|
|
||||||
|
returnValue(
|
||||||
|
Introducer(
|
||||||
|
process=TahoeProcess(transport, intro_dir),
|
||||||
|
protocol=protocol,
|
||||||
|
furl=furl,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class Grid(object):
|
||||||
|
"""
|
||||||
|
Represents an entire Tahoe Grid setup
|
||||||
|
|
||||||
|
A Grid includes an Introducer, Flog Gatherer and some number of
|
||||||
|
Storage Servers.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_reactor = attr.ib()
|
||||||
|
_request = attr.ib()
|
||||||
|
_temp_dir = attr.ib()
|
||||||
|
_port_allocator = attr.ib()
|
||||||
|
introducer = attr.ib()
|
||||||
|
flog_gatherer = attr.ib()
|
||||||
|
storage_servers = attr.ib(factory=list)
|
||||||
|
clients = attr.ib(factory=dict)
|
||||||
|
|
||||||
|
@storage_servers.validator
|
||||||
|
def check(self, attribute, value):
|
||||||
|
for server in value:
|
||||||
|
if not isinstance(server, StorageServer):
|
||||||
|
raise ValueError(
|
||||||
|
"storage_servers must be StorageServer"
|
||||||
|
)
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def add_storage_node(self):
|
||||||
|
"""
|
||||||
|
Creates a new storage node, returns a StorageServer instance
|
||||||
|
(which will already be added to our .storage_servers list)
|
||||||
|
"""
|
||||||
|
port = yield self._port_allocator()
|
||||||
|
print("make {}".format(port))
|
||||||
|
name = 'node{}'.format(port)
|
||||||
|
web_port = 'tcp:{}:interface=localhost'.format(port)
|
||||||
|
server = yield create_storage_server(
|
||||||
|
self._reactor,
|
||||||
|
self._request,
|
||||||
|
self._temp_dir,
|
||||||
|
self.introducer,
|
||||||
|
self.flog_gatherer,
|
||||||
|
name,
|
||||||
|
web_port,
|
||||||
|
)
|
||||||
|
self.storage_servers.append(server)
|
||||||
|
returnValue(server)
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def add_client(self, name, needed=2, happy=3, total=4):
|
||||||
|
"""
|
||||||
|
Create a new client node
|
||||||
|
"""
|
||||||
|
port = yield self._port_allocator()
|
||||||
|
web_port = 'tcp:{}:interface=localhost'.format(port)
|
||||||
|
client = yield create_client(
|
||||||
|
self._reactor,
|
||||||
|
self._request,
|
||||||
|
self._temp_dir,
|
||||||
|
self.introducer,
|
||||||
|
self.flog_gatherer,
|
||||||
|
name,
|
||||||
|
web_port,
|
||||||
|
needed=needed,
|
||||||
|
happy=happy,
|
||||||
|
total=total,
|
||||||
|
)
|
||||||
|
self.clients[name] = client
|
||||||
|
yield await_client_ready(client.process)
|
||||||
|
returnValue(client)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# XXX THINK can we tie a whole *grid* to a single request? (I think
|
||||||
|
# that's all that makes sense)
|
||||||
|
@inlineCallbacks
|
||||||
|
def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
intro_port = yield port_allocator()
|
||||||
|
introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port)
|
||||||
|
grid = Grid(
|
||||||
|
reactor,
|
||||||
|
request,
|
||||||
|
temp_dir,
|
||||||
|
port_allocator,
|
||||||
|
introducer,
|
||||||
|
flog_gatherer,
|
||||||
|
)
|
||||||
|
returnValue(grid)
|
||||||
|
|
||||||
|
|
||||||
|
def create_port_allocator(start_port):
|
||||||
|
"""
|
||||||
|
Returns a new port-allocator .. which is a zero-argument function
|
||||||
|
that returns Deferreds that fire with new, sequential ports
|
||||||
|
starting at `start_port` skipping any that already appear to have
|
||||||
|
a listener.
|
||||||
|
|
||||||
|
There can still be a race against other processes allocating ports
|
||||||
|
-- between the time when we check the status of the port and when
|
||||||
|
our subprocess starts up. This *could* be mitigated by instructing
|
||||||
|
the OS to not randomly-allocate ports in some range, and then
|
||||||
|
using that range here (explicitly, ourselves).
|
||||||
|
|
||||||
|
NB once we're Python3-only this could be an async-generator
|
||||||
|
"""
|
||||||
|
port = [start_port - 1]
|
||||||
|
|
||||||
|
# import stays here to not interfere with reactor selection -- but
|
||||||
|
# maybe this function should be arranged to be called once from a
|
||||||
|
# fixture (with the reactor)?
|
||||||
|
from twisted.internet import reactor
|
||||||
|
|
||||||
|
class NothingProtocol(Protocol):
|
||||||
|
"""
|
||||||
|
I do nothing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def port_generator():
|
||||||
|
print("Checking port {}".format(port))
|
||||||
|
port[0] += 1
|
||||||
|
ep = TCP4ServerEndpoint(reactor, port[0], interface="localhost")
|
||||||
|
d = ep.listen(Factory.forProtocol(NothingProtocol))
|
||||||
|
|
||||||
|
def good(listening_port):
|
||||||
|
unlisten_d = maybeDeferred(listening_port.stopListening)
|
||||||
|
def return_port(_):
|
||||||
|
return port[0]
|
||||||
|
unlisten_d.addBoth(return_port)
|
||||||
|
return unlisten_d
|
||||||
|
|
||||||
|
def try_again(fail):
|
||||||
|
return port_generator()
|
||||||
|
|
||||||
|
d.addCallbacks(good, try_again)
|
||||||
|
return d
|
||||||
|
return port_generator
|
271
integration/test_grid_manager.py
Normal file
271
integration/test_grid_manager.py
Normal file
@ -0,0 +1,271 @@
|
|||||||
|
import sys
|
||||||
|
import json
|
||||||
|
from os.path import join
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives.serialization import (
|
||||||
|
Encoding,
|
||||||
|
PublicFormat,
|
||||||
|
)
|
||||||
|
|
||||||
|
from twisted.internet.utils import (
|
||||||
|
getProcessOutputAndValue,
|
||||||
|
)
|
||||||
|
from twisted.internet.defer import (
|
||||||
|
inlineCallbacks,
|
||||||
|
returnValue,
|
||||||
|
)
|
||||||
|
|
||||||
|
from allmydata.crypto import ed25519
|
||||||
|
from allmydata.util import base32
|
||||||
|
from allmydata.util import configutil
|
||||||
|
|
||||||
|
from . import util
|
||||||
|
from .grid import (
|
||||||
|
create_grid,
|
||||||
|
)
|
||||||
|
|
||||||
|
import pytest_twisted
|
||||||
|
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def _run_gm(reactor, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Run the grid-manager process, passing all arguments as extra CLI
|
||||||
|
args.
|
||||||
|
|
||||||
|
:returns: all process output
|
||||||
|
"""
|
||||||
|
output, errput, exit_code = yield getProcessOutputAndValue(
|
||||||
|
sys.executable,
|
||||||
|
("-m", "allmydata.cli.grid_manager") + args,
|
||||||
|
reactor=reactor,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
if exit_code != 0:
|
||||||
|
raise util.ProcessFailed(
|
||||||
|
RuntimeError("Exit code {}".format(exit_code)),
|
||||||
|
output + errput,
|
||||||
|
)
|
||||||
|
returnValue(output)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_twisted.inlineCallbacks
|
||||||
|
def test_create_certificate(reactor, request):
|
||||||
|
"""
|
||||||
|
The Grid Manager produces a valid, correctly-signed certificate.
|
||||||
|
"""
|
||||||
|
gm_config = yield _run_gm(reactor, "--config", "-", "create")
|
||||||
|
privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
|
||||||
|
privkey, pubkey = ed25519.signing_keypair_from_string(privkey_bytes)
|
||||||
|
|
||||||
|
# Note that zara + her key here are arbitrary and don't match any
|
||||||
|
# "actual" clients in the test-grid; we're just checking that the
|
||||||
|
# Grid Manager signs this properly.
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "add",
|
||||||
|
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
|
||||||
|
stdinBytes=gm_config,
|
||||||
|
)
|
||||||
|
zara_cert_bytes = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "sign", "zara", "1",
|
||||||
|
stdinBytes=gm_config,
|
||||||
|
)
|
||||||
|
zara_cert = json.loads(zara_cert_bytes)
|
||||||
|
|
||||||
|
# confirm that zara's certificate is made by the Grid Manager
|
||||||
|
# (.verify returns None on success, raises exception on error)
|
||||||
|
pubkey.verify(
|
||||||
|
base32.a2b(zara_cert['signature'].encode('ascii')),
|
||||||
|
zara_cert['certificate'].encode('ascii'),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_twisted.inlineCallbacks
|
||||||
|
def test_remove_client(reactor, request):
|
||||||
|
"""
|
||||||
|
A Grid Manager can add and successfully remove a client
|
||||||
|
"""
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "create",
|
||||||
|
)
|
||||||
|
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "add",
|
||||||
|
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
|
||||||
|
stdinBytes=gm_config,
|
||||||
|
)
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "add",
|
||||||
|
"yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq",
|
||||||
|
stdinBytes=gm_config,
|
||||||
|
)
|
||||||
|
assert "zara" in json.loads(gm_config)['storage_servers']
|
||||||
|
assert "yakov" in json.loads(gm_config)['storage_servers']
|
||||||
|
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "remove",
|
||||||
|
"zara",
|
||||||
|
stdinBytes=gm_config,
|
||||||
|
)
|
||||||
|
assert "zara" not in json.loads(gm_config)['storage_servers']
|
||||||
|
assert "yakov" in json.loads(gm_config)['storage_servers']
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_twisted.inlineCallbacks
|
||||||
|
def test_remove_last_client(reactor, request):
|
||||||
|
"""
|
||||||
|
A Grid Manager can remove all clients
|
||||||
|
"""
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "create",
|
||||||
|
)
|
||||||
|
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "add",
|
||||||
|
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
|
||||||
|
stdinBytes=gm_config,
|
||||||
|
)
|
||||||
|
assert "zara" in json.loads(gm_config)['storage_servers']
|
||||||
|
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "remove",
|
||||||
|
"zara",
|
||||||
|
stdinBytes=gm_config,
|
||||||
|
)
|
||||||
|
# there are no storage servers left at all now
|
||||||
|
assert "storage_servers" not in json.loads(gm_config)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_twisted.inlineCallbacks
|
||||||
|
def test_add_remove_client_file(reactor, request, temp_dir):
|
||||||
|
"""
|
||||||
|
A Grid Manager can add and successfully remove a client (when
|
||||||
|
keeping data on disk)
|
||||||
|
"""
|
||||||
|
gmconfig = join(temp_dir, "gmtest")
|
||||||
|
gmconfig_file = join(temp_dir, "gmtest", "config.json")
|
||||||
|
yield _run_gm(
|
||||||
|
reactor, "--config", gmconfig, "create",
|
||||||
|
)
|
||||||
|
|
||||||
|
yield _run_gm(
|
||||||
|
reactor, "--config", gmconfig, "add",
|
||||||
|
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
|
||||||
|
)
|
||||||
|
yield _run_gm(
|
||||||
|
reactor, "--config", gmconfig, "add",
|
||||||
|
"yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq",
|
||||||
|
)
|
||||||
|
assert "zara" in json.load(open(gmconfig_file, "r"))['storage_servers']
|
||||||
|
assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers']
|
||||||
|
|
||||||
|
yield _run_gm(
|
||||||
|
reactor, "--config", gmconfig, "remove",
|
||||||
|
"zara",
|
||||||
|
)
|
||||||
|
assert "zara" not in json.load(open(gmconfig_file, "r"))['storage_servers']
|
||||||
|
assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers']
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_twisted.inlineCallbacks
|
||||||
|
def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
|
||||||
|
"""
|
||||||
|
A client with happines=2 fails to upload to a Grid when it is
|
||||||
|
using Grid Manager and there is only 1 storage server with a valid
|
||||||
|
certificate.
|
||||||
|
"""
|
||||||
|
grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
|
||||||
|
storage0 = yield grid.add_storage_node()
|
||||||
|
_ = yield grid.add_storage_node()
|
||||||
|
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "create",
|
||||||
|
)
|
||||||
|
gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
|
||||||
|
gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes)
|
||||||
|
|
||||||
|
# create certificate for the first storage-server
|
||||||
|
pubkey_fname = join(storage0.process.node_dir, "node.pubkey")
|
||||||
|
with open(pubkey_fname, 'r') as f:
|
||||||
|
pubkey_str = f.read().strip()
|
||||||
|
|
||||||
|
gm_config = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "add",
|
||||||
|
"storage0", pubkey_str,
|
||||||
|
stdinBytes=gm_config,
|
||||||
|
)
|
||||||
|
assert json.loads(gm_config)['storage_servers'].keys() == ['storage0']
|
||||||
|
|
||||||
|
print("inserting certificate")
|
||||||
|
cert = yield _run_gm(
|
||||||
|
reactor, "--config", "-", "sign", "storage0", "1",
|
||||||
|
stdinBytes=gm_config,
|
||||||
|
)
|
||||||
|
print(cert)
|
||||||
|
|
||||||
|
yield util.run_tahoe(
|
||||||
|
reactor, request, "--node-directory", storage0.process.node_dir,
|
||||||
|
"admin", "add-grid-manager-cert",
|
||||||
|
"--name", "default",
|
||||||
|
"--filename", "-",
|
||||||
|
stdin=cert,
|
||||||
|
)
|
||||||
|
|
||||||
|
# re-start this storage server
|
||||||
|
yield storage0.restart(reactor, request)
|
||||||
|
|
||||||
|
# now only one storage-server has the certificate .. configure
|
||||||
|
# diana to have the grid-manager certificate
|
||||||
|
|
||||||
|
diana = yield grid.add_client("diana", needed=2, happy=2, total=2)
|
||||||
|
|
||||||
|
config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg"))
|
||||||
|
config.add_section("grid_managers")
|
||||||
|
config.set("grid_managers", "test", ed25519.string_from_verifying_key(gm_pubkey))
|
||||||
|
with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f:
|
||||||
|
config.write(f)
|
||||||
|
|
||||||
|
yield diana.restart(reactor, request, servers=2)
|
||||||
|
|
||||||
|
# try to put something into the grid, which should fail (because
|
||||||
|
# diana has happy=2 but should only find storage0 to be acceptable
|
||||||
|
# to upload to)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield util.run_tahoe(
|
||||||
|
reactor, request, "--node-directory", diana.process.node_dir,
|
||||||
|
"put", "-",
|
||||||
|
stdin="some content\n" * 200,
|
||||||
|
)
|
||||||
|
assert False, "Should get a failure"
|
||||||
|
except util.ProcessFailed as e:
|
||||||
|
assert 'UploadUnhappinessError' in e.output
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_twisted.inlineCallbacks
|
||||||
|
def test_identity(reactor, request, temp_dir):
|
||||||
|
"""
|
||||||
|
Dump public key to CLI
|
||||||
|
"""
|
||||||
|
gm_config = join(temp_dir, "test_identity")
|
||||||
|
yield _run_gm(
|
||||||
|
reactor, "--config", gm_config, "create",
|
||||||
|
)
|
||||||
|
|
||||||
|
# ask the CLI for the grid-manager pubkey
|
||||||
|
pubkey = yield _run_gm(
|
||||||
|
reactor, "--config", gm_config, "public-identity",
|
||||||
|
)
|
||||||
|
alleged_pubkey = ed25519.verifying_key_from_string(pubkey.strip())
|
||||||
|
|
||||||
|
# load the grid-manager pubkey "ourselves"
|
||||||
|
with open(join(gm_config, "config.json"), "r") as f:
|
||||||
|
real_config = json.load(f)
|
||||||
|
real_privkey, real_pubkey = ed25519.signing_keypair_from_string(
|
||||||
|
real_config["private_key"].encode("ascii"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# confirm the CLI told us the correct thing
|
||||||
|
alleged_bytes = alleged_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw)
|
||||||
|
real_bytes = real_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw)
|
||||||
|
assert alleged_bytes == real_bytes, "Keys don't match"
|
@ -50,8 +50,8 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto
|
|||||||
try:
|
try:
|
||||||
yield proto.done
|
yield proto.done
|
||||||
assert False, "should raise exception"
|
assert False, "should raise exception"
|
||||||
except Exception as e:
|
except util.ProcessFailed as e:
|
||||||
assert isinstance(e, ProcessTerminated)
|
assert "UploadUnhappinessError" in e.output
|
||||||
|
|
||||||
output = proto.output.getvalue()
|
output = proto.output.getvalue()
|
||||||
assert b"shares could be placed on only" in output
|
assert b"shares could be placed on only" in output
|
||||||
|
@ -91,26 +91,28 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
|
|||||||
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl):
|
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl):
|
||||||
node_dir = FilePath(temp_dir).child(name)
|
node_dir = FilePath(temp_dir).child(name)
|
||||||
web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
|
web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
|
||||||
|
if node_dir.exists():
|
||||||
if True:
|
raise RuntimeError(
|
||||||
print("creating", node_dir.path)
|
"A node already exists in '{}'".format(node_dir)
|
||||||
node_dir.makedirs()
|
|
||||||
proto = util._DumpOutputProtocol(None)
|
|
||||||
reactor.spawnProcess(
|
|
||||||
proto,
|
|
||||||
sys.executable,
|
|
||||||
(
|
|
||||||
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
|
|
||||||
'create-node',
|
|
||||||
'--nickname', name,
|
|
||||||
'--introducer', introducer_furl,
|
|
||||||
'--hide-ip',
|
|
||||||
'--tor-control-port', 'tcp:localhost:{}'.format(control_port),
|
|
||||||
'--listen', 'tor',
|
|
||||||
node_dir.path,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
yield proto.done
|
print("creating", node_dir)
|
||||||
|
node_dir.makedirs()
|
||||||
|
proto = util._DumpOutputProtocol(None)
|
||||||
|
reactor.spawnProcess(
|
||||||
|
proto,
|
||||||
|
sys.executable,
|
||||||
|
(
|
||||||
|
sys.executable, '-m', 'allmydata.scripts.runner',
|
||||||
|
'create-node',
|
||||||
|
'--nickname', name,
|
||||||
|
'--introducer', introducer_furl,
|
||||||
|
'--hide-ip',
|
||||||
|
'--tor-control-port', 'tcp:localhost:{}'.format(control_port),
|
||||||
|
'--listen', 'tor',
|
||||||
|
node_dir.path,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
yield proto.done
|
||||||
|
|
||||||
|
|
||||||
# Which services should this client connect to?
|
# Which services should this client connect to?
|
||||||
|
@ -102,7 +102,7 @@ def test_helper_status(storage_nodes):
|
|||||||
successfully GET the /helper_status page
|
successfully GET the /helper_status page
|
||||||
"""
|
"""
|
||||||
|
|
||||||
url = util.node_url(storage_nodes[0].node_dir, "helper_status")
|
url = util.node_url(storage_nodes[0].process.node_dir, "helper_status")
|
||||||
resp = requests.get(url)
|
resp = requests.get(url)
|
||||||
assert resp.status_code >= 200 and resp.status_code < 300
|
assert resp.status_code >= 200 and resp.status_code < 300
|
||||||
dom = BeautifulSoup(resp.content, "html5lib")
|
dom = BeautifulSoup(resp.content, "html5lib")
|
||||||
@ -424,7 +424,7 @@ def test_storage_info(storage_nodes):
|
|||||||
storage0 = storage_nodes[0]
|
storage0 = storage_nodes[0]
|
||||||
|
|
||||||
requests.get(
|
requests.get(
|
||||||
util.node_url(storage0.node_dir, u"storage"),
|
util.node_url(storage0.process.node_dir, u"storage"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -435,7 +435,7 @@ def test_storage_info_json(storage_nodes):
|
|||||||
storage0 = storage_nodes[0]
|
storage0 = storage_nodes[0]
|
||||||
|
|
||||||
resp = requests.get(
|
resp = requests.get(
|
||||||
util.node_url(storage0.node_dir, u"storage"),
|
util.node_url(storage0.process.node_dir, u"storage"),
|
||||||
params={u"t": u"json"},
|
params={u"t": u"json"},
|
||||||
)
|
)
|
||||||
data = json.loads(resp.content)
|
data = json.loads(resp.content)
|
||||||
@ -447,12 +447,12 @@ def test_introducer_info(introducer):
|
|||||||
retrieve and confirm /introducer URI for the introducer
|
retrieve and confirm /introducer URI for the introducer
|
||||||
"""
|
"""
|
||||||
resp = requests.get(
|
resp = requests.get(
|
||||||
util.node_url(introducer.node_dir, u""),
|
util.node_url(introducer.process.node_dir, u""),
|
||||||
)
|
)
|
||||||
assert b"Introducer" in resp.content
|
assert b"Introducer" in resp.content
|
||||||
|
|
||||||
resp = requests.get(
|
resp = requests.get(
|
||||||
util.node_url(introducer.node_dir, u""),
|
util.node_url(introducer.process.node_dir, u""),
|
||||||
params={u"t": u"json"},
|
params={u"t": u"json"},
|
||||||
)
|
)
|
||||||
data = json.loads(resp.content)
|
data = json.loads(resp.content)
|
||||||
|
@ -61,16 +61,40 @@ class _ProcessExitedProtocol(ProcessProtocol):
|
|||||||
self.done.callback(None)
|
self.done.callback(None)
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessFailed(Exception):
|
||||||
|
"""
|
||||||
|
A subprocess has failed.
|
||||||
|
|
||||||
|
:ivar ProcessTerminated reason: the original reason from .processExited
|
||||||
|
|
||||||
|
:ivar StringIO output: all stdout and stderr collected to this point.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, reason, output):
|
||||||
|
self.reason = reason
|
||||||
|
self.output = output
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "<ProcessFailed: {}>:\n{}".format(self.reason, self.output)
|
||||||
|
|
||||||
|
|
||||||
class _CollectOutputProtocol(ProcessProtocol):
|
class _CollectOutputProtocol(ProcessProtocol):
|
||||||
"""
|
"""
|
||||||
Internal helper. Collects all output (stdout + stderr) into
|
Internal helper. Collects all output (stdout + stderr) into
|
||||||
self.output, and callback's on done with all of it after the
|
self.output, and callback's on done with all of it after the
|
||||||
process exits (for any reason).
|
process exits (for any reason).
|
||||||
"""
|
"""
|
||||||
def __init__(self, capture_stderr=True):
|
|
||||||
|
def __init__(self, capture_stderr=True, stdin=None):
|
||||||
self.done = Deferred()
|
self.done = Deferred()
|
||||||
self.output = BytesIO()
|
self.output = BytesIO()
|
||||||
self.capture_stderr = capture_stderr
|
self.capture_stderr = capture_stderr
|
||||||
|
self._stdin = stdin
|
||||||
|
|
||||||
|
def connectionMade(self):
|
||||||
|
if self._stdin is not None:
|
||||||
|
self.transport.write(self._stdin)
|
||||||
|
self.transport.closeStdin()
|
||||||
|
|
||||||
def processEnded(self, reason):
|
def processEnded(self, reason):
|
||||||
if not self.done.called:
|
if not self.done.called:
|
||||||
@ -78,7 +102,7 @@ class _CollectOutputProtocol(ProcessProtocol):
|
|||||||
|
|
||||||
def processExited(self, reason):
|
def processExited(self, reason):
|
||||||
if not isinstance(reason.value, ProcessDone):
|
if not isinstance(reason.value, ProcessDone):
|
||||||
self.done.errback(reason)
|
self.done.errback(ProcessFailed(reason, self.output.getvalue()))
|
||||||
|
|
||||||
def outReceived(self, data):
|
def outReceived(self, data):
|
||||||
self.output.write(data)
|
self.output.write(data)
|
||||||
@ -156,13 +180,27 @@ def _cleanup_tahoe_process(tahoe_transport, exited):
|
|||||||
try:
|
try:
|
||||||
print("signaling {} with TERM".format(tahoe_transport.pid))
|
print("signaling {} with TERM".format(tahoe_transport.pid))
|
||||||
tahoe_transport.signalProcess('TERM')
|
tahoe_transport.signalProcess('TERM')
|
||||||
print("signaled, blocking on exit")
|
print("signaled, blocking on exit {}".format(exited))
|
||||||
block_with_timeout(exited, reactor)
|
block_with_timeout(exited, reactor)
|
||||||
print("exited, goodbye")
|
print("exited, goodbye")
|
||||||
except ProcessExitedAlready:
|
except ProcessExitedAlready:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def run_tahoe(reactor, request, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Helper to run tahoe with optional coverage.
|
||||||
|
|
||||||
|
:returns: a Deferred that fires when the command is done (or a
|
||||||
|
ProcessFailed exception if it exits non-zero)
|
||||||
|
"""
|
||||||
|
stdin = kwargs.get("stdin", None)
|
||||||
|
protocol = _CollectOutputProtocol(stdin=stdin)
|
||||||
|
process = _tahoe_runner_optional_coverage(protocol, reactor, request, args)
|
||||||
|
process.exited = protocol.done
|
||||||
|
return protocol.done
|
||||||
|
|
||||||
|
|
||||||
def _tahoe_runner_optional_coverage(proto, reactor, request, other_args):
|
def _tahoe_runner_optional_coverage(proto, reactor, request, other_args):
|
||||||
"""
|
"""
|
||||||
Internal helper. Calls spawnProcess with `-m
|
Internal helper. Calls spawnProcess with `-m
|
||||||
@ -269,7 +307,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
|
|||||||
if exists(node_dir):
|
if exists(node_dir):
|
||||||
created_d = succeed(None)
|
created_d = succeed(None)
|
||||||
else:
|
else:
|
||||||
print("creating", node_dir)
|
print("creating: {}".format(node_dir))
|
||||||
mkdir(node_dir)
|
mkdir(node_dir)
|
||||||
done_proto = _ProcessExitedProtocol()
|
done_proto = _ProcessExitedProtocol()
|
||||||
args = [
|
args = [
|
||||||
@ -508,7 +546,7 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if len(js['servers']) < minimum_number_of_servers:
|
if len(js['servers']) < minimum_number_of_servers:
|
||||||
print("waiting because insufficient servers")
|
print(f"waiting because insufficient servers (expected at least {minimum_number_of_servers})")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
continue
|
continue
|
||||||
server_times = [
|
server_times = [
|
||||||
|
0
newsfragments/3508.minor
Normal file
0
newsfragments/3508.minor
Normal file
Loading…
Reference in New Issue
Block a user