tahoe-lafs/src/allmydata/test/cli_node_api.py

233 lines
6.0 KiB
Python
Raw Normal View History

__all__ = [
"CLINodeAPI",
"Expect",
"on_stdout",
"on_stdout_and_stderr",
"on_different",
"wait_for_exit",
]
import os
import sys
from errno import ENOENT
import attr
from twisted.internet.error import (
ProcessDone,
ProcessTerminated,
2019-05-13 13:31:46 +00:00
ProcessExitedAlready,
)
from twisted.internet.interfaces import (
IProcessProtocol,
)
from twisted.python.filepath import (
FilePath,
)
from twisted.internet.protocol import (
Protocol,
ProcessProtocol,
)
from twisted.internet.defer import (
Deferred,
succeed,
)
2019-05-13 13:31:46 +00:00
from twisted.internet.task import (
deferLater,
)
2019-05-03 11:27:58 +00:00
from ..client import (
_Client,
)
from ..scripts.tahoe_stop import (
COULD_NOT_STOP,
)
2019-05-13 13:31:46 +00:00
from ..util.eliotutil import (
inline_callbacks,
)
class Expect(Protocol):
def __init__(self):
self._expectations = []
def get_buffered_output(self):
return self._buffer
def expect(self, expectation):
if expectation in self._buffer:
return succeed(None)
d = Deferred()
self._expectations.append((expectation, d))
return d
def connectionMade(self):
self._buffer = b""
def dataReceived(self, data):
self._buffer += data
for i in range(len(self._expectations) - 1, -1, -1):
expectation, d = self._expectations[i]
if expectation in self._buffer:
del self._expectations[i]
d.callback(None)
def connectionLost(self, reason):
for ignored, d in self._expectations:
d.errback(reason)
class _ProcessProtocolAdapter(ProcessProtocol):
def __init__(self, fds):
self._fds = fds
def connectionMade(self):
for proto in self._fds.values():
proto.makeConnection(self.transport)
def childDataReceived(self, childFD, data):
try:
proto = self._fds[childFD]
except KeyError:
pass
else:
proto.dataReceived(data)
def processEnded(self, reason):
notified = set()
for proto in self._fds.values():
if proto not in notified:
proto.connectionLost(reason)
notified.add(proto)
def on_stdout(protocol):
return _ProcessProtocolAdapter({1: protocol})
def on_stdout_and_stderr(protocol):
return _ProcessProtocolAdapter({1: protocol, 2: protocol})
def on_different(fd_mapping):
return _ProcessProtocolAdapter(fd_mapping)
@attr.s
class CLINodeAPI(object):
reactor = attr.ib()
basedir = attr.ib(type=FilePath)
2019-05-13 13:31:46 +00:00
process = attr.ib(default=None)
@property
def twistd_pid_file(self):
return self.basedir.child(u"twistd.pid")
@property
def node_url_file(self):
return self.basedir.child(u"node.url")
@property
def storage_furl_file(self):
return self.basedir.child(u"private").child(u"storage.furl")
@property
def introducer_furl_file(self):
return self.basedir.child(u"private").child(u"introducer.furl")
@property
def config_file(self):
return self.basedir.child(u"tahoe.cfg")
@property
def exit_trigger_file(self):
return self.basedir.child(_Client.EXIT_TRIGGER_FILE)
def _execute(self, process_protocol, argv):
exe = sys.executable
argv = [
exe,
u"-m",
u"allmydata.scripts.runner",
] + argv
return self.reactor.spawnProcess(
processProtocol=process_protocol,
executable=exe,
args=argv,
env=os.environ,
)
def run(self, protocol, extra_tahoe_args=()):
"""
Start the node running.
:param IProcessProtocol protocol: This protocol will be hooked up to
the node process and can handle output or generate input.
"""
if not IProcessProtocol.providedBy(protocol):
raise TypeError("run requires process protocol, got {}".format(protocol))
self.process = self._execute(
protocol,
list(extra_tahoe_args) + [u"run", self.basedir.asTextMode().path],
)
# Don't let the process run away forever.
try:
self.active()
except OSError as e:
if ENOENT != e.errno:
raise
def stop(self, protocol):
self._execute(
protocol,
[u"stop", self.basedir.asTextMode().path],
)
2019-05-13 13:31:46 +00:00
@inline_callbacks
def stop_and_wait(self):
2019-05-13 13:31:46 +00:00
if platform.isWindows():
# On Windows there is no PID file and no "tahoe stop".
if self.process is not None:
while True:
try:
self.process.signalProcess("TERM")
except ProcessExitedAlready:
break
else:
yield deferLater(self.reactor, 0.1, lambda: None)
else:
protocol, ended = wait_for_exit()
self.stop(protocol)
yield ended
def active(self):
# By writing this file, we get two minutes before the client will
# exit. This ensures that even if the 'stop' command doesn't work (and
# the test fails), the client should still terminate.
self.exit_trigger_file.touch()
def _check_cleanup_reason(self, reason):
# Let it fail because the process has already exited.
reason.trap(ProcessTerminated)
if reason.value.exitCode != COULD_NOT_STOP:
return reason
return None
2019-05-03 11:27:58 +00:00
def cleanup(self):
stopping = self.stop_and_wait()
stopping.addErrback(self._check_cleanup_reason)
2019-05-03 11:27:58 +00:00
return stopping
class _WaitForEnd(ProcessProtocol):
def __init__(self, ended):
self._ended = ended
def processEnded(self, reason):
if reason.check(ProcessDone):
self._ended.callback(None)
else:
self._ended.errback(reason)
def wait_for_exit():
ended = Deferred()
protocol = _WaitForEnd(ended)
return protocol, ended