From 7c975036876025328b3d6f45ae5d11af6f75dc50 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 28 Mar 2019 14:54:45 -0400 Subject: [PATCH 1/6] Move and somewhat refactor `assign_foolscap_port`. These changes make it easier to re-use and remove the notion that it is foolscap-specific. --- src/allmydata/test/common.py | 113 +++++++++++++++++- .../test/plugins/tahoe_lafs_dropin.py | 2 +- src/allmydata/test/test_system.py | 107 ++--------------- 3 files changed, 124 insertions(+), 98 deletions(-) diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index 3d3d84593..e24cc581a 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -15,6 +15,12 @@ import tempfile from tempfile import mktemp from functools import partial from unittest import case as _case +from socket import ( + AF_INET, + SOCK_STREAM, + SOMAXCONN, + socket, +) import treq @@ -32,12 +38,19 @@ from testtools.twistedsupport import ( flush_logged_errors, ) +from twisted.plugin import IPlugin from twisted.internet import defer from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.interfaces import IPullProducer from twisted.python import failure +from twisted.python.filepath import FilePath from twisted.application import service from twisted.web.error import Error as WebError +from twisted.internet.interfaces import ( + IStreamServerEndpointStringParser, + IReactorSocket, +) +from twisted.internet.endpoints import AdoptedStreamServerEndpoint from allmydata import uri from allmydata.interfaces import IMutableFileNode, IImmutableFileNode,\ @@ -50,7 +63,7 @@ from allmydata.storage_client import StubServer from allmydata.mutable.layout import unpack_header from allmydata.mutable.publish import MutableData from allmydata.storage.mutable import MutableShareFile -from allmydata.util import hashutil, log +from allmydata.util import hashutil, log, iputil from allmydata.util.assertutil import precondition from allmydata.util.consumer import download_to_data import allmydata.test.common_util as testutil @@ -63,6 +76,104 @@ from .eliotutil import ( TEST_RSA_KEY_SIZE = 522 +@implementer(IPlugin, IStreamServerEndpointStringParser) +class AdoptedServerPort(object): + """ + Parse an ``adopt-socket:`` endpoint description by adopting ``fd`` as + a listening TCP port. + """ + prefix = "adopt-socket" + + def parseStreamServer(self, reactor, fd): + log.msg("Adopting {}".format(fd)) + # AdoptedStreamServerEndpoint wants to own the file descriptor. It + # will duplicate it and then close the one we pass in. This means it + # is really only possible to adopt a particular file descriptor once. + # + # This wouldn't matter except one of the tests wants to stop one of + # the nodes and start it up again. This results in exactly an attempt + # to adopt a particular file descriptor twice. + # + # So we'll dup it ourselves. AdoptedStreamServerEndpoint can do + # whatever it wants to the result - the original will still be valid + # and reusable. + return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET) + + +class SameProcessStreamEndpointAssigner(object): + """ + A fixture which can assign streaming server endpoints for use *in this + process only*. + + An effort is made to avoid address collisions for this port but the logic + for doing so is platform-dependent (sorry, Windows). + + This is more reliable than trying to listen on a hard-coded non-zero port + number. It is at least as reliable as trying to listen on port number + zero on Windows and more reliable than doing that on other platforms. + """ + def setUp(self): + self._cleanups = [] + + def tearDown(self): + for c in self._cleanups: + c() + + def _patch_plugins(self): + """ + Add the testing package ``plugins`` directory to the ``twisted.plugins`` + aggregate package. Arrange for it to be removed again when the + fixture is torn down. + """ + import twisted.plugins + testplugins = FilePath(__file__).sibling("plugins") + twisted.plugins.__path__.insert(0, testplugins.path) + self._cleanups.append(lambda: twisted.plugins.__path__.remove(testplugins.path)) + + + def assign(self, reactor): + """ + Make a new streaming server endpoint and return its string description. + + This is intended to help write config files that will then be read and + used in this process. + + :param reactor: The reactor which will be used to listen with the + resulting endpoint. If it provides ``IReactorSocket`` then + resulting reliability will be extremely high. If it doesn't, + resulting reliability will be pretty alright. + + :return: A two-tuple of (location hint, port endpoint description) as + strings. + """ + if IReactorSocket.providedBy(reactor): + # On this platform, we can reliable pre-allocate a listening port. + # Once it is bound we know it will not fail later with EADDRINUSE. + s = socket(AF_INET, SOCK_STREAM) + # We need to keep ``s`` alive as long as the file descriptor we put in + # this string might still be used. We could dup() the descriptor + # instead but then we've only inverted the cleanup problem: gone from + # don't-close-too-soon to close-just-late-enough. So we'll leave + # ``s`` alive and use it as the cleanup mechanism. + self._cleanups.append(s.close) + s.setblocking(False) + s.bind(("127.0.0.1", 0)) + s.listen(SOMAXCONN) + host, port = s.getsockname() + location_hint = "tcp:%s:%d" % (host, port) + port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),) + # Make sure `adopt-socket` is recognized. We do this instead of + # providing a dropin because we don't want to make this endpoint + # available to random other applications. + self._patch_plugins() + else: + # On other platforms, we blindly guess and hope we get lucky. + portnum = iputil.allocate_tcp_port() + location_hint = "tcp:127.0.0.1:%d" % (portnum,) + port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,) + + return location_hint, port_endpoint + @implementer(IPullProducer) class DummyProducer(object): def resumeProducing(self): diff --git a/src/allmydata/test/plugins/tahoe_lafs_dropin.py b/src/allmydata/test/plugins/tahoe_lafs_dropin.py index 110ef5ab8..9faf5f07f 100644 --- a/src/allmydata/test/plugins/tahoe_lafs_dropin.py +++ b/src/allmydata/test/plugins/tahoe_lafs_dropin.py @@ -1,4 +1,4 @@ -from allmydata.test.test_system import ( +from allmydata.test.common import ( AdoptedServerPort, ) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index c178ef609..0a79dfaa3 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -2,27 +2,12 @@ from __future__ import print_function import os, re, sys, time, json from functools import partial -from socket import ( - AF_INET, - SOCK_STREAM, - SOMAXCONN, - socket, -) -from zope.interface import implementer - -from twisted.python.filepath import FilePath from twisted.internet import reactor from twisted.trial import unittest from twisted.internet import defer from twisted.internet.defer import inlineCallbacks from twisted.application import service -from twisted.plugin import IPlugin -from twisted.internet.interfaces import ( - IStreamServerEndpointStringParser, - IReactorSocket, -) -from twisted.internet.endpoints import AdoptedStreamServerEndpoint import allmydata from allmydata import client, uri @@ -32,7 +17,7 @@ from allmydata.storage.server import si_a2b from allmydata.immutable import offloaded, upload from allmydata.immutable.literal import LiteralFileNode from allmydata.immutable.filenode import ImmutableFileNode -from allmydata.util import idlib, mathutil, pollmixin, fileutil, iputil +from allmydata.util import idlib, mathutil, pollmixin, fileutil from allmydata.util import log, base32 from allmydata.util.encodingutil import quote_output, unicode_to_argv from allmydata.util.fileutil import abspath_expanduser_unicode @@ -48,7 +33,10 @@ from allmydata.mutable.publish import MutableData from foolscap.api import DeadReferenceError, fireEventually, flushEventualQueue from twisted.python.failure import Failure -from .common import TEST_RSA_KEY_SIZE +from .common import ( + TEST_RSA_KEY_SIZE, + SameProcessStreamEndpointAssigner, +) from .common_web import do_http, Error # TODO: move this to common or common_util @@ -431,83 +419,6 @@ def _render_section_values(values): )) -@implementer(IPlugin, IStreamServerEndpointStringParser) -class AdoptedServerPort(object): - """ - Parse an ``adopt-socket:`` endpoint description by adopting ``fd`` as - a listening TCP port. - """ - prefix = "adopt-socket" - - def parseStreamServer(self, reactor, fd): - log.msg("Adopting {}".format(fd)) - # AdoptedStreamServerEndpoint wants to own the file descriptor. It - # will duplicate it and then close the one we pass in. This means it - # is really only possible to adopt a particular file descriptor once. - # - # This wouldn't matter except one of the tests wants to stop one of - # the nodes and start it up again. This results in exactly an attempt - # to adopt a particular file descriptor twice. - # - # So we'll dup it ourselves. AdoptedStreamServerEndpoint can do - # whatever it wants to the result - the original will still be valid - # and reusable. - return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET) - - -def patch_plugins(testcase): - """ - Add the testing package ``plugins`` directory to the ``twisted.plugins`` - aggregate package. Arrange for it to be removed again when the given test - is torn down. - """ - import twisted.plugins - testplugins = FilePath(__file__).sibling("plugins") - twisted.plugins.__path__.insert(0, testplugins.path) - testcase.addCleanup(lambda: twisted.plugins.__path__.remove(testplugins.path)) - - -def assign_foolscap_port(testcase, reactor): - """ - Assign a TCP port which can be used for a Foolscap server. - - An effort is made to avoid address collisions for this port but the logic - for doing so is platform-dependent (sorry, Windows). - - The resulting TCP port can only be used in this process! - - :return: A two-tuple of (location hint, port endpoint description) as - strings. - """ - if IReactorSocket.providedBy(reactor): - # On this platform, we can reliable pre-allocate a listening port. - # Once it is bound we know it will not fail later with EADDRINUSE. - s = socket(AF_INET, SOCK_STREAM) - s.setblocking(False) - s.bind(("127.0.0.1", 0)) - s.listen(SOMAXCONN) - host, port = s.getsockname() - location_hint = "tcp:%s:%d" % (host, port) - # We need to keep ``s`` alive as long as the file descriptor we put in - # this string might still be used. We could dup() the descriptor - # instead but then we've only inverted the cleanup problem: gone from - # don't-close-too-soon to close-just-late-enough. So we'll leave - # ``s`` alive and use it as the cleanup mechanism. - port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),) - testcase.addCleanup(s.close) - # Make sure `adopt-socket` is recognized. We do this instead of - # providing a dropin because we don't want to make this endpoint - # available to random other applications. - patch_plugins(testcase) - else: - # On other platforms, we blindly guess and hope we get lucky. - portnum = iputil.allocate_tcp_port() - location_hint = "tcp:127.0.0.1:%d" % (portnum,) - port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,) - - return location_hint, port_endpoint - - class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): # SystemTestMixin tests tend to be a lot of work, and we have a few @@ -517,6 +428,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): timeout = 300 def setUp(self): + self.port_assigner = SameProcessStreamEndpointAssigner() + self.port_assigner.setUp() + self.addCleanup(self.port_assigner.tearDown) + self.sparent = service.MultiService() self.sparent.startService() @@ -600,7 +515,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): statsdir = self.getdir("stats_gatherer") fileutil.make_dirs(statsdir) - location_hint, port_endpoint = assign_foolscap_port(self, reactor) + location_hint, port_endpoint = self.port_assigner.assign(reactor) fileutil.write(os.path.join(statsdir, "location"), location_hint) fileutil.write(os.path.join(statsdir, "port"), port_endpoint) self.stats_gatherer_svc = StatsGathererService(statsdir) @@ -697,7 +612,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): if self.stats_gatherer_furl: setclient("stats_gatherer.furl", self.stats_gatherer_furl) - tub_location_hint, tub_port_endpoint = assign_foolscap_port(self, reactor) + tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor) setnode("tub.port", tub_port_endpoint) setnode("tub.location", tub_location_hint) From 2898b2477b0f8ab2d9b0dae234ce06b10b813a67 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 28 Mar 2019 15:02:14 -0400 Subject: [PATCH 2/6] Use the port assigner for the web port config as well --- src/allmydata/test/test_system.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 0a79dfaa3..85117ea61 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -457,10 +457,11 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): """ iv_dir = self.getdir("introducer") if not os.path.isdir(iv_dir): + _, port_endpoint = self.port_assigner.assign(reactor) introducer_config = ( u"[node]\n" - u"nickname = introducer \N{BLACK SMILING FACE}\n" - u"web.port = tcp:0:interface=127.0.0.1\n" + u"nickname = introducer \N{BLACK SMILING FACE}\n" + + u"web.port = {}\n".format(port_endpoint) ).encode("utf-8") fileutil.make_dirs(iv_dir) @@ -616,7 +617,8 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): setnode("tub.port", tub_port_endpoint) setnode("tub.location", tub_location_hint) - setnode("web.port", "tcp:0:interface=127.0.0.1") + _, web_port_endpoint = self.port_assigner.assign(reactor) + setnode("web.port", web_port_endpoint) setnode("timeout.keepalive", "600") setnode("timeout.disconnect", "1800") From 4265cc8afd084c0ef52d385b5f5031b0ce6bee3c Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 28 Mar 2019 15:02:53 -0400 Subject: [PATCH 3/6] And for the NoNetworkGrid clients' web port config --- src/allmydata/test/no_network.py | 21 ++++++++++++++++----- src/allmydata/test/test_no_network.py | 23 ++++++++++++++++++----- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index daea27712..120b598f2 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -32,7 +32,10 @@ from allmydata.util import fileutil, idlib, hashutil from allmydata.util.hashutil import permute_server_hash from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.interfaces import IStorageBroker, IServer -from .common import TEST_RSA_KEY_SIZE +from .common import ( + TEST_RSA_KEY_SIZE, + SameProcessStreamEndpointAssigner, +) class IntentionalError(Exception): @@ -267,9 +270,10 @@ class SimpleStats: return ret class NoNetworkGrid(service.MultiService): - def __init__(self, basedir, num_clients=1, num_servers=10, - client_config_hooks={}): + def __init__(self, basedir, num_clients, num_servers, + client_config_hooks, port_assigner): service.MultiService.__init__(self) + self.port_assigner = port_assigner self.basedir = basedir fileutil.make_dirs(basedir) @@ -298,10 +302,12 @@ class NoNetworkGrid(service.MultiService): tahoe_cfg_path = os.path.join(clientdir, "tahoe.cfg") if write_config: + from twisted.internet import reactor + _, port_endpoint = self.port_assigner.assign(reactor) f = open(tahoe_cfg_path, "w") f.write("[node]\n") f.write("nickname = client-%d\n" % i) - f.write("web.port = tcp:0:interface=127.0.0.1\n") + f.write("web.port = {}\n".format(port_endpoint)) f.write("[storage]\n") f.write("enabled = false\n") f.close() @@ -409,10 +415,15 @@ class GridTestMixin(object): def set_up_grid(self, num_clients=1, num_servers=10, client_config_hooks={}, oneshare=False): # self.basedir must be set + port_assigner = SameProcessStreamEndpointAssigner() + port_assigner.setUp() + self.addCleanup(port_assigner.tearDown) self.g = NoNetworkGrid(self.basedir, num_clients=num_clients, num_servers=num_servers, - client_config_hooks=client_config_hooks) + client_config_hooks=client_config_hooks, + port_assigner=port_assigner, + ) self.g.setServiceParent(self.s) if oneshare: c = self.get_client(0) diff --git a/src/allmydata/test/test_no_network.py b/src/allmydata/test/test_no_network.py index 345662f7d..38c44de95 100644 --- a/src/allmydata/test/test_no_network.py +++ b/src/allmydata/test/test_no_network.py @@ -7,23 +7,37 @@ from allmydata.test.no_network import NoNetworkGrid from allmydata.immutable.upload import Data from allmydata.util.consumer import download_to_data +from .common import ( + SameProcessStreamEndpointAssigner, +) + class Harness(unittest.TestCase): def setUp(self): self.s = service.MultiService() self.s.startService() + self.addCleanup(self.s.stopService) + self.port_assigner = SameProcessStreamEndpointAssigner() + self.port_assigner.setUp() + self.addCleanup(self.port_assigner.tearDown) - def tearDown(self): - return self.s.stopService() + def grid(self, basedir): + return NoNetworkGrid( + basedir, + num_clients=1, + num_servers=10, + client_config_hooks={}, + port_assigner=self.port_assigner, + ) def test_create(self): basedir = "no_network/Harness/create" - g = NoNetworkGrid(basedir) + g = self.grid(basedir) g.startService() return g.stopService() def test_upload(self): basedir = "no_network/Harness/upload" - g = NoNetworkGrid(basedir) + g = self.grid(basedir) g.setServiceParent(self.s) c0 = g.clients[0] @@ -39,4 +53,3 @@ class Harness(unittest.TestCase): d.addCallback(_check) return d - From f5e287d3e99f35830c6f51365e6e7e90191fa443 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 28 Mar 2019 15:10:43 -0400 Subject: [PATCH 4/6] Use the port assigner in test_introducer --- src/allmydata/test/web/test_introducer.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/allmydata/test/web/test_introducer.py b/src/allmydata/test/web/test_introducer.py index 178275bb7..7b14e46e7 100644 --- a/src/allmydata/test/web/test_introducer.py +++ b/src/allmydata/test/web/test_introducer.py @@ -1,15 +1,24 @@ from os.path import join from twisted.trial import unittest +from twisted.internet import reactor from foolscap.api import fireEventually, flushEventualQueue from twisted.internet import defer from allmydata.introducer import create_introducer from allmydata import node -from .common import FAVICON_MARKUP +from .common import ( + FAVICON_MARKUP, +) +from ..common import ( + SameProcessStreamEndpointAssigner, +) from ..common_web import do_http class IntroducerWeb(unittest.TestCase): def setUp(self): self.node = None + self.port_assigner = SameProcessStreamEndpointAssigner() + self.port_assigner.setUp() + self.addCleanup(self.port_assigner.tearDown) def tearDown(self): d = defer.succeed(None) @@ -22,11 +31,12 @@ class IntroducerWeb(unittest.TestCase): def test_welcome(self): basedir = self.mktemp() node.create_node_dir(basedir, "testing") + _, port_endpoint = self.port_assigner.assign(reactor) with open(join(basedir, "tahoe.cfg"), "w") as f: f.write( "[node]\n" - "tub.location = 127.0.0.1:1\n" - "web.port = tcp:0\n" + "tub.location = 127.0.0.1:1\n" + + "web.port = {}\n".format(port_endpoint) ) self.node = yield create_introducer(basedir) From 6dc487b02a282defaddb9d74e4aa1459af9036b7 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 28 Mar 2019 15:13:07 -0400 Subject: [PATCH 5/6] News fragment --- newsfragments/3016.other | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/3016.other diff --git a/newsfragments/3016.other b/newsfragments/3016.other new file mode 100644 index 000000000..0af735c73 --- /dev/null +++ b/newsfragments/3016.other @@ -0,0 +1 @@ +allmydata.test.no_network, allmydata.test.test_system, and allmydata.test.web.test_introducer are now more reliable with respect to bound address collisions. From a7d18780f2a4af66dc42ae4fd298c98d4d8748bc Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 28 Mar 2019 16:06:25 -0400 Subject: [PATCH 6/6] try really hard to bind that random ephemeral port number --- src/allmydata/test/common.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index e24cc581a..cd48fcda4 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -20,6 +20,10 @@ from socket import ( SOCK_STREAM, SOMAXCONN, socket, + error as socket_error, +) +from errno import ( + EADDRINUSE, ) import treq @@ -100,6 +104,23 @@ class AdoptedServerPort(object): return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET) +def really_bind(s, addr): + # Arbitrarily decide we'll try 100 times. We don't want to try forever in + # case this is a persistent problem. Trying is cheap, though, so we may + # as well try a lot. Hopefully the OS isn't so bad at allocating a port + # for us that it takes more than 2 iterations. + for i in range(100): + try: + s.bind(addr) + except socket_error as e: + if e.errno == EADDRINUSE: + continue + raise + else: + return + raise Exception("Many bind attempts failed with EADDRINUSE") + + class SameProcessStreamEndpointAssigner(object): """ A fixture which can assign streaming server endpoints for use *in this @@ -157,7 +178,7 @@ class SameProcessStreamEndpointAssigner(object): # ``s`` alive and use it as the cleanup mechanism. self._cleanups.append(s.close) s.setblocking(False) - s.bind(("127.0.0.1", 0)) + really_bind(s, ("127.0.0.1", 0)) s.listen(SOMAXCONN) host, port = s.getsockname() location_hint = "tcp:%s:%d" % (host, port)