diff --git a/benchmarks/upload_download.py b/benchmarks/upload_download.py index aa5f506bc..3dfa63336 100644 --- a/benchmarks/upload_download.py +++ b/benchmarks/upload_download.py @@ -5,6 +5,23 @@ To run: $ pytest benchmarks/upload_download.py -s -v -Wignore +To add latency of e.g. 60ms on Linux: + +$ tc qdisc add dev lo root netem delay 30ms + +To reset: + +$ tc qdisc del dev lo root netem + +Frequency scaling can spoil the results. +To see the range of frequency scaling on a Linux system: + +$ cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_available_frequencies + +And to pin the CPU frequency to the lower bound found in these files: + +$ sudo cpupower frequency-set -f + TODO Parameterization (pytest?) - Foolscap vs not foolscap @@ -30,6 +47,7 @@ from tempfile import mkdtemp import os from twisted.trial.unittest import TestCase +from twisted.internet.defer import gatherResults from allmydata.util.deferredutil import async_to_deferred from allmydata.util.consumer import MemoryConsumer @@ -56,6 +74,10 @@ class ImmutableBenchmarks(SystemTestMixin, TestCase): # To use Foolscap, change to True: FORCE_FOOLSCAP_FOR_STORAGE = False + # Don't reduce HTTP connection timeouts, that messes up the more aggressive + # benchmarks: + REDUCE_HTTP_CLIENT_TIMEOUT = False + @async_to_deferred async def setUp(self): SystemTestMixin.setUp(self) @@ -104,3 +126,13 @@ class ImmutableBenchmarks(SystemTestMixin, TestCase): with timeit("download"): data = await result.download_best_version() self.assertEqual(data, DATA) + + @async_to_deferred + async def test_upload_mutable_in_parallel(self): + # To test larger files, change this: + DATA = b"Some data to upload\n" * 1_000_000 + with timeit(" upload"): + await gatherResults([ + self.clients[0].create_mutable_file(MutableData(DATA)) + for _ in range(20) + ]) diff --git a/integration/test_i2p.py b/integration/test_i2p.py index 15f9d73cf..2deb01fab 100644 --- a/integration/test_i2p.py +++ b/integration/test_i2p.py @@ -55,9 +55,12 @@ def i2p_network(reactor, temp_dir, request): proto, which("docker"), ( - "docker", "run", "-p", "7656:7656", "purplei2p/i2pd:release-2.43.0", + "docker", "run", "-p", "7656:7656", "purplei2p/i2pd:release-2.45.1", # Bad URL for reseeds, so it can't talk to other routers. "--reseed.urls", "http://localhost:1/", + # Make sure we see the "ephemeral keys message" + "--log=stdout", + "--loglevel=info" ), ) diff --git a/newsfragments/3968.minor b/newsfragments/3968.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/protocol_switch.py b/src/allmydata/protocol_switch.py index b0af84c33..208efec6c 100644 --- a/src/allmydata/protocol_switch.py +++ b/src/allmydata/protocol_switch.py @@ -89,7 +89,7 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation): certificate=cls.tub.myCertificate.original, ) - http_storage_server = HTTPServer(storage_server, swissnum) + http_storage_server = HTTPServer(reactor, storage_server, swissnum) cls.https_factory = TLSMemoryBIOFactory( certificate_options, False, diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 094b29c04..c6c3ab615 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -24,6 +24,7 @@ from twisted.internet.interfaces import ( from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.defer import Deferred from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate +from twisted.internet.interfaces import IReactorFromThreads from twisted.web.server import Site, Request from twisted.protocols.tls import TLSMemoryBIOFactory from twisted.python.filepath import FilePath @@ -56,6 +57,8 @@ from .common import si_a2b from .immutable import BucketWriter, ConflictingWriteError from ..util.hashutil import timing_safe_compare from ..util.base32 import rfc3548_alphabet +from ..util.deferredutil import async_to_deferred +from ..util.cputhreadpool import defer_to_thread from allmydata.interfaces import BadWriteEnablerError @@ -486,8 +489,12 @@ class HTTPServer(object): return str(failure.value).encode("utf-8") def __init__( - self, storage_server, swissnum - ): # type: (StorageServer, bytes) -> None + self, + reactor: IReactorFromThreads, + storage_server: StorageServer, + swissnum: bytes, + ): + self._reactor = reactor self._storage_server = storage_server self._swissnum = swissnum # Maps storage index to StorageIndexUploads: @@ -529,7 +536,7 @@ class HTTPServer(object): # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 raise _HTTPError(http.NOT_ACCEPTABLE) - def _read_encoded( + async def _read_encoded( self, request, schema: Schema, max_size: int = 1024 * 1024 ) -> Any: """ @@ -542,10 +549,11 @@ class HTTPServer(object): raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE) # Make sure it's not too large: - request.content.seek(SEEK_END, 0) - if request.content.tell() > max_size: + request.content.seek(0, SEEK_END) + size = request.content.tell() + if size > max_size: raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE) - request.content.seek(SEEK_SET, 0) + request.content.seek(0, SEEK_SET) # We don't want to load the whole message into memory, cause it might # be quite large. The CDDL validator takes a read-only bytes-like @@ -562,12 +570,21 @@ class HTTPServer(object): message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ) else: message = request.content.read() - schema.validate_cbor(message) + + # Pycddl will release the GIL when validating larger documents, so + # let's take advantage of multiple CPUs: + if size > 10_000: + await defer_to_thread(self._reactor, schema.validate_cbor, message) + else: + schema.validate_cbor(message) # The CBOR parser will allocate more memory, but at least we can feed # it the file-like object, so that if it's large it won't be make two # copies. request.content.seek(SEEK_SET, 0) + # Typically deserialization to Python will not release the GIL, and + # indeed as of Jan 2023 cbor2 didn't have any code to release the GIL + # in the decode path. As such, running it in a different thread has no benefit. return cbor2.load(request.content) ##### Generic APIs ##### @@ -585,10 +602,11 @@ class HTTPServer(object): "/storage/v1/immutable/", methods=["POST"], ) - def allocate_buckets(self, request, authorization, storage_index): + @async_to_deferred + async def allocate_buckets(self, request, authorization, storage_index): """Allocate buckets.""" upload_secret = authorization[Secrets.UPLOAD] - info = self._read_encoded(request, _SCHEMAS["allocate_buckets"]) + info = await self._read_encoded(request, _SCHEMAS["allocate_buckets"]) # We do NOT validate the upload secret for existing bucket uploads. # Another upload may be happening in parallel, with a different upload @@ -610,7 +628,7 @@ class HTTPServer(object): storage_index, share_number, upload_secret, bucket ) - return self._send_encoded( + return await self._send_encoded( request, {"already-have": set(already_got), "allocated": set(sharenum_to_bucket)}, ) @@ -745,7 +763,8 @@ class HTTPServer(object): "/storage/v1/immutable///corrupt", methods=["POST"], ) - def advise_corrupt_share_immutable( + @async_to_deferred + async def advise_corrupt_share_immutable( self, request, authorization, storage_index, share_number ): """Indicate that given share is corrupt, with a text reason.""" @@ -754,7 +773,7 @@ class HTTPServer(object): except KeyError: raise _HTTPError(http.NOT_FOUND) - info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"]) + info = await self._read_encoded(request, _SCHEMAS["advise_corrupt_share"]) bucket.advise_corrupt_share(info["reason"].encode("utf-8")) return b"" @@ -766,9 +785,10 @@ class HTTPServer(object): "/storage/v1/mutable//read-test-write", methods=["POST"], ) - def mutable_read_test_write(self, request, authorization, storage_index): + @async_to_deferred + async def mutable_read_test_write(self, request, authorization, storage_index): """Read/test/write combined operation for mutables.""" - rtw_request = self._read_encoded( + rtw_request = await self._read_encoded( request, _SCHEMAS["mutable_read_test_write"], max_size=2**48 ) secrets = ( @@ -795,7 +815,9 @@ class HTTPServer(object): ) except BadWriteEnablerError: raise _HTTPError(http.UNAUTHORIZED) - return self._send_encoded(request, {"success": success, "data": read_data}) + return await self._send_encoded( + request, {"success": success, "data": read_data} + ) @_authorized_route( _app, @@ -840,7 +862,8 @@ class HTTPServer(object): "/storage/v1/mutable///corrupt", methods=["POST"], ) - def advise_corrupt_share_mutable( + @async_to_deferred + async def advise_corrupt_share_mutable( self, request, authorization, storage_index, share_number ): """Indicate that given share is corrupt, with a text reason.""" @@ -849,7 +872,7 @@ class HTTPServer(object): }: raise _HTTPError(http.NOT_FOUND) - info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"]) + info = await self._read_encoded(request, _SCHEMAS["advise_corrupt_share"]) self._storage_server.advise_corrupt_share( b"mutable", storage_index, share_number, info["reason"].encode("utf-8") ) diff --git a/src/allmydata/test/common_system.py b/src/allmydata/test/common_system.py index 01966824a..39b2b43d1 100644 --- a/src/allmydata/test/common_system.py +++ b/src/allmydata/test/common_system.py @@ -681,6 +681,9 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): # test code. FORCE_FOOLSCAP_FOR_STORAGE : Optional[bool] = None + # If True, reduce the timeout on connections: + REDUCE_HTTP_CLIENT_TIMEOUT : bool = True + def setUp(self): self._http_client_pools = [] http_client.StorageClient.start_test_mode(self._got_new_http_connection_pool) @@ -707,7 +710,8 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): d.addTimeout(1, reactor) return d - pool.getConnection = getConnectionWithTimeout + if self.REDUCE_HTTP_CLIENT_TIMEOUT: + pool.getConnection = getConnectionWithTimeout def close_idle_http_connections(self): """Close all HTTP client connections that are just hanging around.""" diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 55754b29b..eb5bcd4db 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -18,10 +18,12 @@ sadly, an internal implementation detail of Twisted being leaked to tests... For definitely synchronous calls, you can just use ``result_of()``. """ +import time from base64 import b64encode from contextlib import contextmanager from os import urandom from typing import Union, Callable, Tuple, Iterable +from queue import Queue from cbor2 import dumps from pycddl import ValidationError as CDDLValidationError from hypothesis import assume, given, strategies as st @@ -31,13 +33,14 @@ from klein import Klein from hyperlink import DecodedURL from collections_extended import RangeMap from twisted.internet.task import Clock, Cooperator -from twisted.internet.interfaces import IReactorTime +from twisted.internet.interfaces import IReactorTime, IReactorFromThreads from twisted.internet.defer import CancelledError, Deferred from twisted.web import http from twisted.web.http_headers import Headers from werkzeug import routing from werkzeug.exceptions import NotFound as WNotFound from testtools.matchers import Equals +from zope.interface import implementer from .common import SyncTestCase from ..storage.http_common import get_content_type, CBOR_MIME_TYPE @@ -449,6 +452,27 @@ class CustomHTTPServerTests(SyncTestCase): self.assertEqual(len(self._http_server.clock.getDelayedCalls()), 0) +@implementer(IReactorFromThreads) +class Reactor(Clock): + """ + Fake reactor that supports time APIs and callFromThread. + + Advancing the clock also runs any callbacks scheduled via callFromThread. + """ + def __init__(self): + Clock.__init__(self) + self._queue = Queue() + + def callFromThread(self, f, *args, **kwargs): + self._queue.put((f, args, kwargs)) + + def advance(self, *args, **kwargs): + Clock.advance(self, *args, **kwargs) + while not self._queue.empty(): + f, args, kwargs = self._queue.get() + f(*args, **kwargs) + + class HttpTestFixture(Fixture): """ Setup HTTP tests' infrastructure, the storage server and corresponding @@ -460,7 +484,7 @@ class HttpTestFixture(Fixture): lambda pool: self.addCleanup(pool.closeCachedConnections) ) self.addCleanup(StorageClient.stop_test_mode) - self.clock = Clock() + self.clock = Reactor() self.tempdir = self.useFixture(TempDir()) # The global Cooperator used by Twisted (a) used by pull producers in # twisted.web, (b) is driven by a real reactor. We want to push time @@ -475,7 +499,7 @@ class HttpTestFixture(Fixture): self.storage_server = StorageServer( self.tempdir.path, b"\x00" * 20, clock=self.clock ) - self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST) + self.http_server = HTTPServer(self.clock, self.storage_server, SWISSNUM_FOR_TEST) self.treq = StubTreq(self.http_server.get_resource()) self.client = StorageClient( DecodedURL.from_text("http://127.0.0.1"), @@ -501,13 +525,25 @@ class HttpTestFixture(Fixture): # OK, no result yet, probably async HTTP endpoint handler, so advance # time, flush treq, and try again: - for i in range(100): + for i in range(10_000): self.clock.advance(0.001) - self.treq.flush() + self.treq.flush() + if result: + break + # By putting the sleep at the end, tests that are completely + # synchronous and don't use threads will have already broken out of + # the loop, and so will finish without any sleeps. This allows them + # to run as quickly as possible. + # + # However, some tests do talk to APIs that use a thread pool on the + # backend, so we need to allow actual time to pass for those. + time.sleep(0.001) + if result: return result[0] if error: error[0].raiseException() + raise RuntimeError( "We expected given Deferred to have result already, but it wasn't. " + "This is probably a test design issue." diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 9a0af1e06..336edf3e2 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -15,8 +15,10 @@ import six import os, time, sys import yaml import json +from threading import current_thread from twisted.trial import unittest +from twisted.internet import reactor from foolscap.api import Violation, RemoteException from allmydata.util import idlib, mathutil @@ -26,6 +28,7 @@ from allmydata.util import pollmixin from allmydata.util import yamlutil from allmydata.util import rrefutil from allmydata.util.fileutil import EncryptedTemporaryFile +from allmydata.util.cputhreadpool import defer_to_thread from allmydata.test.common_util import ReallyEqualMixin from .no_network import fireNow, LocalWrapper @@ -588,3 +591,31 @@ class RrefUtilTests(unittest.TestCase): ) self.assertEqual(result.version, "Default") self.assertIdentical(result, rref) + + +class CPUThreadPool(unittest.TestCase): + """Tests for cputhreadpool.""" + + async def test_runs_in_thread(self): + """The given function runs in a thread.""" + def f(*args, **kwargs): + return current_thread(), args, kwargs + + this_thread = current_thread().ident + result = defer_to_thread(reactor, f, 1, 3, key=4, value=5) + + # Callbacks run in the correct thread: + callback_thread_ident = [] + def passthrough(result): + callback_thread_ident.append(current_thread().ident) + return result + + result.addCallback(passthrough) + + # The task ran in a different thread: + thread, args, kwargs = await result + self.assertEqual(callback_thread_ident[0], this_thread) + self.assertNotEqual(thread.ident, this_thread) + self.assertEqual(args, (1, 3)) + self.assertEqual(kwargs, {"key": 4, "value": 5}) + diff --git a/src/allmydata/util/cputhreadpool.py b/src/allmydata/util/cputhreadpool.py new file mode 100644 index 000000000..225232e04 --- /dev/null +++ b/src/allmydata/util/cputhreadpool.py @@ -0,0 +1,59 @@ +""" +A global thread pool for CPU-intensive tasks. + +Motivation: + +* Certain tasks are blocking on CPU, and so should be run in a thread. +* The Twisted thread pool is used for operations that don't necessarily block + on CPU, like DNS lookups. CPU processing should not block DNS lookups! +* The number of threads should be fixed, and tied to the number of available + CPUs. + +As a first pass, this uses ``os.cpu_count()`` to determine the max number of +threads. This may create too many threads, as it doesn't cover things like +scheduler affinity or cgroups, but that's not the end of the world. +""" + +import os +from typing import TypeVar, Callable, cast +from functools import partial +import threading +from typing_extensions import ParamSpec + +from twisted.python.threadpool import ThreadPool +from twisted.internet.defer import Deferred +from twisted.internet.threads import deferToThreadPool +from twisted.internet.interfaces import IReactorFromThreads + + +_CPU_THREAD_POOL = ThreadPool(minthreads=0, maxthreads=os.cpu_count(), name="TahoeCPU") +if hasattr(threading, "_register_atexit"): + # This is a private API present in Python 3.8 or later, specifically + # designed for thread pool shutdown. Since it's private, it might go away + # at any point, so if it doesn't exist we still have a solution. + threading._register_atexit(_CPU_THREAD_POOL.stop) # type: ignore +else: + # Daemon threads allow shutdown to happen without any explicit stopping of + # threads. There are some bugs in old Python versions related to daemon + # threads (fixed in subsequent CPython patch releases), but Python's own + # thread pools use daemon threads in those versions so we're no worse off. + _CPU_THREAD_POOL.threadFactory = partial( # type: ignore + _CPU_THREAD_POOL.threadFactory, daemon=True + ) +_CPU_THREAD_POOL.start() + + +P = ParamSpec("P") +R = TypeVar("R") + + +def defer_to_thread( + reactor: IReactorFromThreads, f: Callable[P, R], *args: P.args, **kwargs: P.kwargs +) -> Deferred[R]: + """Run the function in a thread, return the result as a ``Deferred``.""" + # deferToThreadPool has no type annotations... + result = deferToThreadPool(reactor, _CPU_THREAD_POOL, f, *args, **kwargs) + return cast(Deferred[R], result) + + +__all__ = ["defer_to_thread"]