Merge pull request #610 from tahoe-lafs/3038.stop-leaking-file-descriptors

Stop leaking file descriptors

Fixes: ticket:3038
This commit is contained in:
Jean-Paul Calderone 2019-05-14 06:13:48 -04:00 committed by GitHub
commit bee18e143e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 121 additions and 5 deletions

0
newsfragments/3038.minor Normal file
View File

View File

@ -0,0 +1,63 @@
"""
Helpers for managing garbage collection.
:ivar fileDescriptorResource: A garbage-collection-informing resource tracker
for file descriptors. This is used to trigger a garbage collection when
it may be possible to reclaim a significant number of file descriptors as
a result. Register allocation and release of *bare* file descriptors with
this object (file objects, socket objects, etc, have their own integration
with the garbage collector and don't need to bother with this).
"""
__all__ = [
"fileDescriptorResource",
]
import gc
import attr
@attr.s
class _ResourceTracker(object):
"""
Keep track of some kind of resource and trigger a full garbage collection
when allocations outnumber releases by some amount.
:ivar int _counter: The number of allocations that have happened in excess
of releases since the last full collection triggered by this tracker.
:ivar int _threshold: The number of excess allocations at which point a
full collection will be triggered.
"""
_counter = attr.ib(default=0)
_threshold = attr.ib(default=25)
def allocate(self):
"""
Register the allocation of an instance of this resource.
"""
self._counter += 1
if self._counter > self._threshold:
gc.collect()
# Garbage collection of this resource has done what it can do. If
# nothing was collected, it doesn't make any sense to trigger
# another full collection the very next time the resource is
# allocated. Start the counter over again. The next collection
# happens when we again exceed the threshold.
self._counter = 0
def release(self):
"""
Register the release of an instance of this resource.
"""
if self._counter > 0:
# If there were any excess allocations at this point, account for
# there now being one fewer. It is not helpful to allow the
# counter to go below zero (as naturally would if a collection is
# triggered and then subsequently resources are released). In
# that case, we would be operating as if we had set a higher
# threshold and that is not desired.
self._counter -= 1
fileDescriptorResource = _ResourceTracker()

View File

@ -2,6 +2,10 @@
import os, re, socket, subprocess, errno
from sys import platform
from zope.interface import implementer
import attr
# from Twisted
from twisted.python.reflect import requireModule
from twisted.internet import defer, threads, reactor
@ -10,7 +14,14 @@ from twisted.internet.error import CannotListenError
from twisted.python.procutils import which
from twisted.python import log
from twisted.internet.endpoints import AdoptedStreamServerEndpoint
from twisted.internet.interfaces import IReactorSocket
from twisted.internet.interfaces import (
IReactorSocket,
IStreamServerEndpoint,
)
from .gcutil import (
fileDescriptorResource,
)
fcntl = requireModule("fcntl")
@ -268,14 +279,23 @@ def _foolscapEndpointForPortNumber(portnum):
s.bind(('', 0))
portnum = s.getsockname()[1]
s.listen(1)
# File descriptors are a relatively scarce resource. The
# cleanup process for the file descriptor we're about to dup
# is unfortunately complicated. In particular, it involves
# the Python garbage collector. See CleanupEndpoint for
# details of that. Here, we need to make sure the garbage
# collector actually runs frequently enough to make a
# difference. Normally, the garbage collector is triggered by
# allocations. It doesn't know about *file descriptor*
# allocation though. So ... we'll "teach" it about those,
# here.
fileDescriptorResource.allocate()
fd = os.dup(s.fileno())
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
flags = flags | os.O_NONBLOCK | fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFD, flags)
return (
portnum,
AdoptedStreamServerEndpoint(reactor, fd, socket.AF_INET),
)
endpoint = AdoptedStreamServerEndpoint(reactor, fd, socket.AF_INET)
return (portnum, CleanupEndpoint(endpoint, fd))
finally:
s.close()
else:
@ -287,6 +307,39 @@ def _foolscapEndpointForPortNumber(portnum):
return (portnum, "tcp:%d" % (portnum,))
@implementer(IStreamServerEndpoint)
@attr.s
class CleanupEndpoint(object):
"""
An ``IStreamServerEndpoint`` wrapper which closes a file descriptor if the
wrapped endpoint is never used.
:ivar IStreamServerEndpoint _wrapped: The wrapped endpoint. The
``listen`` implementation is delegated to this object.
:ivar int _fd: The file descriptor to close if ``listen`` is never called
by the time this object is garbage collected.
:ivar bool _listened: A flag recording whether or not ``listen`` has been
called.
"""
_wrapped = attr.ib()
_fd = attr.ib()
_listened = attr.ib(default=False)
def listen(self, protocolFactory):
self._listened = True
return self._wrapped.listen(protocolFactory)
def __del__(self):
"""
If ``listen`` was never called then close the file descriptor.
"""
if not self._listened:
os.close(self._fd)
fileDescriptorResource.release()
def listenOnUnused(tub, portnum=None):
"""
Start listening on an unused TCP port number with the given tub.