Replace monkey-patching of wormhole with a parameter to run_cli

This commit is contained in:
Jean-Paul Calderone 2022-04-12 11:01:04 -04:00
parent dffcdf2854
commit bc6dafa999
5 changed files with 614 additions and 215 deletions

View File

@ -37,9 +37,6 @@ from allmydata.util.assertutil import precondition
from allmydata.util.encodingutil import listdir_unicode, argv_to_unicode, quote_local_unicode_path, get_io_encoding
from allmydata.util import fileutil, i2p_provider, iputil, tor_provider, jsonbytes as json
from wormhole import wormhole
dummy_tac = """
import sys
print("Nodes created by Tahoe-LAFS v1.11.0 or later cannot be run by")
@ -377,7 +374,7 @@ def _get_config_via_wormhole(config):
relay_url = config.parent['wormhole-server']
print("Connecting to '{}'".format(relay_url), file=out)
wh = wormhole.create(
wh = config.parent.wormhole.create(
appid=config.parent['wormhole-invite-appid'],
relay_url=relay_url,
reactor=reactor,

View File

@ -58,11 +58,17 @@ process_control_commands = [
class Options(usage.Options):
"""
:ivar wormhole: An object exposing the magic-wormhole API (mainly a test
hook).
"""
# unit tests can override these to point at StringIO instances
stdin = sys.stdin
stdout = sys.stdout
stderr = sys.stderr
from wormhole import wormhole
subCommands = ( create_node.subCommands
+ admin.subCommands
+ process_control_commands

View File

@ -18,8 +18,6 @@ except ImportError:
from twisted.python import usage
from twisted.internet import defer, reactor
from wormhole import wormhole
from allmydata.util.encodingutil import argv_to_abspath
from allmydata.util import jsonbytes as json
from allmydata.scripts.common import get_default_nodedir, get_introducer_furl
@ -44,13 +42,15 @@ class InviteOptions(usage.Options):
self['nick'] = args[0].strip()
wormhole = None
@defer.inlineCallbacks
def _send_config_via_wormhole(options, config):
out = options.stdout
err = options.stderr
relay_url = options.parent['wormhole-server']
print("Connecting to '{}'...".format(relay_url), file=out)
wh = wormhole.create(
wh = options.parent.wormhole.create(
appid=options.parent['wormhole-invite-appid'],
relay_url=relay_url,
reactor=reactor,

View File

@ -2,62 +2,21 @@
Tests for ``tahoe invite``.
"""
import os
import mock
import json
import os
from os.path import join
from typing import Optional, Sequence
from twisted.trial import unittest
from twisted.internet import defer
from twisted.trial import unittest
from ...client import read_config
from ...scripts import runner
from ...util.jsonbytes import dumps_bytes
from ..common_util import run_cli
from ..no_network import GridTestMixin
from .common import CLITestMixin
from ...client import (
read_config,
)
class _FakeWormhole(object):
def __init__(self, outgoing_messages):
self.messages = []
for o in outgoing_messages:
assert isinstance(o, bytes)
self._outgoing = outgoing_messages
def get_code(self):
return defer.succeed(u"6-alarmist-tuba")
def set_code(self, code):
self._code = code
def get_welcome(self):
return defer.succeed(
{
u"welcome": {},
}
)
def allocate_code(self):
return None
def send_message(self, msg):
assert isinstance(msg, bytes)
self.messages.append(msg)
def get_message(self):
return defer.succeed(self._outgoing.pop(0))
def close(self):
return defer.succeed(None)
def _create_fake_wormhole(outgoing_messages):
outgoing_messages = [
m.encode("utf-8") if isinstance(m, str) else m
for m in outgoing_messages
]
return _FakeWormhole(outgoing_messages)
from .wormholetesting import MemoryWormholeServer, memory_server
class Join(GridTestMixin, CLITestMixin, unittest.TestCase):
@ -74,41 +33,52 @@ class Join(GridTestMixin, CLITestMixin, unittest.TestCase):
successfully join after an invite
"""
node_dir = self.mktemp()
server = MemoryWormholeServer()
options = runner.Options()
options.wormhole = server
reactor = object()
with mock.patch('allmydata.scripts.create_node.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"server-v1": {}}}),
json.dumps({
u"shares-needed": 1,
u"shares-happy": 1,
u"shares-total": 1,
u"nickname": u"somethinghopefullyunique",
u"introducer": u"pb://foo",
}),
])
w.create = mock.Mock(return_value=fake_wh)
wormhole = server.create(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
reactor,
)
code = yield wormhole.get_code()
messages = [
{u"abilities": {u"server-v1": {}}},
{
u"shares-needed": 1,
u"shares-happy": 1,
u"shares-total": 1,
u"nickname": u"somethinghopefullyunique",
u"introducer": u"pb://foo",
},
]
for msg in messages:
wormhole.send_message(dumps_bytes(msg))
rc, out, err = yield run_cli(
"create-client",
"--join", "1-abysmal-ant",
node_dir,
)
rc, out, err = yield run_cli(
"create-client",
"--join", code,
node_dir,
options=options,
)
self.assertEqual(0, rc)
self.assertEqual(0, rc)
config = read_config(node_dir, u"")
self.assertIn(
"pb://foo",
set(
furl
for (furl, cache)
in config.get_introducer_configuration().values()
),
)
config = read_config(node_dir, u"")
self.assertIn(
"pb://foo",
set(
furl
for (furl, cache)
in config.get_introducer_configuration().values()
),
)
with open(join(node_dir, 'tahoe.cfg'), 'r') as f:
config = f.read()
self.assertIn(u"somethinghopefullyunique", config)
with open(join(node_dir, 'tahoe.cfg'), 'r') as f:
config = f.read()
self.assertIn(u"somethinghopefullyunique", config)
@defer.inlineCallbacks
def test_create_node_illegal_option(self):
@ -116,30 +86,41 @@ class Join(GridTestMixin, CLITestMixin, unittest.TestCase):
Server sends JSON with unknown/illegal key
"""
node_dir = self.mktemp()
server = MemoryWormholeServer()
options = runner.Options()
options.wormhole = server
reactor = object()
with mock.patch('allmydata.scripts.create_node.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"server-v1": {}}}),
json.dumps({
u"shares-needed": 1,
u"shares-happy": 1,
u"shares-total": 1,
u"nickname": u"somethinghopefullyunique",
u"introducer": u"pb://foo",
u"something-else": u"not allowed",
}),
])
w.create = mock.Mock(return_value=fake_wh)
wormhole = server.create(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
reactor,
)
code = yield wormhole.get_code()
messages = [
{u"abilities": {u"server-v1": {}}},
{
u"shares-needed": 1,
u"shares-happy": 1,
u"shares-total": 1,
u"nickname": u"somethinghopefullyunique",
u"introducer": u"pb://foo",
u"something-else": u"not allowed",
},
]
for msg in messages:
wormhole.send_message(dumps_bytes(msg))
rc, out, err = yield run_cli(
"create-client",
"--join", "1-abysmal-ant",
node_dir,
)
rc, out, err = yield run_cli(
"create-client",
"--join", code,
node_dir,
options=options,
)
# should still succeed -- just ignores the not-whitelisted
# "something-else" option
self.assertEqual(0, rc)
# should still succeed -- just ignores the not-whitelisted
# "something-else" option
self.assertEqual(0, rc)
class Invite(GridTestMixin, CLITestMixin, unittest.TestCase):
@ -156,7 +137,7 @@ class Invite(GridTestMixin, CLITestMixin, unittest.TestCase):
intro_dir,
)
def _invite_success(self, extra_args=(), tahoe_config=None):
async def _invite_success(self, extra_args=(), tahoe_config=None):
# type: (Sequence[bytes], Optional[bytes]) -> defer.Deferred
"""
Exercise an expected-success case of ``tahoe invite``.
@ -178,53 +159,82 @@ class Invite(GridTestMixin, CLITestMixin, unittest.TestCase):
with open(join(intro_dir, "tahoe.cfg"), "wb") as fobj_cfg:
fobj_cfg.write(tahoe_config)
with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"client-v1": {}}}),
])
w.create = mock.Mock(return_value=fake_wh)
wormhole_server, helper = memory_server()
options = runner.Options()
options.wormhole = wormhole_server
reactor = object()
extra_args = tuple(extra_args)
d = run_cli(
async def server():
# Run the server side of the invitation process using the CLI.
rc, out, err = await run_cli(
"-d", intro_dir,
"invite",
*(extra_args + ("foo",))
*tuple(extra_args) + ("foo",),
options=options,
)
def done(result):
rc, out, err = result
self.assertEqual(2, len(fake_wh.messages))
self.assertEqual(
json.loads(fake_wh.messages[0]),
async def client():
# Run the client side of the invitation by manually pumping a
# message through the wormhole.
# First, wait for the server to create the wormhole at all.
wormhole = await helper.wait_for_wormhole(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
)
# Then read out its code and open the other side of the wormhole.
code = await wormhole.when_code()
other_end = wormhole_server.create(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
reactor,
)
other_end.set_code(code)
# Send a proper client abilities message.
other_end.send_message(dumps_bytes({u"abilities": {u"client-v1": {}}}))
# Check the server's messages. First, it should announce its
# abilities correctly.
server_abilities = json.loads(await other_end.when_received())
self.assertEqual(
server_abilities,
{
"abilities":
{
"abilities":
{
"server-v1": {}
},
"server-v1": {}
},
)
invite = json.loads(fake_wh.messages[1])
self.assertEqual(
invite["nickname"], "foo",
)
self.assertEqual(
invite["introducer"], "pb://fooblam",
)
return invite
d.addCallback(done)
return d
},
)
# Second, it should have an invitation with a nickname and
# introducer furl.
invite = json.loads(await other_end.when_received())
self.assertEqual(
invite["nickname"], "foo",
)
self.assertEqual(
invite["introducer"], "pb://fooblam",
)
return invite
invite, _ = await defer.gatherResults(map(
defer.Deferred.fromCoroutine,
[client(), server()],
))
return invite
@defer.inlineCallbacks
def test_invite_success(self):
"""
successfully send an invite
"""
invite = yield self._invite_success((
invite = yield defer.Deferred.fromCoroutine(self._invite_success((
"--shares-needed", "1",
"--shares-happy", "2",
"--shares-total", "3",
))
)))
self.assertEqual(
invite["shares-needed"], "1",
)
@ -241,12 +251,12 @@ class Invite(GridTestMixin, CLITestMixin, unittest.TestCase):
If ``--shares-{needed,happy,total}`` are not given on the command line
then the invitation is generated using the configured values.
"""
invite = yield self._invite_success(tahoe_config=b"""
invite = yield defer.Deferred.fromCoroutine(self._invite_success(tahoe_config=b"""
[client]
shares.needed = 2
shares.happy = 4
shares.total = 6
""")
"""))
self.assertEqual(
invite["shares-needed"], "2",
)
@ -265,22 +275,20 @@ shares.total = 6
"""
intro_dir = os.path.join(self.basedir, "introducer")
with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"client-v1": {}}}),
])
w.create = mock.Mock(return_value=fake_wh)
options = runner.Options()
options.wormhole = None
rc, out, err = yield run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
"foo",
)
self.assertNotEqual(rc, 0)
self.assertIn(u"Can't find introducer FURL", out + err)
rc, out, err = yield run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
"foo",
options=options,
)
self.assertNotEqual(rc, 0)
self.assertIn(u"Can't find introducer FURL", out + err)
@defer.inlineCallbacks
def test_invite_wrong_client_abilities(self):
@ -294,23 +302,51 @@ shares.total = 6
with open(join(priv_dir, "introducer.furl"), "w") as f:
f.write("pb://fooblam\n")
with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"client-v9000": {}}}),
])
w.create = mock.Mock(return_value=fake_wh)
wormhole_server, helper = memory_server()
options = runner.Options()
options.wormhole = wormhole_server
reactor = object()
rc, out, err = yield run_cli(
async def server():
rc, out, err = await run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
"foo",
options=options,
)
self.assertNotEqual(rc, 0)
self.assertIn(u"No 'client-v1' in abilities", out + err)
async def client():
# Run the client side of the invitation by manually pumping a
# message through the wormhole.
# First, wait for the server to create the wormhole at all.
wormhole = await helper.wait_for_wormhole(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
)
# Then read out its code and open the other side of the wormhole.
code = await wormhole.when_code()
other_end = wormhole_server.create(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
reactor,
)
other_end.set_code(code)
# Send some surprising client abilities.
other_end.send_message(dumps_bytes({u"abilities": {u"client-v9000": {}}}))
yield defer.gatherResults(map(
defer.Deferred.fromCoroutine,
[client(), server()],
))
@defer.inlineCallbacks
def test_invite_no_client_abilities(self):
"""
@ -323,23 +359,52 @@ shares.total = 6
with open(join(priv_dir, "introducer.furl"), "w") as f:
f.write("pb://fooblam\n")
with mock.patch('allmydata.scripts.tahoe_invite.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({}),
])
w.create = mock.Mock(return_value=fake_wh)
wormhole_server, helper = memory_server()
options = runner.Options()
options.wormhole = wormhole_server
reactor = object()
rc, out, err = yield run_cli(
async def server():
# Run the server side of the invitation process using the CLI.
rc, out, err = await run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
"foo",
options=options,
)
self.assertNotEqual(rc, 0)
self.assertIn(u"No 'abilities' from client", out + err)
async def client():
# Run the client side of the invitation by manually pumping a
# message through the wormhole.
# First, wait for the server to create the wormhole at all.
wormhole = await helper.wait_for_wormhole(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
)
# Then read out its code and open the other side of the wormhole.
code = await wormhole.when_code()
other_end = wormhole_server.create(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
reactor,
)
other_end.set_code(code)
# Send a no-abilities message through to the server.
other_end.send_message(dumps_bytes({}))
yield defer.gatherResults(map(
defer.Deferred.fromCoroutine,
[client(), server()],
))
@defer.inlineCallbacks
def test_invite_wrong_server_abilities(self):
"""
@ -352,26 +417,38 @@ shares.total = 6
with open(join(priv_dir, "introducer.furl"), "w") as f:
f.write("pb://fooblam\n")
with mock.patch('allmydata.scripts.create_node.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({u"abilities": {u"server-v9000": {}}}),
json.dumps({
"shares-needed": "1",
"shares-total": "1",
"shares-happy": "1",
"nickname": "foo",
"introducer": "pb://fooblam",
}),
])
w.create = mock.Mock(return_value=fake_wh)
wormhole_server = MemoryWormholeServer()
options = runner.Options()
options.wormhole = wormhole_server
reactor = object()
rc, out, err = yield run_cli(
"create-client",
"--join", "1-alarmist-tuba",
"foo",
)
self.assertNotEqual(rc, 0)
self.assertIn("Expected 'server-v1' in server abilities", out + err)
wormhole = wormhole_server.create(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
reactor,
)
code = yield wormhole.get_code()
messages = [
{u"abilities": {u"server-v9000": {}}},
{
"shares-needed": "1",
"shares-total": "1",
"shares-happy": "1",
"nickname": "foo",
"introducer": "pb://fooblam",
},
]
for msg in messages:
wormhole.send_message(dumps_bytes(msg))
rc, out, err = yield run_cli(
"create-client",
"--join", code,
"foo",
options=options,
)
self.assertNotEqual(rc, 0)
self.assertIn("Expected 'server-v1' in server abilities", out + err)
@defer.inlineCallbacks
def test_invite_no_server_abilities(self):
@ -385,26 +462,38 @@ shares.total = 6
with open(join(priv_dir, "introducer.furl"), "w") as f:
f.write("pb://fooblam\n")
with mock.patch('allmydata.scripts.create_node.wormhole') as w:
fake_wh = _create_fake_wormhole([
json.dumps({}),
json.dumps({
"shares-needed": "1",
"shares-total": "1",
"shares-happy": "1",
"nickname": "bar",
"introducer": "pb://fooblam",
}),
])
w.create = mock.Mock(return_value=fake_wh)
server = MemoryWormholeServer()
options = runner.Options()
options.wormhole = server
reactor = object()
rc, out, err = yield run_cli(
"create-client",
"--join", "1-alarmist-tuba",
"bar",
)
self.assertNotEqual(rc, 0)
self.assertIn("Expected 'abilities' in server introduction", out + err)
wormhole = server.create(
"tahoe-lafs.org/invite",
"ws://wormhole.tahoe-lafs.org:4000/v1",
reactor,
)
code = yield wormhole.get_code()
messages = [
{},
{
"shares-needed": "1",
"shares-total": "1",
"shares-happy": "1",
"nickname": "bar",
"introducer": "pb://fooblam",
},
]
for msg in messages:
wormhole.send_message(dumps_bytes(msg))
rc, out, err = yield run_cli(
"create-client",
"--join", code,
"bar",
options=options,
)
self.assertNotEqual(rc, 0)
self.assertIn("Expected 'abilities' in server introduction", out + err)
@defer.inlineCallbacks
def test_invite_no_nick(self):
@ -413,13 +502,16 @@ shares.total = 6
"""
intro_dir = os.path.join(self.basedir, "introducer")
with mock.patch('allmydata.scripts.tahoe_invite.wormhole'):
rc, out, err = yield run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
)
self.assertTrue(rc)
self.assertIn(u"Provide a single argument", out + err)
options = runner.Options()
options.wormhole = None
rc, out, err = yield run_cli(
"-d", intro_dir,
"invite",
"--shares-needed", "1",
"--shares-happy", "1",
"--shares-total", "1",
options=options,
)
self.assertTrue(rc)
self.assertIn(u"Provide a single argument", out + err)

View File

@ -0,0 +1,304 @@
"""
An in-memory implementation of some of the magic-wormhole interfaces for
use by automated tests.
For example::
async def peerA(mw):
wormhole = mw.create("myapp", "wss://myserver", reactor)
code = await wormhole.get_code()
print(f"I have a code: {code}")
message = await wormhole.when_received()
print(f"I have a message: {message}")
async def local_peerB(helper, mw):
peerA_wormhole = await helper.wait_for_wormhole("myapp", "wss://myserver")
code = await peerA_wormhole.when_code()
peerB_wormhole = mw.create("myapp", "wss://myserver")
peerB_wormhole.set_code(code)
peerB_wormhole.send_message("Hello, peer A")
# Run peerA against local_peerB with pure in-memory message passing.
server, helper = memory_server()
run(gather(peerA(server), local_peerB(helper, server)))
# Run peerA against a peerB somewhere out in the world, using a real
# wormhole relay server somewhere.
import wormhole
run(peerA(wormhole))
"""
from __future__ import annotations
from typing import Iterator
from collections.abc import Awaitable
from inspect import getargspec
from itertools import count
from sys import stderr
from attrs import frozen, define, field, Factory
from twisted.internet.defer import Deferred, DeferredQueue, succeed
from wormhole._interfaces import IWormhole
from wormhole.wormhole import create
from zope.interface import implementer
@define
class MemoryWormholeServer(object):
"""
A factory for in-memory wormholes.
:ivar _apps: Wormhole state arranged by the application id and relay URL
it belongs to.
:ivar _waiters: Observers waiting for a wormhole to be created for a
specific application id and relay URL combination.
"""
_apps: dict[tuple[str, str], _WormholeApp] = field(default=Factory(dict))
_waiters: dict[tuple[str, str], Deferred] = field(default=Factory(dict))
def create(
self,
appid,
relay_url,
reactor,
versions={},
delegate=None,
journal=None,
tor=None,
timing=None,
stderr=stderr,
_eventual_queue=None,
_enable_dilate=False,
):
"""
Create a wormhole. It will be able to connect to other wormholes created
by this instance (and constrained by the normal appid/relay_url
rules).
"""
if tor is not None:
raise ValueError("Cannot deal with Tor right now.")
if _enable_dilate:
raise ValueError("Cannot deal with dilation right now.")
key = (relay_url, appid)
wormhole = _MemoryWormhole(self._view(key))
if key in self._waiters:
self._waiters.pop(key).callback(wormhole)
return wormhole
def _view(self, key: tuple[str, str]) -> _WormholeServerView:
"""
Created a view onto this server's state that is limited by a certain
appid/relay_url pair.
"""
return _WormholeServerView(self, key)
@frozen
class TestingHelper(object):
"""
Provide extra functionality for interacting with an in-memory wormhole
implementation.
This is intentionally a separate API so that it is not confused with
proper public interface of the real wormhole implementation.
"""
_server: MemoryWormholeServer
async def wait_for_wormhole(self, appid: str, relay_url: str) -> IWormhole:
"""
Wait for a wormhole to appear at a specific location.
:param appid: The appid that the resulting wormhole will have.
:param relay_url: The URL of the relay at which the resulting wormhole
will presume to be created.
:return: The first wormhole to be created which matches the given
parameters.
"""
key = relay_url, appid
if key in self._server._waiters:
raise ValueError(f"There is already a waiter for {key}")
d = Deferred()
self._server._waiters[key] = d
wormhole = await d
return wormhole
def _verify():
"""
Roughly confirm that the in-memory wormhole creation function matches the
interface of the real implementation.
"""
# Poor man's interface verification.
a = getargspec(create)
b = getargspec(MemoryWormholeServer.create)
# I know it has a `self` argument at the beginning. That's okay.
b = b._replace(args=b.args[1:])
assert a == b, "{} != {}".format(a, b)
_verify()
@define
class _WormholeApp(object):
"""
Represent a collection of wormholes that belong to the same
appid/relay_url scope.
"""
wormholes: dict = field(default=Factory(dict))
_waiting: dict = field(default=Factory(dict))
_counter: Iterator[int] = field(default=Factory(count))
def allocate_code(self, wormhole, code):
"""
Allocate a new code for the given wormhole.
This also associates the given wormhole with the code for future
lookup.
Code generation logic is trivial and certainly not good enough for any
real use. It is sufficient for automated testing, though.
"""
if code is None:
code = "{}-persnickety-tardigrade".format(next(self._counter))
self.wormholes.setdefault(code, []).append(wormhole)
try:
waiters = self._waiting.pop(code)
except KeyError:
pass
else:
for w in waiters:
w.callback(wormhole)
return code
def wait_for_wormhole(self, code: str) -> Awaitable[_MemoryWormhole]:
"""
Return a ``Deferred`` which fires with the next wormhole to be associated
with the given code. This is used to let the first end of a wormhole
rendezvous with the second end.
"""
d = Deferred()
self._waiting.setdefault(code, []).append(d)
return d
@frozen
class _WormholeServerView(object):
"""
Present an interface onto the server to be consumed by individual
wormholes.
"""
_server: MemoryWormholeServer
_key: tuple[str, str]
def allocate_code(self, wormhole: _MemoryWormhole, code: str) -> str:
"""
Allocate a new code for the given wormhole in the scope associated with
this view.
"""
app = self._server._apps.setdefault(self._key, _WormholeApp())
return app.allocate_code(wormhole, code)
def wormhole_by_code(self, code, exclude):
"""
Retrieve all wormholes previously associated with a code.
"""
app = self._server._apps[self._key]
wormholes = app.wormholes[code]
try:
[wormhole] = list(wormhole for wormhole in wormholes if wormhole != exclude)
except ValueError:
return app.wait_for_wormhole(code)
return succeed(wormhole)
@implementer(IWormhole)
@define
class _MemoryWormhole(object):
"""
Represent one side of a wormhole as conceived by ``MemoryWormholeServer``.
"""
_view: _WormholeServerView
_code: str = None
_payload: DeferredQueue = field(default=Factory(DeferredQueue))
_waiting_for_code: list[Deferred] = field(default=Factory(list))
_allocated: bool = False
def allocate_code(self):
if self._code is not None:
raise ValueError(
"allocate_code used with a wormhole which already has a code"
)
self._allocated = True
self._code = self._view.allocate_code(self, None)
waiters = self._waiting_for_code
self._waiting_for_code = None
for d in waiters:
d.callback(self._code)
def set_code(self, code):
if self._code is None:
self._code = code
self._view.allocate_code(self, code)
else:
raise ValueError("set_code used with a wormhole which already has a code")
def when_code(self):
if self._code is None:
d = Deferred()
self._waiting_for_code.append(d)
return d
return succeed(self._code)
get_code = when_code
def get_welcome(self):
return succeed("welcome")
def send_message(self, payload):
self._payload.put(payload)
def when_received(self):
if self._code is None:
raise ValueError(
"This implementation requires set_code or allocate_code "
"before when_received."
)
d = self._view.wormhole_by_code(self._code, exclude=self)
def got_wormhole(wormhole):
msg = wormhole._payload.get()
return msg
d.addCallback(got_wormhole)
return d
get_message = when_received
def close(self):
pass
# 0.9.2 compatibility
def get_code(self):
if self._code is None:
self.allocate_code()
return self.when_code()
get = when_received
def memory_server() -> tuple[MemoryWormholeServer, TestingHelper]:
"""
Create a paired in-memory wormhole server and testing helper.
"""
server = MemoryWormholeServer()
return server, TestingHelper(server)