mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-19 11:16:24 +00:00
Merge pull request #757 from tahoe-lafs/3361.limiter-and-gcutil-py3
Fixes ticket:3361 Delete unused limiter, and port gcutil to Python 3
This commit is contained in:
commit
e664aaf548
@ -46,6 +46,8 @@ allmydata.test.test_hashutil.HashUtilTests.test_sha256d
|
||||
allmydata.test.test_hashutil.HashUtilTests.test_sha256d_truncated
|
||||
allmydata.test.test_hashutil.HashUtilTests.test_timing_safe_compare
|
||||
allmydata.test.test_humanreadable.HumanReadable.test_repr
|
||||
allmydata.test.test_iputil.GcUtil.test_gc_after_allocations
|
||||
allmydata.test.test_iputil.GcUtil.test_release_delays_gc
|
||||
allmydata.test.test_iputil.ListAddresses.test_get_local_ip_for
|
||||
allmydata.test.test_iputil.ListAddresses.test_list_async
|
||||
allmydata.test.test_iputil.ListAddresses.test_list_async_mock_cygwin
|
||||
|
0
newsfragments/3361.minor
Normal file
0
newsfragments/3361.minor
Normal file
@ -14,6 +14,7 @@ if PY2:
|
||||
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401
|
||||
|
||||
import re, errno, subprocess, os, socket
|
||||
import gc
|
||||
|
||||
from twisted.trial import unittest
|
||||
|
||||
@ -21,7 +22,7 @@ from tenacity import retry, stop_after_attempt
|
||||
|
||||
from foolscap.api import Tub
|
||||
|
||||
from allmydata.util import iputil
|
||||
from allmydata.util import iputil, gcutil
|
||||
import allmydata.test.common_py3 as testutil
|
||||
from allmydata.util.namespace import Namespace
|
||||
|
||||
@ -228,3 +229,35 @@ class ListenOnUsed(unittest.TestCase):
|
||||
s.close()
|
||||
port2 = iputil.listenOnUnused(tub, port)
|
||||
self.assertEqual(port, port2)
|
||||
|
||||
|
||||
class GcUtil(unittest.TestCase):
|
||||
"""Tests for allmydata.util.gcutil, which is used only by listenOnUnused."""
|
||||
|
||||
def test_gc_after_allocations(self):
|
||||
"""The resource tracker triggers allocations every 26 allocations."""
|
||||
tracker = gcutil._ResourceTracker()
|
||||
collections = []
|
||||
self.patch(gc, "collect", lambda: collections.append(1))
|
||||
for _ in range(2):
|
||||
for _ in range(25):
|
||||
tracker.allocate()
|
||||
self.assertEqual(len(collections), 0)
|
||||
tracker.allocate()
|
||||
self.assertEqual(len(collections), 1)
|
||||
del collections[:]
|
||||
|
||||
def test_release_delays_gc(self):
|
||||
"""Releasing a file descriptor resource delays GC collection."""
|
||||
tracker = gcutil._ResourceTracker()
|
||||
collections = []
|
||||
self.patch(gc, "collect", lambda: collections.append(1))
|
||||
for _ in range(2):
|
||||
tracker.allocate()
|
||||
for _ in range(3):
|
||||
tracker.release()
|
||||
for _ in range(25):
|
||||
tracker.allocate()
|
||||
self.assertEqual(len(collections), 0)
|
||||
tracker.allocate()
|
||||
self.assertEqual(len(collections), 1)
|
||||
|
@ -10,7 +10,7 @@ from twisted.python.failure import Failure
|
||||
|
||||
from allmydata.util import idlib, mathutil
|
||||
from allmydata.util import fileutil
|
||||
from allmydata.util import limiter, pollmixin
|
||||
from allmydata.util import pollmixin
|
||||
from allmydata.util import yamlutil
|
||||
from allmydata.util import log as tahoe_log
|
||||
from allmydata.util.fileutil import EncryptedTemporaryFile
|
||||
@ -429,81 +429,6 @@ class PollMixinTests(unittest.TestCase):
|
||||
return d
|
||||
|
||||
|
||||
class Limiter(unittest.TestCase):
|
||||
|
||||
def job(self, i, foo):
|
||||
self.calls.append( (i, foo) )
|
||||
self.simultaneous += 1
|
||||
self.peak_simultaneous = max(self.simultaneous, self.peak_simultaneous)
|
||||
d = defer.Deferred()
|
||||
def _done():
|
||||
self.simultaneous -= 1
|
||||
d.callback("done %d" % i)
|
||||
reactor.callLater(1.0, _done)
|
||||
return d
|
||||
|
||||
def bad_job(self, i, foo):
|
||||
raise ValueError("bad_job %d" % i)
|
||||
|
||||
def test_limiter(self):
|
||||
self.calls = []
|
||||
self.simultaneous = 0
|
||||
self.peak_simultaneous = 0
|
||||
l = limiter.ConcurrencyLimiter()
|
||||
dl = []
|
||||
for i in range(20):
|
||||
dl.append(l.add(self.job, i, foo=str(i)))
|
||||
d = defer.DeferredList(dl, fireOnOneErrback=True)
|
||||
def _done(res):
|
||||
self.failUnlessEqual(self.simultaneous, 0)
|
||||
self.failUnless(self.peak_simultaneous <= 10)
|
||||
self.failUnlessEqual(len(self.calls), 20)
|
||||
for i in range(20):
|
||||
self.failUnless( (i, str(i)) in self.calls)
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def test_errors(self):
|
||||
self.calls = []
|
||||
self.simultaneous = 0
|
||||
self.peak_simultaneous = 0
|
||||
l = limiter.ConcurrencyLimiter()
|
||||
dl = []
|
||||
for i in range(20):
|
||||
dl.append(l.add(self.job, i, foo=str(i)))
|
||||
d2 = l.add(self.bad_job, 21, "21")
|
||||
d = defer.DeferredList(dl, fireOnOneErrback=True)
|
||||
def _most_done(res):
|
||||
results = []
|
||||
for (success, result) in res:
|
||||
self.failUnlessEqual(success, True)
|
||||
results.append(result)
|
||||
results.sort()
|
||||
expected_results = ["done %d" % i for i in range(20)]
|
||||
expected_results.sort()
|
||||
self.failUnlessEqual(results, expected_results)
|
||||
self.failUnless(self.peak_simultaneous <= 10)
|
||||
self.failUnlessEqual(len(self.calls), 20)
|
||||
for i in range(20):
|
||||
self.failUnless( (i, str(i)) in self.calls)
|
||||
def _good(res):
|
||||
self.fail("should have failed, not got %s" % (res,))
|
||||
def _err(f):
|
||||
f.trap(ValueError)
|
||||
self.failUnless("bad_job 21" in str(f))
|
||||
d2.addCallbacks(_good, _err)
|
||||
return d2
|
||||
d.addCallback(_most_done)
|
||||
def _all_done(res):
|
||||
self.failUnlessEqual(self.simultaneous, 0)
|
||||
self.failUnless(self.peak_simultaneous <= 10)
|
||||
self.failUnlessEqual(len(self.calls), 20)
|
||||
for i in range(20):
|
||||
self.failUnless( (i, str(i)) in self.calls)
|
||||
d.addCallback(_all_done)
|
||||
return d
|
||||
|
||||
|
||||
ctr = [0]
|
||||
class EqButNotIs(object):
|
||||
def __init__(self, x):
|
||||
|
@ -22,6 +22,7 @@ PORTED_MODULES = [
|
||||
"allmydata.util.base62",
|
||||
"allmydata.util.deferredutil",
|
||||
"allmydata.util.dictutil",
|
||||
"allmydata.util.gcutil",
|
||||
"allmydata.util.hashutil",
|
||||
"allmydata.util.humanreadable",
|
||||
"allmydata.util.iputil",
|
||||
|
@ -7,7 +7,17 @@ Helpers for managing garbage collection.
|
||||
a result. Register allocation and release of *bare* file descriptors with
|
||||
this object (file objects, socket objects, etc, have their own integration
|
||||
with the garbage collector and don't need to bother with this).
|
||||
|
||||
Ported to Python 3.
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from future.utils import PY2
|
||||
if PY2:
|
||||
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, int, list, object, range, str, max, min # noqa: F401
|
||||
|
||||
__all__ = [
|
||||
"fileDescriptorResource",
|
||||
|
@ -1,40 +0,0 @@
|
||||
|
||||
from twisted.internet import defer
|
||||
from foolscap.api import eventually
|
||||
|
||||
class ConcurrencyLimiter(object):
|
||||
"""I implement a basic concurrency limiter. Add work to it in the form of
|
||||
(callable, args, kwargs) tuples. No more than LIMIT callables will be
|
||||
outstanding at any one time.
|
||||
"""
|
||||
|
||||
def __init__(self, limit=10):
|
||||
self.limit = limit
|
||||
self.pending = []
|
||||
self.active = 0
|
||||
|
||||
def __repr__(self):
|
||||
return "<Limiter with %d/%d/%d>" % (self.active, len(self.pending),
|
||||
self.limit)
|
||||
|
||||
def add(self, cb, *args, **kwargs):
|
||||
d = defer.Deferred()
|
||||
task = (cb, args, kwargs, d)
|
||||
self.pending.append(task)
|
||||
self.maybe_start_task()
|
||||
return d
|
||||
|
||||
def maybe_start_task(self):
|
||||
if self.active >= self.limit:
|
||||
return
|
||||
if not self.pending:
|
||||
return
|
||||
(cb, args, kwargs, done_d) = self.pending.pop(0)
|
||||
self.active += 1
|
||||
d = defer.maybeDeferred(cb, *args, **kwargs)
|
||||
d.addBoth(self._done, done_d)
|
||||
|
||||
def _done(self, res, done_d):
|
||||
self.active -= 1
|
||||
eventually(done_d.callback, res)
|
||||
eventually(self.maybe_start_task)
|
Loading…
Reference in New Issue
Block a user