From 1a4dcc70e26ce3e720180fb2572e02def7fca351 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 21 Dec 2022 09:24:31 -0500 Subject: [PATCH] Support large mutable uploads in a memory-efficient manner. --- src/allmydata/storage/http_server.py | 60 ++++++++++++++++++------- src/allmydata/test/test_storage_http.py | 47 ++++++++++++------- 2 files changed, 74 insertions(+), 33 deletions(-) diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 3902976ba..d76948d93 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -9,6 +9,10 @@ from functools import wraps from base64 import b64decode import binascii from tempfile import TemporaryFile +from os import SEEK_END, SEEK_SET +from io import BytesIO +import mmap +import sys from cryptography.x509 import Certificate as CryptoCertificate from zope.interface import implementer @@ -39,7 +43,7 @@ from cryptography.x509 import load_pem_x509_certificate # TODO Make sure to use pure Python versions? -from cbor2 import dump, loads +import cbor2 from pycddl import Schema, ValidationError as CDDLValidationError from .server import StorageServer from .http_common import ( @@ -515,7 +519,7 @@ class HTTPServer(object): if accept.best == CBOR_MIME_TYPE: request.setHeader("Content-Type", CBOR_MIME_TYPE) f = TemporaryFile() - dump(data, f) + cbor2.dump(data, f) def read_data(offset: int, length: int) -> bytes: f.seek(offset) @@ -527,27 +531,47 @@ class HTTPServer(object): # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 raise _HTTPError(http.NOT_ACCEPTABLE) - def _read_encoded(self, request, schema: Schema) -> Any: + def _read_encoded( + self, request, schema: Schema, max_size: int = 1024 * 1024 + ) -> Any: """ Read encoded request body data, decoding it with CBOR by default. - Somewhat arbitrarily, limit body size to 1MB; this may be too low, we - may want to customize per query type, but this is the starting point - for now. + Somewhat arbitrarily, limit body size to 1MiB by default. """ content_type = get_content_type(request.requestHeaders) - if content_type == CBOR_MIME_TYPE: - # Read 1 byte more than 1MB. We expect length to be 1MB or - # less; if it's more assume it's not a legitimate message. - message = request.content.read(1024 * 1024 + 1) - if len(message) > 1024 * 1024: - raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE) - schema.validate_cbor(message) - result = loads(message) - return result - else: + if content_type != CBOR_MIME_TYPE: raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE) + # Make sure it's not too large: + request.content.seek(SEEK_END, 0) + if request.content.tell() > max_size: + raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE) + request.content.seek(SEEK_SET, 0) + + # We don't want to load the whole message into memory, cause it might + # be quite large. The CDDL validator takes a read-only bytes-like + # thing. Luckily, for large request bodies twisted.web will buffer the + # data in a file, so we can use mmap() to get a memory view. The CDDL + # validator will not make a copy, so it won't increase memory usage + # beyond that. + try: + fd = request.content.fileno() + except (ValueError, OSError): + fd = -1 + if fd > 0: + # It's a file, so we can use mmap() to save memory. + message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ) + else: + message = request.content.read() + schema.validate_cbor(message) + + # The CBOR parser will allocate more memory, but at least we can feed + # it the file-like object, so that if it's large it won't be make two + # copies. + request.content.seek(SEEK_SET, 0) + return cbor2.load(request.content) + ##### Generic APIs ##### @_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"]) @@ -746,7 +770,9 @@ class HTTPServer(object): ) def mutable_read_test_write(self, request, authorization, storage_index): """Read/test/write combined operation for mutables.""" - rtw_request = self._read_encoded(request, _SCHEMAS["mutable_read_test_write"]) + rtw_request = self._read_encoded( + request, _SCHEMAS["mutable_read_test_write"], max_size=2**48 + ) secrets = ( authorization[Secrets.WRITE_ENABLER], authorization[Secrets.LEASE_RENEW], diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 8dbe18545..bc2e10eb6 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -1186,18 +1186,42 @@ class MutableHTTPAPIsTests(SyncTestCase): ) return storage_index, write_secret, lease_secret - def test_write_can_be_read(self): + def test_write_can_be_read_small_data(self): + """ + Small written data can be read using ``read_share_chunk``. + """ + self.write_can_be_read(b"abcdef") + + def test_write_can_be_read_large_data(self): + """ + Large written data (50MB) can be read using ``read_share_chunk``. + """ + self.write_can_be_read(b"abcdefghij" * 5 * 1024 * 1024) + + def write_can_be_read(self, data): """ Written data can be read using ``read_share_chunk``. """ - storage_index, _, _ = self.create_upload() - data0 = self.http.result_of_with_flush( - self.mut_client.read_share_chunk(storage_index, 0, 1, 7) + lease_secret = urandom(32) + storage_index = urandom(16) + self.http.result_of_with_flush( + self.mut_client.read_test_write_chunks( + storage_index, + urandom(32), + lease_secret, + lease_secret, + { + 0: TestWriteVectors( + write_vectors=[WriteVector(offset=0, data=data)] + ), + }, + [], + ) ) - data1 = self.http.result_of_with_flush( - self.mut_client.read_share_chunk(storage_index, 1, 0, 8) + read_data = self.http.result_of_with_flush( + self.mut_client.read_share_chunk(storage_index, 0, 0, len(data)) ) - self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1")) + self.assertEqual(read_data, data) def test_read_before_write(self): """In combo read/test/write operation, reads happen before writes.""" @@ -1276,15 +1300,6 @@ class MutableHTTPAPIsTests(SyncTestCase): b"aXYZef-0", ) - def test_too_large_write(self): - """ - Writing too large of a chunk results in a REQUEST ENTITY TOO LARGE http - error. - """ - with self.assertRaises(ClientException) as e: - self.create_upload(b"0123456789" * 1024 * 1024) - self.assertEqual(e.exception.code, http.REQUEST_ENTITY_TOO_LARGE) - def test_list_shares(self): """``list_shares()`` returns the shares for a given storage index.""" storage_index, _, _ = self.create_upload()