An adopted-port-endpoint-based fix for the collision

This commit is contained in:
Jean-Paul Calderone 2018-08-07 15:26:35 -04:00
parent 9ec8ec814a
commit 0ca6b8ed52
2 changed files with 89 additions and 10 deletions

View File

@ -1,11 +1,26 @@
import os, re, sys, time, json
from functools import partial
from socket import (
AF_INET,
SOCK_STREAM,
SOMAXCONN,
socket,
)
from zope.interface import implementer
from twisted.internet import reactor
from twisted.trial import unittest
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks
from twisted.application import service
from twisted.plugin import IPlugin
from twisted.internet.interfaces import (
IStreamServerEndpointStringParser,
IReactorSocket,
)
from twisted.internet.endpoints import AdoptedStreamServerEndpoint
import allmydata
from allmydata import client, uri
@ -413,6 +428,68 @@ def _render_section_values(values):
in sorted(values.items())
))
@implementer(IPlugin, IStreamServerEndpointStringParser)
class AdoptedServerPort(object):
"""
Parse an ``adopt-socket:<fd>`` endpoint description by adopting ``fd`` as
a listening TCP port.
"""
prefix = "adopt-socket"
def parseStreamServer(self, reactor, fd):
log.msg("Adopting {}".format(fd))
# AdoptedStreamServerEndpoint wants to own the file descriptor. It
# will duplicate it and then close the one we pass in. This means it
# is really only possible to adopt a particular file descriptor once.
#
# This wouldn't matter except one of the tests wants to stop one of
# the nodes and start it up again. This results in exactly an attempt
# to adopt a particular file descriptor twice.
#
# So we'll dup it ourselves. AdoptedStreamServerEndpoint can do
# whatever it wants to the result - the original will still be valid
# and reusable.
return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET)
def assign_foolscap_port(testcase, reactor):
"""
Assign a TCP port which can be used for a Foolscap server.
An effort is made to avoid address collisions for this port but the logic
for doing so is platform-dependent (sorry, Windows).
The resulting TCP port can only be used in this process!
:return: A two-tuple of (location hint, port endpoint description) as
strings.
"""
if IReactorSocket.providedBy(reactor):
# On this platform, we can reliable pre-allocate a listening port.
# Once it is bound we know it will not fail later with EADDRINUSE.
s = socket(AF_INET, SOCK_STREAM)
s.setblocking(False)
s.bind(("127.0.0.1", 0))
s.listen(SOMAXCONN)
host, port = s.getsockname()
location_hint = "tcp:%s:%d" % (host, port)
# We need to keep ``s`` alive as long as the file descriptor we put in
# this string might still be used. We could dup() the descriptor
# instead but then we've only inverted the cleanup problem: gone from
# don't-close-too-soon to close-just-late-enough. So we'll leave
# ``s`` alive and use it as the cleanup mechanism.
port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),)
testcase.addCleanup(s.close)
else:
# On other platforms, we blindly guess and hope we get lucky.
portnum = iputil.allocate_tcp_port()
location_hint = "tcp:127.0.0.1:%d" % (portnum,)
port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,)
return location_hint, port_endpoint
class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# SystemTestMixin tests tend to be a lot of work, and we have a few
@ -500,11 +577,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
def _set_up_stats_gatherer(self):
statsdir = self.getdir("stats_gatherer")
fileutil.make_dirs(statsdir)
portnum = iputil.allocate_tcp_port()
location = "tcp:127.0.0.1:%d" % portnum
fileutil.write(os.path.join(statsdir, "location"), location)
port = "tcp:%d:interface=127.0.0.1" % portnum
fileutil.write(os.path.join(statsdir, "port"), port)
location_hint, port_endpoint = assign_foolscap_port(self, reactor)
fileutil.write(os.path.join(statsdir, "location"), location_hint)
fileutil.write(os.path.join(statsdir, "port"), port_endpoint)
self.stats_gatherer_svc = StatsGathererService(statsdir)
self.stats_gatherer = self.stats_gatherer_svc.stats_gatherer
self.add_service(self.stats_gatherer_svc)
@ -597,11 +673,9 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
if self.stats_gatherer_furl:
setclient("stats_gatherer.furl", self.stats_gatherer_furl)
tub_port = iputil.allocate_tcp_port()
# Don't let it use AUTO: there's no need for tests to use
# anything other than 127.0.0.1
setnode("tub.port", "tcp:%d" % (tub_port,))
setnode("tub.location", "tcp:127.0.0.1:%d" % (tub_port,))
tub_location_hint, tub_port_endpoint = assign_foolscap_port(self, reactor)
setnode("tub.port", tub_port_endpoint)
setnode("tub.location", tub_location_hint)
setnode("web.port", "tcp:0:interface=127.0.0.1")
setnode("timeout.keepalive", "600")

View File

@ -0,0 +1,5 @@
from allmydata.test.test_system import (
AdoptedServerPort,
)
adoptedEndpointParser = AdoptedServerPort()