mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-25 21:59:19 +00:00
236 lines
6.0 KiB
Python
236 lines
6.0 KiB
Python
|
|
__all__ = [
|
|
"CLINodeAPI",
|
|
"Expect",
|
|
"on_stdout",
|
|
"on_stdout_and_stderr",
|
|
"on_different",
|
|
"wait_for_exit",
|
|
]
|
|
|
|
import os
|
|
import sys
|
|
from errno import ENOENT
|
|
|
|
import attr
|
|
|
|
from twisted.internet.error import (
|
|
ProcessDone,
|
|
ProcessTerminated,
|
|
ProcessExitedAlready,
|
|
)
|
|
from twisted.internet.interfaces import (
|
|
IProcessProtocol,
|
|
)
|
|
from twisted.python.filepath import (
|
|
FilePath,
|
|
)
|
|
from twisted.python.runtime import (
|
|
platform,
|
|
)
|
|
from twisted.internet.protocol import (
|
|
Protocol,
|
|
ProcessProtocol,
|
|
)
|
|
from twisted.internet.defer import (
|
|
Deferred,
|
|
succeed,
|
|
)
|
|
from twisted.internet.task import (
|
|
deferLater,
|
|
)
|
|
from ..client import (
|
|
_Client,
|
|
)
|
|
from ..scripts.tahoe_stop import (
|
|
COULD_NOT_STOP,
|
|
)
|
|
from ..util.eliotutil import (
|
|
inline_callbacks,
|
|
)
|
|
|
|
class Expect(Protocol, object):
|
|
def __init__(self):
|
|
self._expectations = []
|
|
|
|
def get_buffered_output(self):
|
|
return self._buffer
|
|
|
|
def expect(self, expectation):
|
|
if expectation in self._buffer:
|
|
return succeed(None)
|
|
d = Deferred()
|
|
self._expectations.append((expectation, d))
|
|
return d
|
|
|
|
def connectionMade(self):
|
|
self._buffer = b""
|
|
|
|
def dataReceived(self, data):
|
|
self._buffer += data
|
|
for i in range(len(self._expectations) - 1, -1, -1):
|
|
expectation, d = self._expectations[i]
|
|
if expectation in self._buffer:
|
|
del self._expectations[i]
|
|
d.callback(None)
|
|
|
|
def connectionLost(self, reason):
|
|
for ignored, d in self._expectations:
|
|
d.errback(reason)
|
|
|
|
|
|
class _ProcessProtocolAdapter(ProcessProtocol, object):
|
|
def __init__(self, fds):
|
|
self._fds = fds
|
|
|
|
def connectionMade(self):
|
|
for proto in self._fds.values():
|
|
proto.makeConnection(self.transport)
|
|
|
|
def childDataReceived(self, childFD, data):
|
|
try:
|
|
proto = self._fds[childFD]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
proto.dataReceived(data)
|
|
|
|
def processEnded(self, reason):
|
|
notified = set()
|
|
for proto in self._fds.values():
|
|
if proto not in notified:
|
|
proto.connectionLost(reason)
|
|
notified.add(proto)
|
|
|
|
|
|
def on_stdout(protocol):
|
|
return _ProcessProtocolAdapter({1: protocol})
|
|
|
|
def on_stdout_and_stderr(protocol):
|
|
return _ProcessProtocolAdapter({1: protocol, 2: protocol})
|
|
|
|
def on_different(fd_mapping):
|
|
return _ProcessProtocolAdapter(fd_mapping)
|
|
|
|
@attr.s
|
|
class CLINodeAPI(object):
|
|
reactor = attr.ib()
|
|
basedir = attr.ib(type=FilePath)
|
|
process = attr.ib(default=None)
|
|
|
|
@property
|
|
def twistd_pid_file(self):
|
|
return self.basedir.child(u"twistd.pid")
|
|
|
|
@property
|
|
def node_url_file(self):
|
|
return self.basedir.child(u"node.url")
|
|
|
|
@property
|
|
def storage_furl_file(self):
|
|
return self.basedir.child(u"private").child(u"storage.furl")
|
|
|
|
@property
|
|
def introducer_furl_file(self):
|
|
return self.basedir.child(u"private").child(u"introducer.furl")
|
|
|
|
@property
|
|
def config_file(self):
|
|
return self.basedir.child(u"tahoe.cfg")
|
|
|
|
@property
|
|
def exit_trigger_file(self):
|
|
return self.basedir.child(_Client.EXIT_TRIGGER_FILE)
|
|
|
|
def _execute(self, process_protocol, argv):
|
|
exe = sys.executable
|
|
argv = [
|
|
exe,
|
|
u"-m",
|
|
u"allmydata.scripts.runner",
|
|
] + argv
|
|
return self.reactor.spawnProcess(
|
|
processProtocol=process_protocol,
|
|
executable=exe,
|
|
args=argv,
|
|
env=os.environ,
|
|
)
|
|
|
|
def run(self, protocol, extra_tahoe_args=()):
|
|
"""
|
|
Start the node running.
|
|
|
|
:param IProcessProtocol protocol: This protocol will be hooked up to
|
|
the node process and can handle output or generate input.
|
|
"""
|
|
if not IProcessProtocol.providedBy(protocol):
|
|
raise TypeError("run requires process protocol, got {}".format(protocol))
|
|
self.process = self._execute(
|
|
protocol,
|
|
list(extra_tahoe_args) + [u"run", self.basedir.asTextMode().path],
|
|
)
|
|
# Don't let the process run away forever.
|
|
try:
|
|
self.active()
|
|
except OSError as e:
|
|
if ENOENT != e.errno:
|
|
raise
|
|
|
|
def stop(self, protocol):
|
|
self._execute(
|
|
protocol,
|
|
[u"stop", self.basedir.asTextMode().path],
|
|
)
|
|
|
|
@inline_callbacks
|
|
def stop_and_wait(self):
|
|
if platform.isWindows():
|
|
# On Windows there is no PID file and no "tahoe stop".
|
|
if self.process is not None:
|
|
while True:
|
|
try:
|
|
self.process.signalProcess("TERM")
|
|
except ProcessExitedAlready:
|
|
break
|
|
else:
|
|
yield deferLater(self.reactor, 0.1, lambda: None)
|
|
else:
|
|
protocol, ended = wait_for_exit()
|
|
self.stop(protocol)
|
|
yield ended
|
|
|
|
def active(self):
|
|
# By writing this file, we get two minutes before the client will
|
|
# exit. This ensures that even if the 'stop' command doesn't work (and
|
|
# the test fails), the client should still terminate.
|
|
self.exit_trigger_file.touch()
|
|
|
|
def _check_cleanup_reason(self, reason):
|
|
# Let it fail because the process has already exited.
|
|
reason.trap(ProcessTerminated)
|
|
if reason.value.exitCode != COULD_NOT_STOP:
|
|
return reason
|
|
return None
|
|
|
|
def cleanup(self):
|
|
stopping = self.stop_and_wait()
|
|
stopping.addErrback(self._check_cleanup_reason)
|
|
return stopping
|
|
|
|
|
|
class _WaitForEnd(ProcessProtocol, object):
|
|
def __init__(self, ended):
|
|
self._ended = ended
|
|
|
|
def processEnded(self, reason):
|
|
if reason.check(ProcessDone):
|
|
self._ended.callback(None)
|
|
else:
|
|
self._ended.errback(reason)
|
|
|
|
|
|
def wait_for_exit():
|
|
ended = Deferred()
|
|
protocol = _WaitForEnd(ended)
|
|
return protocol, ended
|