Move and somewhat refactor assign_foolscap_port.

These changes make it easier to re-use and remove the notion that it is
foolscap-specific.
This commit is contained in:
Jean-Paul Calderone 2019-03-28 14:54:45 -04:00
parent 8f828c510a
commit 7c97503687
3 changed files with 124 additions and 98 deletions

View File

@ -15,6 +15,12 @@ import tempfile
from tempfile import mktemp from tempfile import mktemp
from functools import partial from functools import partial
from unittest import case as _case from unittest import case as _case
from socket import (
AF_INET,
SOCK_STREAM,
SOMAXCONN,
socket,
)
import treq import treq
@ -32,12 +38,19 @@ from testtools.twistedsupport import (
flush_logged_errors, flush_logged_errors,
) )
from twisted.plugin import IPlugin
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.interfaces import IPullProducer from twisted.internet.interfaces import IPullProducer
from twisted.python import failure from twisted.python import failure
from twisted.python.filepath import FilePath
from twisted.application import service from twisted.application import service
from twisted.web.error import Error as WebError from twisted.web.error import Error as WebError
from twisted.internet.interfaces import (
IStreamServerEndpointStringParser,
IReactorSocket,
)
from twisted.internet.endpoints import AdoptedStreamServerEndpoint
from allmydata import uri from allmydata import uri
from allmydata.interfaces import IMutableFileNode, IImmutableFileNode,\ from allmydata.interfaces import IMutableFileNode, IImmutableFileNode,\
@ -50,7 +63,7 @@ from allmydata.storage_client import StubServer
from allmydata.mutable.layout import unpack_header from allmydata.mutable.layout import unpack_header
from allmydata.mutable.publish import MutableData from allmydata.mutable.publish import MutableData
from allmydata.storage.mutable import MutableShareFile from allmydata.storage.mutable import MutableShareFile
from allmydata.util import hashutil, log from allmydata.util import hashutil, log, iputil
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.consumer import download_to_data from allmydata.util.consumer import download_to_data
import allmydata.test.common_util as testutil import allmydata.test.common_util as testutil
@ -63,6 +76,104 @@ from .eliotutil import (
TEST_RSA_KEY_SIZE = 522 TEST_RSA_KEY_SIZE = 522
@implementer(IPlugin, IStreamServerEndpointStringParser)
class AdoptedServerPort(object):
"""
Parse an ``adopt-socket:<fd>`` endpoint description by adopting ``fd`` as
a listening TCP port.
"""
prefix = "adopt-socket"
def parseStreamServer(self, reactor, fd):
log.msg("Adopting {}".format(fd))
# AdoptedStreamServerEndpoint wants to own the file descriptor. It
# will duplicate it and then close the one we pass in. This means it
# is really only possible to adopt a particular file descriptor once.
#
# This wouldn't matter except one of the tests wants to stop one of
# the nodes and start it up again. This results in exactly an attempt
# to adopt a particular file descriptor twice.
#
# So we'll dup it ourselves. AdoptedStreamServerEndpoint can do
# whatever it wants to the result - the original will still be valid
# and reusable.
return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET)
class SameProcessStreamEndpointAssigner(object):
"""
A fixture which can assign streaming server endpoints for use *in this
process only*.
An effort is made to avoid address collisions for this port but the logic
for doing so is platform-dependent (sorry, Windows).
This is more reliable than trying to listen on a hard-coded non-zero port
number. It is at least as reliable as trying to listen on port number
zero on Windows and more reliable than doing that on other platforms.
"""
def setUp(self):
self._cleanups = []
def tearDown(self):
for c in self._cleanups:
c()
def _patch_plugins(self):
"""
Add the testing package ``plugins`` directory to the ``twisted.plugins``
aggregate package. Arrange for it to be removed again when the
fixture is torn down.
"""
import twisted.plugins
testplugins = FilePath(__file__).sibling("plugins")
twisted.plugins.__path__.insert(0, testplugins.path)
self._cleanups.append(lambda: twisted.plugins.__path__.remove(testplugins.path))
def assign(self, reactor):
"""
Make a new streaming server endpoint and return its string description.
This is intended to help write config files that will then be read and
used in this process.
:param reactor: The reactor which will be used to listen with the
resulting endpoint. If it provides ``IReactorSocket`` then
resulting reliability will be extremely high. If it doesn't,
resulting reliability will be pretty alright.
:return: A two-tuple of (location hint, port endpoint description) as
strings.
"""
if IReactorSocket.providedBy(reactor):
# On this platform, we can reliable pre-allocate a listening port.
# Once it is bound we know it will not fail later with EADDRINUSE.
s = socket(AF_INET, SOCK_STREAM)
# We need to keep ``s`` alive as long as the file descriptor we put in
# this string might still be used. We could dup() the descriptor
# instead but then we've only inverted the cleanup problem: gone from
# don't-close-too-soon to close-just-late-enough. So we'll leave
# ``s`` alive and use it as the cleanup mechanism.
self._cleanups.append(s.close)
s.setblocking(False)
s.bind(("127.0.0.1", 0))
s.listen(SOMAXCONN)
host, port = s.getsockname()
location_hint = "tcp:%s:%d" % (host, port)
port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),)
# Make sure `adopt-socket` is recognized. We do this instead of
# providing a dropin because we don't want to make this endpoint
# available to random other applications.
self._patch_plugins()
else:
# On other platforms, we blindly guess and hope we get lucky.
portnum = iputil.allocate_tcp_port()
location_hint = "tcp:127.0.0.1:%d" % (portnum,)
port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,)
return location_hint, port_endpoint
@implementer(IPullProducer) @implementer(IPullProducer)
class DummyProducer(object): class DummyProducer(object):
def resumeProducing(self): def resumeProducing(self):

View File

@ -1,4 +1,4 @@
from allmydata.test.test_system import ( from allmydata.test.common import (
AdoptedServerPort, AdoptedServerPort,
) )

View File

@ -2,27 +2,12 @@ from __future__ import print_function
import os, re, sys, time, json import os, re, sys, time, json
from functools import partial from functools import partial
from socket import (
AF_INET,
SOCK_STREAM,
SOMAXCONN,
socket,
)
from zope.interface import implementer
from twisted.python.filepath import FilePath
from twisted.internet import reactor from twisted.internet import reactor
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks from twisted.internet.defer import inlineCallbacks
from twisted.application import service from twisted.application import service
from twisted.plugin import IPlugin
from twisted.internet.interfaces import (
IStreamServerEndpointStringParser,
IReactorSocket,
)
from twisted.internet.endpoints import AdoptedStreamServerEndpoint
import allmydata import allmydata
from allmydata import client, uri from allmydata import client, uri
@ -32,7 +17,7 @@ from allmydata.storage.server import si_a2b
from allmydata.immutable import offloaded, upload from allmydata.immutable import offloaded, upload
from allmydata.immutable.literal import LiteralFileNode from allmydata.immutable.literal import LiteralFileNode
from allmydata.immutable.filenode import ImmutableFileNode from allmydata.immutable.filenode import ImmutableFileNode
from allmydata.util import idlib, mathutil, pollmixin, fileutil, iputil from allmydata.util import idlib, mathutil, pollmixin, fileutil
from allmydata.util import log, base32 from allmydata.util import log, base32
from allmydata.util.encodingutil import quote_output, unicode_to_argv from allmydata.util.encodingutil import quote_output, unicode_to_argv
from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.fileutil import abspath_expanduser_unicode
@ -48,7 +33,10 @@ from allmydata.mutable.publish import MutableData
from foolscap.api import DeadReferenceError, fireEventually, flushEventualQueue from foolscap.api import DeadReferenceError, fireEventually, flushEventualQueue
from twisted.python.failure import Failure from twisted.python.failure import Failure
from .common import TEST_RSA_KEY_SIZE from .common import (
TEST_RSA_KEY_SIZE,
SameProcessStreamEndpointAssigner,
)
from .common_web import do_http, Error from .common_web import do_http, Error
# TODO: move this to common or common_util # TODO: move this to common or common_util
@ -431,83 +419,6 @@ def _render_section_values(values):
)) ))
@implementer(IPlugin, IStreamServerEndpointStringParser)
class AdoptedServerPort(object):
"""
Parse an ``adopt-socket:<fd>`` endpoint description by adopting ``fd`` as
a listening TCP port.
"""
prefix = "adopt-socket"
def parseStreamServer(self, reactor, fd):
log.msg("Adopting {}".format(fd))
# AdoptedStreamServerEndpoint wants to own the file descriptor. It
# will duplicate it and then close the one we pass in. This means it
# is really only possible to adopt a particular file descriptor once.
#
# This wouldn't matter except one of the tests wants to stop one of
# the nodes and start it up again. This results in exactly an attempt
# to adopt a particular file descriptor twice.
#
# So we'll dup it ourselves. AdoptedStreamServerEndpoint can do
# whatever it wants to the result - the original will still be valid
# and reusable.
return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET)
def patch_plugins(testcase):
"""
Add the testing package ``plugins`` directory to the ``twisted.plugins``
aggregate package. Arrange for it to be removed again when the given test
is torn down.
"""
import twisted.plugins
testplugins = FilePath(__file__).sibling("plugins")
twisted.plugins.__path__.insert(0, testplugins.path)
testcase.addCleanup(lambda: twisted.plugins.__path__.remove(testplugins.path))
def assign_foolscap_port(testcase, reactor):
"""
Assign a TCP port which can be used for a Foolscap server.
An effort is made to avoid address collisions for this port but the logic
for doing so is platform-dependent (sorry, Windows).
The resulting TCP port can only be used in this process!
:return: A two-tuple of (location hint, port endpoint description) as
strings.
"""
if IReactorSocket.providedBy(reactor):
# On this platform, we can reliable pre-allocate a listening port.
# Once it is bound we know it will not fail later with EADDRINUSE.
s = socket(AF_INET, SOCK_STREAM)
s.setblocking(False)
s.bind(("127.0.0.1", 0))
s.listen(SOMAXCONN)
host, port = s.getsockname()
location_hint = "tcp:%s:%d" % (host, port)
# We need to keep ``s`` alive as long as the file descriptor we put in
# this string might still be used. We could dup() the descriptor
# instead but then we've only inverted the cleanup problem: gone from
# don't-close-too-soon to close-just-late-enough. So we'll leave
# ``s`` alive and use it as the cleanup mechanism.
port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),)
testcase.addCleanup(s.close)
# Make sure `adopt-socket` is recognized. We do this instead of
# providing a dropin because we don't want to make this endpoint
# available to random other applications.
patch_plugins(testcase)
else:
# On other platforms, we blindly guess and hope we get lucky.
portnum = iputil.allocate_tcp_port()
location_hint = "tcp:127.0.0.1:%d" % (portnum,)
port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,)
return location_hint, port_endpoint
class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# SystemTestMixin tests tend to be a lot of work, and we have a few # SystemTestMixin tests tend to be a lot of work, and we have a few
@ -517,6 +428,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
timeout = 300 timeout = 300
def setUp(self): def setUp(self):
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
self.sparent = service.MultiService() self.sparent = service.MultiService()
self.sparent.startService() self.sparent.startService()
@ -600,7 +515,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
statsdir = self.getdir("stats_gatherer") statsdir = self.getdir("stats_gatherer")
fileutil.make_dirs(statsdir) fileutil.make_dirs(statsdir)
location_hint, port_endpoint = assign_foolscap_port(self, reactor) location_hint, port_endpoint = self.port_assigner.assign(reactor)
fileutil.write(os.path.join(statsdir, "location"), location_hint) fileutil.write(os.path.join(statsdir, "location"), location_hint)
fileutil.write(os.path.join(statsdir, "port"), port_endpoint) fileutil.write(os.path.join(statsdir, "port"), port_endpoint)
self.stats_gatherer_svc = StatsGathererService(statsdir) self.stats_gatherer_svc = StatsGathererService(statsdir)
@ -697,7 +612,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
if self.stats_gatherer_furl: if self.stats_gatherer_furl:
setclient("stats_gatherer.furl", self.stats_gatherer_furl) setclient("stats_gatherer.furl", self.stats_gatherer_furl)
tub_location_hint, tub_port_endpoint = assign_foolscap_port(self, reactor) tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor)
setnode("tub.port", tub_port_endpoint) setnode("tub.port", tub_port_endpoint)
setnode("tub.location", tub_location_hint) setnode("tub.location", tub_location_hint)