Merge pull request #1250 from tahoe-lafs/3957-mutable-over-http-speed

Run CDDL validation in a thread
This commit is contained in:
Itamar Turner-Trauring 2023-02-27 09:21:27 -05:00 committed by GitHub
commit 4981864f8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 213 additions and 25 deletions

View File

@ -5,6 +5,23 @@ To run:
$ pytest benchmarks/upload_download.py -s -v -Wignore
To add latency of e.g. 60ms on Linux:
$ tc qdisc add dev lo root netem delay 30ms
To reset:
$ tc qdisc del dev lo root netem
Frequency scaling can spoil the results.
To see the range of frequency scaling on a Linux system:
$ cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_available_frequencies
And to pin the CPU frequency to the lower bound found in these files:
$ sudo cpupower frequency-set -f <lowest available frequency>
TODO Parameterization (pytest?)
- Foolscap vs not foolscap
@ -30,6 +47,7 @@ from tempfile import mkdtemp
import os
from twisted.trial.unittest import TestCase
from twisted.internet.defer import gatherResults
from allmydata.util.deferredutil import async_to_deferred
from allmydata.util.consumer import MemoryConsumer
@ -56,6 +74,10 @@ class ImmutableBenchmarks(SystemTestMixin, TestCase):
# To use Foolscap, change to True:
FORCE_FOOLSCAP_FOR_STORAGE = False
# Don't reduce HTTP connection timeouts, that messes up the more aggressive
# benchmarks:
REDUCE_HTTP_CLIENT_TIMEOUT = False
@async_to_deferred
async def setUp(self):
SystemTestMixin.setUp(self)
@ -104,3 +126,13 @@ class ImmutableBenchmarks(SystemTestMixin, TestCase):
with timeit("download"):
data = await result.download_best_version()
self.assertEqual(data, DATA)
@async_to_deferred
async def test_upload_mutable_in_parallel(self):
# To test larger files, change this:
DATA = b"Some data to upload\n" * 1_000_000
with timeit(" upload"):
await gatherResults([
self.clients[0].create_mutable_file(MutableData(DATA))
for _ in range(20)
])

View File

@ -55,9 +55,12 @@ def i2p_network(reactor, temp_dir, request):
proto,
which("docker"),
(
"docker", "run", "-p", "7656:7656", "purplei2p/i2pd:release-2.43.0",
"docker", "run", "-p", "7656:7656", "purplei2p/i2pd:release-2.45.1",
# Bad URL for reseeds, so it can't talk to other routers.
"--reseed.urls", "http://localhost:1/",
# Make sure we see the "ephemeral keys message"
"--log=stdout",
"--loglevel=info"
),
)

0
newsfragments/3968.minor Normal file
View File

View File

@ -89,7 +89,7 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation):
certificate=cls.tub.myCertificate.original,
)
http_storage_server = HTTPServer(storage_server, swissnum)
http_storage_server = HTTPServer(reactor, storage_server, swissnum)
cls.https_factory = TLSMemoryBIOFactory(
certificate_options,
False,

View File

@ -24,6 +24,7 @@ from twisted.internet.interfaces import (
from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import Deferred
from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate
from twisted.internet.interfaces import IReactorFromThreads
from twisted.web.server import Site, Request
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.filepath import FilePath
@ -56,6 +57,8 @@ from .common import si_a2b
from .immutable import BucketWriter, ConflictingWriteError
from ..util.hashutil import timing_safe_compare
from ..util.base32 import rfc3548_alphabet
from ..util.deferredutil import async_to_deferred
from ..util.cputhreadpool import defer_to_thread
from allmydata.interfaces import BadWriteEnablerError
@ -486,8 +489,12 @@ class HTTPServer(object):
return str(failure.value).encode("utf-8")
def __init__(
self, storage_server, swissnum
): # type: (StorageServer, bytes) -> None
self,
reactor: IReactorFromThreads,
storage_server: StorageServer,
swissnum: bytes,
):
self._reactor = reactor
self._storage_server = storage_server
self._swissnum = swissnum
# Maps storage index to StorageIndexUploads:
@ -529,7 +536,7 @@ class HTTPServer(object):
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
raise _HTTPError(http.NOT_ACCEPTABLE)
def _read_encoded(
async def _read_encoded(
self, request, schema: Schema, max_size: int = 1024 * 1024
) -> Any:
"""
@ -542,10 +549,11 @@ class HTTPServer(object):
raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE)
# Make sure it's not too large:
request.content.seek(SEEK_END, 0)
if request.content.tell() > max_size:
request.content.seek(0, SEEK_END)
size = request.content.tell()
if size > max_size:
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
request.content.seek(SEEK_SET, 0)
request.content.seek(0, SEEK_SET)
# We don't want to load the whole message into memory, cause it might
# be quite large. The CDDL validator takes a read-only bytes-like
@ -562,12 +570,21 @@ class HTTPServer(object):
message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
else:
message = request.content.read()
schema.validate_cbor(message)
# Pycddl will release the GIL when validating larger documents, so
# let's take advantage of multiple CPUs:
if size > 10_000:
await defer_to_thread(self._reactor, schema.validate_cbor, message)
else:
schema.validate_cbor(message)
# The CBOR parser will allocate more memory, but at least we can feed
# it the file-like object, so that if it's large it won't be make two
# copies.
request.content.seek(SEEK_SET, 0)
# Typically deserialization to Python will not release the GIL, and
# indeed as of Jan 2023 cbor2 didn't have any code to release the GIL
# in the decode path. As such, running it in a different thread has no benefit.
return cbor2.load(request.content)
##### Generic APIs #####
@ -585,10 +602,11 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>",
methods=["POST"],
)
def allocate_buckets(self, request, authorization, storage_index):
@async_to_deferred
async def allocate_buckets(self, request, authorization, storage_index):
"""Allocate buckets."""
upload_secret = authorization[Secrets.UPLOAD]
info = self._read_encoded(request, _SCHEMAS["allocate_buckets"])
info = await self._read_encoded(request, _SCHEMAS["allocate_buckets"])
# We do NOT validate the upload secret for existing bucket uploads.
# Another upload may be happening in parallel, with a different upload
@ -610,7 +628,7 @@ class HTTPServer(object):
storage_index, share_number, upload_secret, bucket
)
return self._send_encoded(
return await self._send_encoded(
request,
{"already-have": set(already_got), "allocated": set(sharenum_to_bucket)},
)
@ -745,7 +763,8 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share_immutable(
@async_to_deferred
async def advise_corrupt_share_immutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
@ -754,7 +773,7 @@ class HTTPServer(object):
except KeyError:
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
info = await self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
return b""
@ -766,9 +785,10 @@ class HTTPServer(object):
"/storage/v1/mutable/<storage_index:storage_index>/read-test-write",
methods=["POST"],
)
def mutable_read_test_write(self, request, authorization, storage_index):
@async_to_deferred
async def mutable_read_test_write(self, request, authorization, storage_index):
"""Read/test/write combined operation for mutables."""
rtw_request = self._read_encoded(
rtw_request = await self._read_encoded(
request, _SCHEMAS["mutable_read_test_write"], max_size=2**48
)
secrets = (
@ -795,7 +815,9 @@ class HTTPServer(object):
)
except BadWriteEnablerError:
raise _HTTPError(http.UNAUTHORIZED)
return self._send_encoded(request, {"success": success, "data": read_data})
return await self._send_encoded(
request, {"success": success, "data": read_data}
)
@_authorized_route(
_app,
@ -840,7 +862,8 @@ class HTTPServer(object):
"/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share_mutable(
@async_to_deferred
async def advise_corrupt_share_mutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
@ -849,7 +872,7 @@ class HTTPServer(object):
}:
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
info = await self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
self._storage_server.advise_corrupt_share(
b"mutable", storage_index, share_number, info["reason"].encode("utf-8")
)

View File

@ -681,6 +681,9 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# test code.
FORCE_FOOLSCAP_FOR_STORAGE : Optional[bool] = None
# If True, reduce the timeout on connections:
REDUCE_HTTP_CLIENT_TIMEOUT : bool = True
def setUp(self):
self._http_client_pools = []
http_client.StorageClient.start_test_mode(self._got_new_http_connection_pool)
@ -707,7 +710,8 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
d.addTimeout(1, reactor)
return d
pool.getConnection = getConnectionWithTimeout
if self.REDUCE_HTTP_CLIENT_TIMEOUT:
pool.getConnection = getConnectionWithTimeout
def close_idle_http_connections(self):
"""Close all HTTP client connections that are just hanging around."""

View File

@ -18,10 +18,12 @@ sadly, an internal implementation detail of Twisted being leaked to tests...
For definitely synchronous calls, you can just use ``result_of()``.
"""
import time
from base64 import b64encode
from contextlib import contextmanager
from os import urandom
from typing import Union, Callable, Tuple, Iterable
from queue import Queue
from cbor2 import dumps
from pycddl import ValidationError as CDDLValidationError
from hypothesis import assume, given, strategies as st
@ -31,13 +33,14 @@ from klein import Klein
from hyperlink import DecodedURL
from collections_extended import RangeMap
from twisted.internet.task import Clock, Cooperator
from twisted.internet.interfaces import IReactorTime
from twisted.internet.interfaces import IReactorTime, IReactorFromThreads
from twisted.internet.defer import CancelledError, Deferred
from twisted.web import http
from twisted.web.http_headers import Headers
from werkzeug import routing
from werkzeug.exceptions import NotFound as WNotFound
from testtools.matchers import Equals
from zope.interface import implementer
from .common import SyncTestCase
from ..storage.http_common import get_content_type, CBOR_MIME_TYPE
@ -449,6 +452,27 @@ class CustomHTTPServerTests(SyncTestCase):
self.assertEqual(len(self._http_server.clock.getDelayedCalls()), 0)
@implementer(IReactorFromThreads)
class Reactor(Clock):
"""
Fake reactor that supports time APIs and callFromThread.
Advancing the clock also runs any callbacks scheduled via callFromThread.
"""
def __init__(self):
Clock.__init__(self)
self._queue = Queue()
def callFromThread(self, f, *args, **kwargs):
self._queue.put((f, args, kwargs))
def advance(self, *args, **kwargs):
Clock.advance(self, *args, **kwargs)
while not self._queue.empty():
f, args, kwargs = self._queue.get()
f(*args, **kwargs)
class HttpTestFixture(Fixture):
"""
Setup HTTP tests' infrastructure, the storage server and corresponding
@ -460,7 +484,7 @@ class HttpTestFixture(Fixture):
lambda pool: self.addCleanup(pool.closeCachedConnections)
)
self.addCleanup(StorageClient.stop_test_mode)
self.clock = Clock()
self.clock = Reactor()
self.tempdir = self.useFixture(TempDir())
# The global Cooperator used by Twisted (a) used by pull producers in
# twisted.web, (b) is driven by a real reactor. We want to push time
@ -475,7 +499,7 @@ class HttpTestFixture(Fixture):
self.storage_server = StorageServer(
self.tempdir.path, b"\x00" * 20, clock=self.clock
)
self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST)
self.http_server = HTTPServer(self.clock, self.storage_server, SWISSNUM_FOR_TEST)
self.treq = StubTreq(self.http_server.get_resource())
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
@ -501,13 +525,25 @@ class HttpTestFixture(Fixture):
# OK, no result yet, probably async HTTP endpoint handler, so advance
# time, flush treq, and try again:
for i in range(100):
for i in range(10_000):
self.clock.advance(0.001)
self.treq.flush()
self.treq.flush()
if result:
break
# By putting the sleep at the end, tests that are completely
# synchronous and don't use threads will have already broken out of
# the loop, and so will finish without any sleeps. This allows them
# to run as quickly as possible.
#
# However, some tests do talk to APIs that use a thread pool on the
# backend, so we need to allow actual time to pass for those.
time.sleep(0.001)
if result:
return result[0]
if error:
error[0].raiseException()
raise RuntimeError(
"We expected given Deferred to have result already, but it wasn't. "
+ "This is probably a test design issue."

View File

@ -15,8 +15,10 @@ import six
import os, time, sys
import yaml
import json
from threading import current_thread
from twisted.trial import unittest
from twisted.internet import reactor
from foolscap.api import Violation, RemoteException
from allmydata.util import idlib, mathutil
@ -26,6 +28,7 @@ from allmydata.util import pollmixin
from allmydata.util import yamlutil
from allmydata.util import rrefutil
from allmydata.util.fileutil import EncryptedTemporaryFile
from allmydata.util.cputhreadpool import defer_to_thread
from allmydata.test.common_util import ReallyEqualMixin
from .no_network import fireNow, LocalWrapper
@ -588,3 +591,31 @@ class RrefUtilTests(unittest.TestCase):
)
self.assertEqual(result.version, "Default")
self.assertIdentical(result, rref)
class CPUThreadPool(unittest.TestCase):
"""Tests for cputhreadpool."""
async def test_runs_in_thread(self):
"""The given function runs in a thread."""
def f(*args, **kwargs):
return current_thread(), args, kwargs
this_thread = current_thread().ident
result = defer_to_thread(reactor, f, 1, 3, key=4, value=5)
# Callbacks run in the correct thread:
callback_thread_ident = []
def passthrough(result):
callback_thread_ident.append(current_thread().ident)
return result
result.addCallback(passthrough)
# The task ran in a different thread:
thread, args, kwargs = await result
self.assertEqual(callback_thread_ident[0], this_thread)
self.assertNotEqual(thread.ident, this_thread)
self.assertEqual(args, (1, 3))
self.assertEqual(kwargs, {"key": 4, "value": 5})

View File

@ -0,0 +1,59 @@
"""
A global thread pool for CPU-intensive tasks.
Motivation:
* Certain tasks are blocking on CPU, and so should be run in a thread.
* The Twisted thread pool is used for operations that don't necessarily block
on CPU, like DNS lookups. CPU processing should not block DNS lookups!
* The number of threads should be fixed, and tied to the number of available
CPUs.
As a first pass, this uses ``os.cpu_count()`` to determine the max number of
threads. This may create too many threads, as it doesn't cover things like
scheduler affinity or cgroups, but that's not the end of the world.
"""
import os
from typing import TypeVar, Callable, cast
from functools import partial
import threading
from typing_extensions import ParamSpec
from twisted.python.threadpool import ThreadPool
from twisted.internet.defer import Deferred
from twisted.internet.threads import deferToThreadPool
from twisted.internet.interfaces import IReactorFromThreads
_CPU_THREAD_POOL = ThreadPool(minthreads=0, maxthreads=os.cpu_count(), name="TahoeCPU")
if hasattr(threading, "_register_atexit"):
# This is a private API present in Python 3.8 or later, specifically
# designed for thread pool shutdown. Since it's private, it might go away
# at any point, so if it doesn't exist we still have a solution.
threading._register_atexit(_CPU_THREAD_POOL.stop) # type: ignore
else:
# Daemon threads allow shutdown to happen without any explicit stopping of
# threads. There are some bugs in old Python versions related to daemon
# threads (fixed in subsequent CPython patch releases), but Python's own
# thread pools use daemon threads in those versions so we're no worse off.
_CPU_THREAD_POOL.threadFactory = partial( # type: ignore
_CPU_THREAD_POOL.threadFactory, daemon=True
)
_CPU_THREAD_POOL.start()
P = ParamSpec("P")
R = TypeVar("R")
def defer_to_thread(
reactor: IReactorFromThreads, f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
) -> Deferred[R]:
"""Run the function in a thread, return the result as a ``Deferred``."""
# deferToThreadPool has no type annotations...
result = deferToThreadPool(reactor, _CPU_THREAD_POOL, f, *args, **kwargs)
return cast(Deferred[R], result)
__all__ = ["defer_to_thread"]