Merge pull request #518 from LeastAuthority/test_upload_and_download_random_key

Avoid EADDRINUSE from allmydata.test.test_system.SystemTest

Fixes ticket:2933
This commit is contained in:
Jean-Paul Calderone 2018-08-22 09:31:48 -04:00 committed by GitHub
commit 292448a423
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 287 additions and 92 deletions

1
newsfragments/2933.other Normal file
View File

@ -0,0 +1 @@
allmydata.test.test_system.SystemTest is now more reliable with respect to bound address collisions.

View File

@ -0,0 +1,5 @@
from allmydata.test.test_system import (
AdoptedServerPort,
)
adoptedEndpointParser = AdoptedServerPort()

View File

@ -1,9 +1,27 @@
import os, re, sys, time, json
from functools import partial
from socket import (
AF_INET,
SOCK_STREAM,
SOMAXCONN,
socket,
)
from zope.interface import implementer
from twisted.python.filepath import FilePath
from twisted.internet import reactor
from twisted.trial import unittest
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks
from twisted.application import service
from twisted.plugin import IPlugin
from twisted.internet.interfaces import (
IStreamServerEndpointStringParser,
IReactorSocket,
)
from twisted.internet.endpoints import AdoptedStreamServerEndpoint
import allmydata
from allmydata import client, uri
@ -381,6 +399,114 @@ def flush_but_dont_ignore(res):
d.addCallback(_done)
return d
def _render_config(config):
"""
Convert a ``dict`` of ``dict`` of ``bytes`` to an ini-format string.
"""
return "\n\n".join(list(
_render_config_section(k, v)
for (k, v)
in config.items()
))
def _render_config_section(heading, values):
"""
Convert a ``bytes`` heading and a ``dict`` of ``bytes`` to an ini-format
section as ``bytes``.
"""
return "[{}]\n{}\n".format(
heading, _render_section_values(values)
)
def _render_section_values(values):
"""
Convert a ``dict`` of ``bytes`` to the body of an ini-format section as
``bytes``.
"""
return "\n".join(list(
"{} = {}".format(k, v)
for (k, v)
in sorted(values.items())
))
@implementer(IPlugin, IStreamServerEndpointStringParser)
class AdoptedServerPort(object):
"""
Parse an ``adopt-socket:<fd>`` endpoint description by adopting ``fd`` as
a listening TCP port.
"""
prefix = "adopt-socket"
def parseStreamServer(self, reactor, fd):
log.msg("Adopting {}".format(fd))
# AdoptedStreamServerEndpoint wants to own the file descriptor. It
# will duplicate it and then close the one we pass in. This means it
# is really only possible to adopt a particular file descriptor once.
#
# This wouldn't matter except one of the tests wants to stop one of
# the nodes and start it up again. This results in exactly an attempt
# to adopt a particular file descriptor twice.
#
# So we'll dup it ourselves. AdoptedStreamServerEndpoint can do
# whatever it wants to the result - the original will still be valid
# and reusable.
return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET)
def patch_plugins(testcase):
"""
Add the testing package ``plugins`` directory to the ``twisted.plugins``
aggregate package. Arrange for it to be removed again when the given test
is torn down.
"""
import twisted.plugins
testplugins = FilePath(__file__).sibling("plugins")
twisted.plugins.__path__.insert(0, testplugins.path)
testcase.addCleanup(lambda: twisted.plugins.__path__.remove(testplugins.path))
def assign_foolscap_port(testcase, reactor):
"""
Assign a TCP port which can be used for a Foolscap server.
An effort is made to avoid address collisions for this port but the logic
for doing so is platform-dependent (sorry, Windows).
The resulting TCP port can only be used in this process!
:return: A two-tuple of (location hint, port endpoint description) as
strings.
"""
if IReactorSocket.providedBy(reactor):
# On this platform, we can reliable pre-allocate a listening port.
# Once it is bound we know it will not fail later with EADDRINUSE.
s = socket(AF_INET, SOCK_STREAM)
s.setblocking(False)
s.bind(("127.0.0.1", 0))
s.listen(SOMAXCONN)
host, port = s.getsockname()
location_hint = "tcp:%s:%d" % (host, port)
# We need to keep ``s`` alive as long as the file descriptor we put in
# this string might still be used. We could dup() the descriptor
# instead but then we've only inverted the cleanup problem: gone from
# don't-close-too-soon to close-just-late-enough. So we'll leave
# ``s`` alive and use it as the cleanup mechanism.
port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),)
testcase.addCleanup(s.close)
# Make sure `adopt-socket` is recognized. We do this instead of
# providing a dropin because we don't want to make this endpoint
# available to random other applications.
patch_plugins(testcase)
else:
# On other platforms, we blindly guess and hope we get lucky.
portnum = iputil.allocate_tcp_port()
location_hint = "tcp:127.0.0.1:%d" % (portnum,)
port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,)
return location_hint, port_endpoint
class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# SystemTestMixin tests tend to be a lot of work, and we have a few
@ -409,44 +535,69 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
s.setServiceParent(self.sparent)
return s
def set_up_nodes(self, NUMCLIENTS=5, use_stats_gatherer=False):
self.numclients = NUMCLIENTS
def _create_introducer(self):
iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir):
introducer_config = (
u"[node]\n"
u"nickname = introducer \N{BLACK SMILING FACE}\n"
u"web.port = tcp:0:interface=127.0.0.1\n"
).encode("utf-8")
fileutil.make_dirs(iv_dir)
fileutil.write(os.path.join(iv_dir, 'tahoe.cfg'),
"[node]\n" +
u"nickname = introducer \u263A\n".encode('utf-8') +
"web.port = tcp:0:interface=127.0.0.1\n")
fileutil.write(
os.path.join(iv_dir, 'tahoe.cfg'),
introducer_config,
)
if SYSTEM_TEST_CERTS:
os.mkdir(os.path.join(iv_dir, "private"))
f = open(os.path.join(iv_dir, "private", "node.pem"), "w")
f.write(SYSTEM_TEST_CERTS[0])
f.close()
iv = create_introducer(basedir=iv_dir)
self.introducer = self.add_service(iv)
self._get_introducer_web()
d = defer.succeed(None)
if use_stats_gatherer:
d.addCallback(self._set_up_stats_gatherer)
d.addCallback(self._set_up_nodes_2)
if use_stats_gatherer:
d.addCallback(self._grab_stats)
return d
return create_introducer(basedir=iv_dir)
def _get_introducer_web(self):
f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
self.introweb_url = f.read().strip()
f.close()
with open(os.path.join(self.getdir("introducer"), "node.url"), "r") as f:
return f.read().strip()
def _set_up_stats_gatherer(self, res):
@inlineCallbacks
def set_up_nodes(self, NUMCLIENTS=5, use_stats_gatherer=False):
"""
Create an introducer and ``NUMCLIENTS`` client nodes pointed at it. All
of the nodes are running in this process.
As a side-effect, set:
* ``numclients`` to ``NUMCLIENTS``
* ``introducer`` to the ``_IntroducerNode`` instance
* ``introweb_url`` to the introducer's HTTP API endpoint.
:param int NUMCLIENTS: The number of client nodes to create.
:param bool use_stats_gatherer: If ``True`` then also create a stats
gatherer and configure the other nodes to use it.
:return: A ``Deferred`` that fires when the nodes have connected to
each other.
"""
self.numclients = NUMCLIENTS
self.introducer = self.add_service(self._create_introducer())
self.introweb_url = self._get_introducer_web()
if use_stats_gatherer:
yield self._set_up_stats_gatherer()
yield self._set_up_client_nodes()
if use_stats_gatherer:
yield self._grab_stats()
def _set_up_stats_gatherer(self):
statsdir = self.getdir("stats_gatherer")
fileutil.make_dirs(statsdir)
portnum = iputil.allocate_tcp_port()
location = "tcp:127.0.0.1:%d" % portnum
fileutil.write(os.path.join(statsdir, "location"), location)
port = "tcp:%d:interface=127.0.0.1" % portnum
fileutil.write(os.path.join(statsdir, "port"), port)
location_hint, port_endpoint = assign_foolscap_port(self, reactor)
fileutil.write(os.path.join(statsdir, "location"), location_hint)
fileutil.write(os.path.join(statsdir, "port"), port_endpoint)
self.stats_gatherer_svc = StatsGathererService(statsdir)
self.stats_gatherer = self.stats_gatherer_svc.stats_gatherer
self.add_service(self.stats_gatherer_svc)
@ -461,62 +612,14 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
d.addCallback(get_furl)
return d
def _set_up_nodes_2(self, res):
@inlineCallbacks
def _set_up_client_nodes(self):
q = self.introducer
self.introducer_furl = q.introducer_url
self.clients = []
basedirs = []
for i in range(self.numclients):
basedir = self.getdir("client%d" % i)
basedirs.append(basedir)
fileutil.make_dirs(os.path.join(basedir, "private"))
if len(SYSTEM_TEST_CERTS) > (i+1):
f = open(os.path.join(basedir, "private", "node.pem"), "w")
f.write(SYSTEM_TEST_CERTS[i+1])
f.close()
config = "[client]\n"
if i != 1:
# clients[1] uses private/introducers.yaml, not tahoe.cfg
config += "introducer.furl = %s\n" % self.introducer_furl
if self.stats_gatherer_furl:
config += "stats_gatherer.furl = %s\n" % self.stats_gatherer_furl
nodeconfig = "[node]\n"
nodeconfig += (u"nickname = client %d \u263A\n" % (i,)).encode('utf-8')
tub_port = iputil.allocate_tcp_port()
# Don't let it use AUTO: there's no need for tests to use
# anything other than 127.0.0.1
nodeconfig += "tub.port = tcp:%d\n" % tub_port
nodeconfig += "tub.location = tcp:127.0.0.1:%d\n" % tub_port
if i == 0:
# clients[0] runs a webserver and a helper
config += nodeconfig
config += "web.port = tcp:0:interface=127.0.0.1\n"
config += "timeout.keepalive = 600\n"
config += "[helper]\n"
config += "enabled = True\n"
elif i == 1:
# clients[1] uses private/introducers.yaml, not tahoe.cfg
iyaml = ("introducers:\n"
" petname2:\n"
" furl: %s\n") % self.introducer_furl
iyaml_fn = os.path.join(basedir, "private", "introducers.yaml")
fileutil.write(iyaml_fn, iyaml)
elif i == 3:
# clients[3] runs a webserver and uses a helper
config += nodeconfig
config += "web.port = tcp:0:interface=127.0.0.1\n"
config += "timeout.disconnect = 1800\n"
else:
config += nodeconfig
fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
# give subclasses a chance to append lines to the node's tahoe.cfg
# files before they are launched.
self._set_up_nodes_extra_config()
basedirs.append((yield self._set_up_client_node(i)))
# start clients[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl).
@ -541,22 +644,84 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
self.clients.append(c)
c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
log.msg("STARTING")
d = self.wait_for_connections()
def _connected(res):
log.msg("CONNECTED")
# now find out where the web port was
self.webish_url = self.clients[0].getServiceNamed("webish").getURL()
if self.numclients >=4:
# and the helper-using webport
self.helper_webish_url = self.clients[3].getServiceNamed("webish").getURL()
d.addCallback(_connected)
return d
yield self.wait_for_connections()
log.msg("CONNECTED")
# now find out where the web port was
self.webish_url = self.clients[0].getServiceNamed("webish").getURL()
if self.numclients >=4:
# and the helper-using webport
self.helper_webish_url = self.clients[3].getServiceNamed("webish").getURL()
def _set_up_nodes_extra_config(self):
# for overriding by subclasses
pass
def _generate_config(self, which, basedir):
config = {}
def _grab_stats(self, res):
except1 = set(range(self.numclients)) - {1}
feature_matrix = {
# client 1 uses private/introducers.yaml, not tahoe.cfg
("client", "introducer.furl"): except1,
("client", "nickname"): except1,
# client 1 has to auto-assign an address.
("node", "tub.port"): except1,
("node", "tub.location"): except1,
# client 0 runs a webserver and a helper
# client 3 runs a webserver but no helper
("node", "web.port"): {0, 3},
("node", "timeout.keepalive"): {0},
("node", "timeout.disconnect"): {3},
("helper", "enabled"): {0},
}
def setconf(config, which, section, feature, value):
if which in feature_matrix.get((section, feature), {which}):
if isinstance(value, unicode):
value = value.encode("utf-8")
config.setdefault(section, {})[feature] = value
setclient = partial(setconf, config, which, "client")
setnode = partial(setconf, config, which, "node")
sethelper = partial(setconf, config, which, "helper")
setclient("introducer.furl", self.introducer_furl)
setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,))
if self.stats_gatherer_furl:
setclient("stats_gatherer.furl", self.stats_gatherer_furl)
tub_location_hint, tub_port_endpoint = assign_foolscap_port(self, reactor)
setnode("tub.port", tub_port_endpoint)
setnode("tub.location", tub_location_hint)
setnode("web.port", "tcp:0:interface=127.0.0.1")
setnode("timeout.keepalive", "600")
setnode("timeout.disconnect", "1800")
sethelper("enabled", "True")
if which == 1:
# clients[1] uses private/introducers.yaml, not tahoe.cfg
iyaml = ("introducers:\n"
" petname2:\n"
" furl: %s\n") % self.introducer_furl
iyaml_fn = os.path.join(basedir, "private", "introducers.yaml")
fileutil.write(iyaml_fn, iyaml)
return _render_config(config)
def _set_up_client_node(self, which):
basedir = self.getdir("client%d" % (which,))
fileutil.make_dirs(os.path.join(basedir, "private"))
if len(SYSTEM_TEST_CERTS) > (which + 1):
f = open(os.path.join(basedir, "private", "node.pem"), "w")
f.write(SYSTEM_TEST_CERTS[which + 1])
f.close()
config = self._generate_config(which, basedir)
fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
return basedir
def _grab_stats(self):
d = self.stats_gatherer.poll()
return d
@ -609,14 +774,38 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
return d
def _check_connections(self):
for c in self.clients:
for i, c in enumerate(self.clients):
if not c.connected_to_introducer():
log.msg("%s not connected to introducer yet" % (i,))
return False
sb = c.get_storage_broker()
if len(sb.get_connected_servers()) != self.numclients:
connected_servers = sb.get_connected_servers()
connected_names = sorted(list(
connected.get_nickname()
for connected
in sb.get_known_servers()
if connected.is_connected()
))
if len(connected_servers) != self.numclients:
wanted = sorted(list(
client.nickname
for client
in self.clients
))
log.msg(
"client %s storage broker connected to %s, missing %s" % (
i,
connected_names,
set(wanted) - set(connected_names),
)
)
return False
log.msg("client %s storage broker connected to %s, happy" % (
i, connected_names,
))
up = c.getServiceNamed("uploader")
if up._helper_furl and not up._helper:
log.msg("Helper fURL but no helper")
return False
return True