Support large mutable uploads in a memory-efficient manner.

This commit is contained in:
Itamar Turner-Trauring 2022-12-21 09:24:31 -05:00
parent 6d2e797581
commit 1a4dcc70e2
2 changed files with 74 additions and 33 deletions

View File

@ -9,6 +9,10 @@ from functools import wraps
from base64 import b64decode from base64 import b64decode
import binascii import binascii
from tempfile import TemporaryFile from tempfile import TemporaryFile
from os import SEEK_END, SEEK_SET
from io import BytesIO
import mmap
import sys
from cryptography.x509 import Certificate as CryptoCertificate from cryptography.x509 import Certificate as CryptoCertificate
from zope.interface import implementer from zope.interface import implementer
@ -39,7 +43,7 @@ from cryptography.x509 import load_pem_x509_certificate
# TODO Make sure to use pure Python versions? # TODO Make sure to use pure Python versions?
from cbor2 import dump, loads import cbor2
from pycddl import Schema, ValidationError as CDDLValidationError from pycddl import Schema, ValidationError as CDDLValidationError
from .server import StorageServer from .server import StorageServer
from .http_common import ( from .http_common import (
@ -515,7 +519,7 @@ class HTTPServer(object):
if accept.best == CBOR_MIME_TYPE: if accept.best == CBOR_MIME_TYPE:
request.setHeader("Content-Type", CBOR_MIME_TYPE) request.setHeader("Content-Type", CBOR_MIME_TYPE)
f = TemporaryFile() f = TemporaryFile()
dump(data, f) cbor2.dump(data, f)
def read_data(offset: int, length: int) -> bytes: def read_data(offset: int, length: int) -> bytes:
f.seek(offset) f.seek(offset)
@ -527,27 +531,47 @@ class HTTPServer(object):
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
raise _HTTPError(http.NOT_ACCEPTABLE) raise _HTTPError(http.NOT_ACCEPTABLE)
def _read_encoded(self, request, schema: Schema) -> Any: def _read_encoded(
self, request, schema: Schema, max_size: int = 1024 * 1024
) -> Any:
""" """
Read encoded request body data, decoding it with CBOR by default. Read encoded request body data, decoding it with CBOR by default.
Somewhat arbitrarily, limit body size to 1MB; this may be too low, we Somewhat arbitrarily, limit body size to 1MiB by default.
may want to customize per query type, but this is the starting point
for now.
""" """
content_type = get_content_type(request.requestHeaders) content_type = get_content_type(request.requestHeaders)
if content_type == CBOR_MIME_TYPE: if content_type != CBOR_MIME_TYPE:
# Read 1 byte more than 1MB. We expect length to be 1MB or
# less; if it's more assume it's not a legitimate message.
message = request.content.read(1024 * 1024 + 1)
if len(message) > 1024 * 1024:
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
schema.validate_cbor(message)
result = loads(message)
return result
else:
raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE) raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE)
# Make sure it's not too large:
request.content.seek(SEEK_END, 0)
if request.content.tell() > max_size:
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
request.content.seek(SEEK_SET, 0)
# We don't want to load the whole message into memory, cause it might
# be quite large. The CDDL validator takes a read-only bytes-like
# thing. Luckily, for large request bodies twisted.web will buffer the
# data in a file, so we can use mmap() to get a memory view. The CDDL
# validator will not make a copy, so it won't increase memory usage
# beyond that.
try:
fd = request.content.fileno()
except (ValueError, OSError):
fd = -1
if fd > 0:
# It's a file, so we can use mmap() to save memory.
message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
else:
message = request.content.read()
schema.validate_cbor(message)
# The CBOR parser will allocate more memory, but at least we can feed
# it the file-like object, so that if it's large it won't be make two
# copies.
request.content.seek(SEEK_SET, 0)
return cbor2.load(request.content)
##### Generic APIs ##### ##### Generic APIs #####
@_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"]) @_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"])
@ -746,7 +770,9 @@ class HTTPServer(object):
) )
def mutable_read_test_write(self, request, authorization, storage_index): def mutable_read_test_write(self, request, authorization, storage_index):
"""Read/test/write combined operation for mutables.""" """Read/test/write combined operation for mutables."""
rtw_request = self._read_encoded(request, _SCHEMAS["mutable_read_test_write"]) rtw_request = self._read_encoded(
request, _SCHEMAS["mutable_read_test_write"], max_size=2**48
)
secrets = ( secrets = (
authorization[Secrets.WRITE_ENABLER], authorization[Secrets.WRITE_ENABLER],
authorization[Secrets.LEASE_RENEW], authorization[Secrets.LEASE_RENEW],

View File

@ -1186,18 +1186,42 @@ class MutableHTTPAPIsTests(SyncTestCase):
) )
return storage_index, write_secret, lease_secret return storage_index, write_secret, lease_secret
def test_write_can_be_read(self): def test_write_can_be_read_small_data(self):
"""
Small written data can be read using ``read_share_chunk``.
"""
self.write_can_be_read(b"abcdef")
def test_write_can_be_read_large_data(self):
"""
Large written data (50MB) can be read using ``read_share_chunk``.
"""
self.write_can_be_read(b"abcdefghij" * 5 * 1024 * 1024)
def write_can_be_read(self, data):
""" """
Written data can be read using ``read_share_chunk``. Written data can be read using ``read_share_chunk``.
""" """
storage_index, _, _ = self.create_upload() lease_secret = urandom(32)
data0 = self.http.result_of_with_flush( storage_index = urandom(16)
self.mut_client.read_share_chunk(storage_index, 0, 1, 7) self.http.result_of_with_flush(
self.mut_client.read_test_write_chunks(
storage_index,
urandom(32),
lease_secret,
lease_secret,
{
0: TestWriteVectors(
write_vectors=[WriteVector(offset=0, data=data)]
),
},
[],
)
) )
data1 = self.http.result_of_with_flush( read_data = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 1, 0, 8) self.mut_client.read_share_chunk(storage_index, 0, 0, len(data))
) )
self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1")) self.assertEqual(read_data, data)
def test_read_before_write(self): def test_read_before_write(self):
"""In combo read/test/write operation, reads happen before writes.""" """In combo read/test/write operation, reads happen before writes."""
@ -1276,15 +1300,6 @@ class MutableHTTPAPIsTests(SyncTestCase):
b"aXYZef-0", b"aXYZef-0",
) )
def test_too_large_write(self):
"""
Writing too large of a chunk results in a REQUEST ENTITY TOO LARGE http
error.
"""
with self.assertRaises(ClientException) as e:
self.create_upload(b"0123456789" * 1024 * 1024)
self.assertEqual(e.exception.code, http.REQUEST_ENTITY_TOO_LARGE)
def test_list_shares(self): def test_list_shares(self):
"""``list_shares()`` returns the shares for a given storage index.""" """``list_shares()`` returns the shares for a given storage index."""
storage_index, _, _ = self.create_upload() storage_index, _, _ = self.create_upload()