Merge pull request #1191 from tahoe-lafs/3526.test_invite-no-mock

Remove mock usage from test_invite

Fixes: ticket:3526
This commit is contained in:
Jean-Paul Calderone 2022-04-14 12:50:17 -04:00 committed by GitHub
commit c0a84ececa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 589 additions and 225 deletions

1
newsfragments/3526.minor Normal file
View File

@ -0,0 +1 @@

View File

@ -37,9 +37,6 @@ from allmydata.util.assertutil import precondition
from allmydata.util.encodingutil import listdir_unicode, argv_to_unicode, quote_local_unicode_path, get_io_encoding
from allmydata.util import fileutil, i2p_provider, iputil, tor_provider, jsonbytes as json
from wormhole import wormhole
dummy_tac = """
import sys
print("Nodes created by Tahoe-LAFS v1.11.0 or later cannot be run by")
@ -377,7 +374,7 @@ def _get_config_via_wormhole(config):
relay_url = config.parent['wormhole-server']
print("Connecting to '{}'".format(relay_url), file=out)
wh = wormhole.create(
wh = config.parent.wormhole.create(
appid=config.parent['wormhole-invite-appid'],
relay_url=relay_url,
reactor=reactor,

View File

@ -58,11 +58,17 @@ process_control_commands = [
class Options(usage.Options):
"""
:ivar wormhole: An object exposing the magic-wormhole API (mainly a test
hook).
"""
# unit tests can override these to point at StringIO instances
stdin = sys.stdin
stdout = sys.stdout
stderr = sys.stderr
from wormhole import wormhole
subCommands = ( create_node.subCommands
+ admin.subCommands
+ process_control_commands

View File

@ -18,8 +18,6 @@ except ImportError:
from twisted.python import usage
from twisted.internet import defer, reactor
from wormhole import wormhole
from allmydata.util.encodingutil import argv_to_abspath
from allmydata.util import jsonbytes as json
from allmydata.scripts.common import get_default_nodedir, get_introducer_furl
@ -50,7 +48,7 @@ def _send_config_via_wormhole(options, config):
err = options.stderr
relay_url = options.parent['wormhole-server']
print("Connecting to '{}'...".format(relay_url), file=out)
wh = wormhole.create(
wh = options.parent.wormhole.create(
appid=options.parent['wormhole-invite-appid'],
relay_url=relay_url,
reactor=reactor,

View File

@ -1,75 +1,117 @@
"""
Ported to Pythn 3.
Tests for ``tahoe invite``.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from __future__ import annotations
import os
import mock
import json
import os
from functools import partial
from os.path import join
from typing import Awaitable, Callable, Optional, Sequence, TypeVar, Union
try:
from typing import Optional, Sequence
except ImportError:
pass
from twisted.trial import unittest
from twisted.internet import defer
from twisted.trial import unittest
from ...client import read_config
from ...scripts import runner
from ...util.jsonbytes import dumps_bytes
from ..common_util import run_cli
from ..no_network import GridTestMixin
from .common import CLITestMixin
from ...client import (
read_config,
)
from .wormholetesting import IWormhole, MemoryWormholeServer, TestingHelper, memory_server
class _FakeWormhole(object):
# Logically:
# JSONable = dict[str, Union[JSONable, None, int, float, str, list[JSONable]]]
#
# But practically:
JSONable = Union[dict, None, int, float, str, list]
def __init__(self, outgoing_messages):
self.messages = []
for o in outgoing_messages:
assert isinstance(o, bytes)
self._outgoing = outgoing_messages
def get_code(self):
return defer.succeed(u"6-alarmist-tuba")
async def open_wormhole() -> tuple[Callable, IWormhole, str]:
"""
Create a new in-memory wormhole server, open one end of a wormhole, and
return it and related info.
def set_code(self, code):
self._code = code
:return: A three-tuple allowing use of the wormhole. The first element is
a callable like ``run_cli`` but which will run commands so that they
use the in-memory wormhole server instead of a real one. The second
element is the open wormhole. The third element is the wormhole's
code.
"""
server = MemoryWormholeServer()
options = runner.Options()
options.wormhole = server
reactor = object()
def get_welcome(self):
return defer.succeed(
{
u"welcome": {},
}
wormhole = server.create(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
reactor,
)
code = await wormhole.get_code()
return (partial(run_cli, options=options), wormhole, code)
def make_simple_peer(
reactor,
server: MemoryWormholeServer,
helper: TestingHelper,
messages: Sequence[JSONable],
) -> Callable[[], Awaitable[IWormhole]]:
"""
Make a wormhole peer that just sends the given messages.
The returned function returns an awaitable that fires with the peer's end
of the wormhole.
"""
async def peer() -> IWormhole:
# Run the client side of the invitation by manually pumping a
# message through the wormhole.
# First, wait for the server to create the wormhole at all.
wormhole = await helper.wait_for_wormhole(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
)
# Then read out its code and open the other side of the wormhole.
code = await wormhole.when_code()
other_end = server.create(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
reactor,
)
other_end.set_code(code)
send_messages(other_end, messages)
return other_end
def allocate_code(self):
return None
def send_message(self, msg):
assert isinstance(msg, bytes)
self.messages.append(msg)
def get_message(self):
return defer.succeed(self._outgoing.pop(0))
def close(self):
return defer.succeed(None)
return peer
def _create_fake_wormhole(outgoing_messages):
outgoing_messages = [
m.encode("utf-8") if isinstance(m, str) else m
for m in outgoing_messages
]
return _FakeWormhole(outgoing_messages)
def send_messages(wormhole: IWormhole, messages: Sequence[JSONable]) -> None:
"""
Send a list of message through a wormhole.
"""
for msg in messages:
wormhole.send_message(dumps_bytes(msg))
A = TypeVar("A")
B = TypeVar("B")
def concurrently(
client: Callable[[], Awaitable[A]],
server: Callable[[], Awaitable[B]],
) -> defer.Deferred[tuple[A, B]]:
"""
Run two asynchronous functions concurrently and asynchronously return a
tuple of both their results.
"""
return defer.gatherResults([
defer.Deferred.fromCoroutine(client()),
defer.Deferred.fromCoroutine(server()),
])
class Join(GridTestMixin, CLITestMixin, unittest.TestCase):
@ -86,41 +128,39 @@ class Join(GridTestMixin, CLITestMixin, unittest.TestCase):
successfully join after an invite
"""
node_dir = self.mktemp()
run_cli, wormhole, code = yield defer.Deferred.fromCoroutine(open_wormhole())
send_messages(wormhole, [
{u"abilities": {u"server-v1": {}}},
{
u"shares-needed": 1,
u"shares-happy": 1,
u"shares-total": 1,
u"nickname": u"somethinghopefullyunique",
u"introducer": u"pb://foo",
},
])
with mock.patch('allmydata.scripts.create_node.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"server-v1": {}}}),
json.dumps({
u"shares-needed": 1,
u"shares-happy": 1,
u"shares-total": 1,
u"nickname": u"somethinghopefullyunique",
u"introducer": u"pb://foo",
}),
])
w.create = mock.Mock(return_value=fake_wh)
rc, out, err = yield run_cli(
"create-client",
"--join", code,
node_dir,
)
rc, out, err = yield run_cli(
"create-client",
"--join", "1-abysmal-ant",
node_dir,
)
self.assertEqual(0, rc)
self.assertEqual(0, rc)
config = read_config(node_dir, u"")
self.assertIn(
"pb://foo",
set(
furl
for (furl, cache)
in config.get_introducer_configuration().values()
),
)
config = read_config(node_dir, u"")
self.assertIn(
"pb://foo",
set(
furl
for (furl, cache)
in config.get_introducer_configuration().values()
),
)
with open(join(node_dir, 'tahoe.cfg'), 'r') as f:
config = f.read()
self.assertIn(u"somethinghopefullyunique", config)
with open(join(node_dir, 'tahoe.cfg'), 'r') as f:
config = f.read()
self.assertIn(u"somethinghopefullyunique", config)
@defer.inlineCallbacks
def test_create_node_illegal_option(self):
@ -128,30 +168,28 @@ class Join(GridTestMixin, CLITestMixin, unittest.TestCase):
Server sends JSON with unknown/illegal key
"""
node_dir = self.mktemp()
run_cli, wormhole, code = yield defer.Deferred.fromCoroutine(open_wormhole())
send_messages(wormhole, [
{u"abilities": {u"server-v1": {}}},
{
u"shares-needed": 1,
u"shares-happy": 1,
u"shares-total": 1,
u"nickname": u"somethinghopefullyunique",
u"introducer": u"pb://foo",
u"something-else": u"not allowed",
},
])
with mock.patch('allmydata.scripts.create_node.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"server-v1": {}}}),
json.dumps({
u"shares-needed": 1,
u"shares-happy": 1,
u"shares-total": 1,
u"nickname": u"somethinghopefullyunique",
u"introducer": u"pb://foo",
u"something-else": u"not allowed",
}),
])
w.create = mock.Mock(return_value=fake_wh)
rc, out, err = yield run_cli(
"create-client",
"--join", code,
node_dir,
)
rc, out, err = yield run_cli(
"create-client",
"--join", "1-abysmal-ant",
node_dir,
)
# should still succeed -- just ignores the not-whitelisted
# "something-else" option
self.assertEqual(0, rc)
# should still succeed -- just ignores the not-whitelisted
# "something-else" option
self.assertEqual(0, rc)
class Invite(GridTestMixin, CLITestMixin, unittest.TestCase):
@ -168,8 +206,7 @@ class Invite(GridTestMixin, CLITestMixin, unittest.TestCase):
intro_dir,
)
def _invite_success(self, extra_args=(), tahoe_config=None):
# type: (Sequence[bytes], Optional[bytes]) -> defer.Deferred
async def _invite_success(self, extra_args: Sequence[bytes] = (), tahoe_config: Optional[bytes] = None) -> str:
"""
Exercise an expected-success case of ``tahoe invite``.
@ -190,53 +227,58 @@ class Invite(GridTestMixin, CLITestMixin, unittest.TestCase):
with open(join(intro_dir, "tahoe.cfg"), "wb") as fobj_cfg:
fobj_cfg.write(tahoe_config)
with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"client-v1": {}}}),
])
w.create = mock.Mock(return_value=fake_wh)
wormhole_server, helper = memory_server()
options = runner.Options()
options.wormhole = wormhole_server
reactor = object()
extra_args = tuple(extra_args)
d = run_cli(
async def server():
# Run the server side of the invitation process using the CLI.
rc, out, err = await run_cli(
"-d", intro_dir,
"invite",
*(extra_args + ("foo",))
*tuple(extra_args) + ("foo",),
options=options,
)
def done(result):
rc, out, err = result
self.assertEqual(2, len(fake_wh.messages))
self.assertEqual(
json.loads(fake_wh.messages[0]),
{
"abilities":
{
"server-v1": {}
},
},
)
invite = json.loads(fake_wh.messages[1])
self.assertEqual(
invite["nickname"], "foo",
)
self.assertEqual(
invite["introducer"], "pb://fooblam",
)
return invite
d.addCallback(done)
return d
# Send a proper client abilities message.
client = make_simple_peer(reactor, wormhole_server, helper, [{u"abilities": {u"client-v1": {}}}])
other_end, _ = await concurrently(client, server)
# Check the server's messages. First, it should announce its
# abilities correctly.
server_abilities = json.loads(await other_end.when_received())
self.assertEqual(
server_abilities,
{
"abilities":
{
"server-v1": {}
},
},
)
# Second, it should have an invitation with a nickname and introducer
# furl.
invite = json.loads(await other_end.when_received())
self.assertEqual(
invite["nickname"], "foo",
)
self.assertEqual(
invite["introducer"], "pb://fooblam",
)
return invite
@defer.inlineCallbacks
def test_invite_success(self):
"""
successfully send an invite
"""
invite = yield self._invite_success((
invite = yield defer.Deferred.fromCoroutine(self._invite_success((
"--shares-needed", "1",
"--shares-happy", "2",
"--shares-total", "3",
))
)))
self.assertEqual(
invite["shares-needed"], "1",
)
@ -253,12 +295,12 @@ class Invite(GridTestMixin, CLITestMixin, unittest.TestCase):
If ``--shares-{needed,happy,total}`` are not given on the command line
then the invitation is generated using the configured values.
"""
invite = yield self._invite_success(tahoe_config=b"""
invite = yield defer.Deferred.fromCoroutine(self._invite_success(tahoe_config=b"""
[client]
shares.needed = 2
shares.happy = 4
shares.total = 6
""")
"""))
self.assertEqual(
invite["shares-needed"], "2",
)
@ -277,22 +319,20 @@ shares.total = 6
"""
intro_dir = os.path.join(self.basedir, "introducer")
with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"client-v1": {}}}),
])
w.create = mock.Mock(return_value=fake_wh)
options = runner.Options()
options.wormhole = None
rc, out, err = yield run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
"foo",
)
self.assertNotEqual(rc, 0)
self.assertIn(u"Can't find introducer FURL", out + err)
rc, out, err = yield run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
"foo",
options=options,
)
self.assertNotEqual(rc, 0)
self.assertIn(u"Can't find introducer FURL", out + err)
@defer.inlineCallbacks
def test_invite_wrong_client_abilities(self):
@ -306,23 +346,28 @@ shares.total = 6
with open(join(priv_dir, "introducer.furl"), "w") as f:
f.write("pb://fooblam\n")
with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"client-v9000": {}}}),
])
w.create = mock.Mock(return_value=fake_wh)
wormhole_server, helper = memory_server()
options = runner.Options()
options.wormhole = wormhole_server
reactor = object()
rc, out, err = yield run_cli(
async def server():
rc, out, err = await run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
"foo",
options=options,
)
self.assertNotEqual(rc, 0)
self.assertIn(u"No 'client-v1' in abilities", out + err)
# Send some surprising client abilities.
client = make_simple_peer(reactor, wormhole_server, helper, [{u"abilities": {u"client-v9000": {}}}])
yield concurrently(client, server)
@defer.inlineCallbacks
def test_invite_no_client_abilities(self):
"""
@ -335,23 +380,30 @@ shares.total = 6
with open(join(priv_dir, "introducer.furl"), "w") as f:
f.write("pb://fooblam\n")
with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({}),
])
w.create = mock.Mock(return_value=fake_wh)
wormhole_server, helper = memory_server()
options = runner.Options()
options.wormhole = wormhole_server
reactor = object()
rc, out, err = yield run_cli(
async def server():
# Run the server side of the invitation process using the CLI.
rc, out, err = await run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
"foo",
options=options,
)
self.assertNotEqual(rc, 0)
self.assertIn(u"No 'abilities' from client", out + err)
# Send a no-abilities message through to the server.
client = make_simple_peer(reactor, wormhole_server, helper, [{}])
yield concurrently(client, server)
@defer.inlineCallbacks
def test_invite_wrong_server_abilities(self):
"""
@ -364,26 +416,25 @@ shares.total = 6
with open(join(priv_dir, "introducer.furl"), "w") as f:
f.write("pb://fooblam\n")
with mock.patch('allmydata.scripts.create_node.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"server-v9000": {}}}),
json.dumps({
"shares-needed": "1",
"shares-total": "1",
"shares-happy": "1",
"nickname": "foo",
"introducer": "pb://fooblam",
}),
])
w.create = mock.Mock(return_value=fake_wh)
run_cli, wormhole, code = yield defer.Deferred.fromCoroutine(open_wormhole())
send_messages(wormhole, [
{u"abilities": {u"server-v9000": {}}},
{
"shares-needed": "1",
"shares-total": "1",
"shares-happy": "1",
"nickname": "foo",
"introducer": "pb://fooblam",
},
])
rc, out, err = yield run_cli(
"create-client",
"--join", "1-alarmist-tuba",
"foo",
)
self.assertNotEqual(rc, 0)
self.assertIn("Expected 'server-v1' in server abilities", out + err)
rc, out, err = yield run_cli(
"create-client",
"--join", code,
"foo",
)
self.assertNotEqual(rc, 0)
self.assertIn("Expected 'server-v1' in server abilities", out + err)
@defer.inlineCallbacks
def test_invite_no_server_abilities(self):
@ -397,26 +448,25 @@ shares.total = 6
with open(join(priv_dir, "introducer.furl"), "w") as f:
f.write("pb://fooblam\n")
with mock.patch('allmydata.scripts.create_node.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({}),
json.dumps({
"shares-needed": "1",
"shares-total": "1",
"shares-happy": "1",
"nickname": "bar",
"introducer": "pb://fooblam",
}),
])
w.create = mock.Mock(return_value=fake_wh)
run_cli, wormhole, code = yield defer.Deferred.fromCoroutine(open_wormhole())
send_messages(wormhole, [
{},
{
"shares-needed": "1",
"shares-total": "1",
"shares-happy": "1",
"nickname": "bar",
"introducer": "pb://fooblam",
},
])
rc, out, err = yield run_cli(
"create-client",
"--join", "1-alarmist-tuba",
"bar",
)
self.assertNotEqual(rc, 0)
self.assertIn("Expected 'abilities' in server introduction", out + err)
rc, out, err = yield run_cli(
"create-client",
"--join", code,
"bar",
)
self.assertNotEqual(rc, 0)
self.assertIn("Expected 'abilities' in server introduction", out + err)
@defer.inlineCallbacks
def test_invite_no_nick(self):
@ -425,13 +475,16 @@ shares.total = 6
"""
intro_dir = os.path.join(self.basedir, "introducer")
with mock.patch('allmydata.scripts.tahoe_invite.wormhole'):
rc, out, err = yield run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
)
self.assertTrue(rc)
self.assertIn(u"Provide a single argument", out + err)
options = runner.Options()
options.wormhole = None
rc, out, err = yield run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
options=options,
)
self.assertTrue(rc)
self.assertIn(u"Provide a single argument", out + err)

View File

@ -0,0 +1,305 @@
"""
An in-memory implementation of some of the magic-wormhole interfaces for
use by automated tests.
For example::
async def peerA(mw):
wormhole = mw.create("myapp", "wss://myserver", reactor)
code = await wormhole.get_code()
print(f"I have a code: {code}")
message = await wormhole.when_received()
print(f"I have a message: {message}")
async def local_peerB(helper, mw):
peerA_wormhole = await helper.wait_for_wormhole("myapp", "wss://myserver")
code = await peerA_wormhole.when_code()
peerB_wormhole = mw.create("myapp", "wss://myserver")
peerB_wormhole.set_code(code)
peerB_wormhole.send_message("Hello, peer A")
# Run peerA against local_peerB with pure in-memory message passing.
server, helper = memory_server()
run(gather(peerA(server), local_peerB(helper, server)))
# Run peerA against a peerB somewhere out in the world, using a real
# wormhole relay server somewhere.
import wormhole
run(peerA(wormhole))
"""
from __future__ import annotations
from typing import Iterator, Optional, List, Tuple
from collections.abc import Awaitable
from inspect import getargspec
from itertools import count
from sys import stderr
from attrs import frozen, define, field, Factory
from twisted.internet.defer import Deferred, DeferredQueue, succeed
from wormhole._interfaces import IWormhole
from wormhole.wormhole import create
from zope.interface import implementer
WormholeCode = str
WormholeMessage = bytes
AppId = str
RelayURL = str
ApplicationKey = Tuple[RelayURL, AppId]
@define
class MemoryWormholeServer(object):
"""
A factory for in-memory wormholes.
:ivar _apps: Wormhole state arranged by the application id and relay URL
it belongs to.
:ivar _waiters: Observers waiting for a wormhole to be created for a
specific application id and relay URL combination.
"""
_apps: dict[ApplicationKey, _WormholeApp] = field(default=Factory(dict))
_waiters: dict[ApplicationKey, Deferred] = field(default=Factory(dict))
def create(
self,
appid,
relay_url,
reactor,
versions={},
delegate=None,
journal=None,
tor=None,
timing=None,
stderr=stderr,
_eventual_queue=None,
_enable_dilate=False,
):
"""
Create a wormhole. It will be able to connect to other wormholes created
by this instance (and constrained by the normal appid/relay_url
rules).
"""
if tor is not None:
raise ValueError("Cannot deal with Tor right now.")
if _enable_dilate:
raise ValueError("Cannot deal with dilation right now.")
key = (relay_url, appid)
wormhole = _MemoryWormhole(self._view(key))
if key in self._waiters:
self._waiters.pop(key).callback(wormhole)
return wormhole
def _view(self, key: ApplicationKey) -> _WormholeServerView:
"""
Created a view onto this server's state that is limited by a certain
appid/relay_url pair.
"""
return _WormholeServerView(self, key)
@frozen
class TestingHelper(object):
"""
Provide extra functionality for interacting with an in-memory wormhole
implementation.
This is intentionally a separate API so that it is not confused with
proper public interface of the real wormhole implementation.
"""
_server: MemoryWormholeServer
async def wait_for_wormhole(self, appid: AppId, relay_url: RelayURL) -> IWormhole:
"""
Wait for a wormhole to appear at a specific location.
:param appid: The appid that the resulting wormhole will have.
:param relay_url: The URL of the relay at which the resulting wormhole
will presume to be created.
:return: The first wormhole to be created which matches the given
parameters.
"""
key = (relay_url, appid)
if key in self._server._waiters:
raise ValueError(f"There is already a waiter for {key}")
d = Deferred()
self._server._waiters[key] = d
wormhole = await d
return wormhole
def _verify():
"""
Roughly confirm that the in-memory wormhole creation function matches the
interface of the real implementation.
"""
# Poor man's interface verification.
a = getargspec(create)
b = getargspec(MemoryWormholeServer.create)
# I know it has a `self` argument at the beginning. That's okay.
b = b._replace(args=b.args[1:])
assert a == b, "{} != {}".format(a, b)
_verify()
@define
class _WormholeApp(object):
"""
Represent a collection of wormholes that belong to the same
appid/relay_url scope.
"""
wormholes: dict[WormholeCode, IWormhole] = field(default=Factory(dict))
_waiting: dict[WormholeCode, List[Deferred]] = field(default=Factory(dict))
_counter: Iterator[int] = field(default=Factory(count))
def allocate_code(self, wormhole: IWormhole, code: Optional[WormholeCode]) -> WormholeCode:
"""
Allocate a new code for the given wormhole.
This also associates the given wormhole with the code for future
lookup.
Code generation logic is trivial and certainly not good enough for any
real use. It is sufficient for automated testing, though.
"""
if code is None:
code = "{}-persnickety-tardigrade".format(next(self._counter))
self.wormholes.setdefault(code, []).append(wormhole)
try:
waiters = self._waiting.pop(code)
except KeyError:
pass
else:
for w in waiters:
w.callback(wormhole)
return code
def wait_for_wormhole(self, code: WormholeCode) -> Awaitable[_MemoryWormhole]:
"""
Return a ``Deferred`` which fires with the next wormhole to be associated
with the given code. This is used to let the first end of a wormhole
rendezvous with the second end.
"""
d = Deferred()
self._waiting.setdefault(code, []).append(d)
return d
@frozen
class _WormholeServerView(object):
"""
Present an interface onto the server to be consumed by individual
wormholes.
"""
_server: MemoryWormholeServer
_key: ApplicationKey
def allocate_code(self, wormhole: _MemoryWormhole, code: Optional[WormholeCode]) -> WormholeCode:
"""
Allocate a new code for the given wormhole in the scope associated with
this view.
"""
app = self._server._apps.setdefault(self._key, _WormholeApp())
return app.allocate_code(wormhole, code)
def wormhole_by_code(self, code: WormholeCode, exclude: object) -> Deferred[IWormhole]:
"""
Retrieve all wormholes previously associated with a code.
"""
app = self._server._apps[self._key]
wormholes = app.wormholes[code]
try:
[wormhole] = list(wormhole for wormhole in wormholes if wormhole != exclude)
except ValueError:
return app.wait_for_wormhole(code)
return succeed(wormhole)
@implementer(IWormhole)
@define
class _MemoryWormhole(object):
"""
Represent one side of a wormhole as conceived by ``MemoryWormholeServer``.
"""
_view: _WormholeServerView
_code: Optional[WormholeCode] = None
_payload: DeferredQueue = field(default=Factory(DeferredQueue))
_waiting_for_code: list[Deferred] = field(default=Factory(list))
def allocate_code(self) -> None:
if self._code is not None:
raise ValueError(
"allocate_code used with a wormhole which already has a code"
)
self._code = self._view.allocate_code(self, None)
waiters = self._waiting_for_code
self._waiting_for_code = []
for d in waiters:
d.callback(self._code)
def set_code(self, code: WormholeCode) -> None:
if self._code is None:
self._code = code
self._view.allocate_code(self, code)
else:
raise ValueError("set_code used with a wormhole which already has a code")
def when_code(self) -> Deferred[WormholeCode]:
if self._code is None:
d = Deferred()
self._waiting_for_code.append(d)
return d
return succeed(self._code)
def get_welcome(self):
return succeed("welcome")
def send_message(self, payload: WormholeMessage) -> None:
self._payload.put(payload)
def when_received(self) -> Deferred[WormholeMessage]:
if self._code is None:
raise ValueError(
"This implementation requires set_code or allocate_code "
"before when_received."
)
d = self._view.wormhole_by_code(self._code, exclude=self)
def got_wormhole(wormhole):
msg = wormhole._payload.get()
return msg
d.addCallback(got_wormhole)
return d
get_message = when_received
def close(self) -> None:
pass
# 0.9.2 compatibility
def get_code(self) -> Deferred[WormholeCode]:
if self._code is None:
self.allocate_code()
return self.when_code()
get = when_received
def memory_server() -> tuple[MemoryWormholeServer, TestingHelper]:
"""
Create a paired in-memory wormhole server and testing helper.
"""
server = MemoryWormholeServer()
return server, TestingHelper(server)

View File

@ -69,6 +69,9 @@ def run_cli_native(verb, *args, **kwargs):
Most code should prefer ``run_cli_unicode`` which deals with all the
necessary encoding considerations.
:param runner.Options options: The options instance to use to parse the
given arguments.
:param native_str verb: The command to run. For example,
``"create-node"``.
@ -88,6 +91,7 @@ def run_cli_native(verb, *args, **kwargs):
matching native behavior. If True, stdout/stderr are returned as
bytes.
"""
options = kwargs.pop("options", runner.Options())
nodeargs = kwargs.pop("nodeargs", [])
encoding = kwargs.pop("encoding", None) or getattr(sys.stdout, "encoding") or "utf-8"
return_bytes = kwargs.pop("return_bytes", False)
@ -134,7 +138,7 @@ def run_cli_native(verb, *args, **kwargs):
d.addCallback(
partial(
runner.parse_or_exit,
runner.Options(),
options,
),
stdout=stdout,
stderr=stderr,