From b31acb790a3d280a41601187436ab9cf37751877 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Wed, 1 May 2019 13:30:14 -0400 Subject: [PATCH 1/3] Try to clean up the fds created by listenOnUnused that might leak --- src/allmydata/util/iputil.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/allmydata/util/iputil.py b/src/allmydata/util/iputil.py index 614d784f8..f91322d5c 100644 --- a/src/allmydata/util/iputil.py +++ b/src/allmydata/util/iputil.py @@ -2,6 +2,10 @@ import os, re, socket, subprocess, errno from sys import platform +from zope.interface import implementer + +import attr + # from Twisted from twisted.python.reflect import requireModule from twisted.internet import defer, threads, reactor @@ -10,7 +14,10 @@ from twisted.internet.error import CannotListenError from twisted.python.procutils import which from twisted.python import log from twisted.internet.endpoints import AdoptedStreamServerEndpoint -from twisted.internet.interfaces import IReactorSocket +from twisted.internet.interfaces import ( + IReactorSocket, + IStreamServerEndpoint, +) fcntl = requireModule("fcntl") @@ -272,10 +279,8 @@ def _foolscapEndpointForPortNumber(portnum): flags = fcntl.fcntl(fd, fcntl.F_GETFD) flags = flags | os.O_NONBLOCK | fcntl.FD_CLOEXEC fcntl.fcntl(fd, fcntl.F_SETFD, flags) - return ( - portnum, - AdoptedStreamServerEndpoint(reactor, fd, socket.AF_INET), - ) + endpoint = AdoptedStreamServerEndpoint(reactor, fd, socket.AF_INET) + return (portnum, CleanupEndpoint(endpoint, fd)) finally: s.close() else: @@ -287,6 +292,22 @@ def _foolscapEndpointForPortNumber(portnum): return (portnum, "tcp:%d" % (portnum,)) +@implementer(IStreamServerEndpoint) +@attr.s +class CleanupEndpoint(object): + _wrapped = attr.ib() + _fd = attr.ib() + _listened = attr.ib(default=False) + + def listen(self, protocolFactory): + self._listened = True + return self._wrapped.listen(protocolFactory) + + def __del__(self): + if not self._listened: + os.close(self._fd) + + def listenOnUnused(tub, portnum=None): """ Start listening on an unused TCP port number with the given tub. From 8368a72657ebbc94e0750c1174785a9787983359 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Wed, 1 May 2019 13:30:44 -0400 Subject: [PATCH 2/3] news fragment --- newsfragments/3038.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3038.minor diff --git a/newsfragments/3038.minor b/newsfragments/3038.minor new file mode 100644 index 000000000..e69de29bb From b5659bd312fbdf59556e9cfed0ade32c813ce68b Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Wed, 1 May 2019 15:47:44 -0400 Subject: [PATCH 3/3] Some gc hinting and docs --- src/allmydata/util/gcutil.py | 63 ++++++++++++++++++++++++++++++++++++ src/allmydata/util/iputil.py | 32 ++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 src/allmydata/util/gcutil.py diff --git a/src/allmydata/util/gcutil.py b/src/allmydata/util/gcutil.py new file mode 100644 index 000000000..8fb3a64c9 --- /dev/null +++ b/src/allmydata/util/gcutil.py @@ -0,0 +1,63 @@ +""" +Helpers for managing garbage collection. + +:ivar fileDescriptorResource: A garbage-collection-informing resource tracker + for file descriptors. This is used to trigger a garbage collection when + it may be possible to reclaim a significant number of file descriptors as + a result. Register allocation and release of *bare* file descriptors with + this object (file objects, socket objects, etc, have their own integration + with the garbage collector and don't need to bother with this). +""" + +__all__ = [ + "fileDescriptorResource", +] + +import gc + +import attr + +@attr.s +class _ResourceTracker(object): + """ + Keep track of some kind of resource and trigger a full garbage collection + when allocations outnumber releases by some amount. + + :ivar int _counter: The number of allocations that have happened in excess + of releases since the last full collection triggered by this tracker. + + :ivar int _threshold: The number of excess allocations at which point a + full collection will be triggered. + """ + _counter = attr.ib(default=0) + _threshold = attr.ib(default=25) + + def allocate(self): + """ + Register the allocation of an instance of this resource. + """ + self._counter += 1 + if self._counter > self._threshold: + gc.collect() + # Garbage collection of this resource has done what it can do. If + # nothing was collected, it doesn't make any sense to trigger + # another full collection the very next time the resource is + # allocated. Start the counter over again. The next collection + # happens when we again exceed the threshold. + self._counter = 0 + + + def release(self): + """ + Register the release of an instance of this resource. + """ + if self._counter > 0: + # If there were any excess allocations at this point, account for + # there now being one fewer. It is not helpful to allow the + # counter to go below zero (as naturally would if a collection is + # triggered and then subsequently resources are released). In + # that case, we would be operating as if we had set a higher + # threshold and that is not desired. + self._counter -= 1 + +fileDescriptorResource = _ResourceTracker() diff --git a/src/allmydata/util/iputil.py b/src/allmydata/util/iputil.py index f91322d5c..57a7e790a 100644 --- a/src/allmydata/util/iputil.py +++ b/src/allmydata/util/iputil.py @@ -19,6 +19,10 @@ from twisted.internet.interfaces import ( IStreamServerEndpoint, ) +from .gcutil import ( + fileDescriptorResource, +) + fcntl = requireModule("fcntl") from foolscap.util import allocate_tcp_port # re-exported @@ -275,6 +279,17 @@ def _foolscapEndpointForPortNumber(portnum): s.bind(('', 0)) portnum = s.getsockname()[1] s.listen(1) + # File descriptors are a relatively scarce resource. The + # cleanup process for the file descriptor we're about to dup + # is unfortunately complicated. In particular, it involves + # the Python garbage collector. See CleanupEndpoint for + # details of that. Here, we need to make sure the garbage + # collector actually runs frequently enough to make a + # difference. Normally, the garbage collector is triggered by + # allocations. It doesn't know about *file descriptor* + # allocation though. So ... we'll "teach" it about those, + # here. + fileDescriptorResource.allocate() fd = os.dup(s.fileno()) flags = fcntl.fcntl(fd, fcntl.F_GETFD) flags = flags | os.O_NONBLOCK | fcntl.FD_CLOEXEC @@ -295,6 +310,19 @@ def _foolscapEndpointForPortNumber(portnum): @implementer(IStreamServerEndpoint) @attr.s class CleanupEndpoint(object): + """ + An ``IStreamServerEndpoint`` wrapper which closes a file descriptor if the + wrapped endpoint is never used. + + :ivar IStreamServerEndpoint _wrapped: The wrapped endpoint. The + ``listen`` implementation is delegated to this object. + + :ivar int _fd: The file descriptor to close if ``listen`` is never called + by the time this object is garbage collected. + + :ivar bool _listened: A flag recording whether or not ``listen`` has been + called. + """ _wrapped = attr.ib() _fd = attr.ib() _listened = attr.ib(default=False) @@ -304,8 +332,12 @@ class CleanupEndpoint(object): return self._wrapped.listen(protocolFactory) def __del__(self): + """ + If ``listen`` was never called then close the file descriptor. + """ if not self._listened: os.close(self._fd) + fileDescriptorResource.release() def listenOnUnused(tub, portnum=None):