From 96347e22e27b0614ba5e9797a401129c5bdb8101 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 15 Dec 2022 13:14:49 -0500 Subject: [PATCH 1/7] Make a test demonstrating the problem. --- src/allmydata/test/test_system.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 670ac5868..33b0284da 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -477,9 +477,10 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): def _corrupt_mutable_share(self, filename, which): msf = MutableShareFile(filename) - datav = msf.readv([ (0, 1000000) ]) + # Read more than share length: + datav = msf.readv([ (0, 10_000_000) ]) final_share = datav[0] - assert len(final_share) < 1000000 # ought to be truncated + assert len(final_share) < 10_000_000 # ought to be truncated pieces = mutable_layout.unpack_share(final_share) (seqnum, root_hash, IV, k, N, segsize, datalen, verification_key, signature, share_hash_chain, block_hash_tree, @@ -524,7 +525,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): DATA_uploadable = MutableData(DATA) NEWDATA = b"new contents yay" NEWDATA_uploadable = MutableData(NEWDATA) - NEWERDATA = b"this is getting old" + NEWERDATA = b"this is getting old" * 1_000_000 NEWERDATA_uploadable = MutableData(NEWERDATA) d = self.set_up_nodes() From 1d3464a430a465b4b5ef568b33a6138c0a7a495c Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 19 Dec 2022 13:37:20 -0500 Subject: [PATCH 2/7] Add end-to-end MDMF test. --- src/allmydata/test/test_system.py | 37 ++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 33b0284da..55bf0ed8d 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -34,7 +34,7 @@ from allmydata.util.encodingutil import quote_output, unicode_to_argv from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.consumer import MemoryConsumer, download_to_data from allmydata.interfaces import IDirectoryNode, IFileNode, \ - NoSuchChildError, NoSharesError + NoSuchChildError, NoSharesError, SDMF_VERSION, MDMF_VERSION from allmydata.monitor import Monitor from allmydata.mutable.common import NotWriteableError from allmydata.mutable import layout as mutable_layout @@ -520,7 +520,13 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): msf.writev( [(0, final_share)], None) - def test_mutable(self): + def test_mutable_sdmf(self): + return self._test_mutable(SDMF_VERSION) + + def test_mutable_mdmf(self): + return self._test_mutable(MDMF_VERSION) + + def _test_mutable(self, mutable_version): DATA = b"initial contents go here." # 25 bytes % 3 != 0 DATA_uploadable = MutableData(DATA) NEWDATA = b"new contents yay" @@ -533,7 +539,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): def _create_mutable(res): c = self.clients[0] log.msg("starting create_mutable_file") - d1 = c.create_mutable_file(DATA_uploadable) + d1 = c.create_mutable_file(DATA_uploadable, mutable_version) def _done(res): log.msg("DONE: %s" % (res,)) self._mutable_node_1 = res @@ -555,27 +561,33 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): filename) self.failUnlessEqual(rc, 0) try: + share_type = 'SDMF' if mutable_version == SDMF_VERSION else 'MDMF' self.failUnless("Mutable slot found:\n" in output) - self.failUnless("share_type: SDMF\n" in output) + self.assertIn(f"share_type: {share_type}\n", output) peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid) self.failUnless(" WE for nodeid: %s\n" % peerid in output) self.failUnless(" num_extra_leases: 0\n" in output) self.failUnless(" secrets are for nodeid: %s\n" % peerid in output) - self.failUnless(" SDMF contents:\n" in output) + self.failUnless(f" {share_type} contents:\n" in output) self.failUnless(" seqnum: 1\n" in output) self.failUnless(" required_shares: 3\n" in output) self.failUnless(" total_shares: 10\n" in output) - self.failUnless(" segsize: 27\n" in output, (output, filename)) + if mutable_version == SDMF_VERSION: + self.failUnless(" segsize: 27\n" in output, (output, filename)) self.failUnless(" datalen: 25\n" in output) # the exact share_hash_chain nodes depends upon the sharenum, # and is more of a hassle to compute than I want to deal with # now self.failUnless(" share_hash_chain: " in output) self.failUnless(" block_hash_tree: 1 nodes\n" in output) - expected = (" verify-cap: URI:SSK-Verifier:%s:" % - str(base32.b2a(storage_index), "ascii")) - self.failUnless(expected in output) + if mutable_version == SDMF_VERSION: + expected = (" verify-cap: URI:SSK-Verifier:%s:" % + str(base32.b2a(storage_index), "ascii")) + else: + expected = (" verify-cap: URI:MDMF-Verifier:%s" % + str(base32.b2a(storage_index), "ascii")) + self.assertIn(expected, output) except unittest.FailTest: print() print("dump-share output was:") @@ -695,7 +707,10 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): # when we retrieve this, we should get three signature # failures (where we've mangled seqnum, R, and segsize). The # pubkey mangling - d.addCallback(_corrupt_shares) + + if mutable_version == SDMF_VERSION: + # TODO Corrupting shares in test_systm doesn't work for MDMF right now + d.addCallback(_corrupt_shares) d.addCallback(lambda res: self._newnode3.download_best_version()) d.addCallback(_check_download_5) @@ -703,7 +718,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): def _check_empty_file(res): # make sure we can create empty files, this usually screws up the # segsize math - d1 = self.clients[2].create_mutable_file(MutableData(b"")) + d1 = self.clients[2].create_mutable_file(MutableData(b""), mutable_version) d1.addCallback(lambda newnode: newnode.download_best_version()) d1.addCallback(lambda res: self.failUnlessEqual(b"", res)) return d1 From a71e873c21836898318512c61c45696a847f4134 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 20 Dec 2022 14:07:12 -0500 Subject: [PATCH 3/7] pycddl 0.2 is broken, 0.3 is missing mmap() support. --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8558abd02..dd50e0fcf 100644 --- a/setup.py +++ b/setup.py @@ -137,7 +137,8 @@ install_requires = [ "werkzeug != 2.2.0", "treq", "cbor2", - "pycddl >= 0.2", + # Need 0.4 to be able to pass in mmap() + "pycddl >= 0.4", # for pid-file support "psutil", From 6d2e797581ed214488da9562e09b78c7dd7299a3 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 21 Dec 2022 09:16:18 -0500 Subject: [PATCH 4/7] News file. --- newsfragments/3956.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3956.minor diff --git a/newsfragments/3956.minor b/newsfragments/3956.minor new file mode 100644 index 000000000..e69de29bb From 1a4dcc70e26ce3e720180fb2572e02def7fca351 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 21 Dec 2022 09:24:31 -0500 Subject: [PATCH 5/7] Support large mutable uploads in a memory-efficient manner. --- src/allmydata/storage/http_server.py | 60 ++++++++++++++++++------- src/allmydata/test/test_storage_http.py | 47 ++++++++++++------- 2 files changed, 74 insertions(+), 33 deletions(-) diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 3902976ba..d76948d93 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -9,6 +9,10 @@ from functools import wraps from base64 import b64decode import binascii from tempfile import TemporaryFile +from os import SEEK_END, SEEK_SET +from io import BytesIO +import mmap +import sys from cryptography.x509 import Certificate as CryptoCertificate from zope.interface import implementer @@ -39,7 +43,7 @@ from cryptography.x509 import load_pem_x509_certificate # TODO Make sure to use pure Python versions? -from cbor2 import dump, loads +import cbor2 from pycddl import Schema, ValidationError as CDDLValidationError from .server import StorageServer from .http_common import ( @@ -515,7 +519,7 @@ class HTTPServer(object): if accept.best == CBOR_MIME_TYPE: request.setHeader("Content-Type", CBOR_MIME_TYPE) f = TemporaryFile() - dump(data, f) + cbor2.dump(data, f) def read_data(offset: int, length: int) -> bytes: f.seek(offset) @@ -527,27 +531,47 @@ class HTTPServer(object): # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 raise _HTTPError(http.NOT_ACCEPTABLE) - def _read_encoded(self, request, schema: Schema) -> Any: + def _read_encoded( + self, request, schema: Schema, max_size: int = 1024 * 1024 + ) -> Any: """ Read encoded request body data, decoding it with CBOR by default. - Somewhat arbitrarily, limit body size to 1MB; this may be too low, we - may want to customize per query type, but this is the starting point - for now. + Somewhat arbitrarily, limit body size to 1MiB by default. """ content_type = get_content_type(request.requestHeaders) - if content_type == CBOR_MIME_TYPE: - # Read 1 byte more than 1MB. We expect length to be 1MB or - # less; if it's more assume it's not a legitimate message. - message = request.content.read(1024 * 1024 + 1) - if len(message) > 1024 * 1024: - raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE) - schema.validate_cbor(message) - result = loads(message) - return result - else: + if content_type != CBOR_MIME_TYPE: raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE) + # Make sure it's not too large: + request.content.seek(SEEK_END, 0) + if request.content.tell() > max_size: + raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE) + request.content.seek(SEEK_SET, 0) + + # We don't want to load the whole message into memory, cause it might + # be quite large. The CDDL validator takes a read-only bytes-like + # thing. Luckily, for large request bodies twisted.web will buffer the + # data in a file, so we can use mmap() to get a memory view. The CDDL + # validator will not make a copy, so it won't increase memory usage + # beyond that. + try: + fd = request.content.fileno() + except (ValueError, OSError): + fd = -1 + if fd > 0: + # It's a file, so we can use mmap() to save memory. + message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ) + else: + message = request.content.read() + schema.validate_cbor(message) + + # The CBOR parser will allocate more memory, but at least we can feed + # it the file-like object, so that if it's large it won't be make two + # copies. + request.content.seek(SEEK_SET, 0) + return cbor2.load(request.content) + ##### Generic APIs ##### @_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"]) @@ -746,7 +770,9 @@ class HTTPServer(object): ) def mutable_read_test_write(self, request, authorization, storage_index): """Read/test/write combined operation for mutables.""" - rtw_request = self._read_encoded(request, _SCHEMAS["mutable_read_test_write"]) + rtw_request = self._read_encoded( + request, _SCHEMAS["mutable_read_test_write"], max_size=2**48 + ) secrets = ( authorization[Secrets.WRITE_ENABLER], authorization[Secrets.LEASE_RENEW], diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 8dbe18545..bc2e10eb6 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -1186,18 +1186,42 @@ class MutableHTTPAPIsTests(SyncTestCase): ) return storage_index, write_secret, lease_secret - def test_write_can_be_read(self): + def test_write_can_be_read_small_data(self): + """ + Small written data can be read using ``read_share_chunk``. + """ + self.write_can_be_read(b"abcdef") + + def test_write_can_be_read_large_data(self): + """ + Large written data (50MB) can be read using ``read_share_chunk``. + """ + self.write_can_be_read(b"abcdefghij" * 5 * 1024 * 1024) + + def write_can_be_read(self, data): """ Written data can be read using ``read_share_chunk``. """ - storage_index, _, _ = self.create_upload() - data0 = self.http.result_of_with_flush( - self.mut_client.read_share_chunk(storage_index, 0, 1, 7) + lease_secret = urandom(32) + storage_index = urandom(16) + self.http.result_of_with_flush( + self.mut_client.read_test_write_chunks( + storage_index, + urandom(32), + lease_secret, + lease_secret, + { + 0: TestWriteVectors( + write_vectors=[WriteVector(offset=0, data=data)] + ), + }, + [], + ) ) - data1 = self.http.result_of_with_flush( - self.mut_client.read_share_chunk(storage_index, 1, 0, 8) + read_data = self.http.result_of_with_flush( + self.mut_client.read_share_chunk(storage_index, 0, 0, len(data)) ) - self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1")) + self.assertEqual(read_data, data) def test_read_before_write(self): """In combo read/test/write operation, reads happen before writes.""" @@ -1276,15 +1300,6 @@ class MutableHTTPAPIsTests(SyncTestCase): b"aXYZef-0", ) - def test_too_large_write(self): - """ - Writing too large of a chunk results in a REQUEST ENTITY TOO LARGE http - error. - """ - with self.assertRaises(ClientException) as e: - self.create_upload(b"0123456789" * 1024 * 1024) - self.assertEqual(e.exception.code, http.REQUEST_ENTITY_TOO_LARGE) - def test_list_shares(self): """``list_shares()`` returns the shares for a given storage index.""" storage_index, _, _ = self.create_upload() From 54da6eb60a35a53dc981eca3f5172c5fae6faf38 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 21 Dec 2022 09:34:25 -0500 Subject: [PATCH 6/7] Remove unneeded imports. --- src/allmydata/storage/http_server.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index d76948d93..47fac879f 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -10,9 +10,7 @@ from base64 import b64decode import binascii from tempfile import TemporaryFile from os import SEEK_END, SEEK_SET -from io import BytesIO import mmap -import sys from cryptography.x509 import Certificate as CryptoCertificate from zope.interface import implementer From d1b464d0d871b9c09b0f0312136a33bbc08239df Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 21 Dec 2022 09:35:10 -0500 Subject: [PATCH 7/7] Writing large files can involve many writes. --- src/allmydata/storage/http_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 47fac879f..6d22c92df 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -280,7 +280,7 @@ _SCHEMAS = { "test-write-vectors": { 0*256 share_number : { "test": [0*30 {"offset": uint, "size": uint, "specimen": bstr}] - "write": [0*30 {"offset": uint, "data": bstr}] + "write": [* {"offset": uint, "data": bstr}] "new-length": uint / null } }