Don't block on CDDL validation.

This commit is contained in:
Itamar Turner-Trauring 2023-01-23 11:37:13 -05:00
parent 746b3ca595
commit 1f3993b689

View File

@ -21,6 +21,8 @@ from twisted.internet.interfaces import (
IStreamServerEndpoint,
IPullProducer,
)
from twisted.internet import reactor
from twisted.internet.task import deferToThread
from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import Deferred
from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate
@ -56,6 +58,7 @@ from .common import si_a2b
from .immutable import BucketWriter, ConflictingWriteError
from ..util.hashutil import timing_safe_compare
from ..util.base32 import rfc3548_alphabet
from ..util.deferredutil import async_to_deferred
from allmydata.interfaces import BadWriteEnablerError
@ -529,7 +532,7 @@ class HTTPServer(object):
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
raise _HTTPError(http.NOT_ACCEPTABLE)
def _read_encoded(
async def _read_encoded(
self, request, schema: Schema, max_size: int = 1024 * 1024
) -> Any:
"""
@ -543,7 +546,8 @@ class HTTPServer(object):
# Make sure it's not too large:
request.content.seek(SEEK_END, 0)
if request.content.tell() > max_size:
size = request.content.tell()
if size > max_size:
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
request.content.seek(SEEK_SET, 0)
@ -562,12 +566,21 @@ class HTTPServer(object):
message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
else:
message = request.content.read()
schema.validate_cbor(message)
# Pycddl will release the GIL when validating larger documents, so
# let's take advantage of multiple CPUs:
if size > 10_000:
await deferToThread(reactor, schema.validate_cbor(message))
else:
schema.validate_cbor(message)
# The CBOR parser will allocate more memory, but at least we can feed
# it the file-like object, so that if it's large it won't be make two
# copies.
request.content.seek(SEEK_SET, 0)
# Typically deserialization to Python will not release the GIL, and
# indeed as of Jan 2023 cbor2 didn't have any code to release the GIL
# in the decode path. As such, running it in a different thread has no benefit.
return cbor2.load(request.content)
##### Generic APIs #####
@ -585,10 +598,11 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>",
methods=["POST"],
)
def allocate_buckets(self, request, authorization, storage_index):
@async_to_deferred
async def allocate_buckets(self, request, authorization, storage_index):
"""Allocate buckets."""
upload_secret = authorization[Secrets.UPLOAD]
info = self._read_encoded(request, _SCHEMAS["allocate_buckets"])
info = await self._read_encoded(request, _SCHEMAS["allocate_buckets"])
# We do NOT validate the upload secret for existing bucket uploads.
# Another upload may be happening in parallel, with a different upload
@ -745,7 +759,8 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share_immutable(
@async_to_deferred
async def advise_corrupt_share_immutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
@ -754,7 +769,7 @@ class HTTPServer(object):
except KeyError:
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
info = await self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
return b""
@ -766,9 +781,10 @@ class HTTPServer(object):
"/storage/v1/mutable/<storage_index:storage_index>/read-test-write",
methods=["POST"],
)
def mutable_read_test_write(self, request, authorization, storage_index):
@async_to_deferred
async def mutable_read_test_write(self, request, authorization, storage_index):
"""Read/test/write combined operation for mutables."""
rtw_request = self._read_encoded(
rtw_request = await self._read_encoded(
request, _SCHEMAS["mutable_read_test_write"], max_size=2**48
)
secrets = (
@ -840,7 +856,8 @@ class HTTPServer(object):
"/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share_mutable(
@async_to_deferred
async def advise_corrupt_share_mutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
@ -849,7 +866,7 @@ class HTTPServer(object):
}:
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
info = await self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
self._storage_server.advise_corrupt_share(
b"mutable", storage_index, share_number, info["reason"].encode("utf-8")
)