tahoe-lafs/src/allmydata/test/test_iputil.py

139 lines
4.1 KiB
Python
Raw Normal View History

2020-07-27 11:46:03 -04:00
"""
Tests for allmydata.util.iputil.
Ported to Python 3.
"""
2020-07-27 11:14:01 -04:00
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2, native_str
if PY2:
2020-08-05 11:53:23 -04:00
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
2020-10-23 09:50:31 -04:00
import os, socket
2020-07-29 14:46:49 -04:00
import gc
2020-10-23 09:32:13 -04:00
from testtools.matchers import (
MatchesAll,
IsInstance,
AllMatch,
MatchesPredicate,
)
from twisted.trial import unittest
from tenacity import retry, stop_after_attempt
2020-07-27 13:06:41 -04:00
from foolscap.api import Tub
2020-07-29 14:46:49 -04:00
from allmydata.util import iputil, gcutil
2020-07-27 13:06:41 -04:00
2020-10-23 09:32:13 -04:00
from ..util.iputil import (
get_local_addresses_sync,
)
from .common import (
SyncTestCase,
)
2020-07-27 13:06:41 -04:00
class ListenOnUsed(unittest.TestCase):
"""Tests for listenOnUnused."""
def create_tub(self, basedir):
os.makedirs(basedir)
tubfile = os.path.join(basedir, "tub.pem")
tub = Tub(certFile=tubfile)
tub.setOption("expose-remote-exception-types", False)
tub.startService()
self.addCleanup(tub.stopService)
return tub
@retry(stop=stop_after_attempt(7))
2020-07-27 13:06:41 -04:00
def test_random_port(self):
"""A random port is selected if none is given."""
tub = self.create_tub("utils/ListenOnUsed/test_randomport")
self.assertEqual(len(tub.getListeners()), 0)
portnum = iputil.listenOnUnused(tub)
# We can connect to this port:
s = socket.socket()
s.connect(("127.0.0.1", portnum))
s.close()
self.assertEqual(len(tub.getListeners()), 1)
# Listen on another port:
tub2 = self.create_tub("utils/ListenOnUsed/test_randomport_2")
portnum2 = iputil.listenOnUnused(tub2)
self.assertNotEqual(portnum, portnum2)
2020-07-27 15:16:25 -04:00
@retry(stop=stop_after_attempt(7))
2020-07-27 15:16:25 -04:00
def test_specific_port(self):
"""The given port is used."""
tub = self.create_tub("utils/ListenOnUsed/test_givenport")
s = socket.socket()
s.bind(("127.0.0.1", 0))
port = s.getsockname()[1]
s.close()
port2 = iputil.listenOnUnused(tub, port)
self.assertEqual(port, port2)
2020-07-29 14:46:49 -04:00
class GcUtil(unittest.TestCase):
"""Tests for allmydata.util.gcutil, which is used only by listenOnUnused."""
def test_gc_after_allocations(self):
"""The resource tracker triggers allocations every 26 allocations."""
2020-08-03 10:43:21 -04:00
tracker = gcutil._ResourceTracker()
2020-07-29 14:46:49 -04:00
collections = []
self.patch(gc, "collect", lambda: collections.append(1))
for _ in range(2):
for _ in range(25):
2020-08-03 10:43:21 -04:00
tracker.allocate()
2020-07-29 14:46:49 -04:00
self.assertEqual(len(collections), 0)
2020-08-03 10:43:21 -04:00
tracker.allocate()
2020-07-29 14:46:49 -04:00
self.assertEqual(len(collections), 1)
del collections[:]
def test_release_delays_gc(self):
"""Releasing a file descriptor resource delays GC collection."""
2020-08-03 10:43:21 -04:00
tracker = gcutil._ResourceTracker()
2020-07-29 14:46:49 -04:00
collections = []
self.patch(gc, "collect", lambda: collections.append(1))
for _ in range(2):
2020-08-03 10:43:21 -04:00
tracker.allocate()
2020-07-29 14:46:49 -04:00
for _ in range(3):
2020-08-03 10:43:21 -04:00
tracker.release()
2020-07-29 14:46:49 -04:00
for _ in range(25):
2020-08-03 10:43:21 -04:00
tracker.allocate()
2020-07-29 14:46:49 -04:00
self.assertEqual(len(collections), 0)
2020-08-03 10:43:21 -04:00
tracker.allocate()
2020-07-29 14:46:49 -04:00
self.assertEqual(len(collections), 1)
2020-10-23 09:32:13 -04:00
class GetLocalAddressesSyncTests(SyncTestCase):
"""
Tests for ``get_local_addresses_sync``.
"""
def test_some_ipv4_addresses(self):
"""
``get_local_addresses_sync`` returns a list of IPv4 addresses as native
strings.
"""
self.assertThat(
get_local_addresses_sync(),
MatchesAll(
IsInstance(list),
AllMatch(
MatchesAll(
IsInstance(native_str),
MatchesPredicate(
lambda addr: socket.inet_pton(socket.AF_INET, addr),
"%r is not an IPv4 address.",
),
),
),
),
)