diff --git a/integration/conftest.py b/integration/conftest.py index e284b5cba..b5a60aff3 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -13,16 +13,11 @@ if PY2: import sys import shutil from time import sleep -from os import mkdir, listdir, environ +from os import mkdir, environ from os.path import join, exists -from tempfile import mkdtemp, mktemp -from functools import partial +from tempfile import mkdtemp from json import loads -from foolscap.furl import ( - decode_furl, -) - from eliot import ( to_file, log_call, @@ -44,15 +39,18 @@ from .util import ( _DumpOutputProtocol, _ProcessExitedProtocol, _create_node, - _cleanup_tahoe_process, _tahoe_runner_optional_coverage, await_client_ready, - TahoeProcess, cli, _run_node, generate_ssh_key, block_with_timeout, ) +from .grid import ( + create_port_allocator, + create_flog_gatherer, + create_grid, +) # pytest customization hooks @@ -89,6 +87,12 @@ def reactor(): return _reactor +@pytest.fixture(scope='session') +@log_call(action_type=u"integration:port_allocator", include_result=False) +def port_allocator(reactor): + return create_port_allocator(start_port=45000) + + @pytest.fixture(scope='session') @log_call(action_type=u"integration:temp_dir", include_args=[]) def temp_dir(request): @@ -123,136 +127,30 @@ def flog_binary(): @pytest.fixture(scope='session') @log_call(action_type=u"integration:flog_gatherer", include_args=[]) def flog_gatherer(reactor, temp_dir, flog_binary, request): - out_protocol = _CollectOutputProtocol() - gather_dir = join(temp_dir, 'flog_gather') - reactor.spawnProcess( - out_protocol, - flog_binary, - ( - 'flogtool', 'create-gatherer', - '--location', 'tcp:localhost:3117', - '--port', '3117', - gather_dir, - ) + fg = pytest_twisted.blockon( + create_flog_gatherer(reactor, request, temp_dir, flog_binary) ) - pytest_twisted.blockon(out_protocol.done) - - twistd_protocol = _MagicTextProtocol("Gatherer waiting at") - twistd_process = reactor.spawnProcess( - twistd_protocol, - which('twistd')[0], - ( - 'twistd', '--nodaemon', '--python', - join(gather_dir, 'gatherer.tac'), - ), - path=gather_dir, - ) - pytest_twisted.blockon(twistd_protocol.magic_seen) - - def cleanup(): - _cleanup_tahoe_process(twistd_process, twistd_protocol.exited) - - flog_file = mktemp('.flog_dump') - flog_protocol = _DumpOutputProtocol(open(flog_file, 'w')) - flog_dir = join(temp_dir, 'flog_gather') - flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')] - - print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file)) - reactor.spawnProcess( - flog_protocol, - flog_binary, - ( - 'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0]) - ), - ) - print("Waiting for flogtool to complete") - try: - block_with_timeout(flog_protocol.done, reactor) - except ProcessTerminated as e: - print("flogtool exited unexpectedly: {}".format(str(e))) - print("Flogtool completed") - - request.addfinalizer(cleanup) - - with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f: - furl = f.read().strip() - return furl + return fg @pytest.fixture(scope='session') -@log_call( - action_type=u"integration:introducer", - include_args=["temp_dir", "flog_gatherer"], - include_result=False, -) -def introducer(reactor, temp_dir, flog_gatherer, request): - config = ''' -[node] -nickname = introducer0 -web.port = 4560 -log_gatherer.furl = {log_furl} -'''.format(log_furl=flog_gatherer) - - intro_dir = join(temp_dir, 'introducer') - print("making introducer", intro_dir) - - if not exists(intro_dir): - mkdir(intro_dir) - done_proto = _ProcessExitedProtocol() - _tahoe_runner_optional_coverage( - done_proto, - reactor, - request, - ( - 'create-introducer', - '--listen=tcp', - '--hostname=localhost', - intro_dir, - ), - ) - pytest_twisted.blockon(done_proto.done) - - # over-write the config file with our stuff - with open(join(intro_dir, 'tahoe.cfg'), 'w') as f: - f.write(config) - - # "tahoe run" is consistent across Linux/macOS/Windows, unlike the old - # "start" command. - protocol = _MagicTextProtocol('introducer running') - transport = _tahoe_runner_optional_coverage( - protocol, - reactor, - request, - ( - 'run', - intro_dir, - ), +@log_call(action_type=u"integration:grid", include_args=[]) +def grid(reactor, request, temp_dir, flog_gatherer, port_allocator): + g = pytest_twisted.blockon( + create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) ) - request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited)) + return g - pytest_twisted.blockon(protocol.magic_seen) - return TahoeProcess(transport, intro_dir) + +@pytest.fixture(scope='session') +def introducer(grid): + return grid.introducer @pytest.fixture(scope='session') @log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"]) def introducer_furl(introducer, temp_dir): - furl_fname = join(temp_dir, 'introducer', 'private', 'introducer.furl') - while not exists(furl_fname): - print("Don't see {} yet".format(furl_fname)) - sleep(.1) - furl = open(furl_fname, 'r').read() - tubID, location_hints, name = decode_furl(furl) - if not location_hints: - # If there are no location hints then nothing can ever possibly - # connect to it and the only thing that can happen next is something - # will hang or time out. So just give up right now. - raise ValueError( - "Introducer ({!r}) fURL has no location hints!".format( - introducer_furl, - ), - ) - return furl + return introducer.furl @pytest.fixture(scope='session') @@ -330,28 +228,20 @@ def tor_introducer_furl(tor_introducer, temp_dir): @pytest.fixture(scope='session') @log_call( action_type=u"integration:storage_nodes", - include_args=["temp_dir", "introducer_furl", "flog_gatherer"], + include_args=["grid"], include_result=False, ) -def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request): +def storage_nodes(grid): nodes_d = [] # start all 5 nodes in parallel for x in range(5): - name = 'node{}'.format(x) - web_port= 9990 + x - nodes_d.append( - _create_node( - reactor, request, temp_dir, introducer_furl, flog_gatherer, name, - web_port="tcp:{}:interface=localhost".format(web_port), - storage=True, - ) - ) + #nodes_d.append(grid.add_storage_node()) + pytest_twisted.blockon(grid.add_storage_node()) + nodes_status = pytest_twisted.blockon(DeferredList(nodes_d)) - nodes = [] - for ok, process in nodes_status: - assert ok, "Storage node creation failed: {}".format(process) - nodes.append(process) - return nodes + for ok, value in nodes_status: + assert ok, "Storage node creation failed: {}".format(value) + return grid.storage_servers @pytest.fixture(scope="session") def alice_sftp_client_key_path(temp_dir): diff --git a/integration/grid.py b/integration/grid.py new file mode 100644 index 000000000..3cb16c929 --- /dev/null +++ b/integration/grid.py @@ -0,0 +1,507 @@ +""" +Classes which directly represent various kinds of Tahoe processes +that co-operate to for "a Grid". + +These methods and objects are used by conftest.py fixtures but may +also be used as direct helpers for tests that don't want to (or can't) +rely on 'the' global grid as provided by fixtures like 'alice' or +'storage_servers'. +""" + +from os import mkdir, listdir +from os.path import join, exists +from tempfile import mktemp +from time import sleep + +from eliot import ( + log_call, +) + +from foolscap.furl import ( + decode_furl, +) + +from twisted.python.procutils import which +from twisted.internet.defer import ( + inlineCallbacks, + returnValue, + maybeDeferred, +) +from twisted.internet.task import ( + deferLater, +) +from twisted.internet.interfaces import ( + IProcessTransport, + IProcessProtocol, +) +from twisted.internet.endpoints import ( + TCP4ServerEndpoint, +) +from twisted.internet.protocol import ( + Factory, + Protocol, +) +from twisted.internet.error import ProcessTerminated + +from .util import ( + _CollectOutputProtocol, + _MagicTextProtocol, + _DumpOutputProtocol, + _ProcessExitedProtocol, + _run_node, + _cleanup_tahoe_process, + _tahoe_runner_optional_coverage, + TahoeProcess, + await_client_ready, +) + +import attr +import pytest_twisted + + +# further directions: +# - "Grid" is unused, basically -- tie into the rest? +# - could make a Grid instance mandatory for create_* calls +# - could instead make create_* calls methods of Grid +# - Bring more 'util' or 'conftest' code into here +# - stop()/start()/restart() methods on StorageServer etc +# - more-complex stuff like config changes (which imply a restart too)? + + +@attr.s +class FlogGatherer(object): + """ + Flog Gatherer process. + """ + + process = attr.ib( + validator=attr.validators.provides(IProcessTransport) + ) + protocol = attr.ib( + validator=attr.validators.provides(IProcessProtocol) + ) + furl = attr.ib() + + +@inlineCallbacks +def create_flog_gatherer(reactor, request, temp_dir, flog_binary): + out_protocol = _CollectOutputProtocol() + gather_dir = join(temp_dir, 'flog_gather') + reactor.spawnProcess( + out_protocol, + flog_binary, + ( + 'flogtool', 'create-gatherer', + '--location', 'tcp:localhost:3117', + '--port', '3117', + gather_dir, + ) + ) + yield out_protocol.done + + twistd_protocol = _MagicTextProtocol("Gatherer waiting at") + twistd_process = reactor.spawnProcess( + twistd_protocol, + which('twistd')[0], + ( + 'twistd', '--nodaemon', '--python', + join(gather_dir, 'gatherer.tac'), + ), + path=gather_dir, + ) + yield twistd_protocol.magic_seen + + def cleanup(): + _cleanup_tahoe_process(twistd_process, twistd_protocol.exited) + + flog_file = mktemp('.flog_dump') + flog_protocol = _DumpOutputProtocol(open(flog_file, 'w')) + flog_dir = join(temp_dir, 'flog_gather') + flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')] + + print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file)) + reactor.spawnProcess( + flog_protocol, + flog_binary, + ( + 'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0]) + ), + ) + print("Waiting for flogtool to complete") + try: + pytest_twisted.blockon(flog_protocol.done) + except ProcessTerminated as e: + print("flogtool exited unexpectedly: {}".format(str(e))) + print("Flogtool completed") + + request.addfinalizer(cleanup) + + with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f: + furl = f.read().strip() + returnValue( + FlogGatherer( + protocol=twistd_protocol, + process=twistd_process, + furl=furl, + ) + ) + + +@attr.s +class StorageServer(object): + """ + Represents a Tahoe Storage Server + """ + + process = attr.ib( + validator=attr.validators.instance_of(TahoeProcess) + ) + protocol = attr.ib( + validator=attr.validators.provides(IProcessProtocol) + ) + + @inlineCallbacks + def restart(self, reactor, request): + """ + re-start our underlying process by issuing a TERM, waiting and + then running again. await_client_ready() will be done as well + + Note that self.process and self.protocol will be new instances + after this. + """ + self.process.transport.signalProcess('TERM') + yield self.protocol.exited + self.process = yield _run_node( + reactor, self.process.node_dir, request, None, + ) + self.protocol = self.process.transport._protocol + + +@inlineCallbacks +def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port, + needed=2, happy=3, total=4): + """ + Create a new storage server + """ + from .util import _create_node + node_process = yield _create_node( + reactor, request, temp_dir, introducer.furl, flog_gatherer, + name, web_port, storage=True, needed=needed, happy=happy, total=total, + ) + storage = StorageServer( + process=node_process, + protocol=node_process.transport._protocol, + ) + returnValue(storage) + + +@attr.s +class Client(object): + """ + Represents a Tahoe client + """ + + process = attr.ib( + validator=attr.validators.instance_of(TahoeProcess) + ) + protocol = attr.ib( + validator=attr.validators.provides(IProcessProtocol) + ) + + @inlineCallbacks + def restart(self, reactor, request, servers=1): + """ + re-start our underlying process by issuing a TERM, waiting and + then running again. + + :param int servers: number of server connections we will wait + for before being 'ready' + + Note that self.process and self.protocol will be new instances + after this. + """ + self.process.transport.signalProcess('TERM') + yield self.protocol.exited + process = yield _run_node( + reactor, self.process.node_dir, request, None, + ) + self.process = process + self.protocol = self.process.transport._protocol + + + # XXX add stop / start / restart + # ...maybe "reconfig" of some kind? + + +@inlineCallbacks +def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port, + needed=2, happy=3, total=4): + """ + Create a new storage server + """ + from .util import _create_node + node_process = yield _create_node( + reactor, request, temp_dir, introducer.furl, flog_gatherer, + name, web_port, storage=False, needed=needed, happy=happy, total=total, + ) + returnValue( + Client( + process=node_process, + protocol=node_process.transport._protocol, + ) + ) + + +@attr.s +class Introducer(object): + """ + Reprsents a running introducer + """ + + process = attr.ib( + validator=attr.validators.instance_of(TahoeProcess) + ) + protocol = attr.ib( + validator=attr.validators.provides(IProcessProtocol) + ) + furl = attr.ib() + + +def _validate_furl(furl_fname): + """ + Opens and validates a fURL, ensuring location hints. + :returns: the furl + :raises: ValueError if no location hints + """ + while not exists(furl_fname): + print("Don't see {} yet".format(furl_fname)) + sleep(.1) + furl = open(furl_fname, 'r').read() + tubID, location_hints, name = decode_furl(furl) + if not location_hints: + # If there are no location hints then nothing can ever possibly + # connect to it and the only thing that can happen next is something + # will hang or time out. So just give up right now. + raise ValueError( + "Introducer ({!r}) fURL has no location hints!".format( + furl, + ), + ) + return furl + + +@inlineCallbacks +@log_call( + action_type=u"integration:introducer", + include_args=["temp_dir", "flog_gatherer"], + include_result=False, +) +def create_introducer(reactor, request, temp_dir, flog_gatherer, port): + """ + Run a new Introducer and return an Introducer instance. + """ + config = ( + '[node]\n' + 'nickname = introducer{port}\n' + 'web.port = {port}\n' + 'log_gatherer.furl = {log_furl}\n' + ).format( + port=port, + log_furl=flog_gatherer.furl, + ) + + intro_dir = join(temp_dir, 'introducer{}'.format(port)) + + if not exists(intro_dir): + mkdir(intro_dir) + done_proto = _ProcessExitedProtocol() + _tahoe_runner_optional_coverage( + done_proto, + reactor, + request, + ( + 'create-introducer', + '--listen=tcp', + '--hostname=localhost', + intro_dir, + ), + ) + yield done_proto.done + + # over-write the config file with our stuff + with open(join(intro_dir, 'tahoe.cfg'), 'w') as f: + f.write(config) + + # on windows, "tahoe start" means: run forever in the foreground, + # but on linux it means daemonize. "tahoe run" is consistent + # between platforms. + protocol = _MagicTextProtocol('introducer running') + transport = _tahoe_runner_optional_coverage( + protocol, + reactor, + request, + ( + 'run', + intro_dir, + ), + ) + + def clean(): + return _cleanup_tahoe_process(transport, protocol.exited) + request.addfinalizer(clean) + + yield protocol.magic_seen + + furl_fname = join(intro_dir, 'private', 'introducer.furl') + while not exists(furl_fname): + print("Don't see {} yet".format(furl_fname)) + yield deferLater(reactor, .1, lambda: None) + furl = _validate_furl(furl_fname) + + returnValue( + Introducer( + process=TahoeProcess(transport, intro_dir), + protocol=protocol, + furl=furl, + ) + ) + + +@attr.s +class Grid(object): + """ + Represents an entire Tahoe Grid setup + + A Grid includes an Introducer, Flog Gatherer and some number of + Storage Servers. + """ + + _reactor = attr.ib() + _request = attr.ib() + _temp_dir = attr.ib() + _port_allocator = attr.ib() + introducer = attr.ib() + flog_gatherer = attr.ib() + storage_servers = attr.ib(factory=list) + clients = attr.ib(factory=dict) + + @storage_servers.validator + def check(self, attribute, value): + for server in value: + if not isinstance(server, StorageServer): + raise ValueError( + "storage_servers must be StorageServer" + ) + + @inlineCallbacks + def add_storage_node(self): + """ + Creates a new storage node, returns a StorageServer instance + (which will already be added to our .storage_servers list) + """ + port = yield self._port_allocator() + print("make {}".format(port)) + name = 'node{}'.format(port) + web_port = 'tcp:{}:interface=localhost'.format(port) + server = yield create_storage_server( + self._reactor, + self._request, + self._temp_dir, + self.introducer, + self.flog_gatherer, + name, + web_port, + ) + self.storage_servers.append(server) + returnValue(server) + + @inlineCallbacks + def add_client(self, name, needed=2, happy=3, total=4): + """ + Create a new client node + """ + port = yield self._port_allocator() + web_port = 'tcp:{}:interface=localhost'.format(port) + client = yield create_client( + self._reactor, + self._request, + self._temp_dir, + self.introducer, + self.flog_gatherer, + name, + web_port, + needed=needed, + happy=happy, + total=total, + ) + self.clients[name] = client + yield await_client_ready(client.process) + returnValue(client) + + + +# XXX THINK can we tie a whole *grid* to a single request? (I think +# that's all that makes sense) +@inlineCallbacks +def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator): + """ + """ + intro_port = yield port_allocator() + introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port) + grid = Grid( + reactor, + request, + temp_dir, + port_allocator, + introducer, + flog_gatherer, + ) + returnValue(grid) + + +def create_port_allocator(start_port): + """ + Returns a new port-allocator .. which is a zero-argument function + that returns Deferreds that fire with new, sequential ports + starting at `start_port` skipping any that already appear to have + a listener. + + There can still be a race against other processes allocating ports + -- between the time when we check the status of the port and when + our subprocess starts up. This *could* be mitigated by instructing + the OS to not randomly-allocate ports in some range, and then + using that range here (explicitly, ourselves). + + NB once we're Python3-only this could be an async-generator + """ + port = [start_port - 1] + + # import stays here to not interfere with reactor selection -- but + # maybe this function should be arranged to be called once from a + # fixture (with the reactor)? + from twisted.internet import reactor + + class NothingProtocol(Protocol): + """ + I do nothing. + """ + + def port_generator(): + print("Checking port {}".format(port)) + port[0] += 1 + ep = TCP4ServerEndpoint(reactor, port[0], interface="localhost") + d = ep.listen(Factory.forProtocol(NothingProtocol)) + + def good(listening_port): + unlisten_d = maybeDeferred(listening_port.stopListening) + def return_port(_): + return port[0] + unlisten_d.addBoth(return_port) + return unlisten_d + + def try_again(fail): + return port_generator() + + d.addCallbacks(good, try_again) + return d + return port_generator diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py new file mode 100644 index 000000000..c01773b96 --- /dev/null +++ b/integration/test_grid_manager.py @@ -0,0 +1,271 @@ +import sys +import json +from os.path import join + +from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, +) + +from twisted.internet.utils import ( + getProcessOutputAndValue, +) +from twisted.internet.defer import ( + inlineCallbacks, + returnValue, +) + +from allmydata.crypto import ed25519 +from allmydata.util import base32 +from allmydata.util import configutil + +from . import util +from .grid import ( + create_grid, +) + +import pytest_twisted + + +@inlineCallbacks +def _run_gm(reactor, *args, **kwargs): + """ + Run the grid-manager process, passing all arguments as extra CLI + args. + + :returns: all process output + """ + output, errput, exit_code = yield getProcessOutputAndValue( + sys.executable, + ("-m", "allmydata.cli.grid_manager") + args, + reactor=reactor, + **kwargs + ) + if exit_code != 0: + raise util.ProcessFailed( + RuntimeError("Exit code {}".format(exit_code)), + output + errput, + ) + returnValue(output) + + +@pytest_twisted.inlineCallbacks +def test_create_certificate(reactor, request): + """ + The Grid Manager produces a valid, correctly-signed certificate. + """ + gm_config = yield _run_gm(reactor, "--config", "-", "create") + privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') + privkey, pubkey = ed25519.signing_keypair_from_string(privkey_bytes) + + # Note that zara + her key here are arbitrary and don't match any + # "actual" clients in the test-grid; we're just checking that the + # Grid Manager signs this properly. + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + stdinBytes=gm_config, + ) + zara_cert_bytes = yield _run_gm( + reactor, "--config", "-", "sign", "zara", "1", + stdinBytes=gm_config, + ) + zara_cert = json.loads(zara_cert_bytes) + + # confirm that zara's certificate is made by the Grid Manager + # (.verify returns None on success, raises exception on error) + pubkey.verify( + base32.a2b(zara_cert['signature'].encode('ascii')), + zara_cert['certificate'].encode('ascii'), + ) + + +@pytest_twisted.inlineCallbacks +def test_remove_client(reactor, request): + """ + A Grid Manager can add and successfully remove a client + """ + gm_config = yield _run_gm( + reactor, "--config", "-", "create", + ) + + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + stdinBytes=gm_config, + ) + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", + stdinBytes=gm_config, + ) + assert "zara" in json.loads(gm_config)['storage_servers'] + assert "yakov" in json.loads(gm_config)['storage_servers'] + + gm_config = yield _run_gm( + reactor, "--config", "-", "remove", + "zara", + stdinBytes=gm_config, + ) + assert "zara" not in json.loads(gm_config)['storage_servers'] + assert "yakov" in json.loads(gm_config)['storage_servers'] + + +@pytest_twisted.inlineCallbacks +def test_remove_last_client(reactor, request): + """ + A Grid Manager can remove all clients + """ + gm_config = yield _run_gm( + reactor, "--config", "-", "create", + ) + + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + stdinBytes=gm_config, + ) + assert "zara" in json.loads(gm_config)['storage_servers'] + + gm_config = yield _run_gm( + reactor, "--config", "-", "remove", + "zara", + stdinBytes=gm_config, + ) + # there are no storage servers left at all now + assert "storage_servers" not in json.loads(gm_config) + + +@pytest_twisted.inlineCallbacks +def test_add_remove_client_file(reactor, request, temp_dir): + """ + A Grid Manager can add and successfully remove a client (when + keeping data on disk) + """ + gmconfig = join(temp_dir, "gmtest") + gmconfig_file = join(temp_dir, "gmtest", "config.json") + yield _run_gm( + reactor, "--config", gmconfig, "create", + ) + + yield _run_gm( + reactor, "--config", gmconfig, "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + ) + yield _run_gm( + reactor, "--config", gmconfig, "add", + "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", + ) + assert "zara" in json.load(open(gmconfig_file, "r"))['storage_servers'] + assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers'] + + yield _run_gm( + reactor, "--config", gmconfig, "remove", + "zara", + ) + assert "zara" not in json.load(open(gmconfig_file, "r"))['storage_servers'] + assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers'] + + +@pytest_twisted.inlineCallbacks +def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): + """ + A client with happines=2 fails to upload to a Grid when it is + using Grid Manager and there is only 1 storage server with a valid + certificate. + """ + grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) + storage0 = yield grid.add_storage_node() + _ = yield grid.add_storage_node() + + gm_config = yield _run_gm( + reactor, "--config", "-", "create", + ) + gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') + gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes) + + # create certificate for the first storage-server + pubkey_fname = join(storage0.process.node_dir, "node.pubkey") + with open(pubkey_fname, 'r') as f: + pubkey_str = f.read().strip() + + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "storage0", pubkey_str, + stdinBytes=gm_config, + ) + assert json.loads(gm_config)['storage_servers'].keys() == ['storage0'] + + print("inserting certificate") + cert = yield _run_gm( + reactor, "--config", "-", "sign", "storage0", "1", + stdinBytes=gm_config, + ) + print(cert) + + yield util.run_tahoe( + reactor, request, "--node-directory", storage0.process.node_dir, + "admin", "add-grid-manager-cert", + "--name", "default", + "--filename", "-", + stdin=cert, + ) + + # re-start this storage server + yield storage0.restart(reactor, request) + + # now only one storage-server has the certificate .. configure + # diana to have the grid-manager certificate + + diana = yield grid.add_client("diana", needed=2, happy=2, total=2) + + config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg")) + config.add_section("grid_managers") + config.set("grid_managers", "test", ed25519.string_from_verifying_key(gm_pubkey)) + with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f: + config.write(f) + + yield diana.restart(reactor, request, servers=2) + + # try to put something into the grid, which should fail (because + # diana has happy=2 but should only find storage0 to be acceptable + # to upload to) + + try: + yield util.run_tahoe( + reactor, request, "--node-directory", diana.process.node_dir, + "put", "-", + stdin="some content\n" * 200, + ) + assert False, "Should get a failure" + except util.ProcessFailed as e: + assert 'UploadUnhappinessError' in e.output + + +@pytest_twisted.inlineCallbacks +def test_identity(reactor, request, temp_dir): + """ + Dump public key to CLI + """ + gm_config = join(temp_dir, "test_identity") + yield _run_gm( + reactor, "--config", gm_config, "create", + ) + + # ask the CLI for the grid-manager pubkey + pubkey = yield _run_gm( + reactor, "--config", gm_config, "public-identity", + ) + alleged_pubkey = ed25519.verifying_key_from_string(pubkey.strip()) + + # load the grid-manager pubkey "ourselves" + with open(join(gm_config, "config.json"), "r") as f: + real_config = json.load(f) + real_privkey, real_pubkey = ed25519.signing_keypair_from_string( + real_config["private_key"].encode("ascii"), + ) + + # confirm the CLI told us the correct thing + alleged_bytes = alleged_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) + real_bytes = real_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) + assert alleged_bytes == real_bytes, "Keys don't match" diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index b9de0c075..3376b91d0 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -50,8 +50,8 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto try: yield proto.done assert False, "should raise exception" - except Exception as e: - assert isinstance(e, ProcessTerminated) + except util.ProcessFailed as e: + assert "UploadUnhappinessError" in e.output output = proto.output.getvalue() assert b"shares could be placed on only" in output diff --git a/integration/test_tor.py b/integration/test_tor.py index c78fa8098..495679a51 100644 --- a/integration/test_tor.py +++ b/integration/test_tor.py @@ -91,26 +91,28 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl): node_dir = FilePath(temp_dir).child(name) web_port = "tcp:{}:interface=localhost".format(control_port + 2000) - - if True: - print("creating", node_dir.path) - node_dir.makedirs() - proto = util._DumpOutputProtocol(None) - reactor.spawnProcess( - proto, - sys.executable, - ( - sys.executable, '-b', '-m', 'allmydata.scripts.runner', - 'create-node', - '--nickname', name, - '--introducer', introducer_furl, - '--hide-ip', - '--tor-control-port', 'tcp:localhost:{}'.format(control_port), - '--listen', 'tor', - node_dir.path, - ) + if node_dir.exists(): + raise RuntimeError( + "A node already exists in '{}'".format(node_dir) ) - yield proto.done + print("creating", node_dir) + node_dir.makedirs() + proto = util._DumpOutputProtocol(None) + reactor.spawnProcess( + proto, + sys.executable, + ( + sys.executable, '-m', 'allmydata.scripts.runner', + 'create-node', + '--nickname', name, + '--introducer', introducer_furl, + '--hide-ip', + '--tor-control-port', 'tcp:localhost:{}'.format(control_port), + '--listen', 'tor', + node_dir.path, + ) + ) + yield proto.done # Which services should this client connect to? diff --git a/integration/test_web.py b/integration/test_web.py index 22f08da82..74ca0bf4d 100644 --- a/integration/test_web.py +++ b/integration/test_web.py @@ -102,7 +102,7 @@ def test_helper_status(storage_nodes): successfully GET the /helper_status page """ - url = util.node_url(storage_nodes[0].node_dir, "helper_status") + url = util.node_url(storage_nodes[0].process.node_dir, "helper_status") resp = requests.get(url) assert resp.status_code >= 200 and resp.status_code < 300 dom = BeautifulSoup(resp.content, "html5lib") @@ -424,7 +424,7 @@ def test_storage_info(storage_nodes): storage0 = storage_nodes[0] requests.get( - util.node_url(storage0.node_dir, u"storage"), + util.node_url(storage0.process.node_dir, u"storage"), ) @@ -435,7 +435,7 @@ def test_storage_info_json(storage_nodes): storage0 = storage_nodes[0] resp = requests.get( - util.node_url(storage0.node_dir, u"storage"), + util.node_url(storage0.process.node_dir, u"storage"), params={u"t": u"json"}, ) data = json.loads(resp.content) @@ -447,12 +447,12 @@ def test_introducer_info(introducer): retrieve and confirm /introducer URI for the introducer """ resp = requests.get( - util.node_url(introducer.node_dir, u""), + util.node_url(introducer.process.node_dir, u""), ) assert b"Introducer" in resp.content resp = requests.get( - util.node_url(introducer.node_dir, u""), + util.node_url(introducer.process.node_dir, u""), params={u"t": u"json"}, ) data = json.loads(resp.content) diff --git a/integration/util.py b/integration/util.py index ad9249e45..f84a6aed4 100644 --- a/integration/util.py +++ b/integration/util.py @@ -61,16 +61,40 @@ class _ProcessExitedProtocol(ProcessProtocol): self.done.callback(None) +class ProcessFailed(Exception): + """ + A subprocess has failed. + + :ivar ProcessTerminated reason: the original reason from .processExited + + :ivar StringIO output: all stdout and stderr collected to this point. + """ + + def __init__(self, reason, output): + self.reason = reason + self.output = output + + def __str__(self): + return ":\n{}".format(self.reason, self.output) + + class _CollectOutputProtocol(ProcessProtocol): """ Internal helper. Collects all output (stdout + stderr) into self.output, and callback's on done with all of it after the process exits (for any reason). """ - def __init__(self, capture_stderr=True): + + def __init__(self, capture_stderr=True, stdin=None): self.done = Deferred() self.output = BytesIO() self.capture_stderr = capture_stderr + self._stdin = stdin + + def connectionMade(self): + if self._stdin is not None: + self.transport.write(self._stdin) + self.transport.closeStdin() def processEnded(self, reason): if not self.done.called: @@ -78,7 +102,7 @@ class _CollectOutputProtocol(ProcessProtocol): def processExited(self, reason): if not isinstance(reason.value, ProcessDone): - self.done.errback(reason) + self.done.errback(ProcessFailed(reason, self.output.getvalue())) def outReceived(self, data): self.output.write(data) @@ -156,13 +180,27 @@ def _cleanup_tahoe_process(tahoe_transport, exited): try: print("signaling {} with TERM".format(tahoe_transport.pid)) tahoe_transport.signalProcess('TERM') - print("signaled, blocking on exit") + print("signaled, blocking on exit {}".format(exited)) block_with_timeout(exited, reactor) print("exited, goodbye") except ProcessExitedAlready: pass +def run_tahoe(reactor, request, *args, **kwargs): + """ + Helper to run tahoe with optional coverage. + + :returns: a Deferred that fires when the command is done (or a + ProcessFailed exception if it exits non-zero) + """ + stdin = kwargs.get("stdin", None) + protocol = _CollectOutputProtocol(stdin=stdin) + process = _tahoe_runner_optional_coverage(protocol, reactor, request, args) + process.exited = protocol.done + return protocol.done + + def _tahoe_runner_optional_coverage(proto, reactor, request, other_args): """ Internal helper. Calls spawnProcess with `-m @@ -269,7 +307,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam if exists(node_dir): created_d = succeed(None) else: - print("creating", node_dir) + print("creating: {}".format(node_dir)) mkdir(node_dir) done_proto = _ProcessExitedProtocol() args = [ @@ -508,7 +546,7 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve continue if len(js['servers']) < minimum_number_of_servers: - print("waiting because insufficient servers") + print(f"waiting because insufficient servers (expected at least {minimum_number_of_servers})") time.sleep(1) continue server_times = [ diff --git a/newsfragments/3508.minor b/newsfragments/3508.minor new file mode 100644 index 000000000..e69de29bb