From 5909f451e336bba6a15606e073e9eb9b0d74581d Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 6 Feb 2023 13:54:47 -0500 Subject: [PATCH] Use the CPU thread pool for CBOR validation. --- src/allmydata/protocol_switch.py | 2 +- src/allmydata/storage/http_server.py | 8 +++--- src/allmydata/test/test_storage_http.py | 35 +++++++++++++++++++++---- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/allmydata/protocol_switch.py b/src/allmydata/protocol_switch.py index b0af84c33..208efec6c 100644 --- a/src/allmydata/protocol_switch.py +++ b/src/allmydata/protocol_switch.py @@ -89,7 +89,7 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation): certificate=cls.tub.myCertificate.original, ) - http_storage_server = HTTPServer(storage_server, swissnum) + http_storage_server = HTTPServer(reactor, storage_server, swissnum) cls.https_factory = TLSMemoryBIOFactory( certificate_options, False, diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 6b94c227a..0d2280e2c 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -21,8 +21,6 @@ from twisted.internet.interfaces import ( IStreamServerEndpoint, IPullProducer, ) -from twisted.internet import reactor -from twisted.internet.threads import deferToThread from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.defer import Deferred from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate @@ -59,6 +57,7 @@ from .immutable import BucketWriter, ConflictingWriteError from ..util.hashutil import timing_safe_compare from ..util.base32 import rfc3548_alphabet from ..util.deferredutil import async_to_deferred +from ..util.cputhreadpool import defer_to_thread from allmydata.interfaces import BadWriteEnablerError @@ -489,8 +488,9 @@ class HTTPServer(object): return str(failure.value).encode("utf-8") def __init__( - self, storage_server, swissnum + self, reactor, storage_server, swissnum ): # type: (StorageServer, bytes) -> None + self._reactor = reactor self._storage_server = storage_server self._swissnum = swissnum # Maps storage index to StorageIndexUploads: @@ -570,7 +570,7 @@ class HTTPServer(object): # Pycddl will release the GIL when validating larger documents, so # let's take advantage of multiple CPUs: if size > 10_000: - await deferToThread(schema.validate_cbor, message) + await defer_to_thread(self._reactor, schema.validate_cbor, message) else: schema.validate_cbor(message) diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 55754b29b..beb36e87a 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -18,10 +18,12 @@ sadly, an internal implementation detail of Twisted being leaked to tests... For definitely synchronous calls, you can just use ``result_of()``. """ +import time from base64 import b64encode from contextlib import contextmanager from os import urandom from typing import Union, Callable, Tuple, Iterable +from queue import Queue from cbor2 import dumps from pycddl import ValidationError as CDDLValidationError from hypothesis import assume, given, strategies as st @@ -31,13 +33,14 @@ from klein import Klein from hyperlink import DecodedURL from collections_extended import RangeMap from twisted.internet.task import Clock, Cooperator -from twisted.internet.interfaces import IReactorTime +from twisted.internet.interfaces import IReactorTime, IReactorFromThreads from twisted.internet.defer import CancelledError, Deferred from twisted.web import http from twisted.web.http_headers import Headers from werkzeug import routing from werkzeug.exceptions import NotFound as WNotFound from testtools.matchers import Equals +from zope.interface import implementer from .common import SyncTestCase from ..storage.http_common import get_content_type, CBOR_MIME_TYPE @@ -449,6 +452,23 @@ class CustomHTTPServerTests(SyncTestCase): self.assertEqual(len(self._http_server.clock.getDelayedCalls()), 0) +@implementer(IReactorFromThreads) +class Reactor(Clock): + """Fake reactor.""" + def __init__(self): + Clock.__init__(self) + self._queue = Queue() + + def callFromThread(self, f, *args, **kwargs): + self._queue.put((f, args, kwargs)) + + def advance(self, *args, **kwargs): + Clock.advance(self, *args, **kwargs) + while not self._queue.empty(): + f, args, kwargs = self._queue.get() + f(*args, **kwargs) + + class HttpTestFixture(Fixture): """ Setup HTTP tests' infrastructure, the storage server and corresponding @@ -460,7 +480,7 @@ class HttpTestFixture(Fixture): lambda pool: self.addCleanup(pool.closeCachedConnections) ) self.addCleanup(StorageClient.stop_test_mode) - self.clock = Clock() + self.clock = Reactor() self.tempdir = self.useFixture(TempDir()) # The global Cooperator used by Twisted (a) used by pull producers in # twisted.web, (b) is driven by a real reactor. We want to push time @@ -475,7 +495,7 @@ class HttpTestFixture(Fixture): self.storage_server = StorageServer( self.tempdir.path, b"\x00" * 20, clock=self.clock ) - self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST) + self.http_server = HTTPServer(self.clock, self.storage_server, SWISSNUM_FOR_TEST) self.treq = StubTreq(self.http_server.get_resource()) self.client = StorageClient( DecodedURL.from_text("http://127.0.0.1"), @@ -501,13 +521,18 @@ class HttpTestFixture(Fixture): # OK, no result yet, probably async HTTP endpoint handler, so advance # time, flush treq, and try again: - for i in range(100): + for i in range(10_000): self.clock.advance(0.001) - self.treq.flush() + self.treq.flush() + if result: + break + time.sleep(0.001) + if result: return result[0] if error: error[0].raiseException() + raise RuntimeError( "We expected given Deferred to have result already, but it wasn't. " + "This is probably a test design issue."