From 5a88dfd5753ba4243613566f02a878bc435d4e34 Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 14 Nov 2020 01:56:03 -0700 Subject: [PATCH 01/50] refactor integration tests and add some for grid-manager --- integration/conftest.py | 172 ++------ integration/grid.py | 507 +++++++++++++++++++++++ integration/test_servers_of_happiness.py | 7 +- integration/test_tor.py | 39 +- integration/test_web.py | 10 +- integration/util.py | 54 ++- 6 files changed, 614 insertions(+), 175 deletions(-) create mode 100644 integration/grid.py diff --git a/integration/conftest.py b/integration/conftest.py index ca18230cd..15450767b 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -8,10 +8,6 @@ from os.path import join, exists from tempfile import mkdtemp, mktemp from functools import partial -from foolscap.furl import ( - decode_furl, -) - from eliot import ( to_file, log_call, @@ -38,6 +34,11 @@ from util import ( await_client_ready, TahoeProcess, ) +from grid import ( + create_port_allocator, + create_flog_gatherer, + create_grid, +) # pytest customization hooks @@ -74,6 +75,12 @@ def reactor(): return _reactor +@pytest.fixture(scope='session') +@log_call(action_type=u"integration:port_allocator", include_result=False) +def port_allocator(reactor): + return create_port_allocator(start_port=45000) + + @pytest.fixture(scope='session') @log_call(action_type=u"integration:temp_dir", include_args=[]) def temp_dir(request): @@ -108,137 +115,30 @@ def flog_binary(): @pytest.fixture(scope='session') @log_call(action_type=u"integration:flog_gatherer", include_args=[]) def flog_gatherer(reactor, temp_dir, flog_binary, request): - out_protocol = _CollectOutputProtocol() - gather_dir = join(temp_dir, 'flog_gather') - reactor.spawnProcess( - out_protocol, - flog_binary, - ( - 'flogtool', 'create-gatherer', - '--location', 'tcp:localhost:3117', - '--port', '3117', - gather_dir, - ) + fg = pytest_twisted.blockon( + create_flog_gatherer(reactor, request, temp_dir, flog_binary) ) - pytest_twisted.blockon(out_protocol.done) - - twistd_protocol = _MagicTextProtocol("Gatherer waiting at") - twistd_process = reactor.spawnProcess( - twistd_protocol, - which('twistd')[0], - ( - 'twistd', '--nodaemon', '--python', - join(gather_dir, 'gatherer.tac'), - ), - path=gather_dir, - ) - pytest_twisted.blockon(twistd_protocol.magic_seen) - - def cleanup(): - _cleanup_tahoe_process(twistd_process, twistd_protocol.exited) - - flog_file = mktemp('.flog_dump') - flog_protocol = _DumpOutputProtocol(open(flog_file, 'w')) - flog_dir = join(temp_dir, 'flog_gather') - flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')] - - print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file)) - reactor.spawnProcess( - flog_protocol, - flog_binary, - ( - 'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0]) - ), - ) - print("Waiting for flogtool to complete") - try: - pytest_twisted.blockon(flog_protocol.done) - except ProcessTerminated as e: - print("flogtool exited unexpectedly: {}".format(str(e))) - print("Flogtool completed") - - request.addfinalizer(cleanup) - - with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f: - furl = f.read().strip() - return furl + return fg @pytest.fixture(scope='session') -@log_call( - action_type=u"integration:introducer", - include_args=["temp_dir", "flog_gatherer"], - include_result=False, -) -def introducer(reactor, temp_dir, flog_gatherer, request): - config = ''' -[node] -nickname = introducer0 -web.port = 4560 -log_gatherer.furl = {log_furl} -'''.format(log_furl=flog_gatherer) - - intro_dir = join(temp_dir, 'introducer') - print("making introducer", intro_dir) - - if not exists(intro_dir): - mkdir(intro_dir) - done_proto = _ProcessExitedProtocol() - _tahoe_runner_optional_coverage( - done_proto, - reactor, - request, - ( - 'create-introducer', - '--listen=tcp', - '--hostname=localhost', - intro_dir, - ), - ) - pytest_twisted.blockon(done_proto.done) - - # over-write the config file with our stuff - with open(join(intro_dir, 'tahoe.cfg'), 'w') as f: - f.write(config) - - # on windows, "tahoe start" means: run forever in the foreground, - # but on linux it means daemonize. "tahoe run" is consistent - # between platforms. - protocol = _MagicTextProtocol('introducer running') - transport = _tahoe_runner_optional_coverage( - protocol, - reactor, - request, - ( - 'run', - intro_dir, - ), +@log_call(action_type=u"integration:grid", include_args=[]) +def grid(reactor, request, temp_dir, flog_gatherer, port_allocator): + g = pytest_twisted.blockon( + create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) ) - request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited)) + return g - pytest_twisted.blockon(protocol.magic_seen) - return TahoeProcess(transport, intro_dir) + +@pytest.fixture(scope='session') +def introducer(grid): + return grid.introducer @pytest.fixture(scope='session') @log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"]) def introducer_furl(introducer, temp_dir): - furl_fname = join(temp_dir, 'introducer', 'private', 'introducer.furl') - while not exists(furl_fname): - print("Don't see {} yet".format(furl_fname)) - sleep(.1) - furl = open(furl_fname, 'r').read() - tubID, location_hints, name = decode_furl(furl) - if not location_hints: - # If there are no location hints then nothing can ever possibly - # connect to it and the only thing that can happen next is something - # will hang or time out. So just give up right now. - raise ValueError( - "Introducer ({!r}) fURL has no location hints!".format( - introducer_furl, - ), - ) - return furl + return introducer.furl @pytest.fixture(scope='session') @@ -317,28 +217,20 @@ def tor_introducer_furl(tor_introducer, temp_dir): @pytest.fixture(scope='session') @log_call( action_type=u"integration:storage_nodes", - include_args=["temp_dir", "introducer_furl", "flog_gatherer"], + include_args=["grid"], include_result=False, ) -def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request): +def storage_nodes(grid): nodes_d = [] # start all 5 nodes in parallel for x in range(5): - name = 'node{}'.format(x) - web_port= 9990 + x - nodes_d.append( - _create_node( - reactor, request, temp_dir, introducer_furl, flog_gatherer, name, - web_port="tcp:{}:interface=localhost".format(web_port), - storage=True, - ) - ) + #nodes_d.append(grid.add_storage_node()) + pytest_twisted.blockon(grid.add_storage_node()) + nodes_status = pytest_twisted.blockon(DeferredList(nodes_d)) - nodes = [] - for ok, process in nodes_status: - assert ok, "Storage node creation failed: {}".format(process) - nodes.append(process) - return nodes + for ok, value in nodes_status: + assert ok, "Storage node creation failed: {}".format(value) + return grid.storage_servers @pytest.fixture(scope='session') diff --git a/integration/grid.py b/integration/grid.py new file mode 100644 index 000000000..5c3086eea --- /dev/null +++ b/integration/grid.py @@ -0,0 +1,507 @@ +""" +Classes which directly represent various kinds of Tahoe processes +that co-operate to for "a Grid". + +These methods and objects are used by conftest.py fixtures but may +also be used as direct helpers for tests that don't want to (or can't) +rely on 'the' global grid as provided by fixtures like 'alice' or +'storage_servers'. +""" + +from os import mkdir, listdir, environ +from os.path import join, exists +from tempfile import mkdtemp, mktemp + +from eliot import ( + log_call, +) + +from foolscap.furl import ( + decode_furl, +) + +from twisted.python.procutils import which +from twisted.internet.defer import ( + inlineCallbacks, + returnValue, + maybeDeferred, +) +from twisted.internet.task import ( + deferLater, +) +from twisted.internet.interfaces import ( + IProcessTransport, + IProcessProtocol, + IProtocol, +) +from twisted.internet.endpoints import ( + TCP4ServerEndpoint, +) +from twisted.internet.protocol import ( + Factory, + Protocol, +) + +from util import ( + _CollectOutputProtocol, + _MagicTextProtocol, + _DumpOutputProtocol, + _ProcessExitedProtocol, + _create_node, + _run_node, + _cleanup_tahoe_process, + _tahoe_runner_optional_coverage, + TahoeProcess, + await_client_ready, +) + +import attr +import pytest_twisted + + +# further directions: +# - "Grid" is unused, basically -- tie into the rest? +# - could make a Grid instance mandatory for create_* calls +# - could instead make create_* calls methods of Grid +# - Bring more 'util' or 'conftest' code into here +# - stop()/start()/restart() methods on StorageServer etc +# - more-complex stuff like config changes (which imply a restart too)? + + +@attr.s +class FlogGatherer(object): + """ + Flog Gatherer process. + """ + + process = attr.ib( + validator=attr.validators.provides(IProcessTransport) + ) + protocol = attr.ib( + validator=attr.validators.provides(IProcessProtocol) + ) + furl = attr.ib() + + +@inlineCallbacks +def create_flog_gatherer(reactor, request, temp_dir, flog_binary): + out_protocol = _CollectOutputProtocol() + gather_dir = join(temp_dir, 'flog_gather') + reactor.spawnProcess( + out_protocol, + flog_binary, + ( + 'flogtool', 'create-gatherer', + '--location', 'tcp:localhost:3117', + '--port', '3117', + gather_dir, + ) + ) + yield out_protocol.done + + twistd_protocol = _MagicTextProtocol("Gatherer waiting at") + twistd_process = reactor.spawnProcess( + twistd_protocol, + which('twistd')[0], + ( + 'twistd', '--nodaemon', '--python', + join(gather_dir, 'gatherer.tac'), + ), + path=gather_dir, + ) + yield twistd_protocol.magic_seen + + def cleanup(): + _cleanup_tahoe_process(twistd_process, twistd_protocol.exited) + + flog_file = mktemp('.flog_dump') + flog_protocol = _DumpOutputProtocol(open(flog_file, 'w')) + flog_dir = join(temp_dir, 'flog_gather') + flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')] + + print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file)) + reactor.spawnProcess( + flog_protocol, + flog_binary, + ( + 'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0]) + ), + ) + print("Waiting for flogtool to complete") + try: + pytest_twisted.blockon(flog_protocol.done) + except ProcessTerminated as e: + print("flogtool exited unexpectedly: {}".format(str(e))) + print("Flogtool completed") + + request.addfinalizer(cleanup) + + with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f: + furl = f.read().strip() + returnValue( + FlogGatherer( + protocol=twistd_protocol, + process=twistd_process, + furl=furl, + ) + ) + + +@attr.s +class StorageServer(object): + """ + Represents a Tahoe Storage Server + """ + + process = attr.ib( + validator=attr.validators.instance_of(TahoeProcess) + ) + protocol = attr.ib( + validator=attr.validators.provides(IProcessProtocol) + ) + + @inlineCallbacks + def restart(self, reactor, request): + """ + re-start our underlying process by issuing a TERM, waiting and + then running again. await_client_ready() will be done as well + + Note that self.process and self.protocol will be new instances + after this. + """ + self.process.transport.signalProcess('TERM') + yield self.protocol.exited + self.process = yield _run_node( + reactor, self.process.node_dir, request, None, + ) + self.protocol = self.process.transport._protocol + + +@inlineCallbacks +def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port, + needed=2, happy=3, total=4): + """ + Create a new storage server + """ + from util import _create_node + node_process = yield _create_node( + reactor, request, temp_dir, introducer.furl, flog_gatherer, + name, web_port, storage=True, needed=needed, happy=happy, total=total, + ) + storage = StorageServer( + process=node_process, + protocol=node_process.transport._protocol, + ) + returnValue(storage) + + +@attr.s +class Client(object): + """ + Represents a Tahoe client + """ + + process = attr.ib( + validator=attr.validators.instance_of(TahoeProcess) + ) + protocol = attr.ib( + validator=attr.validators.provides(IProcessProtocol) + ) + + @inlineCallbacks + def restart(self, reactor, request, servers=1): + """ + re-start our underlying process by issuing a TERM, waiting and + then running again. + + :param int servers: number of server connections we will wait + for before being 'ready' + + Note that self.process and self.protocol will be new instances + after this. + """ + self.process.transport.signalProcess('TERM') + yield self.protocol.exited + process = yield _run_node( + reactor, self.process.node_dir, request, None, + ) + self.process = process + self.protocol = self.process.transport._protocol + + + # XXX add stop / start / restart + # ...maybe "reconfig" of some kind? + + +@inlineCallbacks +def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port, + needed=2, happy=3, total=4): + """ + Create a new storage server + """ + from util import _create_node + node_process = yield _create_node( + reactor, request, temp_dir, introducer.furl, flog_gatherer, + name, web_port, storage=False, needed=needed, happy=happy, total=total, + ) + returnValue( + Client( + process=node_process, + protocol=node_process.transport._protocol, + ) + ) + + +@attr.s +class Introducer(object): + """ + Reprsents a running introducer + """ + + process = attr.ib( + validator=attr.validators.instance_of(TahoeProcess) + ) + protocol = attr.ib( + validator=attr.validators.provides(IProcessProtocol) + ) + furl = attr.ib() + + +def _validate_furl(furl_fname): + """ + Opens and validates a fURL, ensuring location hints. + :returns: the furl + :raises: ValueError if no location hints + """ + while not exists(furl_fname): + print("Don't see {} yet".format(furl_fname)) + sleep(.1) + furl = open(furl_fname, 'r').read() + tubID, location_hints, name = decode_furl(furl) + if not location_hints: + # If there are no location hints then nothing can ever possibly + # connect to it and the only thing that can happen next is something + # will hang or time out. So just give up right now. + raise ValueError( + "Introducer ({!r}) fURL has no location hints!".format( + introducer_furl, + ), + ) + return furl + + +@inlineCallbacks +@log_call( + action_type=u"integration:introducer", + include_args=["temp_dir", "flog_gatherer"], + include_result=False, +) +def create_introducer(reactor, request, temp_dir, flog_gatherer, port): + """ + Run a new Introducer and return an Introducer instance. + """ + config = ( + '[node]\n' + 'nickname = introducer{port}\n' + 'web.port = {port}\n' + 'log_gatherer.furl = {log_furl}\n' + ).format( + port=port, + log_furl=flog_gatherer.furl, + ) + + intro_dir = join(temp_dir, 'introducer{}'.format(port)) + + if not exists(intro_dir): + mkdir(intro_dir) + done_proto = _ProcessExitedProtocol() + _tahoe_runner_optional_coverage( + done_proto, + reactor, + request, + ( + 'create-introducer', + '--listen=tcp', + '--hostname=localhost', + intro_dir, + ), + ) + yield done_proto.done + + # over-write the config file with our stuff + with open(join(intro_dir, 'tahoe.cfg'), 'w') as f: + f.write(config) + + # on windows, "tahoe start" means: run forever in the foreground, + # but on linux it means daemonize. "tahoe run" is consistent + # between platforms. + protocol = _MagicTextProtocol('introducer running') + transport = _tahoe_runner_optional_coverage( + protocol, + reactor, + request, + ( + 'run', + intro_dir, + ), + ) + + def clean(): + return _cleanup_tahoe_process(transport, protocol.exited) + request.addfinalizer(clean) + + yield protocol.magic_seen + + furl_fname = join(intro_dir, 'private', 'introducer.furl') + while not exists(furl_fname): + print("Don't see {} yet".format(furl_fname)) + yield deferLater(reactor, .1, lambda: None) + furl = _validate_furl(furl_fname) + + returnValue( + Introducer( + process=TahoeProcess(transport, intro_dir), + protocol=protocol, + furl=furl, + ) + ) + + +@attr.s +class Grid(object): + """ + Represents an entire Tahoe Grid setup + + A Grid includes an Introducer, Flog Gatherer and some number of + Storage Servers. + """ + + _reactor = attr.ib() + _request = attr.ib() + _temp_dir = attr.ib() + _port_allocator = attr.ib() + introducer = attr.ib() + flog_gatherer = attr.ib() + storage_servers = attr.ib(factory=list) + clients = attr.ib(factory=dict) + + @storage_servers.validator + def check(self, attribute, value): + for server in value: + if not isinstance(server, StorageServer): + raise ValueError( + "storage_servers must be StorageServer" + ) + + @inlineCallbacks + def add_storage_node(self): + """ + Creates a new storage node, returns a StorageServer instance + (which will already be added to our .storage_servers list) + """ + port = yield self._port_allocator() + print("make {}".format(port)) + name = 'node{}'.format(port) + web_port = 'tcp:{}:interface=localhost'.format(port) + server = yield create_storage_server( + self._reactor, + self._request, + self._temp_dir, + self.introducer, + self.flog_gatherer, + name, + web_port, + ) + self.storage_servers.append(server) + returnValue(server) + + @inlineCallbacks + def add_client(self, name, needed=2, happy=3, total=4): + """ + Create a new client node + """ + port = yield self._port_allocator() + web_port = 'tcp:{}:interface=localhost'.format(port) + client = yield create_client( + self._reactor, + self._request, + self._temp_dir, + self.introducer, + self.flog_gatherer, + name, + web_port, + needed=needed, + happy=happy, + total=total, + ) + self.clients[name] = client + yield await_client_ready(client.process) + returnValue(client) + + + +# XXX THINK can we tie a whole *grid* to a single request? (I think +# that's all that makes sense) +@inlineCallbacks +def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator): + """ + """ + intro_port = yield port_allocator() + introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port) + grid = Grid( + reactor, + request, + temp_dir, + port_allocator, + introducer, + flog_gatherer, + ) + returnValue(grid) + + +def create_port_allocator(start_port): + """ + Returns a new port-allocator .. which is a zero-argument function + that returns Deferreds that fire with new, sequential ports + starting at `start_port` skipping any that already appear to have + a listener. + + There can still be a race against other processes allocating ports + -- between the time when we check the status of the port and when + our subprocess starts up. This *could* be mitigated by instructing + the OS to not randomly-allocate ports in some range, and then + using that range here (explicitly, ourselves). + + NB once we're Python3-only this could be an async-generator + """ + port = [start_port - 1] + + # import stays here to not interfere with reactor selection -- but + # maybe this function should be arranged to be called once from a + # fixture (with the reactor)? + from twisted.internet import reactor + + class NothingProtocol(Protocol): + """ + I do nothing. + """ + + def port_generator(): + print("Checking port {}".format(port)) + port[0] += 1 + ep = TCP4ServerEndpoint(reactor, port[0], interface="localhost") + d = ep.listen(Factory.forProtocol(NothingProtocol)) + + def good(listening_port): + unlisten_d = maybeDeferred(listening_port.stopListening) + def return_port(_): + return port[0] + unlisten_d.addBoth(return_port) + return unlisten_d + + def try_again(fail): + return port_generator() + + d.addCallbacks(good, try_again) + return d + return port_generator diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index 97392bf00..fe3a466eb 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -38,8 +38,7 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto try: yield proto.done assert False, "should raise exception" - except Exception as e: - assert isinstance(e, ProcessTerminated) + except util.ProcessFailed as e: + assert "UploadUnhappinessError" in e.output - output = proto.output.getvalue() - assert "shares could be placed on only" in output + assert "shares could be placed on only" in proto.output.getvalue() diff --git a/integration/test_tor.py b/integration/test_tor.py index 3d169a88f..db38b13ea 100644 --- a/integration/test_tor.py +++ b/integration/test_tor.py @@ -69,25 +69,28 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_ node_dir = join(temp_dir, name) web_port = "tcp:{}:interface=localhost".format(control_port + 2000) - if True: - print("creating", node_dir) - mkdir(node_dir) - proto = util._DumpOutputProtocol(None) - reactor.spawnProcess( - proto, - sys.executable, - ( - sys.executable, '-m', 'allmydata.scripts.runner', - 'create-node', - '--nickname', name, - '--introducer', introducer_furl, - '--hide-ip', - '--tor-control-port', 'tcp:localhost:{}'.format(control_port), - '--listen', 'tor', - node_dir, - ) + if exists(node_dir): + raise RuntimeError( + "A node already exists in '{}'".format(node_dir) ) - yield proto.done + print("creating", node_dir) + mkdir(node_dir) + proto = util._DumpOutputProtocol(None) + reactor.spawnProcess( + proto, + sys.executable, + ( + sys.executable, '-m', 'allmydata.scripts.runner', + 'create-node', + '--nickname', name, + '--introducer', introducer_furl, + '--hide-ip', + '--tor-control-port', 'tcp:localhost:{}'.format(control_port), + '--listen', 'tor', + node_dir, + ) + ) + yield proto.done with open(join(node_dir, 'tahoe.cfg'), 'w') as f: f.write(''' diff --git a/integration/test_web.py b/integration/test_web.py index fe2137ff3..6986e74c5 100644 --- a/integration/test_web.py +++ b/integration/test_web.py @@ -91,7 +91,7 @@ def test_helper_status(storage_nodes): successfully GET the /helper_status page """ - url = util.node_url(storage_nodes[0].node_dir, "helper_status") + url = util.node_url(storage_nodes[0].process.node_dir, "helper_status") resp = requests.get(url) assert resp.status_code >= 200 and resp.status_code < 300 dom = BeautifulSoup(resp.content, "html5lib") @@ -412,7 +412,7 @@ def test_storage_info(storage_nodes): storage0 = storage_nodes[0] requests.get( - util.node_url(storage0.node_dir, u"storage"), + util.node_url(storage0.process.node_dir, u"storage"), ) @@ -423,7 +423,7 @@ def test_storage_info_json(storage_nodes): storage0 = storage_nodes[0] resp = requests.get( - util.node_url(storage0.node_dir, u"storage"), + util.node_url(storage0.process.node_dir, u"storage"), params={u"t": u"json"}, ) data = json.loads(resp.content) @@ -435,12 +435,12 @@ def test_introducer_info(introducer): retrieve and confirm /introducer URI for the introducer """ resp = requests.get( - util.node_url(introducer.node_dir, u""), + util.node_url(introducer.process.node_dir, u""), ) assert "Introducer" in resp.content resp = requests.get( - util.node_url(introducer.node_dir, u""), + util.node_url(introducer.process.node_dir, u""), params={u"t": u"json"}, ) data = json.loads(resp.content) diff --git a/integration/util.py b/integration/util.py index a64bcbf8e..54898ec4a 100644 --- a/integration/util.py +++ b/integration/util.py @@ -5,6 +5,7 @@ from os import mkdir, environ from os.path import exists, join from six.moves import StringIO from functools import partial +from shutil import rmtree from twisted.internet.defer import Deferred, succeed from twisted.internet.protocol import ProcessProtocol @@ -35,15 +36,38 @@ class _ProcessExitedProtocol(ProcessProtocol): self.done.callback(None) +class ProcessFailed(Exception): + """ + A subprocess has failed. + + :ivar ProcessTerminated reason: the original reason from .processExited + + :ivar StringIO output: all stdout and stderr collected to this point. + """ + + def __init__(self, reason, output): + self.reason = reason + self.output = output + + def __str__(self): + return ":\n{}".format(self.reason, self.output) + + class _CollectOutputProtocol(ProcessProtocol): """ Internal helper. Collects all output (stdout + stderr) into self.output, and callback's on done with all of it after the process exits (for any reason). """ - def __init__(self): + def __init__(self, stdin=None): self.done = Deferred() self.output = StringIO() + self._stdin = stdin + + def connectionMade(self): + if self._stdin is not None: + self.transport.write(self._stdin) + self.transport.closeStdin() def processEnded(self, reason): if not self.done.called: @@ -51,7 +75,7 @@ class _CollectOutputProtocol(ProcessProtocol): def processExited(self, reason): if not isinstance(reason.value, ProcessDone): - self.done.errback(reason) + self.done.errback(ProcessFailed(reason, self.output.getvalue())) def outReceived(self, data): self.output.write(data) @@ -123,13 +147,27 @@ def _cleanup_tahoe_process(tahoe_transport, exited): try: print("signaling {} with TERM".format(tahoe_transport.pid)) tahoe_transport.signalProcess('TERM') - print("signaled, blocking on exit") + print("signaled, blocking on exit {}".format(exited)) pytest_twisted.blockon(exited) print("exited, goodbye") except ProcessExitedAlready: pass +def run_tahoe(reactor, request, *args, **kwargs): + """ + Helper to run tahoe with optional coverage. + + :returns: a Deferred that fires when the command is done (or a + ProcessFailed exception if it exits non-zero) + """ + stdin = kwargs.get("stdin", None) + protocol = _CollectOutputProtocol(stdin=stdin) + process = _tahoe_runner_optional_coverage(protocol, reactor, request, args) + process.exited = protocol.done + return protocol.done + + def _tahoe_runner_optional_coverage(proto, reactor, request, other_args): """ Internal helper. Calls spawnProcess with `-m @@ -232,7 +270,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam if exists(node_dir): created_d = succeed(None) else: - print("creating", node_dir) + print("creating: {}".format(node_dir)) mkdir(node_dir) done_proto = _ProcessExitedProtocol() args = [ @@ -257,7 +295,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam def created(_): config_path = join(node_dir, 'tahoe.cfg') config = get_config(config_path) - set_config(config, 'node', 'log_gatherer.furl', flog_gatherer) + set_config(config, 'node', 'log_gatherer.furl', flog_gatherer.furl) write_config(config_path, config) created_d.addCallback(created) @@ -444,7 +482,7 @@ def web_post(tahoe, uri_fragment, **kwargs): return resp.content -def await_client_ready(tahoe, timeout=10, liveness=60*2): +def await_client_ready(tahoe, timeout=10, liveness=60*2, servers=1): """ Uses the status API to wait for a client-type node (in `tahoe`, a `TahoeProcess` instance usually from a fixture e.g. `alice`) to be @@ -468,8 +506,8 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2): time.sleep(1) continue - if len(js['servers']) == 0: - print("waiting because no servers at all") + if len(js['servers']) < servers: + print("waiting because fewer than {} server(s)".format(servers)) time.sleep(1) continue server_times = [ From 2e2128619335d7b6a87f7fac060544568582232d Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 16 Nov 2020 01:19:01 -0700 Subject: [PATCH 02/50] grid-manager tests --- integration/test_grid_manager.py | 274 +++++++++++++++++++++++++++++++ 1 file changed, 274 insertions(+) create mode 100644 integration/test_grid_manager.py diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py new file mode 100644 index 000000000..ce426c6d7 --- /dev/null +++ b/integration/test_grid_manager.py @@ -0,0 +1,274 @@ +import sys +import time +import json +import shutil +from os import mkdir, unlink, listdir, utime +from os.path import join, exists, getmtime + +from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, +) + +from twisted.internet.utils import ( + getProcessOutputAndValue, +) +from twisted.internet.defer import ( + inlineCallbacks, + returnValue, +) + +from allmydata.crypto import ed25519 +from allmydata.util import base32 +from allmydata.util import configutil + +import util +from grid import ( + create_grid, +) + +import pytest_twisted + + +@inlineCallbacks +def _run_gm(reactor, *args, **kwargs): + """ + Run the grid-manager process, passing all arguments as extra CLI + args. + + :returns: all process output + """ + output, errput, exit_code = yield getProcessOutputAndValue( + sys.executable, + ("-m", "allmydata.cli.grid_manager") + args, + reactor=reactor, + **kwargs + ) + if exit_code != 0: + raise util.ProcessFailed( + RuntimeError("Exit code {}".format(exit_code)), + output + errput, + ) + returnValue(output) + + +@pytest_twisted.inlineCallbacks +def test_create_certificate(reactor, request): + """ + The Grid Manager produces a valid, correctly-signed certificate. + """ + gm_config = yield _run_gm(reactor, "--config", "-", "create") + privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') + privkey, pubkey = ed25519.signing_keypair_from_string(privkey_bytes) + + # Note that zara + her key here are arbitrary and don't match any + # "actual" clients in the test-grid; we're just checking that the + # Grid Manager signs this properly. + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + stdinBytes=gm_config, + ) + zara_cert_bytes = yield _run_gm( + reactor, "--config", "-", "sign", "zara", "1", + stdinBytes=gm_config, + ) + zara_cert = json.loads(zara_cert_bytes) + + # confirm that zara's certificate is made by the Grid Manager + # (.verify returns None on success, raises exception on error) + pubkey.verify( + base32.a2b(zara_cert['signature'].encode('ascii')), + zara_cert['certificate'].encode('ascii'), + ) + + +@pytest_twisted.inlineCallbacks +def test_remove_client(reactor, request): + """ + A Grid Manager can add and successfully remove a client + """ + gm_config = yield _run_gm( + reactor, "--config", "-", "create", + ) + + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + stdinBytes=gm_config, + ) + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", + stdinBytes=gm_config, + ) + assert "zara" in json.loads(gm_config)['storage_servers'] + assert "yakov" in json.loads(gm_config)['storage_servers'] + + gm_config = yield _run_gm( + reactor, "--config", "-", "remove", + "zara", + stdinBytes=gm_config, + ) + assert "zara" not in json.loads(gm_config)['storage_servers'] + assert "yakov" in json.loads(gm_config)['storage_servers'] + + +@pytest_twisted.inlineCallbacks +def test_remove_last_client(reactor, request): + """ + A Grid Manager can remove all clients + """ + gm_config = yield _run_gm( + reactor, "--config", "-", "create", + ) + + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + stdinBytes=gm_config, + ) + assert "zara" in json.loads(gm_config)['storage_servers'] + + gm_config = yield _run_gm( + reactor, "--config", "-", "remove", + "zara", + stdinBytes=gm_config, + ) + # there are no storage servers left at all now + assert "storage_servers" not in json.loads(gm_config) + + +@pytest_twisted.inlineCallbacks +def test_add_remove_client_file(reactor, request, temp_dir): + """ + A Grid Manager can add and successfully remove a client (when + keeping data on disk) + """ + gmconfig = join(temp_dir, "gmtest") + gmconfig_file = join(temp_dir, "gmtest", "config.json") + yield _run_gm( + reactor, "--config", gmconfig, "create", + ) + + yield _run_gm( + reactor, "--config", gmconfig, "add", + "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", + ) + yield _run_gm( + reactor, "--config", gmconfig, "add", + "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", + ) + assert "zara" in json.load(open(gmconfig_file, "r"))['storage_servers'] + assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers'] + + yield _run_gm( + reactor, "--config", gmconfig, "remove", + "zara", + ) + assert "zara" not in json.load(open(gmconfig_file, "r"))['storage_servers'] + assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers'] + + +@pytest_twisted.inlineCallbacks +def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): + """ + A client with happines=2 fails to upload to a Grid when it is + using Grid Manager and there is only 1 storage server with a valid + certificate. + """ + grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) + storage0 = yield grid.add_storage_node() + storage1 = yield grid.add_storage_node() + + gm_config = yield _run_gm( + reactor, "--config", "-", "create", + ) + gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') + gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes) + + # create certificate for the first storage-server + pubkey_fname = join(storage0.process.node_dir, "node.pubkey") + with open(pubkey_fname, 'r') as f: + pubkey_str = f.read().strip() + + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + "storage0", pubkey_str, + stdinBytes=gm_config, + ) + assert json.loads(gm_config)['storage_servers'].keys() == ['storage0'] + + print("inserting certificate") + cert = yield _run_gm( + reactor, "--config", "-", "sign", "storage0", "1", + stdinBytes=gm_config, + ) + print(cert) + + yield util.run_tahoe( + reactor, request, "--node-directory", storage0.process.node_dir, + "admin", "add-grid-manager-cert", + "--name", "default", + "--filename", "-", + stdin=cert, + ) + + # re-start this storage server + yield storage0.restart(reactor, request) + + # now only one storage-server has the certificate .. configure + # diana to have the grid-manager certificate + + diana = yield grid.add_client("diana", needed=2, happy=2, total=2) + + config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg")) + config.add_section("grid_managers") + config.set("grid_managers", "test", ed25519.string_from_verifying_key(gm_pubkey)) + with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f: + config.write(f) + + yield diana.restart(reactor, request, servers=2) + + # try to put something into the grid, which should fail (because + # diana has happy=2 but should only find storage0 to be acceptable + # to upload to) + + try: + yield util.run_tahoe( + reactor, request, "--node-directory", diana.process.node_dir, + "put", "-", + stdin="some content\n" * 200, + ) + assert False, "Should get a failure" + except util.ProcessFailed as e: + assert 'UploadUnhappinessError' in e.output + + +@pytest_twisted.inlineCallbacks +def test_identity(reactor, request, temp_dir): + """ + Dump public key to CLI + """ + gm_config = join(temp_dir, "test_identity") + yield _run_gm( + reactor, "--config", gm_config, "create", + ) + + # ask the CLI for the grid-manager pubkey + pubkey = yield _run_gm( + reactor, "--config", gm_config, "public-identity", + ) + alleged_pubkey = ed25519.verifying_key_from_string(pubkey.strip()) + + # load the grid-manager pubkey "ourselves" + with open(join(gm_config, "config.json"), "r") as f: + real_config = json.load(f) + real_privkey, real_pubkey = ed25519.signing_keypair_from_string( + real_config["private_key"].encode("ascii"), + ) + + # confirm the CLI told us the correct thing + alleged_bytes = alleged_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) + real_bytes = real_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) + assert alleged_bytes == real_bytes, "Keys don't match" From 8400893976966cb698608811473a438511662428 Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 16 Nov 2020 01:30:38 -0700 Subject: [PATCH 03/50] news --- newsfragments/3508.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3508.minor diff --git a/newsfragments/3508.minor b/newsfragments/3508.minor new file mode 100644 index 000000000..e69de29bb From 671e829f4e5d7a58be75ff1c4ef673b7f7e2fb3d Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 27 Jul 2022 12:23:20 -0400 Subject: [PATCH 04/50] We need to pass in the furl here. --- integration/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/util.py b/integration/util.py index f84a6aed4..5e644f19d 100644 --- a/integration/util.py +++ b/integration/util.py @@ -336,7 +336,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam config, u'node', u'log_gatherer.furl', - flog_gatherer, + flog_gatherer.furl, ) write_config(FilePath(config_path), config) created_d.addCallback(created) From 2999ca45798d466ff8ee8f94eef1ac841f4d93db Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 27 Jul 2022 12:23:34 -0400 Subject: [PATCH 05/50] It's bytes now. --- integration/test_servers_of_happiness.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index 3376b91d0..4cbb94654 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -51,7 +51,7 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto yield proto.done assert False, "should raise exception" except util.ProcessFailed as e: - assert "UploadUnhappinessError" in e.output + assert b"UploadUnhappinessError" in e.output output = proto.output.getvalue() assert b"shares could be placed on only" in output From 106b67db5588364727d3f6add1b9942943b34f58 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 27 Jul 2022 12:23:40 -0400 Subject: [PATCH 06/50] It's bytes now. --- integration/test_grid_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py index c01773b96..704dee04b 100644 --- a/integration/test_grid_manager.py +++ b/integration/test_grid_manager.py @@ -239,7 +239,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a ) assert False, "Should get a failure" except util.ProcessFailed as e: - assert 'UploadUnhappinessError' in e.output + assert b'UploadUnhappinessError' in e.output @pytest_twisted.inlineCallbacks From 02cb4105b3a182c82e8b2a2b66755dcac6385ac8 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 29 Jul 2022 09:43:37 -0400 Subject: [PATCH 07/50] A lot closer to passing grid manager integration tests. --- integration/test_grid_manager.py | 6 +++--- src/allmydata/cli/grid_manager.py | 4 ++++ src/allmydata/storage_client.py | 7 ++++--- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py index 704dee04b..63ee827b0 100644 --- a/integration/test_grid_manager.py +++ b/integration/test_grid_manager.py @@ -194,7 +194,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a "storage0", pubkey_str, stdinBytes=gm_config, ) - assert json.loads(gm_config)['storage_servers'].keys() == ['storage0'] + assert json.loads(gm_config)['storage_servers'].keys() == {'storage0'} print("inserting certificate") cert = yield _run_gm( @@ -221,7 +221,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg")) config.add_section("grid_managers") - config.set("grid_managers", "test", ed25519.string_from_verifying_key(gm_pubkey)) + config.set("grid_managers", "test", str(ed25519.string_from_verifying_key(gm_pubkey), "ascii")) with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f: config.write(f) @@ -235,7 +235,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a yield util.run_tahoe( reactor, request, "--node-directory", diana.process.node_dir, "put", "-", - stdin="some content\n" * 200, + stdin=b"some content\n" * 200, ) assert False, "Should get a failure" except util.ProcessFailed as e: diff --git a/src/allmydata/cli/grid_manager.py b/src/allmydata/cli/grid_manager.py index 4ef53887c..d3a11b62d 100644 --- a/src/allmydata/cli/grid_manager.py +++ b/src/allmydata/cli/grid_manager.py @@ -225,3 +225,7 @@ def _config_path_from_option(config: str) -> Optional[FilePath]: if config == "-": return None return FilePath(config) + + +if __name__ == '__main__': + grid_manager() diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 91073579d..7fe4d6bd2 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -37,8 +37,9 @@ from os import urandom import re import time import hashlib - +from io import StringIO from configparser import NoSectionError +import json import attr from zope.interface import ( @@ -67,7 +68,7 @@ from allmydata.interfaces import ( IFoolscapStoragePlugin, ) from allmydata.grid_manager import ( - create_grid_manager_verifier, + create_grid_manager_verifier, SignedCertificate ) from allmydata.crypto import ( ed25519, @@ -289,7 +290,7 @@ class StorageFarmBroker(service.MultiService): handler_overrides = server.get("connections", {}) gm_verifier = create_grid_manager_verifier( self.storage_client_config.grid_manager_keys, - server["ann"].get("grid-manager-certificates", []), + [SignedCertificate.load(StringIO(json.dumps(data))) for data in server["ann"].get("grid-manager-certificates", [])], "pub-{}".format(str(server_id, "ascii")), # server_id is v0- not pub-v0-key .. for reasons? ) From ad027aff7656abc098539f4159fa30638581ad35 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Aug 2022 00:34:47 -0600 Subject: [PATCH 08/50] compare bytes to bytes --- src/allmydata/grid_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/allmydata/grid_manager.py b/src/allmydata/grid_manager.py index f502c413e..d776fb7d0 100644 --- a/src/allmydata/grid_manager.py +++ b/src/allmydata/grid_manager.py @@ -466,7 +466,7 @@ def create_grid_manager_verifier(keys, certs, public_key, now_fn=None, bad_cert= now = now_fn() for cert in valid_certs: expires = datetime.utcfromtimestamp(cert['expires']) - if cert['public_key'].encode("ascii") == public_key: + if cert['public_key'] == public_key: if expires > now: # not-expired return True From cb065aefbd417cc1d546744ff746e25dbe35f999 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Aug 2022 01:17:22 -0600 Subject: [PATCH 09/50] key is bytes --- src/allmydata/storage_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 7fe4d6bd2..49663f141 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -291,7 +291,7 @@ class StorageFarmBroker(service.MultiService): gm_verifier = create_grid_manager_verifier( self.storage_client_config.grid_manager_keys, [SignedCertificate.load(StringIO(json.dumps(data))) for data in server["ann"].get("grid-manager-certificates", [])], - "pub-{}".format(str(server_id, "ascii")), # server_id is v0- not pub-v0-key .. for reasons? + "pub-{}".format(str(server_id, "ascii")).encode("ascii"), # server_id is v0- not pub-v0-key .. for reasons? ) s = NativeStorageServer( From 4d779cfe0742fb9e3d75ca33fb3984d33e5e7586 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Aug 2022 01:17:33 -0600 Subject: [PATCH 10/50] more assert --- src/allmydata/grid_manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/allmydata/grid_manager.py b/src/allmydata/grid_manager.py index d776fb7d0..0201eace5 100644 --- a/src/allmydata/grid_manager.py +++ b/src/allmydata/grid_manager.py @@ -466,7 +466,9 @@ def create_grid_manager_verifier(keys, certs, public_key, now_fn=None, bad_cert= now = now_fn() for cert in valid_certs: expires = datetime.utcfromtimestamp(cert['expires']) - if cert['public_key'] == public_key: + pc = cert['public_key'].encode('ascii') + assert type(pc) == type(public_key), "{} isn't {}".format(type(pc), type(public_key)) + if pc == public_key: if expires > now: # not-expired return True From 1676e9e7c5e6b9c4208c6b71e6379581ee17e047 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Aug 2022 01:27:01 -0600 Subject: [PATCH 11/50] unused --- integration/test_servers_of_happiness.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index 4cbb94654..3adc11340 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -13,8 +13,6 @@ if PY2: import sys from os.path import join -from twisted.internet.error import ProcessTerminated - from . import util import pytest_twisted From 9ff863e6cd47ad6c255491b2a6127b944b835f9a Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 12 Aug 2022 09:54:12 -0400 Subject: [PATCH 12/50] Fix lint. --- integration/test_servers_of_happiness.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index 4cbb94654..b85eb8e5b 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -1,4 +1,4 @@ -""" +P""" Ported to Python 3. """ from __future__ import absolute_import @@ -13,8 +13,6 @@ if PY2: import sys from os.path import join -from twisted.internet.error import ProcessTerminated - from . import util import pytest_twisted From 0c6881e6150ffa590aec6775586de0d83d657017 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 12 Aug 2022 09:59:43 -0400 Subject: [PATCH 13/50] Fix race condition. --- integration/test_grid_manager.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py index 63ee827b0..866856be7 100644 --- a/integration/test_grid_manager.py +++ b/integration/test_grid_manager.py @@ -214,6 +214,9 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a # re-start this storage server yield storage0.restart(reactor, request) + import time + time.sleep(1) + # now only one storage-server has the certificate .. configure # diana to have the grid-manager certificate @@ -231,15 +234,22 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a # diana has happy=2 but should only find storage0 to be acceptable # to upload to) - try: - yield util.run_tahoe( - reactor, request, "--node-directory", diana.process.node_dir, - "put", "-", - stdin=b"some content\n" * 200, - ) - assert False, "Should get a failure" - except util.ProcessFailed as e: - assert b'UploadUnhappinessError' in e.output + # Takes a little bit of time for node to connect: + for i in range(10): + try: + yield util.run_tahoe( + reactor, request, "--node-directory", diana.process.node_dir, + "put", "-", + stdin=b"some content\n" * 200, + ) + assert False, "Should get a failure" + except util.ProcessFailed as e: + if b'UploadUnhappinessError' in e.output: + # We're done! We've succeeded. + return + time.sleep(0.2) + + assert False, "Failed to see one of out of two servers" @pytest_twisted.inlineCallbacks From 298600969af240a3caff30fb2f2735fa669606ee Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 12 Aug 2022 10:06:35 -0400 Subject: [PATCH 14/50] Fix typo. --- integration/test_servers_of_happiness.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index b85eb8e5b..3adc11340 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -1,4 +1,4 @@ -P""" +""" Ported to Python 3. """ from __future__ import absolute_import From c4a32b65ff7ada59ed4de0324e2634f3dc50bc29 Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 13 Aug 2022 11:45:51 -0600 Subject: [PATCH 15/50] actually wait --- integration/grid.py | 1 + 1 file changed, 1 insertion(+) diff --git a/integration/grid.py b/integration/grid.py index 3cb16c929..eb25d9514 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -175,6 +175,7 @@ class StorageServer(object): reactor, self.process.node_dir, request, None, ) self.protocol = self.process.transport._protocol + yield await_client_ready(self.process) @inlineCallbacks From 06a5176626dbd14d41d8ab6c3307462be9cc279c Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 13 Aug 2022 11:46:02 -0600 Subject: [PATCH 16/50] happy-path grid-manager test --- integration/test_grid_manager.py | 70 ++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py index 63ee827b0..672700e15 100644 --- a/integration/test_grid_manager.py +++ b/integration/test_grid_manager.py @@ -242,6 +242,76 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a assert b'UploadUnhappinessError' in e.output +@pytest_twisted.inlineCallbacks +def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): + """ + Successfully upload to a Grid Manager enabled Grid. + """ + grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) + happy0 = yield grid.add_storage_node() + happy1 = yield grid.add_storage_node() + + gm_config = yield _run_gm( + reactor, "--config", "-", "create", + ) + gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') + gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes) + + # create certificate for the storage-servers + servers = ( + ("happy0", happy0), + ("happy1", happy1), + ) + for st_name, st in servers: + pubkey_fname = join(st.process.node_dir, "node.pubkey") + with open(pubkey_fname, 'r') as f: + pubkey_str = f.read().strip() + + gm_config = yield _run_gm( + reactor, "--config", "-", "add", + st_name, pubkey_str, + stdinBytes=gm_config, + ) + assert json.loads(gm_config)['storage_servers'].keys() == {'happy0', 'happy1'} + + print("inserting certificates") + for st_name, st in servers: + cert = yield _run_gm( + reactor, "--config", "-", "sign", st_name, "1", + stdinBytes=gm_config, + ) + + yield util.run_tahoe( + reactor, request, "--node-directory", st.process.node_dir, + "admin", "add-grid-manager-cert", + "--name", "default", + "--filename", "-", + stdin=cert, + ) + + # re-start the storage servers + yield happy0.restart(reactor, request) + yield happy1.restart(reactor, request) + + # configure edna to have the grid-manager certificate + + edna = yield grid.add_client("edna", needed=2, happy=2, total=2) + + config = configutil.get_config(join(edna.process.node_dir, "tahoe.cfg")) + config.add_section("grid_managers") + config.set("grid_managers", "test", str(ed25519.string_from_verifying_key(gm_pubkey), "ascii")) + with open(join(edna.process.node_dir, "tahoe.cfg"), "w") as f: + config.write(f) + + yield edna.restart(reactor, request, servers=2) + + yield util.run_tahoe( + reactor, request, "--node-directory", edna.process.node_dir, + "put", "-", + stdin=b"some content\n" * 200, + ) + + @pytest_twisted.inlineCallbacks def test_identity(reactor, request, temp_dir): """ From 34dd39bfbf78e627a2ca05e457c444d0e0ee5e8e Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 13 Aug 2022 11:51:01 -0600 Subject: [PATCH 17/50] fix race with 'await_client_ready' instead --- integration/grid.py | 1 + integration/test_grid_manager.py | 28 +++++++++++----------------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/integration/grid.py b/integration/grid.py index eb25d9514..4e5d8a900 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -228,6 +228,7 @@ class Client(object): ) self.process = process self.protocol = self.process.transport._protocol + yield await_client_ready(self.process, minimum_number_of_servers=servers) # XXX add stop / start / restart diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py index 35ea10c9f..b24149a3b 100644 --- a/integration/test_grid_manager.py +++ b/integration/test_grid_manager.py @@ -214,9 +214,6 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a # re-start this storage server yield storage0.restart(reactor, request) - import time - time.sleep(1) - # now only one storage-server has the certificate .. configure # diana to have the grid-manager certificate @@ -234,20 +231,17 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a # diana has happy=2 but should only find storage0 to be acceptable # to upload to) - # Takes a little bit of time for node to connect: - for i in range(10): - try: - yield util.run_tahoe( - reactor, request, "--node-directory", diana.process.node_dir, - "put", "-", - stdin=b"some content\n" * 200, - ) - assert False, "Should get a failure" - except util.ProcessFailed as e: - if b'UploadUnhappinessError' in e.output: - # We're done! We've succeeded. - return - time.sleep(0.2) + try: + yield util.run_tahoe( + reactor, request, "--node-directory", diana.process.node_dir, + "put", "-", + stdin=b"some content\n" * 200, + ) + assert False, "Should get a failure" + except util.ProcessFailed as e: + if b'UploadUnhappinessError' in e.output: + # We're done! We've succeeded. + return assert False, "Failed to see one of out of two servers" From 04b0c30c11343838b246ed276a1fdac230383594 Mon Sep 17 00:00:00 2001 From: meejah Date: Sun, 25 Sep 2022 14:08:05 -0600 Subject: [PATCH 18/50] clean up comments --- integration/test_grid_manager.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py index b24149a3b..d89f1e8f6 100644 --- a/integration/test_grid_manager.py +++ b/integration/test_grid_manager.py @@ -261,7 +261,7 @@ def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_a gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes) - # create certificate for the storage-servers + # create certificates for all storage-servers servers = ( ("happy0", happy0), ("happy1", happy1), @@ -278,7 +278,8 @@ def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_a ) assert json.loads(gm_config)['storage_servers'].keys() == {'happy0', 'happy1'} - print("inserting certificates") + # add the certificates from the grid-manager to the storage servers + print("inserting storage-server certificates") for st_name, st in servers: cert = yield _run_gm( reactor, "--config", "-", "sign", st_name, "1", @@ -297,8 +298,7 @@ def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_a yield happy0.restart(reactor, request) yield happy1.restart(reactor, request) - # configure edna to have the grid-manager certificate - + # configure edna (a client) to have the grid-manager certificate edna = yield grid.add_client("edna", needed=2, happy=2, total=2) config = configutil.get_config(join(edna.process.node_dir, "tahoe.cfg")) @@ -309,6 +309,7 @@ def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_a yield edna.restart(reactor, request, servers=2) + # confirm that Edna will upload to the GridManager-enabled Grid yield util.run_tahoe( reactor, request, "--node-directory", edna.process.node_dir, "put", "-", From af227fb31517b11c1117eb0749e7231afc9d9e62 Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 26 Sep 2022 00:02:40 -0600 Subject: [PATCH 19/50] coverage for grid-manager tests --- integration/test_grid_manager.py | 53 +++++++++++++++++--------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py index d89f1e8f6..0136a11ac 100644 --- a/integration/test_grid_manager.py +++ b/integration/test_grid_manager.py @@ -28,16 +28,21 @@ import pytest_twisted @inlineCallbacks -def _run_gm(reactor, *args, **kwargs): +def _run_gm(reactor, request, *args, **kwargs): """ Run the grid-manager process, passing all arguments as extra CLI args. :returns: all process output """ + if request.config.getoption('coverage'): + base_args = ("-b", "-m", "coverage", "run", "-m", "allmydata.cli.grid_manager") + else: + base_args = ("-m", "allmydata.cli.grid_manager") + output, errput, exit_code = yield getProcessOutputAndValue( sys.executable, - ("-m", "allmydata.cli.grid_manager") + args, + base_args + args, reactor=reactor, **kwargs ) @@ -54,7 +59,7 @@ def test_create_certificate(reactor, request): """ The Grid Manager produces a valid, correctly-signed certificate. """ - gm_config = yield _run_gm(reactor, "--config", "-", "create") + gm_config = yield _run_gm(reactor, request, "--config", "-", "create") privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') privkey, pubkey = ed25519.signing_keypair_from_string(privkey_bytes) @@ -62,12 +67,12 @@ def test_create_certificate(reactor, request): # "actual" clients in the test-grid; we're just checking that the # Grid Manager signs this properly. gm_config = yield _run_gm( - reactor, "--config", "-", "add", + reactor, request, "--config", "-", "add", "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", stdinBytes=gm_config, ) zara_cert_bytes = yield _run_gm( - reactor, "--config", "-", "sign", "zara", "1", + reactor, request, "--config", "-", "sign", "zara", "1", stdinBytes=gm_config, ) zara_cert = json.loads(zara_cert_bytes) @@ -86,16 +91,16 @@ def test_remove_client(reactor, request): A Grid Manager can add and successfully remove a client """ gm_config = yield _run_gm( - reactor, "--config", "-", "create", + reactor, request, "--config", "-", "create", ) gm_config = yield _run_gm( - reactor, "--config", "-", "add", + reactor, request, "--config", "-", "add", "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", stdinBytes=gm_config, ) gm_config = yield _run_gm( - reactor, "--config", "-", "add", + reactor, request, "--config", "-", "add", "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", stdinBytes=gm_config, ) @@ -103,7 +108,7 @@ def test_remove_client(reactor, request): assert "yakov" in json.loads(gm_config)['storage_servers'] gm_config = yield _run_gm( - reactor, "--config", "-", "remove", + reactor, request, "--config", "-", "remove", "zara", stdinBytes=gm_config, ) @@ -117,18 +122,18 @@ def test_remove_last_client(reactor, request): A Grid Manager can remove all clients """ gm_config = yield _run_gm( - reactor, "--config", "-", "create", + reactor, request, "--config", "-", "create", ) gm_config = yield _run_gm( - reactor, "--config", "-", "add", + reactor, request, "--config", "-", "add", "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", stdinBytes=gm_config, ) assert "zara" in json.loads(gm_config)['storage_servers'] gm_config = yield _run_gm( - reactor, "--config", "-", "remove", + reactor, request, "--config", "-", "remove", "zara", stdinBytes=gm_config, ) @@ -145,22 +150,22 @@ def test_add_remove_client_file(reactor, request, temp_dir): gmconfig = join(temp_dir, "gmtest") gmconfig_file = join(temp_dir, "gmtest", "config.json") yield _run_gm( - reactor, "--config", gmconfig, "create", + reactor, request, "--config", gmconfig, "create", ) yield _run_gm( - reactor, "--config", gmconfig, "add", + reactor, request, "--config", gmconfig, "add", "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", ) yield _run_gm( - reactor, "--config", gmconfig, "add", + reactor, request, "--config", gmconfig, "add", "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", ) assert "zara" in json.load(open(gmconfig_file, "r"))['storage_servers'] assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers'] yield _run_gm( - reactor, "--config", gmconfig, "remove", + reactor, request, "--config", gmconfig, "remove", "zara", ) assert "zara" not in json.load(open(gmconfig_file, "r"))['storage_servers'] @@ -179,7 +184,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a _ = yield grid.add_storage_node() gm_config = yield _run_gm( - reactor, "--config", "-", "create", + reactor, request, "--config", "-", "create", ) gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes) @@ -190,7 +195,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a pubkey_str = f.read().strip() gm_config = yield _run_gm( - reactor, "--config", "-", "add", + reactor, request, "--config", "-", "add", "storage0", pubkey_str, stdinBytes=gm_config, ) @@ -198,7 +203,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a print("inserting certificate") cert = yield _run_gm( - reactor, "--config", "-", "sign", "storage0", "1", + reactor, request, "--config", "-", "sign", "storage0", "1", stdinBytes=gm_config, ) print(cert) @@ -256,7 +261,7 @@ def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_a happy1 = yield grid.add_storage_node() gm_config = yield _run_gm( - reactor, "--config", "-", "create", + reactor, request, "--config", "-", "create", ) gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes) @@ -272,7 +277,7 @@ def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_a pubkey_str = f.read().strip() gm_config = yield _run_gm( - reactor, "--config", "-", "add", + reactor, request, "--config", "-", "add", st_name, pubkey_str, stdinBytes=gm_config, ) @@ -282,7 +287,7 @@ def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_a print("inserting storage-server certificates") for st_name, st in servers: cert = yield _run_gm( - reactor, "--config", "-", "sign", st_name, "1", + reactor, request, "--config", "-", "sign", st_name, "1", stdinBytes=gm_config, ) @@ -324,12 +329,12 @@ def test_identity(reactor, request, temp_dir): """ gm_config = join(temp_dir, "test_identity") yield _run_gm( - reactor, "--config", gm_config, "create", + reactor, request, "--config", gm_config, "create", ) # ask the CLI for the grid-manager pubkey pubkey = yield _run_gm( - reactor, "--config", gm_config, "public-identity", + reactor, request, "--config", gm_config, "public-identity", ) alleged_pubkey = ed25519.verifying_key_from_string(pubkey.strip()) From 8250c5fdd54a909a17f24d99ae2ec89e78fb4600 Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 26 Sep 2022 15:40:55 -0600 Subject: [PATCH 20/50] edna -> freya --- integration/test_grid_manager.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py index 0136a11ac..1856ef435 100644 --- a/integration/test_grid_manager.py +++ b/integration/test_grid_manager.py @@ -303,20 +303,20 @@ def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_a yield happy0.restart(reactor, request) yield happy1.restart(reactor, request) - # configure edna (a client) to have the grid-manager certificate - edna = yield grid.add_client("edna", needed=2, happy=2, total=2) + # configure freya (a client) to have the grid-manager certificate + freya = yield grid.add_client("freya", needed=2, happy=2, total=2) - config = configutil.get_config(join(edna.process.node_dir, "tahoe.cfg")) + config = configutil.get_config(join(freya.process.node_dir, "tahoe.cfg")) config.add_section("grid_managers") config.set("grid_managers", "test", str(ed25519.string_from_verifying_key(gm_pubkey), "ascii")) - with open(join(edna.process.node_dir, "tahoe.cfg"), "w") as f: + with open(join(freya.process.node_dir, "tahoe.cfg"), "w") as f: config.write(f) - yield edna.restart(reactor, request, servers=2) + yield freya.restart(reactor, request, servers=2) - # confirm that Edna will upload to the GridManager-enabled Grid + # confirm that Freya will upload to the GridManager-enabled Grid yield util.run_tahoe( - reactor, request, "--node-directory", edna.process.node_dir, + reactor, request, "--node-directory", freya.process.node_dir, "put", "-", stdin=b"some content\n" * 200, ) From 58e0e3def7fc25a8ed15f7d2203adf8eed0625e4 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 16 Mar 2023 08:52:15 -0400 Subject: [PATCH 21/50] see if this fixes the AttributeError --- integration/grid.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/integration/grid.py b/integration/grid.py index 4e5d8a900..cec30b79b 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -191,7 +191,11 @@ def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, ) storage = StorageServer( process=node_process, - protocol=node_process.transport._protocol, + # node_process is a TahoeProcess. its transport is an + # IProcessTransport. in practice, this means it is a + # twisted.internet._baseprocess.BaseProcess. BaseProcess records the + # process protocol as its proto attribute. + protocol=node_process.transport.proto, ) returnValue(storage) From c9dba4d0a4d3074e3de1ce77c05065e411b6f0b8 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 16 Mar 2023 09:09:25 -0400 Subject: [PATCH 22/50] Fix a couple other `_protocol` attributes --- integration/grid.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/grid.py b/integration/grid.py index cec30b79b..9b347cf6f 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -231,7 +231,7 @@ class Client(object): reactor, self.process.node_dir, request, None, ) self.process = process - self.protocol = self.process.transport._protocol + self.protocol = self.process.transport.proto yield await_client_ready(self.process, minimum_number_of_servers=servers) @@ -253,7 +253,7 @@ def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, w returnValue( Client( process=node_process, - protocol=node_process.transport._protocol, + protocol=node_process.transport.proto, ) ) From e6832dd71ca1ad70ccf856f8d6f74751c82b5a0c Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 16 Mar 2023 09:37:54 -0400 Subject: [PATCH 23/50] another one --- integration/grid.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/grid.py b/integration/grid.py index 9b347cf6f..8c7e7624b 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -174,7 +174,7 @@ class StorageServer(object): self.process = yield _run_node( reactor, self.process.node_dir, request, None, ) - self.protocol = self.process.transport._protocol + self.protocol = self.process.transport.proto yield await_client_ready(self.process) From d8ca0176ab2341d42c3cd808bd9c1a166eec36f6 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 3 Jul 2023 11:05:29 -0400 Subject: [PATCH 24/50] Pass the correct arguments in. --- integration/conftest.py | 2 +- integration/test_tor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index 643295291..43f16d45b 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -214,7 +214,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request): config = read_config(intro_dir, "tub.port") config.set_config("node", "nickname", "introducer-tor") config.set_config("node", "web.port", "4561") - config.set_config("node", "log_gatherer.furl", flog_gatherer) + config.set_config("node", "log_gatherer.furl", flog_gatherer.furl) # "tahoe run" is consistent across Linux/macOS/Windows, unlike the old # "start" command. diff --git a/integration/test_tor.py b/integration/test_tor.py index 32572276a..4d0ce4f16 100644 --- a/integration/test_tor.py +++ b/integration/test_tor.py @@ -128,7 +128,7 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_ # Which services should this client connect to? write_introducer(node_dir, "default", introducer_furl) - util.basic_node_configuration(request, flog_gatherer, node_dir.path) + util.basic_node_configuration(request, flog_gatherer.furl, node_dir.path) config = read_config(node_dir.path, "tub.port") config.set_config("tor", "onion", "true") From f4ed5cb0f347abb680f7ba143119877b7610a604 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 3 Jul 2023 11:30:35 -0400 Subject: [PATCH 25/50] Fix lint --- integration/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integration/conftest.py b/integration/conftest.py index 43f16d45b..6de2e84af 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -30,7 +30,6 @@ import pytest import pytest_twisted from .util import ( - _CollectOutputProtocol, _MagicTextProtocol, _DumpOutputProtocol, _ProcessExitedProtocol, From 76f8ab617276e07f0cab382216d40c9dcf9b81c5 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 3 Jul 2023 13:07:56 -0400 Subject: [PATCH 26/50] Set the config the way we were in latest code. --- integration/grid.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/integration/grid.py b/integration/grid.py index ec8b1e0e0..794639b2f 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -43,6 +43,7 @@ from twisted.internet.protocol import ( ) from twisted.internet.error import ProcessTerminated +from allmydata.node import read_config from .util import ( _CollectOutputProtocol, _MagicTextProtocol, @@ -306,16 +307,6 @@ def create_introducer(reactor, request, temp_dir, flog_gatherer, port): """ Run a new Introducer and return an Introducer instance. """ - config = ( - '[node]\n' - 'nickname = introducer{port}\n' - 'web.port = {port}\n' - 'log_gatherer.furl = {log_furl}\n' - ).format( - port=port, - log_furl=flog_gatherer.furl, - ) - intro_dir = join(temp_dir, 'introducer{}'.format(port)) if not exists(intro_dir): @@ -334,9 +325,10 @@ def create_introducer(reactor, request, temp_dir, flog_gatherer, port): ) yield done_proto.done - # over-write the config file with our stuff - with open(join(intro_dir, 'tahoe.cfg'), 'w') as f: - f.write(config) + config = read_config(intro_dir, "tub.port") + config.set_config("node", "nickname", f"introducer-{port}") + config.set_config("node", "web.port", f"{port}") + config.set_config("node", "log_gatherer.furl", flog_gatherer.furl) # on windows, "tahoe start" means: run forever in the foreground, # but on linux it means daemonize. "tahoe run" is consistent From 4c8a20c8767bf27881e8151f049dff856780bdb9 Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 12 Jul 2023 18:33:43 -0600 Subject: [PATCH 27/50] When finalizing a process, we can ignore the case where it isn't running --- integration/grid.py | 3 +-- integration/util.py | 45 +++++++++++++++++++++------------------------ 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/integration/grid.py b/integration/grid.py index 794639b2f..46fde576e 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -235,8 +235,7 @@ class Client(object): self.protocol = self.process.transport.proto yield await_client_ready(self.process, minimum_number_of_servers=servers) - - # XXX add stop / start / restart + # XXX add stop / start ? # ...maybe "reconfig" of some kind? diff --git a/integration/util.py b/integration/util.py index 6a3ec57f3..ff54b1831 100644 --- a/integration/util.py +++ b/integration/util.py @@ -177,38 +177,33 @@ class _MagicTextProtocol(ProcessProtocol): sys.stdout.write(self.name + line + "\n") -def _cleanup_process_async(transport: IProcessTransport, allow_missing: bool) -> None: +def _cleanup_process_async(transport: IProcessTransport) -> None: """ If the given process transport seems to still be associated with a running process, send a SIGTERM to that process. :param transport: The transport to use. - :param allow_missing: If ``True`` then it is not an error for the - transport to have no associated process. Otherwise, an exception will - be raised in that case. - :raise: ``ValueError`` if ``allow_missing`` is ``False`` and the transport has no process. """ if transport.pid is None: - if allow_missing: - print("Process already cleaned up and that's okay.") - return - else: - raise ValueError("Process is not running") + # in cases of "restart", we will have registered a finalizer + # that will kill the process -- but already explicitly killed + # it (and then ran again) due to the "restart". So, if the + # process is already killed, our job is done. + print("Process already cleaned up and that's okay.") + return print("signaling {} with TERM".format(transport.pid)) try: transport.signalProcess('TERM') except ProcessExitedAlready: # The transport object thought it still had a process but the real OS # process has already exited. That's fine. We accomplished what we - # wanted to. We don't care about ``allow_missing`` here because - # there's no way we could have known the real OS process already - # exited. + # wanted to. pass -def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False): +def _cleanup_tahoe_process(tahoe_transport, exited): """ Terminate the given process with a kill signal (SIGTERM on POSIX, TerminateProcess on Windows). @@ -219,7 +214,7 @@ def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False): :return: After the process has exited. """ from twisted.internet import reactor - _cleanup_process_async(tahoe_transport, allow_missing=allow_missing) + _cleanup_process_async(tahoe_transport) print(f"signaled, blocking on exit {exited}") block_with_timeout(exited, reactor) print("exited, goodbye") @@ -282,16 +277,20 @@ class TahoeProcess(object): ) def kill(self): - """Kill the process, block until it's done.""" + """ + Kill the process, block until it's done. + Does nothing if the process is already stopped (or never started). + """ print(f"TahoeProcess.kill({self.transport.pid} / {self.node_dir})") _cleanup_tahoe_process(self.transport, self.transport.exited) def kill_async(self): """ Kill the process, return a Deferred that fires when it's done. + Does nothing if the process is already stopped (or never started). """ print(f"TahoeProcess.kill_async({self.transport.pid} / {self.node_dir})") - _cleanup_process_async(self.transport, allow_missing=False) + _cleanup_process_async(self.transport) return self.transport.exited def restart_async(self, reactor: IReactorProcess, request: Any) -> Deferred: @@ -302,7 +301,7 @@ class TahoeProcess(object): handle requests. """ d = self.kill_async() - d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None, finalize=False)) + d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None)) def got_new_process(proc): # Grab the new transport since the one we had before is no longer # valid after the stop/start cycle. @@ -314,7 +313,7 @@ class TahoeProcess(object): return "".format(self._node_dir) -def _run_node(reactor, node_dir, request, magic_text, finalize=True): +def _run_node(reactor, node_dir, request, magic_text): """ Run a tahoe process from its node_dir. @@ -343,8 +342,7 @@ def _run_node(reactor, node_dir, request, magic_text, finalize=True): node_dir, ) - if finalize: - request.addfinalizer(tahoe_process.kill) + request.addfinalizer(tahoe_process.kill) d = protocol.magic_seen d.addCallback(lambda ignored: tahoe_process) @@ -386,8 +384,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam magic_text=None, needed=2, happy=3, - total=4, - finalize=True): + total=4): """ Helper to create a single node, run it and return the instance spawnProcess returned (ITransport) @@ -427,7 +424,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam d = Deferred() d.callback(None) d.addCallback(lambda _: created_d) - d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text, finalize=finalize)) + d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text)) return d From 0b9506dfada0c26d2fd305e9ed339bc5e2d6562c Mon Sep 17 00:00:00 2001 From: meejah Date: Thu, 13 Jul 2023 17:53:27 -0600 Subject: [PATCH 28/50] try new-enoug to avoid a type error --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 7ca2650d5..9ebdae550 100644 --- a/setup.py +++ b/setup.py @@ -151,7 +151,7 @@ install_requires = [ "pycddl >= 0.4", # Command-line parsing - "click >= 7.0", + "click >= 8.1.1", # for pid-file support "psutil", From a4801cc2ebe396dd29b284b2acc8fecd93ec405b Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 17 Jul 2023 17:10:45 -0600 Subject: [PATCH 29/50] CI uses tox less than 4 --- setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/setup.py b/setup.py index 9ebdae550..a2e870e8b 100644 --- a/setup.py +++ b/setup.py @@ -436,6 +436,8 @@ setup(name="tahoe-lafs", # also set in __init__.py "pytest-timeout", # Does our OpenMetrics endpoint adhere to the spec: "prometheus-client == 0.11.0", + # CI uses "tox<4", change here too if that becomes different + "tox < 4", ] + tor_requires + i2p_requires, "tor": tor_requires, "i2p": i2p_requires, From 6c5cb02ee5e667590d8383944054debada1f1f33 Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 17 Jul 2023 17:11:00 -0600 Subject: [PATCH 30/50] shush mypy --- src/allmydata/cli/grid_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/allmydata/cli/grid_manager.py b/src/allmydata/cli/grid_manager.py index dfefeb576..d5a5d7e35 100644 --- a/src/allmydata/cli/grid_manager.py +++ b/src/allmydata/cli/grid_manager.py @@ -226,4 +226,4 @@ def _config_path_from_option(config: str) -> Optional[FilePath]: if __name__ == '__main__': - grid_manager() + grid_manager() # type: ignore From 45898ff8b8ae6218e52397d1d3c55ad9d71fed2e Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 24 Jul 2023 20:08:41 -0600 Subject: [PATCH 31/50] refactor: make sftp tests (etc) work with 'grid' refactoring --- integration/conftest.py | 67 ++++------------------ integration/grid.py | 97 ++++++++++++++++++++++++++++---- integration/test_get_put.py | 30 +++++----- integration/test_grid_manager.py | 4 +- integration/test_sftp.py | 17 +++--- integration/test_vectors.py | 16 +++--- integration/test_web.py | 40 ++++++------- integration/util.py | 3 +- 8 files changed, 151 insertions(+), 123 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index 6de2e84af..837b54aa1 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -162,6 +162,10 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request): @pytest.fixture(scope='session') @log_call(action_type=u"integration:grid", include_args=[]) def grid(reactor, request, temp_dir, flog_gatherer, port_allocator): + # XXX think: this creates an "empty" grid (introducer, no nodes); + # do we want to ensure it has some minimum storage-nodes at least? + # (that is, semantically does it make sense that 'a grid' is + # essentially empty, or not?) g = pytest_twisted.blockon( create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) ) @@ -271,64 +275,17 @@ def storage_nodes(grid): assert ok, "Storage node creation failed: {}".format(value) return grid.storage_servers -@pytest.fixture(scope="session") -def alice_sftp_client_key_path(temp_dir): - # The client SSH key path is typically going to be somewhere else (~/.ssh, - # typically), but for convenience sake for testing we'll put it inside node. - return join(temp_dir, "alice", "private", "ssh_client_rsa_key") @pytest.fixture(scope='session') @log_call(action_type=u"integration:alice", include_args=[], include_result=False) -def alice( - reactor, - temp_dir, - introducer_furl, - flog_gatherer, - storage_nodes, - alice_sftp_client_key_path, - request, -): - process = pytest_twisted.blockon( - _create_node( - reactor, request, temp_dir, introducer_furl, flog_gatherer, "alice", - web_port="tcp:9980:interface=localhost", - storage=False, - ) - ) - pytest_twisted.blockon(await_client_ready(process)) - - # 1. Create a new RW directory cap: - cli(process, "create-alias", "test") - rwcap = loads(cli(process, "list-aliases", "--json"))["test"]["readwrite"] - - # 2. Enable SFTP on the node: - host_ssh_key_path = join(process.node_dir, "private", "ssh_host_rsa_key") - accounts_path = join(process.node_dir, "private", "accounts") - with open(join(process.node_dir, "tahoe.cfg"), "a") as f: - f.write("""\ -[sftpd] -enabled = true -port = tcp:8022:interface=127.0.0.1 -host_pubkey_file = {ssh_key_path}.pub -host_privkey_file = {ssh_key_path} -accounts.file = {accounts_path} -""".format(ssh_key_path=host_ssh_key_path, accounts_path=accounts_path)) - generate_ssh_key(host_ssh_key_path) - - # 3. Add a SFTP access file with an SSH key for auth. - generate_ssh_key(alice_sftp_client_key_path) - # Pub key format is "ssh-rsa ". We want the key. - ssh_public_key = open(alice_sftp_client_key_path + ".pub").read().strip().split()[1] - with open(accounts_path, "w") as f: - f.write("""\ -alice-key ssh-rsa {ssh_public_key} {rwcap} -""".format(rwcap=rwcap, ssh_public_key=ssh_public_key)) - - # 4. Restart the node with new SFTP config. - pytest_twisted.blockon(process.restart_async(reactor, request)) - pytest_twisted.blockon(await_client_ready(process)) - print(f"Alice pid: {process.transport.pid}") - return process +def alice(reactor, request, grid, storage_nodes): + """ + :returns grid.Client: the associated instance for Alice + """ + alice = pytest_twisted.blockon(grid.add_client("alice")) + pytest_twisted.blockon(alice.add_sftp(reactor, request)) + print(f"Alice pid: {alice.process.transport.pid}") + return alice @pytest.fixture(scope='session') diff --git a/integration/grid.py b/integration/grid.py index 46fde576e..fe3befd3a 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -10,6 +10,7 @@ rely on 'the' global grid as provided by fixtures like 'alice' or from os import mkdir, listdir from os.path import join, exists +from json import loads from tempfile import mktemp from time import sleep @@ -26,6 +27,7 @@ from twisted.internet.defer import ( inlineCallbacks, returnValue, maybeDeferred, + Deferred, ) from twisted.internet.task import ( deferLater, @@ -54,19 +56,20 @@ from .util import ( _tahoe_runner_optional_coverage, TahoeProcess, await_client_ready, + generate_ssh_key, + cli, + reconfigure, ) import attr import pytest_twisted -# further directions: -# - "Grid" is unused, basically -- tie into the rest? -# - could make a Grid instance mandatory for create_* calls -# - could instead make create_* calls methods of Grid -# - Bring more 'util' or 'conftest' code into here -# - stop()/start()/restart() methods on StorageServer etc -# - more-complex stuff like config changes (which imply a restart too)? +# currently, we pass a "request" around a bunch but it seems to only +# be for addfinalizer() calls. +# - is "keeping" a request like that okay? What if it's a session-scoped one? +# (i.e. in Grid etc) +# - maybe limit to "a callback to hang your cleanup off of" (instead of request)? @attr.s @@ -170,6 +173,8 @@ class StorageServer(object): Note that self.process and self.protocol will be new instances after this. """ + # XXX per review comments, _can_ we make this "return a new + # instance" instead of mutating? self.process.transport.signalProcess('TERM') yield self.protocol.exited self.process = yield _run_node( @@ -213,6 +218,27 @@ class Client(object): protocol = attr.ib( validator=attr.validators.provides(IProcessProtocol) ) + request = attr.ib() # original request, for addfinalizer() + +## XXX convenience? or confusion? +# @property +# def node_dir(self): +# return self.process.node_dir + + @inlineCallbacks + def reconfigure_zfec(self, reactor, request, zfec_params, convergence=None, max_segment_size=None): + """ + Reconfigure the ZFEC parameters for this node + """ + # XXX this is a stop-gap to keep tests running "as is" + # -> we should fix the tests so that they create a new client + # in the grid with the required parameters, instead of + # re-configuring Alice (or whomever) + + rtn = yield Deferred.fromCoroutine( + reconfigure(reactor, self.request, self.process, zfec_params, convergence, max_segment_size) + ) + return rtn @inlineCallbacks def restart(self, reactor, request, servers=1): @@ -226,6 +252,8 @@ class Client(object): Note that self.process and self.protocol will be new instances after this. """ + # XXX similar to above, can we make this return a new instance + # instead of mutating? self.process.transport.signalProcess('TERM') yield self.protocol.exited process = yield _run_node( @@ -235,8 +263,55 @@ class Client(object): self.protocol = self.process.transport.proto yield await_client_ready(self.process, minimum_number_of_servers=servers) - # XXX add stop / start ? - # ...maybe "reconfig" of some kind? + @inlineCallbacks + def add_sftp(self, reactor, request): + """ + """ + # if other things need to add or change configuration, further + # refactoring could be useful here (i.e. move reconfigure + # parts to their own functions) + + # XXX why do we need an alias? + # 1. Create a new RW directory cap: + cli(self.process, "create-alias", "test") + rwcap = loads(cli(self.process, "list-aliases", "--json"))["test"]["readwrite"] + + # 2. Enable SFTP on the node: + host_ssh_key_path = join(self.process.node_dir, "private", "ssh_host_rsa_key") + sftp_client_key_path = join(self.process.node_dir, "private", "ssh_client_rsa_key") + accounts_path = join(self.process.node_dir, "private", "accounts") + with open(join(self.process.node_dir, "tahoe.cfg"), "a") as f: + f.write( + ("\n\n[sftpd]\n" + "enabled = true\n" + "port = tcp:8022:interface=127.0.0.1\n" + "host_pubkey_file = {ssh_key_path}.pub\n" + "host_privkey_file = {ssh_key_path}\n" + "accounts.file = {accounts_path}\n").format( + ssh_key_path=host_ssh_key_path, + accounts_path=accounts_path, + ) + ) + generate_ssh_key(host_ssh_key_path) + + # 3. Add a SFTP access file with an SSH key for auth. + generate_ssh_key(sftp_client_key_path) + # Pub key format is "ssh-rsa ". We want the key. + with open(sftp_client_key_path + ".pub") as pubkey_file: + ssh_public_key = pubkey_file.read().strip().split()[1] + with open(accounts_path, "w") as f: + f.write( + "alice-key ssh-rsa {ssh_public_key} {rwcap}\n".format( + rwcap=rwcap, + ssh_public_key=ssh_public_key, + ) + ) + + # 4. Restart the node with new SFTP config. + print("restarting for SFTP") + yield self.restart(reactor, request) + print("restart done") + # XXX i think this is broken because we're "waiting for ready" during first bootstrap? or something? @inlineCallbacks @@ -254,6 +329,7 @@ def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, w Client( process=node_process, protocol=node_process.transport.proto, + request=request, ) ) @@ -370,7 +446,7 @@ class Grid(object): Represents an entire Tahoe Grid setup A Grid includes an Introducer, Flog Gatherer and some number of - Storage Servers. + Storage Servers. Optionally includes Clients. """ _reactor = attr.ib() @@ -436,7 +512,6 @@ class Grid(object): returnValue(client) - # XXX THINK can we tie a whole *grid* to a single request? (I think # that's all that makes sense) @inlineCallbacks diff --git a/integration/test_get_put.py b/integration/test_get_put.py index e30a34f97..536185ef8 100644 --- a/integration/test_get_put.py +++ b/integration/test_get_put.py @@ -8,9 +8,8 @@ from subprocess import Popen, PIPE, check_output, check_call import pytest from twisted.internet import reactor from twisted.internet.threads import blockingCallFromThread -from twisted.internet.defer import Deferred -from .util import run_in_thread, cli, reconfigure +from .util import run_in_thread, cli DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11" try: @@ -23,7 +22,7 @@ else: @pytest.fixture(scope="session") def get_put_alias(alice): - cli(alice, "create-alias", "getput") + cli(alice.process, "create-alias", "getput") def read_bytes(path): @@ -39,14 +38,14 @@ def test_put_from_stdin(alice, get_put_alias, tmpdir): """ tempfile = str(tmpdir.join("file")) p = Popen( - ["tahoe", "--node-directory", alice.node_dir, "put", "-", "getput:fromstdin"], + ["tahoe", "--node-directory", alice.process.node_dir, "put", "-", "getput:fromstdin"], stdin=PIPE ) p.stdin.write(DATA) p.stdin.close() assert p.wait() == 0 - cli(alice, "get", "getput:fromstdin", tempfile) + cli(alice.process, "get", "getput:fromstdin", tempfile) assert read_bytes(tempfile) == DATA @@ -58,10 +57,10 @@ def test_get_to_stdout(alice, get_put_alias, tmpdir): tempfile = tmpdir.join("file") with tempfile.open("wb") as f: f.write(DATA) - cli(alice, "put", str(tempfile), "getput:tostdout") + cli(alice.process, "put", str(tempfile), "getput:tostdout") p = Popen( - ["tahoe", "--node-directory", alice.node_dir, "get", "getput:tostdout", "-"], + ["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:tostdout", "-"], stdout=PIPE ) assert p.stdout.read() == DATA @@ -78,11 +77,11 @@ def test_large_file(alice, get_put_alias, tmp_path): tempfile = tmp_path / "file" with tempfile.open("wb") as f: f.write(DATA * 1_000_000) - cli(alice, "put", str(tempfile), "getput:largefile") + cli(alice.process, "put", str(tempfile), "getput:largefile") outfile = tmp_path / "out" check_call( - ["tahoe", "--node-directory", alice.node_dir, "get", "getput:largefile", str(outfile)], + ["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:largefile", str(outfile)], ) assert outfile.read_bytes() == tempfile.read_bytes() @@ -104,31 +103,30 @@ def test_upload_download_immutable_different_default_max_segment_size(alice, get def set_segment_size(segment_size): return blockingCallFromThread( reactor, - lambda: Deferred.fromCoroutine(reconfigure( + lambda: alice.reconfigure_zfec( reactor, request, - alice, (1, 1, 1), None, max_segment_size=segment_size - )) + ) ) # 1. Upload file 1 with default segment size set to 1MB set_segment_size(1024 * 1024) - cli(alice, "put", str(tempfile), "getput:seg1024kb") + cli(alice.process, "put", str(tempfile), "getput:seg1024kb") # 2. Download file 1 with default segment size set to 128KB set_segment_size(128 * 1024) assert large_data == check_output( - ["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg1024kb", "-"] + ["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg1024kb", "-"] ) # 3. Upload file 2 with default segment size set to 128KB - cli(alice, "put", str(tempfile), "getput:seg128kb") + cli(alice.process, "put", str(tempfile), "getput:seg128kb") # 4. Download file 2 with default segment size set to 1MB set_segment_size(1024 * 1024) assert large_data == check_output( - ["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg128kb", "-"] + ["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg128kb", "-"] ) diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py index 1856ef435..437fe7455 100644 --- a/integration/test_grid_manager.py +++ b/integration/test_grid_manager.py @@ -173,7 +173,7 @@ def test_add_remove_client_file(reactor, request, temp_dir): @pytest_twisted.inlineCallbacks -def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): +def _test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): """ A client with happines=2 fails to upload to a Grid when it is using Grid Manager and there is only 1 storage server with a valid @@ -252,7 +252,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a @pytest_twisted.inlineCallbacks -def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): +def _test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): """ Successfully upload to a Grid Manager enabled Grid. """ diff --git a/integration/test_sftp.py b/integration/test_sftp.py index 3fdbb56d7..01ddfdf8a 100644 --- a/integration/test_sftp.py +++ b/integration/test_sftp.py @@ -72,7 +72,7 @@ def test_bad_account_password_ssh_key(alice, tmpdir): another_key = os.path.join(str(tmpdir), "ssh_key") generate_ssh_key(another_key) - good_key = RSAKey(filename=os.path.join(alice.node_dir, "private", "ssh_client_rsa_key")) + good_key = RSAKey(filename=os.path.join(alice.process.node_dir, "private", "ssh_client_rsa_key")) bad_key = RSAKey(filename=another_key) # Wrong key: @@ -87,17 +87,16 @@ def test_bad_account_password_ssh_key(alice, tmpdir): "username": "someoneelse", "pkey": good_key, }) -def sftp_client_key(node): + +def sftp_client_key(client): + """ + :return RSAKey: the RSA client key associated with this grid.Client + """ + # XXX move to Client / grid.py? return RSAKey( - filename=os.path.join(node.node_dir, "private", "ssh_client_rsa_key"), + filename=os.path.join(client.process.node_dir, "private", "ssh_client_rsa_key"), ) -def test_sftp_client_key_exists(alice, alice_sftp_client_key_path): - """ - Weakly validate the sftp client key fixture by asserting that *something* - exists at the supposed key path. - """ - assert os.path.exists(alice_sftp_client_key_path) @run_in_thread def test_ssh_key_auth(alice): diff --git a/integration/test_vectors.py b/integration/test_vectors.py index 6e7b5746a..13a451d1c 100644 --- a/integration/test_vectors.py +++ b/integration/test_vectors.py @@ -15,7 +15,8 @@ from pytest_twisted import ensureDeferred from . import vectors from .vectors import parameters -from .util import reconfigure, upload, TahoeProcess +from .util import reconfigure, upload +from .grid import Client @mark.parametrize('convergence', parameters.CONVERGENCE_SECRETS) def test_convergence(convergence): @@ -36,11 +37,11 @@ async def test_capability(reactor, request, alice, case, expected): computed value. """ # rewrite alice's config to match params and convergence - await reconfigure( - reactor, request, alice, (1, case.params.required, case.params.total), case.convergence, case.segment_size) + await alice.reconfigure_zfec( + reactor, request, (1, case.params.required, case.params.total), case.convergence, case.segment_size) # upload data in the correct format - actual = upload(alice, case.fmt, case.data) + actual = upload(alice.process, case.fmt, case.data) # compare the resulting cap to the expected result assert actual == expected @@ -82,7 +83,7 @@ async def skiptest_generate(reactor, request, alice): async def generate( reactor, request, - alice: TahoeProcess, + alice: Client, cases: Iterator[vectors.Case], ) -> AsyncGenerator[[vectors.Case, str], None]: """ @@ -106,10 +107,9 @@ async def generate( # reliability of this generator, be happy if we can put shares anywhere happy = 1 for case in cases: - await reconfigure( + await alice.reconfigure_zfec( reactor, request, - alice, (happy, case.params.required, case.params.total), case.convergence, case.segment_size @@ -117,5 +117,5 @@ async def generate( # Give the format a chance to make an RSA key if it needs it. case = evolve(case, fmt=case.fmt.customize()) - cap = upload(alice, case.fmt, case.data) + cap = upload(alice.process, case.fmt, case.data) yield case, cap diff --git a/integration/test_web.py b/integration/test_web.py index b863a27fe..01f69bca0 100644 --- a/integration/test_web.py +++ b/integration/test_web.py @@ -33,7 +33,7 @@ def test_index(alice): """ we can download the index file """ - util.web_get(alice, u"") + util.web_get(alice.process, u"") @run_in_thread @@ -41,7 +41,7 @@ def test_index_json(alice): """ we can download the index file as json """ - data = util.web_get(alice, u"", params={u"t": u"json"}) + data = util.web_get(alice.process, u"", params={u"t": u"json"}) # it should be valid json json.loads(data) @@ -55,7 +55,7 @@ def test_upload_download(alice): FILE_CONTENTS = u"some contents" readcap = util.web_post( - alice, u"uri", + alice.process, u"uri", data={ u"t": u"upload", u"format": u"mdmf", @@ -67,7 +67,7 @@ def test_upload_download(alice): readcap = readcap.strip() data = util.web_get( - alice, u"uri", + alice.process, u"uri", params={ u"uri": readcap, u"filename": u"boom", @@ -85,11 +85,11 @@ def test_put(alice): FILE_CONTENTS = b"added via PUT" * 20 resp = requests.put( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), data=FILE_CONTENTS, ) cap = allmydata.uri.from_string(resp.text.strip().encode('ascii')) - cfg = alice.get_config() + cfg = alice.process.get_config() assert isinstance(cap, allmydata.uri.CHKFileURI) assert cap.size == len(FILE_CONTENTS) assert cap.total_shares == int(cfg.get_config("client", "shares.total")) @@ -116,7 +116,7 @@ def test_deep_stats(alice): URIs work """ resp = requests.post( - util.node_url(alice.node_dir, "uri"), + util.node_url(alice.process.node_dir, "uri"), params={ "format": "sdmf", "t": "mkdir", @@ -130,7 +130,7 @@ def test_deep_stats(alice): uri = url_unquote(resp.url) assert 'URI:DIR2:' in uri dircap = uri[uri.find("URI:DIR2:"):].rstrip('/') - dircap_uri = util.node_url(alice.node_dir, "uri/{}".format(url_quote(dircap))) + dircap_uri = util.node_url(alice.process.node_dir, "uri/{}".format(url_quote(dircap))) # POST a file into this directory FILE_CONTENTS = u"a file in a directory" @@ -176,7 +176,7 @@ def test_deep_stats(alice): while tries > 0: tries -= 1 resp = requests.get( - util.node_url(alice.node_dir, u"operations/something_random"), + util.node_url(alice.process.node_dir, u"operations/something_random"), ) d = json.loads(resp.content) if d['size-literal-files'] == len(FILE_CONTENTS): @@ -201,21 +201,21 @@ def test_status(alice): FILE_CONTENTS = u"all the Important Data of alice\n" * 1200 resp = requests.put( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), data=FILE_CONTENTS, ) cap = resp.text.strip() print("Uploaded data, cap={}".format(cap)) resp = requests.get( - util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap))), + util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap))), ) print("Downloaded {} bytes of data".format(len(resp.content))) assert str(resp.content, "ascii") == FILE_CONTENTS resp = requests.get( - util.node_url(alice.node_dir, "status"), + util.node_url(alice.process.node_dir, "status"), ) dom = html5lib.parse(resp.content) @@ -229,7 +229,7 @@ def test_status(alice): for href in hrefs: if href == u"/" or not href: continue - resp = requests.get(util.node_url(alice.node_dir, href)) + resp = requests.get(util.node_url(alice.process.node_dir, href)) if href.startswith(u"/status/up"): assert b"File Upload Status" in resp.content if b"Total Size: %d" % (len(FILE_CONTENTS),) in resp.content: @@ -241,7 +241,7 @@ def test_status(alice): # download the specialized event information resp = requests.get( - util.node_url(alice.node_dir, u"{}/event_json".format(href)), + util.node_url(alice.process.node_dir, u"{}/event_json".format(href)), ) js = json.loads(resp.content) # there's usually just one "read" operation, but this can handle many .. @@ -264,14 +264,14 @@ async def test_directory_deep_check(reactor, request, alice): required = 2 total = 4 - await util.reconfigure(reactor, request, alice, (happy, required, total), convergence=None) + await alice.reconfigure_zfec(reactor, request, (happy, required, total), convergence=None) await deferToThread(_test_directory_deep_check_blocking, alice) def _test_directory_deep_check_blocking(alice): # create a directory resp = requests.post( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), params={ u"t": u"mkdir", u"redirect_to_result": u"true", @@ -320,7 +320,7 @@ def _test_directory_deep_check_blocking(alice): print("Uploaded data1, cap={}".format(cap1)) resp = requests.get( - util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap0))), + util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap0))), params={u"t": u"info"}, ) @@ -484,14 +484,14 @@ def test_mkdir_with_children(alice): # create a file to put in our directory FILE_CONTENTS = u"some file contents\n" * 500 resp = requests.put( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), data=FILE_CONTENTS, ) filecap = resp.content.strip() # create a (sub) directory to put in our directory resp = requests.post( - util.node_url(alice.node_dir, u"uri"), + util.node_url(alice.process.node_dir, u"uri"), params={ u"t": u"mkdir", } @@ -534,7 +534,7 @@ def test_mkdir_with_children(alice): # create a new directory with one file and one sub-dir (all-at-once) resp = util.web_post( - alice, u"uri", + alice.process, u"uri", params={u"t": "mkdir-with-children"}, data=json.dumps(meta), ) diff --git a/integration/util.py b/integration/util.py index ff54b1831..b614a84bd 100644 --- a/integration/util.py +++ b/integration/util.py @@ -741,7 +741,6 @@ class SSK: def load(cls, params: dict) -> SSK: assert params.keys() == {"format", "mutable", "key"} return cls(params["format"], params["key"].encode("ascii")) - def customize(self) -> SSK: """ Return an SSK with a newly generated random RSA key. @@ -780,7 +779,7 @@ def upload(alice: TahoeProcess, fmt: CHK | SSK, data: bytes) -> str: f.write(data) f.flush() with fmt.to_argv() as fmt_argv: - argv = [alice, "put"] + fmt_argv + [f.name] + argv = [alice.process, "put"] + fmt_argv + [f.name] return cli(*argv).decode("utf-8").strip() From 6f9b9a3ac1123ca3eb9ecce85e98cc75dc6ccd89 Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 24 Jul 2023 20:12:01 -0600 Subject: [PATCH 32/50] only use original request --- integration/grid.py | 2 +- integration/test_get_put.py | 1 - integration/test_vectors.py | 3 +-- integration/test_web.py | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/integration/grid.py b/integration/grid.py index fe3befd3a..5ce4179ec 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -226,7 +226,7 @@ class Client(object): # return self.process.node_dir @inlineCallbacks - def reconfigure_zfec(self, reactor, request, zfec_params, convergence=None, max_segment_size=None): + def reconfigure_zfec(self, reactor, zfec_params, convergence=None, max_segment_size=None): """ Reconfigure the ZFEC parameters for this node """ diff --git a/integration/test_get_put.py b/integration/test_get_put.py index 536185ef8..2f6642493 100644 --- a/integration/test_get_put.py +++ b/integration/test_get_put.py @@ -105,7 +105,6 @@ def test_upload_download_immutable_different_default_max_segment_size(alice, get reactor, lambda: alice.reconfigure_zfec( reactor, - request, (1, 1, 1), None, max_segment_size=segment_size diff --git a/integration/test_vectors.py b/integration/test_vectors.py index 13a451d1c..bd5def8c5 100644 --- a/integration/test_vectors.py +++ b/integration/test_vectors.py @@ -38,7 +38,7 @@ async def test_capability(reactor, request, alice, case, expected): """ # rewrite alice's config to match params and convergence await alice.reconfigure_zfec( - reactor, request, (1, case.params.required, case.params.total), case.convergence, case.segment_size) + reactor, (1, case.params.required, case.params.total), case.convergence, case.segment_size) # upload data in the correct format actual = upload(alice.process, case.fmt, case.data) @@ -109,7 +109,6 @@ async def generate( for case in cases: await alice.reconfigure_zfec( reactor, - request, (happy, case.params.required, case.params.total), case.convergence, case.segment_size diff --git a/integration/test_web.py b/integration/test_web.py index 01f69bca0..08c6e6217 100644 --- a/integration/test_web.py +++ b/integration/test_web.py @@ -264,7 +264,7 @@ async def test_directory_deep_check(reactor, request, alice): required = 2 total = 4 - await alice.reconfigure_zfec(reactor, request, (happy, required, total), convergence=None) + await alice.reconfigure_zfec(reactor, (happy, required, total), convergence=None) await deferToThread(_test_directory_deep_check_blocking, alice) From 050ef6cca3d19b20f76b7d4bf80b2d82f30f2af6 Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 29 Jul 2023 04:04:05 -0600 Subject: [PATCH 33/50] tor-tests work; refactor ports --- integration/conftest.py | 49 ++++++++++++++++++++++++++++++++++++----- integration/test_i2p.py | 7 +++--- integration/test_tor.py | 15 ++++++------- integration/util.py | 2 +- 4 files changed, 55 insertions(+), 18 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index 837b54aa1..92483da65 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -7,6 +7,7 @@ from __future__ import annotations import os import sys import shutil +from attr import define from time import sleep from os import mkdir, environ from os.path import join, exists @@ -189,7 +190,7 @@ def introducer_furl(introducer, temp_dir): include_args=["temp_dir", "flog_gatherer"], include_result=False, ) -def tor_introducer(reactor, temp_dir, flog_gatherer, request): +def tor_introducer(reactor, temp_dir, flog_gatherer, request, tor_network): intro_dir = join(temp_dir, 'introducer_tor') print("making Tor introducer in {}".format(intro_dir)) print("(this can take tens of seconds to allocate Onion address)") @@ -203,9 +204,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request): request, ( 'create-introducer', - # The control port should agree with the configuration of the - # Tor network we bootstrap with chutney. - '--tor-control-port', 'tcp:localhost:8007', + '--tor-control-port', tor_network.client_control_endpoint, '--hide-ip', '--listen=tor', intro_dir, @@ -306,6 +305,21 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques @pytest.mark.skipif(sys.platform.startswith('win'), 'Tor tests are unstable on Windows') def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]: + """ + Instantiate the "networks/hs-v3" Chutney configuration for a local + Tor network. + + This provides a small, local Tor network that can run v3 Onion + Services. This has 10 tor processes: 3 authorities, 5 + exits+relays, a client (and one service-hosting node we don't use). + + We pin a Chutney revision, so things shouldn't change. Currently, + the ONLY node that exposes a valid SocksPort is "008c" (the + client) on 9008. + + The control ports start at 8000 (so the ControlPort for the one + client node is 8008). + """ # Try to find Chutney already installed in the environment. try: import chutney @@ -363,7 +377,24 @@ def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]: ) pytest_twisted.blockon(proto.done) - return (chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")}) + return chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")} + + +@define +class ChutneyTorNetwork: + """ + Represents a running Chutney (tor) network. Returned by the + "tor_network" fixture. + """ + dir: FilePath + environ: dict + client_control_port: int + + @property + def client_control_endpoint(self) -> str: + print("CONTROL", "tcp:localhost:{}".format(self.client_control_port)) + return "tcp:localhost:{}".format(self.client_control_port) + @pytest.fixture(scope='session') @@ -422,3 +453,11 @@ def tor_network(reactor, temp_dir, chutney, request): pytest_twisted.blockon(chutney(("status", basic_network))) except ProcessTerminated: print("Chutney.TorNet status failed (continuing)") + + # the "8008" comes from configuring "networks/basic" in chutney + # and then examining "net/nodes/008c/torrc" for ControlPort value + return ChutneyTorNetwork( + chutney_root, + chutney_env, + 8008, + ) diff --git a/integration/test_i2p.py b/integration/test_i2p.py index 2ee603573..ea3ddb62b 100644 --- a/integration/test_i2p.py +++ b/integration/test_i2p.py @@ -132,8 +132,8 @@ def i2p_introducer_furl(i2p_introducer, temp_dir): @pytest_twisted.inlineCallbacks @pytest.mark.skip("I2P tests are not functioning at all, for unknown reasons") def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl): - yield _create_anonymous_node(reactor, 'carol_i2p', 8008, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) - yield _create_anonymous_node(reactor, 'dave_i2p', 8009, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) + yield _create_anonymous_node(reactor, 'carol_i2p', request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) + yield _create_anonymous_node(reactor, 'dave_i2p', request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) # ensure both nodes are connected to "a grid" by uploading # something via carol, and retrieve it using dave. gold_path = join(temp_dir, "gold") @@ -179,9 +179,8 @@ def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_netw @pytest_twisted.inlineCallbacks -def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, i2p_network, introducer_furl): +def _create_anonymous_node(reactor, name, request, temp_dir, flog_gatherer, i2p_network, introducer_furl): node_dir = FilePath(temp_dir).child(name) - web_port = "tcp:{}:interface=localhost".format(control_port + 2000) print("creating", node_dir.path) node_dir.makedirs() diff --git a/integration/test_tor.py b/integration/test_tor.py index 4d0ce4f16..d7fed5790 100644 --- a/integration/test_tor.py +++ b/integration/test_tor.py @@ -38,8 +38,8 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne The two nodes can talk to the introducer and each other: we upload to one node, read from the other. """ - carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2) - dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2) + carol = yield _create_anonymous_node(reactor, 'carol', 8100, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2) + dave = yield _create_anonymous_node(reactor, 'dave', 8101, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2) yield util.await_client_ready(carol, minimum_number_of_servers=2, timeout=600) yield util.await_client_ready(dave, minimum_number_of_servers=2, timeout=600) yield upload_to_one_download_from_the_other(reactor, temp_dir, carol, dave) @@ -94,9 +94,8 @@ async def upload_to_one_download_from_the_other(reactor, temp_dir, upload_to: ut @pytest_twisted.inlineCallbacks -def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess: +def _create_anonymous_node(reactor, name, web_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess: node_dir = FilePath(temp_dir).child(name) - web_port = "tcp:{}:interface=localhost".format(control_port + 2000) if node_dir.exists(): raise RuntimeError( "A node already exists in '{}'".format(node_dir) @@ -111,10 +110,10 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_ sys.executable, '-b', '-m', 'allmydata.scripts.runner', 'create-node', '--nickname', name, - '--webport', web_port, + '--webport', str(web_port), '--introducer', introducer_furl, '--hide-ip', - '--tor-control-port', 'tcp:localhost:{}'.format(control_port), + '--tor-control-port', tor_network.client_control_endpoint, '--listen', 'tor', '--shares-needed', '1', '--shares-happy', '1', @@ -133,7 +132,7 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_ config = read_config(node_dir.path, "tub.port") config.set_config("tor", "onion", "true") config.set_config("tor", "onion.external_port", "3457") - config.set_config("tor", "control.port", f"tcp:port={control_port}:host=127.0.0.1") + config.set_config("tor", "control.port", tor_network.client_control_endpoint) config.set_config("tor", "onion.private_key_file", "private/tor_onion.privkey") print("running") @@ -159,7 +158,7 @@ def test_anonymous_client(reactor, request, temp_dir, flog_gatherer, tor_network ) yield util.await_client_ready(normie) - anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8008, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1) + anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8102, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1) yield util.await_client_ready(anonymoose, minimum_number_of_servers=1, timeout=600) yield upload_to_one_download_from_the_other(reactor, temp_dir, normie, anonymoose) diff --git a/integration/util.py b/integration/util.py index b614a84bd..909def8ef 100644 --- a/integration/util.py +++ b/integration/util.py @@ -659,7 +659,7 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve print( f"Now: {time.ctime()}\n" - f"Server last-received-data: {[time.ctime(s['last_received_data']) for s in servers]}" + f"Server last-received-data: {[s['last_received_data'] for s in servers]}" ) server_times = [ From 01a87d85be5a11f40015d651ea1244ffb3a5a487 Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 29 Jul 2023 04:08:52 -0600 Subject: [PATCH 34/50] refactor: actually parallel --- integration/conftest.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index 92483da65..aa85a38cd 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -266,8 +266,7 @@ def storage_nodes(grid): nodes_d = [] # start all 5 nodes in parallel for x in range(5): - #nodes_d.append(grid.add_storage_node()) - pytest_twisted.blockon(grid.add_storage_node()) + nodes_d.append(grid.add_storage_node()) nodes_status = pytest_twisted.blockon(DeferredList(nodes_d)) for ok, value in nodes_status: From e565b9e28c00138eed1cf3cfdb064c23ddad9ffc Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 29 Jul 2023 04:14:39 -0600 Subject: [PATCH 35/50] no, we can't --- integration/grid.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration/grid.py b/integration/grid.py index 5ce4179ec..064319f74 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -173,8 +173,6 @@ class StorageServer(object): Note that self.process and self.protocol will be new instances after this. """ - # XXX per review comments, _can_ we make this "return a new - # instance" instead of mutating? self.process.transport.signalProcess('TERM') yield self.protocol.exited self.process = yield _run_node( From c4ac548cba2c397774a5d2af3f09d1bf0a642dbc Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 29 Jul 2023 13:08:01 -0600 Subject: [PATCH 36/50] reactor from fixture --- integration/conftest.py | 2 +- integration/grid.py | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index d2024ce98..04dc400a2 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -117,7 +117,7 @@ def reactor(): @pytest.fixture(scope='session') @log_call(action_type=u"integration:port_allocator", include_result=False) def port_allocator(reactor): - return create_port_allocator(start_port=45000) + return create_port_allocator(reactor, start_port=45000) @pytest.fixture(scope='session') diff --git a/integration/grid.py b/integration/grid.py index 064319f74..343bd779f 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -529,7 +529,7 @@ def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator): returnValue(grid) -def create_port_allocator(start_port): +def create_port_allocator(reactor, start_port): """ Returns a new port-allocator .. which is a zero-argument function that returns Deferreds that fire with new, sequential ports @@ -546,11 +546,6 @@ def create_port_allocator(start_port): """ port = [start_port - 1] - # import stays here to not interfere with reactor selection -- but - # maybe this function should be arranged to be called once from a - # fixture (with the reactor)? - from twisted.internet import reactor - class NothingProtocol(Protocol): """ I do nothing. From fe96defa2b2e6f7934f97bf76f0b651b1c20b191 Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 29 Jul 2023 13:15:21 -0600 Subject: [PATCH 37/50] use existing port-allocator instead --- integration/conftest.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index 04dc400a2..46f5a0a44 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -21,7 +21,7 @@ from eliot import ( from twisted.python.filepath import FilePath from twisted.python.procutils import which -from twisted.internet.defer import DeferredList +from twisted.internet.defer import DeferredList, succeed from twisted.internet.error import ( ProcessExitedAlready, ProcessTerminated, @@ -117,7 +117,16 @@ def reactor(): @pytest.fixture(scope='session') @log_call(action_type=u"integration:port_allocator", include_result=False) def port_allocator(reactor): - return create_port_allocator(reactor, start_port=45000) + from allmydata.util.iputil import allocate_tcp_port + + # these will appear basically random, which can make especially + # manual debugging harder but we're re-using code instead of + # writing our own...so, win? + def allocate(): + port = allocate_tcp_port() + return succeed(port) + return allocate + #return create_port_allocator(reactor, start_port=45000) @pytest.fixture(scope='session') From 7a8752c969d8dc64e3e68ba944f0bf98b4e33f48 Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 29 Jul 2023 13:18:23 -0600 Subject: [PATCH 38/50] docstring, remove duplicate port-allocator --- integration/conftest.py | 4 +--- integration/grid.py | 52 ++++++----------------------------------- 2 files changed, 8 insertions(+), 48 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index 46f5a0a44..55a0bbbb5 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -47,6 +47,7 @@ from .grid import ( create_grid, ) from allmydata.node import read_config +from allmydata.util.iputil import allocate_tcp_port # No reason for HTTP requests to take longer than four minutes in the # integration tests. See allmydata/scripts/common_http.py for usage. @@ -117,8 +118,6 @@ def reactor(): @pytest.fixture(scope='session') @log_call(action_type=u"integration:port_allocator", include_result=False) def port_allocator(reactor): - from allmydata.util.iputil import allocate_tcp_port - # these will appear basically random, which can make especially # manual debugging harder but we're re-using code instead of # writing our own...so, win? @@ -126,7 +125,6 @@ def port_allocator(reactor): port = allocate_tcp_port() return succeed(port) return allocate - #return create_port_allocator(reactor, start_port=45000) @pytest.fixture(scope='session') diff --git a/integration/grid.py b/integration/grid.py index 343bd779f..79b5b45ad 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -510,11 +510,16 @@ class Grid(object): returnValue(client) -# XXX THINK can we tie a whole *grid* to a single request? (I think -# that's all that makes sense) +# A grid is now forever tied to its original 'request' which is where +# it must hang finalizers off of. The "main" one is a session-level +# fixture so it'll live the life of the tests but it could be +# per-function Grid too. @inlineCallbacks def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator): """ + Create a new grid. This will have one Introducer but zero + storage-servers or clients; those must be added by a test or + subsequent fixtures. """ intro_port = yield port_allocator() introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port) @@ -527,46 +532,3 @@ def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator): flog_gatherer, ) returnValue(grid) - - -def create_port_allocator(reactor, start_port): - """ - Returns a new port-allocator .. which is a zero-argument function - that returns Deferreds that fire with new, sequential ports - starting at `start_port` skipping any that already appear to have - a listener. - - There can still be a race against other processes allocating ports - -- between the time when we check the status of the port and when - our subprocess starts up. This *could* be mitigated by instructing - the OS to not randomly-allocate ports in some range, and then - using that range here (explicitly, ourselves). - - NB once we're Python3-only this could be an async-generator - """ - port = [start_port - 1] - - class NothingProtocol(Protocol): - """ - I do nothing. - """ - - def port_generator(): - print("Checking port {}".format(port)) - port[0] += 1 - ep = TCP4ServerEndpoint(reactor, port[0], interface="localhost") - d = ep.listen(Factory.forProtocol(NothingProtocol)) - - def good(listening_port): - unlisten_d = maybeDeferred(listening_port.stopListening) - def return_port(_): - return port[0] - unlisten_d.addBoth(return_port) - return unlisten_d - - def try_again(fail): - return port_generator() - - d.addCallbacks(good, try_again) - return d - return port_generator From 67d5c82e103f49fb1d624e3ad6908de885c01842 Mon Sep 17 00:00:00 2001 From: meejah Date: Sat, 29 Jul 2023 13:34:12 -0600 Subject: [PATCH 39/50] codechecks / linter --- integration/conftest.py | 4 ---- integration/grid.py | 8 -------- integration/test_i2p.py | 9 ++++++--- integration/test_vectors.py | 2 +- 4 files changed, 7 insertions(+), 16 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index 55a0bbbb5..a26d2043d 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -12,7 +12,6 @@ from time import sleep from os import mkdir, environ from os.path import join, exists from tempfile import mkdtemp -from json import loads from eliot import ( to_file, @@ -37,12 +36,9 @@ from .util import ( _create_node, _tahoe_runner_optional_coverage, await_client_ready, - cli, - generate_ssh_key, block_with_timeout, ) from .grid import ( - create_port_allocator, create_flog_gatherer, create_grid, ) diff --git a/integration/grid.py b/integration/grid.py index 79b5b45ad..94f8c3d7f 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -26,7 +26,6 @@ from twisted.python.procutils import which from twisted.internet.defer import ( inlineCallbacks, returnValue, - maybeDeferred, Deferred, ) from twisted.internet.task import ( @@ -36,13 +35,6 @@ from twisted.internet.interfaces import ( IProcessTransport, IProcessProtocol, ) -from twisted.internet.endpoints import ( - TCP4ServerEndpoint, -) -from twisted.internet.protocol import ( - Factory, - Protocol, -) from twisted.internet.error import ProcessTerminated from allmydata.node import read_config diff --git a/integration/test_i2p.py b/integration/test_i2p.py index ea3ddb62b..c99c469fa 100644 --- a/integration/test_i2p.py +++ b/integration/test_i2p.py @@ -24,6 +24,7 @@ from allmydata.test.common import ( write_introducer, ) from allmydata.node import read_config +from allmydata.util.iputil import allocate_tcp_port if which("docker") is None: @@ -132,8 +133,10 @@ def i2p_introducer_furl(i2p_introducer, temp_dir): @pytest_twisted.inlineCallbacks @pytest.mark.skip("I2P tests are not functioning at all, for unknown reasons") def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl): - yield _create_anonymous_node(reactor, 'carol_i2p', request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) - yield _create_anonymous_node(reactor, 'dave_i2p', request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) + web_port0 = allocate_tcp_port() + web_port1 = allocate_tcp_port() + yield _create_anonymous_node(reactor, 'carol_i2p', web_port0, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) + yield _create_anonymous_node(reactor, 'dave_i2p', web_port1, request, temp_dir, flog_gatherer, i2p_network, i2p_introducer_furl) # ensure both nodes are connected to "a grid" by uploading # something via carol, and retrieve it using dave. gold_path = join(temp_dir, "gold") @@ -179,7 +182,7 @@ def test_i2p_service_storage(reactor, request, temp_dir, flog_gatherer, i2p_netw @pytest_twisted.inlineCallbacks -def _create_anonymous_node(reactor, name, request, temp_dir, flog_gatherer, i2p_network, introducer_furl): +def _create_anonymous_node(reactor, name, web_port, request, temp_dir, flog_gatherer, i2p_network, introducer_furl): node_dir = FilePath(temp_dir).child(name) print("creating", node_dir.path) diff --git a/integration/test_vectors.py b/integration/test_vectors.py index bd5def8c5..1bcbcffa4 100644 --- a/integration/test_vectors.py +++ b/integration/test_vectors.py @@ -15,7 +15,7 @@ from pytest_twisted import ensureDeferred from . import vectors from .vectors import parameters -from .util import reconfigure, upload +from .util import upload from .grid import Client @mark.parametrize('convergence', parameters.CONVERGENCE_SECRETS) From 112770aeb31a7f95e59718c54ea59c69d842c1d7 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 31 Jul 2023 11:07:37 -0400 Subject: [PATCH 40/50] Don't hardcode tox --- setup.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/setup.py b/setup.py index a2e870e8b..86873ad53 100644 --- a/setup.py +++ b/setup.py @@ -435,9 +435,7 @@ setup(name="tahoe-lafs", # also set in __init__.py "paramiko < 2.9", "pytest-timeout", # Does our OpenMetrics endpoint adhere to the spec: - "prometheus-client == 0.11.0", - # CI uses "tox<4", change here too if that becomes different - "tox < 4", + "prometheus-client == 0.11.0" ] + tor_requires + i2p_requires, "tor": tor_requires, "i2p": i2p_requires, From e3f30d8e58fa73dbd3a2af870cb1b2c1252eb184 Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 14:48:36 -0600 Subject: [PATCH 41/50] fix comments about tor/chutney in integration config --- integration/conftest.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index 171310570..89de83cdb 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -308,19 +308,10 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques 'Tor tests are unstable on Windows') def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]: """ - Instantiate the "networks/hs-v3" Chutney configuration for a local - Tor network. + Install the Chutney software that is required to run a small local Tor grid. - This provides a small, local Tor network that can run v3 Onion - Services. This has 10 tor processes: 3 authorities, 5 - exits+relays, a client (and one service-hosting node we don't use). - - We pin a Chutney revision, so things shouldn't change. Currently, - the ONLY node that exposes a valid SocksPort is "008c" (the - client) on 9008. - - The control ports start at 8000 (so the ControlPort for the one - client node is 8008). + (Chutney lacks the normal "python stuff" so we can't just declare + it in Tox or similar dependencies) """ # Try to find Chutney already installed in the environment. try: @@ -404,6 +395,20 @@ def tor_network(reactor, temp_dir, chutney, request): """ Build a basic Tor network. + Instantiate the "networks/basic" Chutney configuration for a local + Tor network. + + This provides a small, local Tor network that can run v3 Onion + Services. It has 3 authorities, 5 relays and 2 clients. + + The 'chutney' fixture pins a Chutney git qrevision, so things + shouldn't change. This network has two clients which are the only + nodes with valid SocksPort configuration ("008c" and "009c" 9008 + and 9009) + + The control ports start at 8000 (so the ControlPort for the client + nodes are 8008 and 8009). + :param chutney: The root directory of a Chutney checkout and a dict of additional environment variables to set so a Python process can use it. From 8ec7f5485a1836c8a79f689a0b609d94aa0caf88 Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 14:48:59 -0600 Subject: [PATCH 42/50] upload() needs the actual alice fixture --- integration/test_vectors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/test_vectors.py b/integration/test_vectors.py index 1bcbcffa4..f53ec1741 100644 --- a/integration/test_vectors.py +++ b/integration/test_vectors.py @@ -41,7 +41,7 @@ async def test_capability(reactor, request, alice, case, expected): reactor, (1, case.params.required, case.params.total), case.convergence, case.segment_size) # upload data in the correct format - actual = upload(alice.process, case.fmt, case.data) + actual = upload(alice, case.fmt, case.data) # compare the resulting cap to the expected result assert actual == expected From bd0bfa4ab7c3cc366503ef088b8497f60dd4388b Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 14:49:36 -0600 Subject: [PATCH 43/50] define -> frozen Co-authored-by: Jean-Paul Calderone --- integration/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/conftest.py b/integration/conftest.py index 171310570..36cda8f45 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -382,7 +382,7 @@ def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]: return chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")} -@define +@frozen class ChutneyTorNetwork: """ Represents a running Chutney (tor) network. Returned by the From 7127ae62a942a329d08c982a49883f3f2ed38ee5 Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 14:50:04 -0600 Subject: [PATCH 44/50] fix types Co-authored-by: Jean-Paul Calderone --- integration/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/conftest.py b/integration/conftest.py index 36cda8f45..52bffff61 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -389,7 +389,7 @@ class ChutneyTorNetwork: "tor_network" fixture. """ dir: FilePath - environ: dict + environ: Mapping[str, str] client_control_port: int @property From 3e2c784e7794de806280bf1c627ae4884c3ef508 Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 14:58:04 -0600 Subject: [PATCH 45/50] likely to be more-right --- integration/grid.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/integration/grid.py b/integration/grid.py index 94f8c3d7f..00f0dd826 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -116,13 +116,14 @@ def create_flog_gatherer(reactor, request, temp_dir, flog_binary): flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')] print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file)) - reactor.spawnProcess( - flog_protocol, - flog_binary, - ( - 'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0]) - ), - ) + for flog_path in flogs: + reactor.spawnProcess( + flog_protocol, + flog_binary, + ( + 'flogtool', 'dump', join(temp_dir, 'flog_gather', flog_path) + ), + ) print("Waiting for flogtool to complete") try: pytest_twisted.blockon(flog_protocol.done) From 63f4c6fcc6c421a96827558c569a08065a3dc2a6 Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 14:58:55 -0600 Subject: [PATCH 46/50] import to top-level --- integration/grid.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/grid.py b/integration/grid.py index 00f0dd826..b9af7ed5d 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -51,6 +51,7 @@ from .util import ( generate_ssh_key, cli, reconfigure, + _create_node, ) import attr @@ -181,7 +182,6 @@ def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, """ Create a new storage server """ - from .util import _create_node node_process = yield _create_node( reactor, request, temp_dir, introducer.furl, flog_gatherer, name, web_port, storage=True, needed=needed, happy=happy, total=total, From f77b6c433778ed91b7c41abf6b2c1ddb3e5dc94a Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 15:12:38 -0600 Subject: [PATCH 47/50] fix XXX comment + add docstring --- integration/conftest.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/integration/conftest.py b/integration/conftest.py index 89de83cdb..f7fb5f093 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -166,10 +166,12 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request): @pytest.fixture(scope='session') @log_call(action_type=u"integration:grid", include_args=[]) def grid(reactor, request, temp_dir, flog_gatherer, port_allocator): - # XXX think: this creates an "empty" grid (introducer, no nodes); - # do we want to ensure it has some minimum storage-nodes at least? - # (that is, semantically does it make sense that 'a grid' is - # essentially empty, or not?) + """ + Provides a new Grid with a single Introducer and flog-gathering process. + + Notably does _not_ provide storage servers; use the storage_nodes + fixture if your tests need a Grid that can be used for puts / gets. + """ g = pytest_twisted.blockon( create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) ) From 8b175383af0ce7d4c835bb2a029797730fb4d646 Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 15:15:33 -0600 Subject: [PATCH 48/50] flake8 --- integration/conftest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integration/conftest.py b/integration/conftest.py index be467bb34..313ff36c2 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -7,7 +7,7 @@ from __future__ import annotations import os import sys import shutil -from attr import define +from attr import frozen from time import sleep from os import mkdir, environ from os.path import join, exists @@ -28,6 +28,7 @@ from twisted.internet.error import ( import pytest import pytest_twisted +from typing import Mapping from .util import ( _MagicTextProtocol, From f663581ed32e2d0f1206074cca21151d227bf3bc Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 16:27:18 -0600 Subject: [PATCH 49/50] temporarily remove new provides() usage --- integration/grid.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/integration/grid.py b/integration/grid.py index b9af7ed5d..c39b9cff9 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -31,10 +31,8 @@ from twisted.internet.defer import ( from twisted.internet.task import ( deferLater, ) -from twisted.internet.interfaces import ( - IProcessTransport, - IProcessProtocol, -) +from twisted.internet.protocol import ProcessProtocol # see ticket 4056 +from twisted.internet.process import Process # see ticket 4056 from twisted.internet.error import ProcessTerminated from allmydata.node import read_config @@ -71,11 +69,17 @@ class FlogGatherer(object): Flog Gatherer process. """ + # it would be best to use attr.validators.provides() here with the + # corresponding Twisted interface (IProcessTransport, + # IProcessProtocol) but that is deprecated; please replace with + # our own "provides" as part of + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4056#ticket + # insisting on a subclass is narrower than necessary process = attr.ib( - validator=attr.validators.provides(IProcessTransport) + validator=attr.validators.instance_of(Process) ) protocol = attr.ib( - validator=attr.validators.provides(IProcessProtocol) + validator=attr.validators.instance_of(ProcessProtocol) ) furl = attr.ib() @@ -155,7 +159,7 @@ class StorageServer(object): validator=attr.validators.instance_of(TahoeProcess) ) protocol = attr.ib( - validator=attr.validators.provides(IProcessProtocol) + validator=attr.validators.instance_of(ProcessProtocol) ) @inlineCallbacks @@ -207,7 +211,7 @@ class Client(object): validator=attr.validators.instance_of(TahoeProcess) ) protocol = attr.ib( - validator=attr.validators.provides(IProcessProtocol) + validator=attr.validators.instance_of(ProcessProtocol) ) request = attr.ib() # original request, for addfinalizer() @@ -335,7 +339,7 @@ class Introducer(object): validator=attr.validators.instance_of(TahoeProcess) ) protocol = attr.ib( - validator=attr.validators.provides(IProcessProtocol) + validator=attr.validators.instance_of(ProcessProtocol) ) furl = attr.ib() From d0208bc099a3c500a50fceae1fbe1785c0144725 Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 2 Aug 2023 16:52:29 -0600 Subject: [PATCH 50/50] different Process instance on different platforms --- integration/grid.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/integration/grid.py b/integration/grid.py index c39b9cff9..524da730f 100644 --- a/integration/grid.py +++ b/integration/grid.py @@ -32,7 +32,6 @@ from twisted.internet.task import ( deferLater, ) from twisted.internet.protocol import ProcessProtocol # see ticket 4056 -from twisted.internet.process import Process # see ticket 4056 from twisted.internet.error import ProcessTerminated from allmydata.node import read_config @@ -75,9 +74,7 @@ class FlogGatherer(object): # our own "provides" as part of # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4056#ticket # insisting on a subclass is narrower than necessary - process = attr.ib( - validator=attr.validators.instance_of(Process) - ) + process = attr.ib() protocol = attr.ib( validator=attr.validators.instance_of(ProcessProtocol) )