Merge pull request #590 from tahoe-lafs/3016.address-already-in-use

Fix more "Address already in use" errors

Fixes: ticket:3016
This commit is contained in:
Jean-Paul Calderone 2019-03-28 19:01:44 -04:00 committed by GitHub
commit 7511b5956b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 198 additions and 114 deletions

1
newsfragments/3016.other Normal file
View File

@ -0,0 +1 @@
allmydata.test.no_network, allmydata.test.test_system, and allmydata.test.web.test_introducer are now more reliable with respect to bound address collisions.

View File

@ -15,6 +15,16 @@ import tempfile
from tempfile import mktemp from tempfile import mktemp
from functools import partial from functools import partial
from unittest import case as _case from unittest import case as _case
from socket import (
AF_INET,
SOCK_STREAM,
SOMAXCONN,
socket,
error as socket_error,
)
from errno import (
EADDRINUSE,
)
import treq import treq
@ -32,12 +42,19 @@ from testtools.twistedsupport import (
flush_logged_errors, flush_logged_errors,
) )
from twisted.plugin import IPlugin
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.interfaces import IPullProducer from twisted.internet.interfaces import IPullProducer
from twisted.python import failure from twisted.python import failure
from twisted.python.filepath import FilePath
from twisted.application import service from twisted.application import service
from twisted.web.error import Error as WebError from twisted.web.error import Error as WebError
from twisted.internet.interfaces import (
IStreamServerEndpointStringParser,
IReactorSocket,
)
from twisted.internet.endpoints import AdoptedStreamServerEndpoint
from allmydata import uri from allmydata import uri
from allmydata.interfaces import IMutableFileNode, IImmutableFileNode,\ from allmydata.interfaces import IMutableFileNode, IImmutableFileNode,\
@ -50,7 +67,7 @@ from allmydata.storage_client import StubServer
from allmydata.mutable.layout import unpack_header from allmydata.mutable.layout import unpack_header
from allmydata.mutable.publish import MutableData from allmydata.mutable.publish import MutableData
from allmydata.storage.mutable import MutableShareFile from allmydata.storage.mutable import MutableShareFile
from allmydata.util import hashutil, log from allmydata.util import hashutil, log, iputil
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.consumer import download_to_data from allmydata.util.consumer import download_to_data
import allmydata.test.common_util as testutil import allmydata.test.common_util as testutil
@ -63,6 +80,121 @@ from .eliotutil import (
TEST_RSA_KEY_SIZE = 522 TEST_RSA_KEY_SIZE = 522
@implementer(IPlugin, IStreamServerEndpointStringParser)
class AdoptedServerPort(object):
"""
Parse an ``adopt-socket:<fd>`` endpoint description by adopting ``fd`` as
a listening TCP port.
"""
prefix = "adopt-socket"
def parseStreamServer(self, reactor, fd):
log.msg("Adopting {}".format(fd))
# AdoptedStreamServerEndpoint wants to own the file descriptor. It
# will duplicate it and then close the one we pass in. This means it
# is really only possible to adopt a particular file descriptor once.
#
# This wouldn't matter except one of the tests wants to stop one of
# the nodes and start it up again. This results in exactly an attempt
# to adopt a particular file descriptor twice.
#
# So we'll dup it ourselves. AdoptedStreamServerEndpoint can do
# whatever it wants to the result - the original will still be valid
# and reusable.
return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET)
def really_bind(s, addr):
# Arbitrarily decide we'll try 100 times. We don't want to try forever in
# case this is a persistent problem. Trying is cheap, though, so we may
# as well try a lot. Hopefully the OS isn't so bad at allocating a port
# for us that it takes more than 2 iterations.
for i in range(100):
try:
s.bind(addr)
except socket_error as e:
if e.errno == EADDRINUSE:
continue
raise
else:
return
raise Exception("Many bind attempts failed with EADDRINUSE")
class SameProcessStreamEndpointAssigner(object):
"""
A fixture which can assign streaming server endpoints for use *in this
process only*.
An effort is made to avoid address collisions for this port but the logic
for doing so is platform-dependent (sorry, Windows).
This is more reliable than trying to listen on a hard-coded non-zero port
number. It is at least as reliable as trying to listen on port number
zero on Windows and more reliable than doing that on other platforms.
"""
def setUp(self):
self._cleanups = []
def tearDown(self):
for c in self._cleanups:
c()
def _patch_plugins(self):
"""
Add the testing package ``plugins`` directory to the ``twisted.plugins``
aggregate package. Arrange for it to be removed again when the
fixture is torn down.
"""
import twisted.plugins
testplugins = FilePath(__file__).sibling("plugins")
twisted.plugins.__path__.insert(0, testplugins.path)
self._cleanups.append(lambda: twisted.plugins.__path__.remove(testplugins.path))
def assign(self, reactor):
"""
Make a new streaming server endpoint and return its string description.
This is intended to help write config files that will then be read and
used in this process.
:param reactor: The reactor which will be used to listen with the
resulting endpoint. If it provides ``IReactorSocket`` then
resulting reliability will be extremely high. If it doesn't,
resulting reliability will be pretty alright.
:return: A two-tuple of (location hint, port endpoint description) as
strings.
"""
if IReactorSocket.providedBy(reactor):
# On this platform, we can reliable pre-allocate a listening port.
# Once it is bound we know it will not fail later with EADDRINUSE.
s = socket(AF_INET, SOCK_STREAM)
# We need to keep ``s`` alive as long as the file descriptor we put in
# this string might still be used. We could dup() the descriptor
# instead but then we've only inverted the cleanup problem: gone from
# don't-close-too-soon to close-just-late-enough. So we'll leave
# ``s`` alive and use it as the cleanup mechanism.
self._cleanups.append(s.close)
s.setblocking(False)
really_bind(s, ("127.0.0.1", 0))
s.listen(SOMAXCONN)
host, port = s.getsockname()
location_hint = "tcp:%s:%d" % (host, port)
port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),)
# Make sure `adopt-socket` is recognized. We do this instead of
# providing a dropin because we don't want to make this endpoint
# available to random other applications.
self._patch_plugins()
else:
# On other platforms, we blindly guess and hope we get lucky.
portnum = iputil.allocate_tcp_port()
location_hint = "tcp:127.0.0.1:%d" % (portnum,)
port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,)
return location_hint, port_endpoint
@implementer(IPullProducer) @implementer(IPullProducer)
class DummyProducer(object): class DummyProducer(object):
def resumeProducing(self): def resumeProducing(self):

View File

@ -32,7 +32,10 @@ from allmydata.util import fileutil, idlib, hashutil
from allmydata.util.hashutil import permute_server_hash from allmydata.util.hashutil import permute_server_hash
from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.interfaces import IStorageBroker, IServer from allmydata.interfaces import IStorageBroker, IServer
from .common import TEST_RSA_KEY_SIZE from .common import (
TEST_RSA_KEY_SIZE,
SameProcessStreamEndpointAssigner,
)
class IntentionalError(Exception): class IntentionalError(Exception):
@ -267,9 +270,10 @@ class SimpleStats:
return ret return ret
class NoNetworkGrid(service.MultiService): class NoNetworkGrid(service.MultiService):
def __init__(self, basedir, num_clients=1, num_servers=10, def __init__(self, basedir, num_clients, num_servers,
client_config_hooks={}): client_config_hooks, port_assigner):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self.port_assigner = port_assigner
self.basedir = basedir self.basedir = basedir
fileutil.make_dirs(basedir) fileutil.make_dirs(basedir)
@ -298,10 +302,12 @@ class NoNetworkGrid(service.MultiService):
tahoe_cfg_path = os.path.join(clientdir, "tahoe.cfg") tahoe_cfg_path = os.path.join(clientdir, "tahoe.cfg")
if write_config: if write_config:
from twisted.internet import reactor
_, port_endpoint = self.port_assigner.assign(reactor)
f = open(tahoe_cfg_path, "w") f = open(tahoe_cfg_path, "w")
f.write("[node]\n") f.write("[node]\n")
f.write("nickname = client-%d\n" % i) f.write("nickname = client-%d\n" % i)
f.write("web.port = tcp:0:interface=127.0.0.1\n") f.write("web.port = {}\n".format(port_endpoint))
f.write("[storage]\n") f.write("[storage]\n")
f.write("enabled = false\n") f.write("enabled = false\n")
f.close() f.close()
@ -409,10 +415,15 @@ class GridTestMixin(object):
def set_up_grid(self, num_clients=1, num_servers=10, def set_up_grid(self, num_clients=1, num_servers=10,
client_config_hooks={}, oneshare=False): client_config_hooks={}, oneshare=False):
# self.basedir must be set # self.basedir must be set
port_assigner = SameProcessStreamEndpointAssigner()
port_assigner.setUp()
self.addCleanup(port_assigner.tearDown)
self.g = NoNetworkGrid(self.basedir, self.g = NoNetworkGrid(self.basedir,
num_clients=num_clients, num_clients=num_clients,
num_servers=num_servers, num_servers=num_servers,
client_config_hooks=client_config_hooks) client_config_hooks=client_config_hooks,
port_assigner=port_assigner,
)
self.g.setServiceParent(self.s) self.g.setServiceParent(self.s)
if oneshare: if oneshare:
c = self.get_client(0) c = self.get_client(0)

View File

@ -1,4 +1,4 @@
from allmydata.test.test_system import ( from allmydata.test.common import (
AdoptedServerPort, AdoptedServerPort,
) )

View File

@ -7,23 +7,37 @@ from allmydata.test.no_network import NoNetworkGrid
from allmydata.immutable.upload import Data from allmydata.immutable.upload import Data
from allmydata.util.consumer import download_to_data from allmydata.util.consumer import download_to_data
from .common import (
SameProcessStreamEndpointAssigner,
)
class Harness(unittest.TestCase): class Harness(unittest.TestCase):
def setUp(self): def setUp(self):
self.s = service.MultiService() self.s = service.MultiService()
self.s.startService() self.s.startService()
self.addCleanup(self.s.stopService)
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
def tearDown(self): def grid(self, basedir):
return self.s.stopService() return NoNetworkGrid(
basedir,
num_clients=1,
num_servers=10,
client_config_hooks={},
port_assigner=self.port_assigner,
)
def test_create(self): def test_create(self):
basedir = "no_network/Harness/create" basedir = "no_network/Harness/create"
g = NoNetworkGrid(basedir) g = self.grid(basedir)
g.startService() g.startService()
return g.stopService() return g.stopService()
def test_upload(self): def test_upload(self):
basedir = "no_network/Harness/upload" basedir = "no_network/Harness/upload"
g = NoNetworkGrid(basedir) g = self.grid(basedir)
g.setServiceParent(self.s) g.setServiceParent(self.s)
c0 = g.clients[0] c0 = g.clients[0]
@ -39,4 +53,3 @@ class Harness(unittest.TestCase):
d.addCallback(_check) d.addCallback(_check)
return d return d

View File

@ -2,27 +2,12 @@ from __future__ import print_function
import os, re, sys, time, json import os, re, sys, time, json
from functools import partial from functools import partial
from socket import (
AF_INET,
SOCK_STREAM,
SOMAXCONN,
socket,
)
from zope.interface import implementer
from twisted.python.filepath import FilePath
from twisted.internet import reactor from twisted.internet import reactor
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks from twisted.internet.defer import inlineCallbacks
from twisted.application import service from twisted.application import service
from twisted.plugin import IPlugin
from twisted.internet.interfaces import (
IStreamServerEndpointStringParser,
IReactorSocket,
)
from twisted.internet.endpoints import AdoptedStreamServerEndpoint
import allmydata import allmydata
from allmydata import client, uri from allmydata import client, uri
@ -32,7 +17,7 @@ from allmydata.storage.server import si_a2b
from allmydata.immutable import offloaded, upload from allmydata.immutable import offloaded, upload
from allmydata.immutable.literal import LiteralFileNode from allmydata.immutable.literal import LiteralFileNode
from allmydata.immutable.filenode import ImmutableFileNode from allmydata.immutable.filenode import ImmutableFileNode
from allmydata.util import idlib, mathutil, pollmixin, fileutil, iputil from allmydata.util import idlib, mathutil, pollmixin, fileutil
from allmydata.util import log, base32 from allmydata.util import log, base32
from allmydata.util.encodingutil import quote_output, unicode_to_argv from allmydata.util.encodingutil import quote_output, unicode_to_argv
from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.fileutil import abspath_expanduser_unicode
@ -48,7 +33,10 @@ from allmydata.mutable.publish import MutableData
from foolscap.api import DeadReferenceError, fireEventually, flushEventualQueue from foolscap.api import DeadReferenceError, fireEventually, flushEventualQueue
from twisted.python.failure import Failure from twisted.python.failure import Failure
from .common import TEST_RSA_KEY_SIZE from .common import (
TEST_RSA_KEY_SIZE,
SameProcessStreamEndpointAssigner,
)
from .common_web import do_http, Error from .common_web import do_http, Error
# TODO: move this to common or common_util # TODO: move this to common or common_util
@ -431,83 +419,6 @@ def _render_section_values(values):
)) ))
@implementer(IPlugin, IStreamServerEndpointStringParser)
class AdoptedServerPort(object):
"""
Parse an ``adopt-socket:<fd>`` endpoint description by adopting ``fd`` as
a listening TCP port.
"""
prefix = "adopt-socket"
def parseStreamServer(self, reactor, fd):
log.msg("Adopting {}".format(fd))
# AdoptedStreamServerEndpoint wants to own the file descriptor. It
# will duplicate it and then close the one we pass in. This means it
# is really only possible to adopt a particular file descriptor once.
#
# This wouldn't matter except one of the tests wants to stop one of
# the nodes and start it up again. This results in exactly an attempt
# to adopt a particular file descriptor twice.
#
# So we'll dup it ourselves. AdoptedStreamServerEndpoint can do
# whatever it wants to the result - the original will still be valid
# and reusable.
return AdoptedStreamServerEndpoint(reactor, os.dup(int(fd)), AF_INET)
def patch_plugins(testcase):
"""
Add the testing package ``plugins`` directory to the ``twisted.plugins``
aggregate package. Arrange for it to be removed again when the given test
is torn down.
"""
import twisted.plugins
testplugins = FilePath(__file__).sibling("plugins")
twisted.plugins.__path__.insert(0, testplugins.path)
testcase.addCleanup(lambda: twisted.plugins.__path__.remove(testplugins.path))
def assign_foolscap_port(testcase, reactor):
"""
Assign a TCP port which can be used for a Foolscap server.
An effort is made to avoid address collisions for this port but the logic
for doing so is platform-dependent (sorry, Windows).
The resulting TCP port can only be used in this process!
:return: A two-tuple of (location hint, port endpoint description) as
strings.
"""
if IReactorSocket.providedBy(reactor):
# On this platform, we can reliable pre-allocate a listening port.
# Once it is bound we know it will not fail later with EADDRINUSE.
s = socket(AF_INET, SOCK_STREAM)
s.setblocking(False)
s.bind(("127.0.0.1", 0))
s.listen(SOMAXCONN)
host, port = s.getsockname()
location_hint = "tcp:%s:%d" % (host, port)
# We need to keep ``s`` alive as long as the file descriptor we put in
# this string might still be used. We could dup() the descriptor
# instead but then we've only inverted the cleanup problem: gone from
# don't-close-too-soon to close-just-late-enough. So we'll leave
# ``s`` alive and use it as the cleanup mechanism.
port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),)
testcase.addCleanup(s.close)
# Make sure `adopt-socket` is recognized. We do this instead of
# providing a dropin because we don't want to make this endpoint
# available to random other applications.
patch_plugins(testcase)
else:
# On other platforms, we blindly guess and hope we get lucky.
portnum = iputil.allocate_tcp_port()
location_hint = "tcp:127.0.0.1:%d" % (portnum,)
port_endpoint = "tcp:%d:interface=127.0.0.1" % (portnum,)
return location_hint, port_endpoint
class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# SystemTestMixin tests tend to be a lot of work, and we have a few # SystemTestMixin tests tend to be a lot of work, and we have a few
@ -517,6 +428,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
timeout = 300 timeout = 300
def setUp(self): def setUp(self):
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
self.sparent = service.MultiService() self.sparent = service.MultiService()
self.sparent.startService() self.sparent.startService()
@ -542,10 +457,11 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
""" """
iv_dir = self.getdir("introducer") iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir): if not os.path.isdir(iv_dir):
_, port_endpoint = self.port_assigner.assign(reactor)
introducer_config = ( introducer_config = (
u"[node]\n" u"[node]\n"
u"nickname = introducer \N{BLACK SMILING FACE}\n" u"nickname = introducer \N{BLACK SMILING FACE}\n" +
u"web.port = tcp:0:interface=127.0.0.1\n" u"web.port = {}\n".format(port_endpoint)
).encode("utf-8") ).encode("utf-8")
fileutil.make_dirs(iv_dir) fileutil.make_dirs(iv_dir)
@ -600,7 +516,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
statsdir = self.getdir("stats_gatherer") statsdir = self.getdir("stats_gatherer")
fileutil.make_dirs(statsdir) fileutil.make_dirs(statsdir)
location_hint, port_endpoint = assign_foolscap_port(self, reactor) location_hint, port_endpoint = self.port_assigner.assign(reactor)
fileutil.write(os.path.join(statsdir, "location"), location_hint) fileutil.write(os.path.join(statsdir, "location"), location_hint)
fileutil.write(os.path.join(statsdir, "port"), port_endpoint) fileutil.write(os.path.join(statsdir, "port"), port_endpoint)
self.stats_gatherer_svc = StatsGathererService(statsdir) self.stats_gatherer_svc = StatsGathererService(statsdir)
@ -697,11 +613,12 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
if self.stats_gatherer_furl: if self.stats_gatherer_furl:
setclient("stats_gatherer.furl", self.stats_gatherer_furl) setclient("stats_gatherer.furl", self.stats_gatherer_furl)
tub_location_hint, tub_port_endpoint = assign_foolscap_port(self, reactor) tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor)
setnode("tub.port", tub_port_endpoint) setnode("tub.port", tub_port_endpoint)
setnode("tub.location", tub_location_hint) setnode("tub.location", tub_location_hint)
setnode("web.port", "tcp:0:interface=127.0.0.1") _, web_port_endpoint = self.port_assigner.assign(reactor)
setnode("web.port", web_port_endpoint)
setnode("timeout.keepalive", "600") setnode("timeout.keepalive", "600")
setnode("timeout.disconnect", "1800") setnode("timeout.disconnect", "1800")

View File

@ -1,15 +1,24 @@
from os.path import join from os.path import join
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import reactor
from foolscap.api import fireEventually, flushEventualQueue from foolscap.api import fireEventually, flushEventualQueue
from twisted.internet import defer from twisted.internet import defer
from allmydata.introducer import create_introducer from allmydata.introducer import create_introducer
from allmydata import node from allmydata import node
from .common import FAVICON_MARKUP from .common import (
FAVICON_MARKUP,
)
from ..common import (
SameProcessStreamEndpointAssigner,
)
from ..common_web import do_http from ..common_web import do_http
class IntroducerWeb(unittest.TestCase): class IntroducerWeb(unittest.TestCase):
def setUp(self): def setUp(self):
self.node = None self.node = None
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
def tearDown(self): def tearDown(self):
d = defer.succeed(None) d = defer.succeed(None)
@ -22,11 +31,12 @@ class IntroducerWeb(unittest.TestCase):
def test_welcome(self): def test_welcome(self):
basedir = self.mktemp() basedir = self.mktemp()
node.create_node_dir(basedir, "testing") node.create_node_dir(basedir, "testing")
_, port_endpoint = self.port_assigner.assign(reactor)
with open(join(basedir, "tahoe.cfg"), "w") as f: with open(join(basedir, "tahoe.cfg"), "w") as f:
f.write( f.write(
"[node]\n" "[node]\n"
"tub.location = 127.0.0.1:1\n" "tub.location = 127.0.0.1:1\n" +
"web.port = tcp:0\n" "web.port = {}\n".format(port_endpoint)
) )
self.node = yield create_introducer(basedir) self.node = yield create_introducer(basedir)