Use the CPU thread pool for CBOR validation.

This commit is contained in:
Itamar Turner-Trauring 2023-02-06 13:54:47 -05:00
parent b221954946
commit 5909f451e3
3 changed files with 35 additions and 10 deletions

View File

@ -89,7 +89,7 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation):
certificate=cls.tub.myCertificate.original,
)
http_storage_server = HTTPServer(storage_server, swissnum)
http_storage_server = HTTPServer(reactor, storage_server, swissnum)
cls.https_factory = TLSMemoryBIOFactory(
certificate_options,
False,

View File

@ -21,8 +21,6 @@ from twisted.internet.interfaces import (
IStreamServerEndpoint,
IPullProducer,
)
from twisted.internet import reactor
from twisted.internet.threads import deferToThread
from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import Deferred
from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate
@ -59,6 +57,7 @@ from .immutable import BucketWriter, ConflictingWriteError
from ..util.hashutil import timing_safe_compare
from ..util.base32 import rfc3548_alphabet
from ..util.deferredutil import async_to_deferred
from ..util.cputhreadpool import defer_to_thread
from allmydata.interfaces import BadWriteEnablerError
@ -489,8 +488,9 @@ class HTTPServer(object):
return str(failure.value).encode("utf-8")
def __init__(
self, storage_server, swissnum
self, reactor, storage_server, swissnum
): # type: (StorageServer, bytes) -> None
self._reactor = reactor
self._storage_server = storage_server
self._swissnum = swissnum
# Maps storage index to StorageIndexUploads:
@ -570,7 +570,7 @@ class HTTPServer(object):
# Pycddl will release the GIL when validating larger documents, so
# let's take advantage of multiple CPUs:
if size > 10_000:
await deferToThread(schema.validate_cbor, message)
await defer_to_thread(self._reactor, schema.validate_cbor, message)
else:
schema.validate_cbor(message)

View File

@ -18,10 +18,12 @@ sadly, an internal implementation detail of Twisted being leaked to tests...
For definitely synchronous calls, you can just use ``result_of()``.
"""
import time
from base64 import b64encode
from contextlib import contextmanager
from os import urandom
from typing import Union, Callable, Tuple, Iterable
from queue import Queue
from cbor2 import dumps
from pycddl import ValidationError as CDDLValidationError
from hypothesis import assume, given, strategies as st
@ -31,13 +33,14 @@ from klein import Klein
from hyperlink import DecodedURL
from collections_extended import RangeMap
from twisted.internet.task import Clock, Cooperator
from twisted.internet.interfaces import IReactorTime
from twisted.internet.interfaces import IReactorTime, IReactorFromThreads
from twisted.internet.defer import CancelledError, Deferred
from twisted.web import http
from twisted.web.http_headers import Headers
from werkzeug import routing
from werkzeug.exceptions import NotFound as WNotFound
from testtools.matchers import Equals
from zope.interface import implementer
from .common import SyncTestCase
from ..storage.http_common import get_content_type, CBOR_MIME_TYPE
@ -449,6 +452,23 @@ class CustomHTTPServerTests(SyncTestCase):
self.assertEqual(len(self._http_server.clock.getDelayedCalls()), 0)
@implementer(IReactorFromThreads)
class Reactor(Clock):
"""Fake reactor."""
def __init__(self):
Clock.__init__(self)
self._queue = Queue()
def callFromThread(self, f, *args, **kwargs):
self._queue.put((f, args, kwargs))
def advance(self, *args, **kwargs):
Clock.advance(self, *args, **kwargs)
while not self._queue.empty():
f, args, kwargs = self._queue.get()
f(*args, **kwargs)
class HttpTestFixture(Fixture):
"""
Setup HTTP tests' infrastructure, the storage server and corresponding
@ -460,7 +480,7 @@ class HttpTestFixture(Fixture):
lambda pool: self.addCleanup(pool.closeCachedConnections)
)
self.addCleanup(StorageClient.stop_test_mode)
self.clock = Clock()
self.clock = Reactor()
self.tempdir = self.useFixture(TempDir())
# The global Cooperator used by Twisted (a) used by pull producers in
# twisted.web, (b) is driven by a real reactor. We want to push time
@ -475,7 +495,7 @@ class HttpTestFixture(Fixture):
self.storage_server = StorageServer(
self.tempdir.path, b"\x00" * 20, clock=self.clock
)
self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST)
self.http_server = HTTPServer(self.clock, self.storage_server, SWISSNUM_FOR_TEST)
self.treq = StubTreq(self.http_server.get_resource())
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
@ -501,13 +521,18 @@ class HttpTestFixture(Fixture):
# OK, no result yet, probably async HTTP endpoint handler, so advance
# time, flush treq, and try again:
for i in range(100):
for i in range(10_000):
self.clock.advance(0.001)
self.treq.flush()
self.treq.flush()
if result:
break
time.sleep(0.001)
if result:
return result[0]
if error:
error[0].raiseException()
raise RuntimeError(
"We expected given Deferred to have result already, but it wasn't. "
+ "This is probably a test design issue."