mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-31 08:25:35 +00:00
Merge pull request #641 from meejah/integration-test-wait-for-ready
Better method of "waiting for readiness" in integration tests
This commit is contained in:
commit
7244f3516d
@ -15,6 +15,7 @@ from eliot import (
|
||||
)
|
||||
|
||||
from twisted.python.procutils import which
|
||||
from twisted.internet.defer import DeferredList
|
||||
from twisted.internet.error import (
|
||||
ProcessExitedAlready,
|
||||
ProcessTerminated,
|
||||
@ -30,8 +31,9 @@ from util import (
|
||||
_ProcessExitedProtocol,
|
||||
_create_node,
|
||||
_run_node,
|
||||
_cleanup_twistd_process,
|
||||
_cleanup_tahoe_process,
|
||||
_tahoe_runner_optional_coverage,
|
||||
await_client_ready,
|
||||
)
|
||||
|
||||
|
||||
@ -130,7 +132,7 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request):
|
||||
pytest_twisted.blockon(twistd_protocol.magic_seen)
|
||||
|
||||
def cleanup():
|
||||
_cleanup_twistd_process(twistd_process, twistd_protocol.exited)
|
||||
_cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
|
||||
|
||||
flog_file = mktemp('.flog_dump')
|
||||
flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
|
||||
@ -209,7 +211,7 @@ log_gatherer.furl = {log_furl}
|
||||
intro_dir,
|
||||
),
|
||||
)
|
||||
request.addfinalizer(partial(_cleanup_twistd_process, process, protocol.exited))
|
||||
request.addfinalizer(partial(_cleanup_tahoe_process, process, protocol.exited))
|
||||
|
||||
pytest_twisted.blockon(protocol.magic_seen)
|
||||
return process
|
||||
@ -267,7 +269,7 @@ log_gatherer.furl = {log_furl}
|
||||
# but on linux it means daemonize. "tahoe run" is consistent
|
||||
# between platforms.
|
||||
protocol = _MagicTextProtocol('introducer running')
|
||||
process = _tahoe_runner_optional_coverage(
|
||||
transport = _tahoe_runner_optional_coverage(
|
||||
protocol,
|
||||
reactor,
|
||||
request,
|
||||
@ -279,14 +281,14 @@ log_gatherer.furl = {log_furl}
|
||||
|
||||
def cleanup():
|
||||
try:
|
||||
process.signalProcess('TERM')
|
||||
transport.signalProcess('TERM')
|
||||
pytest_twisted.blockon(protocol.exited)
|
||||
except ProcessExitedAlready:
|
||||
pass
|
||||
request.addfinalizer(cleanup)
|
||||
|
||||
pytest_twisted.blockon(protocol.magic_seen)
|
||||
return process
|
||||
return transport
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
@ -306,20 +308,22 @@ def tor_introducer_furl(tor_introducer, temp_dir):
|
||||
include_result=False,
|
||||
)
|
||||
def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request):
|
||||
nodes = []
|
||||
nodes_d = []
|
||||
# start all 5 nodes in parallel
|
||||
for x in range(5):
|
||||
name = 'node{}'.format(x)
|
||||
# tub_port = 9900 + x
|
||||
nodes.append(
|
||||
pytest_twisted.blockon(
|
||||
_create_node(
|
||||
reactor, request, temp_dir, introducer_furl, flog_gatherer, name,
|
||||
web_port=None, storage=True,
|
||||
)
|
||||
nodes_d.append(
|
||||
_create_node(
|
||||
reactor, request, temp_dir, introducer_furl, flog_gatherer, name,
|
||||
web_port=None, storage=True,
|
||||
)
|
||||
)
|
||||
#nodes = pytest_twisted.blockon(DeferredList(nodes))
|
||||
nodes_status = pytest_twisted.blockon(DeferredList(nodes_d))
|
||||
nodes = []
|
||||
for ok, process in nodes_status:
|
||||
assert ok, "Storage node creation failed: {}".format(process)
|
||||
nodes.append(process)
|
||||
return nodes
|
||||
|
||||
|
||||
@ -338,6 +342,7 @@ def alice(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, requ
|
||||
storage=False,
|
||||
)
|
||||
)
|
||||
await_client_ready(process)
|
||||
return process
|
||||
|
||||
|
||||
@ -356,6 +361,7 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques
|
||||
storage=False,
|
||||
)
|
||||
)
|
||||
await_client_ready(process)
|
||||
return process
|
||||
|
||||
|
||||
@ -368,7 +374,6 @@ def alice_invite(reactor, alice, temp_dir, request):
|
||||
# FIXME XXX by the time we see "client running" in the logs, the
|
||||
# storage servers aren't "really" ready to roll yet (uploads fairly
|
||||
# consistently fail if we don't hack in this pause...)
|
||||
import time ; time.sleep(5)
|
||||
proto = _CollectOutputProtocol()
|
||||
_tahoe_runner_optional_coverage(
|
||||
proto,
|
||||
@ -402,13 +407,14 @@ def alice_invite(reactor, alice, temp_dir, request):
|
||||
# before magic-folder works, we have to stop and restart (this is
|
||||
# crappy for the tests -- can we fix it in magic-folder?)
|
||||
try:
|
||||
alice.signalProcess('TERM')
|
||||
pytest_twisted.blockon(alice.exited)
|
||||
alice.transport.signalProcess('TERM')
|
||||
pytest_twisted.blockon(alice.transport.exited)
|
||||
except ProcessExitedAlready:
|
||||
pass
|
||||
with start_action(action_type=u"integration:alice:magic_folder:magic-text"):
|
||||
magic_text = 'Completed initial Magic Folder scan successfully'
|
||||
pytest_twisted.blockon(_run_node(reactor, node_dir, request, magic_text))
|
||||
await_client_ready(alice)
|
||||
return invite
|
||||
|
||||
|
||||
@ -439,13 +445,14 @@ def magic_folder(reactor, alice_invite, alice, bob, temp_dir, request):
|
||||
# crappy for the tests -- can we fix it in magic-folder?)
|
||||
try:
|
||||
print("Sending TERM to Bob")
|
||||
bob.signalProcess('TERM')
|
||||
pytest_twisted.blockon(bob.exited)
|
||||
bob.transport.signalProcess('TERM')
|
||||
pytest_twisted.blockon(bob.transport.exited)
|
||||
except ProcessExitedAlready:
|
||||
pass
|
||||
|
||||
magic_text = 'Completed initial Magic Folder scan successfully'
|
||||
pytest_twisted.blockon(_run_node(reactor, bob_dir, request, magic_text))
|
||||
await_client_ready(bob)
|
||||
return (join(temp_dir, 'magic-alice'), join(temp_dir, 'magic-bob'))
|
||||
|
||||
|
||||
|
@ -336,10 +336,10 @@ def test_edmond_uploads_then_restarts(reactor, request, temp_dir, introducer_fur
|
||||
assert created, "Didn't create a magic-folder"
|
||||
|
||||
# to actually-start the magic-folder we have to re-start
|
||||
edmond.signalProcess('TERM')
|
||||
yield edmond._protocol.exited
|
||||
time.sleep(1)
|
||||
edmond = yield util._run_node(reactor, edmond._node_dir, request, 'Completed initial Magic Folder scan successfully')
|
||||
edmond.transport.signalProcess('TERM')
|
||||
yield edmond.transport.exited
|
||||
edmond = yield util._run_node(reactor, edmond.node_dir, request, 'Completed initial Magic Folder scan successfully')
|
||||
util.await_client_ready(edmond)
|
||||
|
||||
# add a thing to the magic-folder
|
||||
with open(join(magic_folder, "its_a_file"), "w") as f:
|
||||
@ -383,10 +383,11 @@ def test_edmond_uploads_then_restarts(reactor, request, temp_dir, introducer_fur
|
||||
# re-starting edmond right now would "normally" trigger the 2880 bug
|
||||
|
||||
# kill edmond
|
||||
edmond.signalProcess('TERM')
|
||||
yield edmond._protocol.exited
|
||||
edmond.transport.signalProcess('TERM')
|
||||
yield edmond.transport.exited
|
||||
time.sleep(1)
|
||||
edmond = yield util._run_node(reactor, edmond._node_dir, request, 'Completed initial Magic Folder scan successfully')
|
||||
edmond = yield util._run_node(reactor, edmond.node_dir, request, 'Completed initial Magic Folder scan successfully')
|
||||
util.await_client_ready(edmond)
|
||||
|
||||
# XXX how can we say for sure if we've waited long enough? look at
|
||||
# tail of logs for magic-folder ... somethingsomething?
|
||||
|
@ -12,7 +12,7 @@ import pytest_twisted
|
||||
@pytest_twisted.inlineCallbacks
|
||||
def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, request):
|
||||
|
||||
yield util._create_node(
|
||||
edna = yield util._create_node(
|
||||
reactor, request, temp_dir, introducer_furl, flog_gatherer, "edna",
|
||||
web_port="tcp:9983:interface=localhost",
|
||||
storage=False,
|
||||
@ -20,13 +20,10 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto
|
||||
happy=7,
|
||||
total=10,
|
||||
)
|
||||
|
||||
util.await_client_ready(edna)
|
||||
|
||||
node_dir = join(temp_dir, 'edna')
|
||||
|
||||
print("waiting 10 seconds unil we're maybe ready")
|
||||
yield task.deferLater(reactor, 10, lambda: None)
|
||||
|
||||
# upload a file, which should fail because we have don't have 7
|
||||
# storage servers (but happiness is set to 7)
|
||||
proto = util._CollectOutputProtocol()
|
||||
|
@ -1,5 +1,6 @@
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
from os import mkdir
|
||||
from os.path import exists, join
|
||||
from six.moves import StringIO
|
||||
@ -9,6 +10,8 @@ from twisted.internet.defer import Deferred, succeed
|
||||
from twisted.internet.protocol import ProcessProtocol
|
||||
from twisted.internet.error import ProcessExitedAlready, ProcessDone
|
||||
|
||||
import requests
|
||||
|
||||
from allmydata.util.configutil import (
|
||||
get_config,
|
||||
set_config,
|
||||
@ -106,19 +109,19 @@ class _MagicTextProtocol(ProcessProtocol):
|
||||
sys.stdout.write(data)
|
||||
|
||||
|
||||
def _cleanup_twistd_process(twistd_process, exited):
|
||||
def _cleanup_tahoe_process(tahoe_transport, exited):
|
||||
"""
|
||||
Terminate the given process with a kill signal (SIGKILL on POSIX,
|
||||
TerminateProcess on Windows).
|
||||
|
||||
:param twistd_process: The `IProcessTransport` representing the process.
|
||||
:param tahoe_transport: The `IProcessTransport` representing the process.
|
||||
:param exited: A `Deferred` which fires when the process has exited.
|
||||
|
||||
:return: After the process has exited.
|
||||
"""
|
||||
try:
|
||||
print("signaling {} with TERM".format(twistd_process.pid))
|
||||
twistd_process.signalProcess('TERM')
|
||||
print("signaling {} with TERM".format(tahoe_transport.pid))
|
||||
tahoe_transport.signalProcess('TERM')
|
||||
print("signaled, blocking on exit")
|
||||
pytest_twisted.blockon(exited)
|
||||
print("exited, goodbye")
|
||||
@ -144,7 +147,30 @@ def _tahoe_runner_optional_coverage(proto, reactor, request, other_args):
|
||||
)
|
||||
|
||||
|
||||
class TahoeProcess(object):
|
||||
"""
|
||||
A running Tahoe process, with associated information.
|
||||
"""
|
||||
|
||||
def __init__(self, process_transport, node_dir):
|
||||
self._process_transport = process_transport # IProcessTransport instance
|
||||
self._node_dir = node_dir # path
|
||||
|
||||
@property
|
||||
def transport(self):
|
||||
return self._process_transport
|
||||
|
||||
@property
|
||||
def node_dir(self):
|
||||
return self._node_dir
|
||||
|
||||
|
||||
def _run_node(reactor, node_dir, request, magic_text):
|
||||
"""
|
||||
Run a tahoe process from its node_dir.
|
||||
|
||||
:returns: a TahoeProcess for this node
|
||||
"""
|
||||
if magic_text is None:
|
||||
magic_text = "client running"
|
||||
protocol = _MagicTextProtocol(magic_text)
|
||||
@ -153,7 +179,7 @@ def _run_node(reactor, node_dir, request, magic_text):
|
||||
# but on linux it means daemonize. "tahoe run" is consistent
|
||||
# between platforms.
|
||||
|
||||
process = _tahoe_runner_optional_coverage(
|
||||
transport = _tahoe_runner_optional_coverage(
|
||||
protocol,
|
||||
reactor,
|
||||
request,
|
||||
@ -163,17 +189,18 @@ def _run_node(reactor, node_dir, request, magic_text):
|
||||
node_dir,
|
||||
],
|
||||
)
|
||||
process.exited = protocol.exited
|
||||
transport.exited = protocol.exited
|
||||
|
||||
request.addfinalizer(partial(_cleanup_twistd_process, process, protocol.exited))
|
||||
request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited))
|
||||
|
||||
# we return the 'process' ITransport instance
|
||||
# XXX abusing the Deferred; should use .when_magic_seen() or something?
|
||||
# XXX abusing the Deferred; should use .when_magic_seen() pattern
|
||||
|
||||
def got_proto(proto):
|
||||
process._protocol = proto
|
||||
process._node_dir = node_dir
|
||||
return process
|
||||
transport._protocol = proto
|
||||
return TahoeProcess(
|
||||
transport,
|
||||
node_dir,
|
||||
)
|
||||
protocol.magic_seen.addCallback(got_proto)
|
||||
return protocol.magic_seen
|
||||
|
||||
@ -358,5 +385,106 @@ def cli(request, reactor, node_dir, *argv):
|
||||
)
|
||||
return proto.done
|
||||
|
||||
|
||||
def node_url(node_dir, uri_fragment):
|
||||
"""
|
||||
Create a fully qualified URL by reading config from `node_dir` and
|
||||
adding the `uri_fragment`
|
||||
"""
|
||||
with open(join(node_dir, "node.url"), "r") as f:
|
||||
base = f.read().strip()
|
||||
url = base + uri_fragment
|
||||
return url
|
||||
|
||||
|
||||
def _check_status(response):
|
||||
"""
|
||||
Check the response code is a 2xx (raise an exception otherwise)
|
||||
"""
|
||||
if response.status_code < 200 or response.status_code >= 300:
|
||||
raise ValueError(
|
||||
"Expected a 2xx code, got {}".format(response.status_code)
|
||||
)
|
||||
|
||||
|
||||
def web_get(node_dir, uri_fragment, **kwargs):
|
||||
"""
|
||||
Make a GET request to the webport of `node_dir`. This will look
|
||||
like: `http://localhost:<webport>/<uri_fragment>`. All `kwargs`
|
||||
are passed on to `requests.get`
|
||||
"""
|
||||
url = node_url(node_dir, uri_fragment)
|
||||
resp = requests.get(url, **kwargs)
|
||||
_check_status(resp)
|
||||
return resp.content
|
||||
|
||||
|
||||
def web_post(node_dir, uri_fragment, **kwargs):
|
||||
"""
|
||||
Make a POST request to the webport of `node_dir`. This will look
|
||||
like: `http://localhost:<webport>/<uri_fragment>`. All `kwargs`
|
||||
are passed on to `requests.post`
|
||||
"""
|
||||
url = node_url(node_dir, uri_fragment)
|
||||
resp = requests.post(url, **kwargs)
|
||||
_check_status(resp)
|
||||
return resp.content
|
||||
|
||||
|
||||
def await_client_ready(process, timeout=10, liveness=60*2):
|
||||
"""
|
||||
Uses the status API to wait for a client-type node to be
|
||||
'ready'. A client is deemed ready if:
|
||||
- it answers http://<node_url>/statistics/?t=json/
|
||||
- there is at least one storage-server connected
|
||||
- every storage-server has a "last_received_data" and it is
|
||||
within the last `liveness` seconds
|
||||
|
||||
We will try for up to `timeout` seconds for the above conditions
|
||||
to be true. Otherwise, an exception is raised
|
||||
"""
|
||||
start = time.time()
|
||||
while (time.time() - start) < float(timeout):
|
||||
try:
|
||||
data = web_get(process.node_dir, u"", params={u"t": u"json"})
|
||||
js = json.loads(data)
|
||||
except Exception as e:
|
||||
print("waiting because '{}'".format(e))
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
if len(js['servers']) == 0:
|
||||
print("waiting because no servers at all")
|
||||
time.sleep(1)
|
||||
continue
|
||||
server_times = [
|
||||
server['last_received_data']
|
||||
for server in js['servers']
|
||||
]
|
||||
# if any times are null/None that server has never been
|
||||
# contacted (so it's down still, probably)
|
||||
if any(t is None for t in server_times):
|
||||
print("waiting because at least one server not contacted")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
# check that all times are 'recent enough'
|
||||
if any([time.time() - t > liveness for t in server_times]):
|
||||
print("waiting because at least one server too old")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
# we have a status with at least one server, and all servers
|
||||
# have been contacted recently
|
||||
return True
|
||||
# we only fall out of the loop when we've timed out
|
||||
raise RuntimeError(
|
||||
"Waited {} seconds for {} to be 'ready' but it never was".format(
|
||||
timeout,
|
||||
process.node_dir,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def magic_folder_cli(request, reactor, node_dir, *argv):
|
||||
return cli(request, reactor, node_dir, "magic-folder", *argv)
|
||||
|
1
newsfragments/3237.minor
Normal file
1
newsfragments/3237.minor
Normal file
@ -0,0 +1 @@
|
||||
Wait for integration-test clients to be ready using status-API
|
Loading…
x
Reference in New Issue
Block a user