Use an already-installed Chutney if there is one

This commit is contained in:
Jean-Paul Calderone 2023-03-27 11:24:32 -04:00
parent c478160988
commit fb8c10c55f

View File

@ -1,6 +1,9 @@
"""
Ported to Python 3.
"""
from __future__ import annotations
import sys
import shutil
from time import sleep
@ -19,6 +22,7 @@ from eliot import (
log_call,
)
from twisted.python.filepath import FilePath
from twisted.python.procutils import which
from twisted.internet.defer import DeferredList
from twisted.internet.error import (
@ -104,7 +108,7 @@ def reactor():
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:temp_dir", include_args=[])
def temp_dir(request):
def temp_dir(request) -> str:
"""
Invoke like 'py.test --keep-tempdir ...' to avoid deleting the temp-dir
"""
@ -446,7 +450,23 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques
@pytest.fixture(scope='session')
@pytest.mark.skipif(sys.platform.startswith('win'),
'Tor tests are unstable on Windows')
def chutney(reactor, temp_dir):
def chutney(reactor, temp_dir: str) -> tuple[str, dict[str, str]]:
# Try to find Chutney already installed in the environment.
try:
import chutney
except ImportError:
# Nope, we'll get our own in a moment.
pass
else:
# We already have one, just use it.
return (
# from `checkout/lib/chutney/__init__.py` we want to get back to
# `checkout` because that's the parent of the directory with all
# of the network definitions. So, great-grand-parent.
FilePath(chutney.__file__).parent().parent().parent().path,
# There's nothing to add to the environment.
{},
)
chutney_dir = join(temp_dir, 'chutney')
mkdir(chutney_dir)
@ -489,83 +509,55 @@ def chutney(reactor, temp_dir):
)
pytest_twisted.blockon(proto.done)
return chutney_dir
return (chutney_dir, {"PYTHONPATH": join(chutney_dir, "lib")})
@pytest.fixture(scope='session')
@pytest.mark.skipif(sys.platform.startswith('win'),
reason='Tor tests are unstable on Windows')
def tor_network(reactor, temp_dir, chutney, request):
"""
Build a basic Tor network.
# this is the actual "chutney" script at the root of a chutney checkout
chutney_dir = chutney
chut = join(chutney_dir, 'chutney')
:param chutney: The root directory of a Chutney checkout and a dict of
additional environment variables to set so a Python process can use
it.
:return: None
"""
chutney_root, chutney_env = chutney
basic_network = join(chutney_root, 'networks', 'basic')
env = environ.copy()
env.update(chutney_env)
chutney_argv = (sys.executable, '-m', 'chutney.TorNet')
def chutney(argv):
proto = _DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
chutney_argv + argv,
path=join(chutney_root),
env=env,
)
return proto.done
# now, as per Chutney's README, we have to create the network
# ./chutney configure networks/basic
# ./chutney start networks/basic
env = environ.copy()
env.update({"PYTHONPATH": join(chutney_dir, "lib")})
proto = _DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-m', 'chutney.TorNet', 'configure',
join(chutney_dir, 'networks', 'basic'),
),
path=join(chutney_dir),
env=env,
)
pytest_twisted.blockon(proto.done)
proto = _DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-m', 'chutney.TorNet', 'start',
join(chutney_dir, 'networks', 'basic'),
),
path=join(chutney_dir),
env=env,
)
pytest_twisted.blockon(proto.done)
pytest_twisted.blockon(chutney(("configure", basic_network)))
pytest_twisted.blockon(chutney(("start", basic_network)))
# print some useful stuff
proto = _CollectOutputProtocol()
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-m', 'chutney.TorNet', 'status',
join(chutney_dir, 'networks', 'basic'),
),
path=join(chutney_dir),
env=env,
)
try:
pytest_twisted.blockon(proto.done)
except ProcessTerminated:
print("Chutney.TorNet status failed (continuing):")
print(proto.output.getvalue())
pytest_twisted.blockon(chutney(("status", basic_network)))
except ProcessTerminated as e:
print("Chutney.TorNet status failed (continuing)")
def cleanup():
print("Tearing down Chutney Tor network")
proto = _CollectOutputProtocol()
reactor.spawnProcess(
proto,
sys.executable,
(
sys.executable, '-m', 'chutney.TorNet', 'stop',
join(chutney_dir, 'networks', 'basic'),
),
path=join(chutney_dir),
env=env,
)
try:
block_with_timeout(proto.done, reactor)
block_with_timeout(chutney(("stop", basic_network)), reactor)
except ProcessTerminated:
# If this doesn't exit cleanly, that's fine, that shouldn't fail
# the test suite.