From 7c975036876025328b3d6f45ae5d11af6f75dc50 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 28 Mar 2019 14:54:45 -0400 Subject: [PATCH] Move and somewhat refactor `assign_foolscap_port`. These changes make it easier to re-use and remove the notion that it is foolscap-specific. --- src/allmydata/test/common.py | 113 +++++++++++++++++- .../test/plugins/tahoe_lafs_dropin.py | 2 +- src/allmydata/test/test_system.py | 107 ++--------------- 3 files changed, 124 insertions(+), 98 deletions(-) diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index 3d3d84593..e24cc581a 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -15,6 +15,12 @@ import tempfile from tempfile import mktemp from functools import partial from unittest import case as _case +from socket import ( + AF_INET, + SOCK_STREAM, + SOMAXCONN, + socket, +) import treq @@ -32,12 +38,19 @@ from testtools.twistedsupport import ( flush_logged_errors, ) +from twisted.plugin import IPlugin from twisted.internet import defer from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.interfaces import IPullProducer from twisted.python import failure +from twisted.python.filepath import FilePath from twisted.application import service from twisted.web.error import Error as WebError +from twisted.internet.interfaces import ( + IStreamServerEndpointStringParser, + IReactorSocket, +) +from twisted.internet.endpoints import AdoptedStreamServerEndpoint from allmydata import uri from allmydata.interfaces import IMutableFileNode, IImmutableFileNode,\ @@ -50,7 +63,7 @@ from allmydata.storage_client import StubServer from allmydata.mutable.layout import unpack_header from allmydata.mutable.publish import MutableData from allmydata.storage.mutable import MutableShareFile -from allmydata.util import hashutil, log +from allmydata.util import hashutil, log, iputil from allmydata.util.assertutil import precondition from allmydata.util.consumer import download_to_data import allmydata.test.common_util as testutil @@ -63,6 +76,104 @@ from .eliotutil import ( TEST_RSA_KEY_SIZE = 522 +@implementer(IPlugin, IStreamServerEndpointStringParser) +class AdoptedServerPort(object): + """ + Parse an ``adopt-socket:`` endpoint description by adopting ``fd`` as + a listening TCP port. + """ + prefix = "adopt-socket" + + def parseStreamServer(self, reactor, fd): + log.msg("Adopting {}".format(fd)) + # AdoptedStreamServerEndpoint wants to own the file descriptor. It + # will duplicate it and then close the one we pass in. This means it + # is really only possible to adopt a particular file descriptor once. + # + # This wouldn't matter except one of the tests wants to stop one of + # the nodes and start it up again. This results in exactly an attempt + # to adopt a particular file descriptor twice. + # + # So we'll dup it ourselves. AdoptedStreamServerEndpoint can do + # whatever it wants to the result - the original will still be valid + # and reusable. + return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET) + + +class SameProcessStreamEndpointAssigner(object): + """ + A fixture which can assign streaming server endpoints for use *in this + process only*. + + An effort is made to avoid address collisions for this port but the logic + for doing so is platform-dependent (sorry, Windows). + + This is more reliable than trying to listen on a hard-coded non-zero port + number. It is at least as reliable as trying to listen on port number + zero on Windows and more reliable than doing that on other platforms. + """ + def setUp(self): + self._cleanups = [] + + def tearDown(self): + for c in self._cleanups: + c() + + def _patch_plugins(self): + """ + Add the testing package ``plugins`` directory to the ``twisted.plugins`` + aggregate package. Arrange for it to be removed again when the + fixture is torn down. + """ + import twisted.plugins + testplugins = FilePath(__file__).sibling("plugins") + twisted.plugins.__path__.insert(0, testplugins.path) + self._cleanups.append(lambda: twisted.plugins.__path__.remove(testplugins.path)) + + + def assign(self, reactor): + """ + Make a new streaming server endpoint and return its string description. + + This is intended to help write config files that will then be read and + used in this process. + + :param reactor: The reactor which will be used to listen with the + resulting endpoint. If it provides ``IReactorSocket`` then + resulting reliability will be extremely high. If it doesn't, + resulting reliability will be pretty alright. + + :return: A two-tuple of (location hint, port endpoint description) as + strings. + """ + if IReactorSocket.providedBy(reactor): + # On this platform, we can reliable pre-allocate a listening port. + # Once it is bound we know it will not fail later with EADDRINUSE. + s = socket(AF_INET, SOCK_STREAM) + # We need to keep ``s`` alive as long as the file descriptor we put in + # this string might still be used. We could dup() the descriptor + # instead but then we've only inverted the cleanup problem: gone from + # don't-close-too-soon to close-just-late-enough. So we'll leave + # ``s`` alive and use it as the cleanup mechanism. + self._cleanups.append(s.close) + s.setblocking(False) + s.bind(("127.0.0.1", 0)) + s.listen(SOMAXCONN) + host, port = s.getsockname() + location_hint = "tcp:%s:%d" % (host, port) + port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),) + # Make sure `adopt-socket` is recognized. We do this instead of + # providing a dropin because we don't want to make this endpoint + # available to random other applications. + self._patch_plugins() + else: + # On other platforms, we blindly guess and hope we get lucky. + portnum = iputil.allocate_tcp_port() + location_hint = "tcp:127.0.0.1:%d" % (portnum,) + port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,) + + return location_hint, port_endpoint + @implementer(IPullProducer) class DummyProducer(object): def resumeProducing(self): diff --git a/src/allmydata/test/plugins/tahoe_lafs_dropin.py b/src/allmydata/test/plugins/tahoe_lafs_dropin.py index 110ef5ab8..9faf5f07f 100644 --- a/src/allmydata/test/plugins/tahoe_lafs_dropin.py +++ b/src/allmydata/test/plugins/tahoe_lafs_dropin.py @@ -1,4 +1,4 @@ -from allmydata.test.test_system import ( +from allmydata.test.common import ( AdoptedServerPort, ) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index c178ef609..0a79dfaa3 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -2,27 +2,12 @@ from __future__ import print_function import os, re, sys, time, json from functools import partial -from socket import ( - AF_INET, - SOCK_STREAM, - SOMAXCONN, - socket, -) -from zope.interface import implementer - -from twisted.python.filepath import FilePath from twisted.internet import reactor from twisted.trial import unittest from twisted.internet import defer from twisted.internet.defer import inlineCallbacks from twisted.application import service -from twisted.plugin import IPlugin -from twisted.internet.interfaces import ( - IStreamServerEndpointStringParser, - IReactorSocket, -) -from twisted.internet.endpoints import AdoptedStreamServerEndpoint import allmydata from allmydata import client, uri @@ -32,7 +17,7 @@ from allmydata.storage.server import si_a2b from allmydata.immutable import offloaded, upload from allmydata.immutable.literal import LiteralFileNode from allmydata.immutable.filenode import ImmutableFileNode -from allmydata.util import idlib, mathutil, pollmixin, fileutil, iputil +from allmydata.util import idlib, mathutil, pollmixin, fileutil from allmydata.util import log, base32 from allmydata.util.encodingutil import quote_output, unicode_to_argv from allmydata.util.fileutil import abspath_expanduser_unicode @@ -48,7 +33,10 @@ from allmydata.mutable.publish import MutableData from foolscap.api import DeadReferenceError, fireEventually, flushEventualQueue from twisted.python.failure import Failure -from .common import TEST_RSA_KEY_SIZE +from .common import ( + TEST_RSA_KEY_SIZE, + SameProcessStreamEndpointAssigner, +) from .common_web import do_http, Error # TODO: move this to common or common_util @@ -431,83 +419,6 @@ def _render_section_values(values): )) -@implementer(IPlugin, IStreamServerEndpointStringParser) -class AdoptedServerPort(object): - """ - Parse an ``adopt-socket:`` endpoint description by adopting ``fd`` as - a listening TCP port. - """ - prefix = "adopt-socket" - - def parseStreamServer(self, reactor, fd): - log.msg("Adopting {}".format(fd)) - # AdoptedStreamServerEndpoint wants to own the file descriptor. It - # will duplicate it and then close the one we pass in. This means it - # is really only possible to adopt a particular file descriptor once. - # - # This wouldn't matter except one of the tests wants to stop one of - # the nodes and start it up again. This results in exactly an attempt - # to adopt a particular file descriptor twice. - # - # So we'll dup it ourselves. AdoptedStreamServerEndpoint can do - # whatever it wants to the result - the original will still be valid - # and reusable. - return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET) - - -def patch_plugins(testcase): - """ - Add the testing package ``plugins`` directory to the ``twisted.plugins`` - aggregate package. Arrange for it to be removed again when the given test - is torn down. - """ - import twisted.plugins - testplugins = FilePath(__file__).sibling("plugins") - twisted.plugins.__path__.insert(0, testplugins.path) - testcase.addCleanup(lambda: twisted.plugins.__path__.remove(testplugins.path)) - - -def assign_foolscap_port(testcase, reactor): - """ - Assign a TCP port which can be used for a Foolscap server. - - An effort is made to avoid address collisions for this port but the logic - for doing so is platform-dependent (sorry, Windows). - - The resulting TCP port can only be used in this process! - - :return: A two-tuple of (location hint, port endpoint description) as - strings. - """ - if IReactorSocket.providedBy(reactor): - # On this platform, we can reliable pre-allocate a listening port. - # Once it is bound we know it will not fail later with EADDRINUSE. - s = socket(AF_INET, SOCK_STREAM) - s.setblocking(False) - s.bind(("127.0.0.1", 0)) - s.listen(SOMAXCONN) - host, port = s.getsockname() - location_hint = "tcp:%s:%d" % (host, port) - # We need to keep ``s`` alive as long as the file descriptor we put in - # this string might still be used. We could dup() the descriptor - # instead but then we've only inverted the cleanup problem: gone from - # don't-close-too-soon to close-just-late-enough. So we'll leave - # ``s`` alive and use it as the cleanup mechanism. - port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),) - testcase.addCleanup(s.close) - # Make sure `adopt-socket` is recognized. We do this instead of - # providing a dropin because we don't want to make this endpoint - # available to random other applications. - patch_plugins(testcase) - else: - # On other platforms, we blindly guess and hope we get lucky. - portnum = iputil.allocate_tcp_port() - location_hint = "tcp:127.0.0.1:%d" % (portnum,) - port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,) - - return location_hint, port_endpoint - - class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): # SystemTestMixin tests tend to be a lot of work, and we have a few @@ -517,6 +428,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): timeout = 300 def setUp(self): + self.port_assigner = SameProcessStreamEndpointAssigner() + self.port_assigner.setUp() + self.addCleanup(self.port_assigner.tearDown) + self.sparent = service.MultiService() self.sparent.startService() @@ -600,7 +515,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): statsdir = self.getdir("stats_gatherer") fileutil.make_dirs(statsdir) - location_hint, port_endpoint = assign_foolscap_port(self, reactor) + location_hint, port_endpoint = self.port_assigner.assign(reactor) fileutil.write(os.path.join(statsdir, "location"), location_hint) fileutil.write(os.path.join(statsdir, "port"), port_endpoint) self.stats_gatherer_svc = StatsGathererService(statsdir) @@ -697,7 +612,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): if self.stats_gatherer_furl: setclient("stats_gatherer.furl", self.stats_gatherer_furl) - tub_location_hint, tub_port_endpoint = assign_foolscap_port(self, reactor) + tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor) setnode("tub.port", tub_port_endpoint) setnode("tub.location", tub_location_hint)