diff --git a/newsfragments/3526.minor b/newsfragments/3526.minor new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/newsfragments/3526.minor @@ -0,0 +1 @@ + diff --git a/src/allmydata/scripts/create_node.py b/src/allmydata/scripts/create_node.py index 4959ed391..5d9da518b 100644 --- a/src/allmydata/scripts/create_node.py +++ b/src/allmydata/scripts/create_node.py @@ -37,9 +37,6 @@ from allmydata.util.assertutil import precondition from allmydata.util.encodingutil import listdir_unicode, argv_to_unicode, quote_local_unicode_path, get_io_encoding from allmydata.util import fileutil, i2p_provider, iputil, tor_provider, jsonbytes as json -from wormhole import wormhole - - dummy_tac = """ import sys print("Nodes created by Tahoe-LAFS v1.11.0 or later cannot be run by") @@ -377,7 +374,7 @@ def _get_config_via_wormhole(config): relay_url = config.parent['wormhole-server'] print("Connecting to '{}'".format(relay_url), file=out) - wh = wormhole.create( + wh = config.parent.wormhole.create( appid=config.parent['wormhole-invite-appid'], relay_url=relay_url, reactor=reactor, diff --git a/src/allmydata/scripts/runner.py b/src/allmydata/scripts/runner.py index 145ee6464..a0d8a752b 100644 --- a/src/allmydata/scripts/runner.py +++ b/src/allmydata/scripts/runner.py @@ -58,11 +58,17 @@ process_control_commands = [ class Options(usage.Options): + """ + :ivar wormhole: An object exposing the magic-wormhole API (mainly a test + hook). + """ # unit tests can override these to point at StringIO instances stdin = sys.stdin stdout = sys.stdout stderr = sys.stderr + from wormhole import wormhole + subCommands = ( create_node.subCommands + admin.subCommands + process_control_commands diff --git a/src/allmydata/scripts/tahoe_invite.py b/src/allmydata/scripts/tahoe_invite.py index 09d4cbd59..b62d6a463 100644 --- a/src/allmydata/scripts/tahoe_invite.py +++ b/src/allmydata/scripts/tahoe_invite.py @@ -18,8 +18,6 @@ except ImportError: from twisted.python import usage from twisted.internet import defer, reactor -from wormhole import wormhole - from allmydata.util.encodingutil import argv_to_abspath from allmydata.util import jsonbytes as json from allmydata.scripts.common import get_default_nodedir, get_introducer_furl @@ -50,7 +48,7 @@ def _send_config_via_wormhole(options, config): err = options.stderr relay_url = options.parent['wormhole-server'] print("Connecting to '{}'...".format(relay_url), file=out) - wh = wormhole.create( + wh = options.parent.wormhole.create( appid=options.parent['wormhole-invite-appid'], relay_url=relay_url, reactor=reactor, diff --git a/src/allmydata/test/cli/test_invite.py b/src/allmydata/test/cli/test_invite.py index 20d012995..07756eeed 100644 --- a/src/allmydata/test/cli/test_invite.py +++ b/src/allmydata/test/cli/test_invite.py @@ -1,75 +1,117 @@ """ -Ported to Pythn 3. +Tests for ``tahoe invite``. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals -from future.utils import PY2 -if PY2: - from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 +from __future__ import annotations -import os -import mock import json +import os +from functools import partial from os.path import join +from typing import Awaitable, Callable, Optional, Sequence, TypeVar, Union -try: - from typing import Optional, Sequence -except ImportError: - pass - -from twisted.trial import unittest from twisted.internet import defer +from twisted.trial import unittest + +from ...client import read_config +from ...scripts import runner +from ...util.jsonbytes import dumps_bytes from ..common_util import run_cli from ..no_network import GridTestMixin from .common import CLITestMixin -from ...client import ( - read_config, -) +from .wormholetesting import IWormhole, MemoryWormholeServer, TestingHelper, memory_server -class _FakeWormhole(object): +# Logically: +# JSONable = dict[str, Union[JSONable, None, int, float, str, list[JSONable]]] +# +# But practically: +JSONable = Union[dict, None, int, float, str, list] - def __init__(self, outgoing_messages): - self.messages = [] - for o in outgoing_messages: - assert isinstance(o, bytes) - self._outgoing = outgoing_messages - def get_code(self): - return defer.succeed(u"6-alarmist-tuba") +async def open_wormhole() -> tuple[Callable, IWormhole, str]: + """ + Create a new in-memory wormhole server, open one end of a wormhole, and + return it and related info. - def set_code(self, code): - self._code = code + :return: A three-tuple allowing use of the wormhole. The first element is + a callable like ``run_cli`` but which will run commands so that they + use the in-memory wormhole server instead of a real one. The second + element is the open wormhole. The third element is the wormhole's + code. + """ + server = MemoryWormholeServer() + options = runner.Options() + options.wormhole = server + reactor = object() - def get_welcome(self): - return defer.succeed( - { - u"welcome": {}, - } + wormhole = server.create( + "tahoe-lafs.org/invite", + "ws://wormhole.tahoe-lafs.org:4000/v1", + reactor, + ) + code = await wormhole.get_code() + + return (partial(run_cli, options=options), wormhole, code) + + +def make_simple_peer( + reactor, + server: MemoryWormholeServer, + helper: TestingHelper, + messages: Sequence[JSONable], +) -> Callable[[], Awaitable[IWormhole]]: + """ + Make a wormhole peer that just sends the given messages. + + The returned function returns an awaitable that fires with the peer's end + of the wormhole. + """ + async def peer() -> IWormhole: + # Run the client side of the invitation by manually pumping a + # message through the wormhole. + + # First, wait for the server to create the wormhole at all. + wormhole = await helper.wait_for_wormhole( + "tahoe-lafs.org/invite", + "ws://wormhole.tahoe-lafs.org:4000/v1", ) + # Then read out its code and open the other side of the wormhole. + code = await wormhole.when_code() + other_end = server.create( + "tahoe-lafs.org/invite", + "ws://wormhole.tahoe-lafs.org:4000/v1", + reactor, + ) + other_end.set_code(code) + send_messages(other_end, messages) + return other_end - def allocate_code(self): - return None - - def send_message(self, msg): - assert isinstance(msg, bytes) - self.messages.append(msg) - - def get_message(self): - return defer.succeed(self._outgoing.pop(0)) - - def close(self): - return defer.succeed(None) + return peer -def _create_fake_wormhole(outgoing_messages): - outgoing_messages = [ - m.encode("utf-8") if isinstance(m, str) else m - for m in outgoing_messages - ] - return _FakeWormhole(outgoing_messages) +def send_messages(wormhole: IWormhole, messages: Sequence[JSONable]) -> None: + """ + Send a list of message through a wormhole. + """ + for msg in messages: + wormhole.send_message(dumps_bytes(msg)) + + +A = TypeVar("A") +B = TypeVar("B") + +def concurrently( + client: Callable[[], Awaitable[A]], + server: Callable[[], Awaitable[B]], +) -> defer.Deferred[tuple[A, B]]: + """ + Run two asynchronous functions concurrently and asynchronously return a + tuple of both their results. + """ + return defer.gatherResults([ + defer.Deferred.fromCoroutine(client()), + defer.Deferred.fromCoroutine(server()), + ]) class Join(GridTestMixin, CLITestMixin, unittest.TestCase): @@ -86,41 +128,39 @@ class Join(GridTestMixin, CLITestMixin, unittest.TestCase): successfully join after an invite """ node_dir = self.mktemp() + run_cli, wormhole, code = yield defer.Deferred.fromCoroutine(open_wormhole()) + send_messages(wormhole, [ + {u"abilities": {u"server-v1": {}}}, + { + u"shares-needed": 1, + u"shares-happy": 1, + u"shares-total": 1, + u"nickname": u"somethinghopefullyunique", + u"introducer": u"pb://foo", + }, + ]) - with mock.patch('allmydata.scripts.create_node.wormhole') as w: - fake_wh = _create_fake_wormhole([ - json.dumps({u"abilities": {u"server-v1": {}}}), - json.dumps({ - u"shares-needed": 1, - u"shares-happy": 1, - u"shares-total": 1, - u"nickname": u"somethinghopefullyunique", - u"introducer": u"pb://foo", - }), - ]) - w.create = mock.Mock(return_value=fake_wh) + rc, out, err = yield run_cli( + "create-client", + "--join", code, + node_dir, + ) - rc, out, err = yield run_cli( - "create-client", - "--join", "1-abysmal-ant", - node_dir, - ) + self.assertEqual(0, rc) - self.assertEqual(0, rc) + config = read_config(node_dir, u"") + self.assertIn( + "pb://foo", + set( + furl + for (furl, cache) + in config.get_introducer_configuration().values() + ), + ) - config = read_config(node_dir, u"") - self.assertIn( - "pb://foo", - set( - furl - for (furl, cache) - in config.get_introducer_configuration().values() - ), - ) - - with open(join(node_dir, 'tahoe.cfg'), 'r') as f: - config = f.read() - self.assertIn(u"somethinghopefullyunique", config) + with open(join(node_dir, 'tahoe.cfg'), 'r') as f: + config = f.read() + self.assertIn(u"somethinghopefullyunique", config) @defer.inlineCallbacks def test_create_node_illegal_option(self): @@ -128,30 +168,28 @@ class Join(GridTestMixin, CLITestMixin, unittest.TestCase): Server sends JSON with unknown/illegal key """ node_dir = self.mktemp() + run_cli, wormhole, code = yield defer.Deferred.fromCoroutine(open_wormhole()) + send_messages(wormhole, [ + {u"abilities": {u"server-v1": {}}}, + { + u"shares-needed": 1, + u"shares-happy": 1, + u"shares-total": 1, + u"nickname": u"somethinghopefullyunique", + u"introducer": u"pb://foo", + u"something-else": u"not allowed", + }, + ]) - with mock.patch('allmydata.scripts.create_node.wormhole') as w: - fake_wh = _create_fake_wormhole([ - json.dumps({u"abilities": {u"server-v1": {}}}), - json.dumps({ - u"shares-needed": 1, - u"shares-happy": 1, - u"shares-total": 1, - u"nickname": u"somethinghopefullyunique", - u"introducer": u"pb://foo", - u"something-else": u"not allowed", - }), - ]) - w.create = mock.Mock(return_value=fake_wh) + rc, out, err = yield run_cli( + "create-client", + "--join", code, + node_dir, + ) - rc, out, err = yield run_cli( - "create-client", - "--join", "1-abysmal-ant", - node_dir, - ) - - # should still succeed -- just ignores the not-whitelisted - # "something-else" option - self.assertEqual(0, rc) + # should still succeed -- just ignores the not-whitelisted + # "something-else" option + self.assertEqual(0, rc) class Invite(GridTestMixin, CLITestMixin, unittest.TestCase): @@ -168,8 +206,7 @@ class Invite(GridTestMixin, CLITestMixin, unittest.TestCase): intro_dir, ) - def _invite_success(self, extra_args=(), tahoe_config=None): - # type: (Sequence[bytes], Optional[bytes]) -> defer.Deferred + async def _invite_success(self, extra_args: Sequence[bytes] = (), tahoe_config: Optional[bytes] = None) -> str: """ Exercise an expected-success case of ``tahoe invite``. @@ -190,53 +227,58 @@ class Invite(GridTestMixin, CLITestMixin, unittest.TestCase): with open(join(intro_dir, "tahoe.cfg"), "wb") as fobj_cfg: fobj_cfg.write(tahoe_config) - with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w: - fake_wh = _create_fake_wormhole([ - json.dumps({u"abilities": {u"client-v1": {}}}), - ]) - w.create = mock.Mock(return_value=fake_wh) + wormhole_server, helper = memory_server() + options = runner.Options() + options.wormhole = wormhole_server + reactor = object() - extra_args = tuple(extra_args) - - d = run_cli( + async def server(): + # Run the server side of the invitation process using the CLI. + rc, out, err = await run_cli( "-d", intro_dir, "invite", - *(extra_args + ("foo",)) + *tuple(extra_args) + ("foo",), + options=options, ) - def done(result): - rc, out, err = result - self.assertEqual(2, len(fake_wh.messages)) - self.assertEqual( - json.loads(fake_wh.messages[0]), - { - "abilities": - { - "server-v1": {} - }, - }, - ) - invite = json.loads(fake_wh.messages[1]) - self.assertEqual( - invite["nickname"], "foo", - ) - self.assertEqual( - invite["introducer"], "pb://fooblam", - ) - return invite - d.addCallback(done) - return d + # Send a proper client abilities message. + client = make_simple_peer(reactor, wormhole_server, helper, [{u"abilities": {u"client-v1": {}}}]) + other_end, _ = await concurrently(client, server) + + # Check the server's messages. First, it should announce its + # abilities correctly. + server_abilities = json.loads(await other_end.when_received()) + self.assertEqual( + server_abilities, + { + "abilities": + { + "server-v1": {} + }, + }, + ) + + # Second, it should have an invitation with a nickname and introducer + # furl. + invite = json.loads(await other_end.when_received()) + self.assertEqual( + invite["nickname"], "foo", + ) + self.assertEqual( + invite["introducer"], "pb://fooblam", + ) + return invite @defer.inlineCallbacks def test_invite_success(self): """ successfully send an invite """ - invite = yield self._invite_success(( + invite = yield defer.Deferred.fromCoroutine(self._invite_success(( "--shares-needed", "1", "--shares-happy", "2", "--shares-total", "3", - )) + ))) self.assertEqual( invite["shares-needed"], "1", ) @@ -253,12 +295,12 @@ class Invite(GridTestMixin, CLITestMixin, unittest.TestCase): If ``--shares-{needed,happy,total}`` are not given on the command line then the invitation is generated using the configured values. """ - invite = yield self._invite_success(tahoe_config=b""" + invite = yield defer.Deferred.fromCoroutine(self._invite_success(tahoe_config=b""" [client] shares.needed = 2 shares.happy = 4 shares.total = 6 -""") +""")) self.assertEqual( invite["shares-needed"], "2", ) @@ -277,22 +319,20 @@ shares.total = 6 """ intro_dir = os.path.join(self.basedir, "introducer") - with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w: - fake_wh = _create_fake_wormhole([ - json.dumps({u"abilities": {u"client-v1": {}}}), - ]) - w.create = mock.Mock(return_value=fake_wh) + options = runner.Options() + options.wormhole = None - rc, out, err = yield run_cli( - "-d", intro_dir, - "invite", - "--shares-needed", "1", - "--shares-happy", "1", - "--shares-total", "1", - "foo", - ) - self.assertNotEqual(rc, 0) - self.assertIn(u"Can't find introducer FURL", out + err) + rc, out, err = yield run_cli( + "-d", intro_dir, + "invite", + "--shares-needed", "1", + "--shares-happy", "1", + "--shares-total", "1", + "foo", + options=options, + ) + self.assertNotEqual(rc, 0) + self.assertIn(u"Can't find introducer FURL", out + err) @defer.inlineCallbacks def test_invite_wrong_client_abilities(self): @@ -306,23 +346,28 @@ shares.total = 6 with open(join(priv_dir, "introducer.furl"), "w") as f: f.write("pb://fooblam\n") - with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w: - fake_wh = _create_fake_wormhole([ - json.dumps({u"abilities": {u"client-v9000": {}}}), - ]) - w.create = mock.Mock(return_value=fake_wh) + wormhole_server, helper = memory_server() + options = runner.Options() + options.wormhole = wormhole_server + reactor = object() - rc, out, err = yield run_cli( + async def server(): + rc, out, err = await run_cli( "-d", intro_dir, "invite", "--shares-needed", "1", "--shares-happy", "1", "--shares-total", "1", "foo", + options=options, ) self.assertNotEqual(rc, 0) self.assertIn(u"No 'client-v1' in abilities", out + err) + # Send some surprising client abilities. + client = make_simple_peer(reactor, wormhole_server, helper, [{u"abilities": {u"client-v9000": {}}}]) + yield concurrently(client, server) + @defer.inlineCallbacks def test_invite_no_client_abilities(self): """ @@ -335,23 +380,30 @@ shares.total = 6 with open(join(priv_dir, "introducer.furl"), "w") as f: f.write("pb://fooblam\n") - with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w: - fake_wh = _create_fake_wormhole([ - json.dumps({}), - ]) - w.create = mock.Mock(return_value=fake_wh) + wormhole_server, helper = memory_server() + options = runner.Options() + options.wormhole = wormhole_server + reactor = object() - rc, out, err = yield run_cli( + async def server(): + # Run the server side of the invitation process using the CLI. + rc, out, err = await run_cli( "-d", intro_dir, "invite", "--shares-needed", "1", "--shares-happy", "1", "--shares-total", "1", "foo", + options=options, ) self.assertNotEqual(rc, 0) self.assertIn(u"No 'abilities' from client", out + err) + # Send a no-abilities message through to the server. + client = make_simple_peer(reactor, wormhole_server, helper, [{}]) + yield concurrently(client, server) + + @defer.inlineCallbacks def test_invite_wrong_server_abilities(self): """ @@ -364,26 +416,25 @@ shares.total = 6 with open(join(priv_dir, "introducer.furl"), "w") as f: f.write("pb://fooblam\n") - with mock.patch('allmydata.scripts.create_node.wormhole') as w: - fake_wh = _create_fake_wormhole([ - json.dumps({u"abilities": {u"server-v9000": {}}}), - json.dumps({ - "shares-needed": "1", - "shares-total": "1", - "shares-happy": "1", - "nickname": "foo", - "introducer": "pb://fooblam", - }), - ]) - w.create = mock.Mock(return_value=fake_wh) + run_cli, wormhole, code = yield defer.Deferred.fromCoroutine(open_wormhole()) + send_messages(wormhole, [ + {u"abilities": {u"server-v9000": {}}}, + { + "shares-needed": "1", + "shares-total": "1", + "shares-happy": "1", + "nickname": "foo", + "introducer": "pb://fooblam", + }, + ]) - rc, out, err = yield run_cli( - "create-client", - "--join", "1-alarmist-tuba", - "foo", - ) - self.assertNotEqual(rc, 0) - self.assertIn("Expected 'server-v1' in server abilities", out + err) + rc, out, err = yield run_cli( + "create-client", + "--join", code, + "foo", + ) + self.assertNotEqual(rc, 0) + self.assertIn("Expected 'server-v1' in server abilities", out + err) @defer.inlineCallbacks def test_invite_no_server_abilities(self): @@ -397,26 +448,25 @@ shares.total = 6 with open(join(priv_dir, "introducer.furl"), "w") as f: f.write("pb://fooblam\n") - with mock.patch('allmydata.scripts.create_node.wormhole') as w: - fake_wh = _create_fake_wormhole([ - json.dumps({}), - json.dumps({ - "shares-needed": "1", - "shares-total": "1", - "shares-happy": "1", - "nickname": "bar", - "introducer": "pb://fooblam", - }), - ]) - w.create = mock.Mock(return_value=fake_wh) + run_cli, wormhole, code = yield defer.Deferred.fromCoroutine(open_wormhole()) + send_messages(wormhole, [ + {}, + { + "shares-needed": "1", + "shares-total": "1", + "shares-happy": "1", + "nickname": "bar", + "introducer": "pb://fooblam", + }, + ]) - rc, out, err = yield run_cli( - "create-client", - "--join", "1-alarmist-tuba", - "bar", - ) - self.assertNotEqual(rc, 0) - self.assertIn("Expected 'abilities' in server introduction", out + err) + rc, out, err = yield run_cli( + "create-client", + "--join", code, + "bar", + ) + self.assertNotEqual(rc, 0) + self.assertIn("Expected 'abilities' in server introduction", out + err) @defer.inlineCallbacks def test_invite_no_nick(self): @@ -425,13 +475,16 @@ shares.total = 6 """ intro_dir = os.path.join(self.basedir, "introducer") - with mock.patch('allmydata.scripts.tahoe_invite.wormhole'): - rc, out, err = yield run_cli( - "-d", intro_dir, - "invite", - "--shares-needed", "1", - "--shares-happy", "1", - "--shares-total", "1", - ) - self.assertTrue(rc) - self.assertIn(u"Provide a single argument", out + err) + options = runner.Options() + options.wormhole = None + + rc, out, err = yield run_cli( + "-d", intro_dir, + "invite", + "--shares-needed", "1", + "--shares-happy", "1", + "--shares-total", "1", + options=options, + ) + self.assertTrue(rc) + self.assertIn(u"Provide a single argument", out + err) diff --git a/src/allmydata/test/cli/wormholetesting.py b/src/allmydata/test/cli/wormholetesting.py new file mode 100644 index 000000000..744f9d75a --- /dev/null +++ b/src/allmydata/test/cli/wormholetesting.py @@ -0,0 +1,305 @@ +""" +An in-memory implementation of some of the magic-wormhole interfaces for +use by automated tests. + +For example:: + + async def peerA(mw): + wormhole = mw.create("myapp", "wss://myserver", reactor) + code = await wormhole.get_code() + print(f"I have a code: {code}") + message = await wormhole.when_received() + print(f"I have a message: {message}") + + async def local_peerB(helper, mw): + peerA_wormhole = await helper.wait_for_wormhole("myapp", "wss://myserver") + code = await peerA_wormhole.when_code() + + peerB_wormhole = mw.create("myapp", "wss://myserver") + peerB_wormhole.set_code(code) + + peerB_wormhole.send_message("Hello, peer A") + + # Run peerA against local_peerB with pure in-memory message passing. + server, helper = memory_server() + run(gather(peerA(server), local_peerB(helper, server))) + + # Run peerA against a peerB somewhere out in the world, using a real + # wormhole relay server somewhere. + import wormhole + run(peerA(wormhole)) +""" + +from __future__ import annotations + +from typing import Iterator, Optional, List, Tuple +from collections.abc import Awaitable +from inspect import getargspec +from itertools import count +from sys import stderr + +from attrs import frozen, define, field, Factory +from twisted.internet.defer import Deferred, DeferredQueue, succeed +from wormhole._interfaces import IWormhole +from wormhole.wormhole import create +from zope.interface import implementer + +WormholeCode = str +WormholeMessage = bytes +AppId = str +RelayURL = str +ApplicationKey = Tuple[RelayURL, AppId] + +@define +class MemoryWormholeServer(object): + """ + A factory for in-memory wormholes. + + :ivar _apps: Wormhole state arranged by the application id and relay URL + it belongs to. + + :ivar _waiters: Observers waiting for a wormhole to be created for a + specific application id and relay URL combination. + """ + _apps: dict[ApplicationKey, _WormholeApp] = field(default=Factory(dict)) + _waiters: dict[ApplicationKey, Deferred] = field(default=Factory(dict)) + + def create( + self, + appid, + relay_url, + reactor, + versions={}, + delegate=None, + journal=None, + tor=None, + timing=None, + stderr=stderr, + _eventual_queue=None, + _enable_dilate=False, + ): + """ + Create a wormhole. It will be able to connect to other wormholes created + by this instance (and constrained by the normal appid/relay_url + rules). + """ + if tor is not None: + raise ValueError("Cannot deal with Tor right now.") + if _enable_dilate: + raise ValueError("Cannot deal with dilation right now.") + + key = (relay_url, appid) + wormhole = _MemoryWormhole(self._view(key)) + if key in self._waiters: + self._waiters.pop(key).callback(wormhole) + return wormhole + + def _view(self, key: ApplicationKey) -> _WormholeServerView: + """ + Created a view onto this server's state that is limited by a certain + appid/relay_url pair. + """ + return _WormholeServerView(self, key) + + +@frozen +class TestingHelper(object): + """ + Provide extra functionality for interacting with an in-memory wormhole + implementation. + + This is intentionally a separate API so that it is not confused with + proper public interface of the real wormhole implementation. + """ + _server: MemoryWormholeServer + + async def wait_for_wormhole(self, appid: AppId, relay_url: RelayURL) -> IWormhole: + """ + Wait for a wormhole to appear at a specific location. + + :param appid: The appid that the resulting wormhole will have. + + :param relay_url: The URL of the relay at which the resulting wormhole + will presume to be created. + + :return: The first wormhole to be created which matches the given + parameters. + """ + key = (relay_url, appid) + if key in self._server._waiters: + raise ValueError(f"There is already a waiter for {key}") + d = Deferred() + self._server._waiters[key] = d + wormhole = await d + return wormhole + + +def _verify(): + """ + Roughly confirm that the in-memory wormhole creation function matches the + interface of the real implementation. + """ + # Poor man's interface verification. + + a = getargspec(create) + b = getargspec(MemoryWormholeServer.create) + # I know it has a `self` argument at the beginning. That's okay. + b = b._replace(args=b.args[1:]) + assert a == b, "{} != {}".format(a, b) + + +_verify() + + +@define +class _WormholeApp(object): + """ + Represent a collection of wormholes that belong to the same + appid/relay_url scope. + """ + wormholes: dict[WormholeCode, IWormhole] = field(default=Factory(dict)) + _waiting: dict[WormholeCode, List[Deferred]] = field(default=Factory(dict)) + _counter: Iterator[int] = field(default=Factory(count)) + + def allocate_code(self, wormhole: IWormhole, code: Optional[WormholeCode]) -> WormholeCode: + """ + Allocate a new code for the given wormhole. + + This also associates the given wormhole with the code for future + lookup. + + Code generation logic is trivial and certainly not good enough for any + real use. It is sufficient for automated testing, though. + """ + if code is None: + code = "{}-persnickety-tardigrade".format(next(self._counter)) + self.wormholes.setdefault(code, []).append(wormhole) + try: + waiters = self._waiting.pop(code) + except KeyError: + pass + else: + for w in waiters: + w.callback(wormhole) + + return code + + def wait_for_wormhole(self, code: WormholeCode) -> Awaitable[_MemoryWormhole]: + """ + Return a ``Deferred`` which fires with the next wormhole to be associated + with the given code. This is used to let the first end of a wormhole + rendezvous with the second end. + """ + d = Deferred() + self._waiting.setdefault(code, []).append(d) + return d + + +@frozen +class _WormholeServerView(object): + """ + Present an interface onto the server to be consumed by individual + wormholes. + """ + _server: MemoryWormholeServer + _key: ApplicationKey + + def allocate_code(self, wormhole: _MemoryWormhole, code: Optional[WormholeCode]) -> WormholeCode: + """ + Allocate a new code for the given wormhole in the scope associated with + this view. + """ + app = self._server._apps.setdefault(self._key, _WormholeApp()) + return app.allocate_code(wormhole, code) + + def wormhole_by_code(self, code: WormholeCode, exclude: object) -> Deferred[IWormhole]: + """ + Retrieve all wormholes previously associated with a code. + """ + app = self._server._apps[self._key] + wormholes = app.wormholes[code] + try: + [wormhole] = list(wormhole for wormhole in wormholes if wormhole != exclude) + except ValueError: + return app.wait_for_wormhole(code) + return succeed(wormhole) + + +@implementer(IWormhole) +@define +class _MemoryWormhole(object): + """ + Represent one side of a wormhole as conceived by ``MemoryWormholeServer``. + """ + + _view: _WormholeServerView + _code: Optional[WormholeCode] = None + _payload: DeferredQueue = field(default=Factory(DeferredQueue)) + _waiting_for_code: list[Deferred] = field(default=Factory(list)) + + def allocate_code(self) -> None: + if self._code is not None: + raise ValueError( + "allocate_code used with a wormhole which already has a code" + ) + self._code = self._view.allocate_code(self, None) + waiters = self._waiting_for_code + self._waiting_for_code = [] + for d in waiters: + d.callback(self._code) + + def set_code(self, code: WormholeCode) -> None: + if self._code is None: + self._code = code + self._view.allocate_code(self, code) + else: + raise ValueError("set_code used with a wormhole which already has a code") + + def when_code(self) -> Deferred[WormholeCode]: + if self._code is None: + d = Deferred() + self._waiting_for_code.append(d) + return d + return succeed(self._code) + + def get_welcome(self): + return succeed("welcome") + + def send_message(self, payload: WormholeMessage) -> None: + self._payload.put(payload) + + def when_received(self) -> Deferred[WormholeMessage]: + if self._code is None: + raise ValueError( + "This implementation requires set_code or allocate_code " + "before when_received." + ) + d = self._view.wormhole_by_code(self._code, exclude=self) + + def got_wormhole(wormhole): + msg = wormhole._payload.get() + return msg + + d.addCallback(got_wormhole) + return d + + get_message = when_received + + def close(self) -> None: + pass + + # 0.9.2 compatibility + def get_code(self) -> Deferred[WormholeCode]: + if self._code is None: + self.allocate_code() + return self.when_code() + + get = when_received + + +def memory_server() -> tuple[MemoryWormholeServer, TestingHelper]: + """ + Create a paired in-memory wormhole server and testing helper. + """ + server = MemoryWormholeServer() + return server, TestingHelper(server) diff --git a/src/allmydata/test/common_util.py b/src/allmydata/test/common_util.py index d2d20916d..e63c3eef8 100644 --- a/src/allmydata/test/common_util.py +++ b/src/allmydata/test/common_util.py @@ -69,6 +69,9 @@ def run_cli_native(verb, *args, **kwargs): Most code should prefer ``run_cli_unicode`` which deals with all the necessary encoding considerations. + :param runner.Options options: The options instance to use to parse the + given arguments. + :param native_str verb: The command to run. For example, ``"create-node"``. @@ -88,6 +91,7 @@ def run_cli_native(verb, *args, **kwargs): matching native behavior. If True, stdout/stderr are returned as bytes. """ + options = kwargs.pop("options", runner.Options()) nodeargs = kwargs.pop("nodeargs", []) encoding = kwargs.pop("encoding", None) or getattr(sys.stdout, "encoding") or "utf-8" return_bytes = kwargs.pop("return_bytes", False) @@ -134,7 +138,7 @@ def run_cli_native(verb, *args, **kwargs): d.addCallback( partial( runner.parse_or_exit, - runner.Options(), + options, ), stdout=stdout, stderr=stderr,