diff --git a/newsfragments/3956.minor b/newsfragments/3956.minor new file mode 100644 index 000000000..e69de29bb diff --git a/setup.py b/setup.py index edef7a4c3..3681dc441 100644 --- a/setup.py +++ b/setup.py @@ -139,7 +139,11 @@ install_requires = [ "werkzeug != 2.2.0", "treq", "cbor2", - "pycddl >= 0.2", + # Ideally we want 0.4+ to be able to pass in mmap(), but it's not strictly + # necessary yet until we fix the workaround to + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3963 in + # allmydata.storage.http_server. + "pycddl", # for pid-file support "psutil", diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 96a491882..387353d24 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -9,6 +9,9 @@ from functools import wraps from base64 import b64decode import binascii from tempfile import TemporaryFile +from os import SEEK_END, SEEK_SET +import mmap +from importlib.metadata import version as get_package_version, PackageNotFoundError from cryptography.x509 import Certificate as CryptoCertificate from zope.interface import implementer @@ -39,7 +42,7 @@ from cryptography.x509 import load_pem_x509_certificate # TODO Make sure to use pure Python versions? -from cbor2 import dump, loads +import cbor2 from pycddl import Schema, ValidationError as CDDLValidationError from .server import StorageServer from .http_common import ( @@ -57,6 +60,20 @@ from ..util.base32 import rfc3548_alphabet from allmydata.interfaces import BadWriteEnablerError +# Until we figure out Nix (https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3963), +# need to support old pycddl which can only take bytes: +from distutils.version import LooseVersion + +try: + PYCDDL_BYTES_ONLY = LooseVersion(get_package_version("pycddl")) < LooseVersion( + "0.4" + ) +except PackageNotFoundError: + # This can happen when building PyInstaller distribution. We'll just assume + # you installed a modern pycddl, cause why wouldn't you? + PYCDDL_BYTES_ONLY = False + + class ClientSecretsException(Exception): """The client did not send the appropriate secrets.""" @@ -278,7 +295,7 @@ _SCHEMAS = { "test-write-vectors": { 0*256 share_number : { "test": [0*30 {"offset": uint, "size": uint, "specimen": bstr}] - "write": [0*30 {"offset": uint, "data": bstr}] + "write": [* {"offset": uint, "data": bstr}] "new-length": uint / null } } @@ -515,7 +532,7 @@ class HTTPServer(object): if accept.best == CBOR_MIME_TYPE: request.setHeader("Content-Type", CBOR_MIME_TYPE) f = TemporaryFile() - dump(data, f) + cbor2.dump(data, f) def read_data(offset: int, length: int) -> bytes: f.seek(offset) @@ -527,27 +544,47 @@ class HTTPServer(object): # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 raise _HTTPError(http.NOT_ACCEPTABLE) - def _read_encoded(self, request, schema: Schema) -> Any: + def _read_encoded( + self, request, schema: Schema, max_size: int = 1024 * 1024 + ) -> Any: """ Read encoded request body data, decoding it with CBOR by default. - Somewhat arbitrarily, limit body size to 1MB; this may be too low, we - may want to customize per query type, but this is the starting point - for now. + Somewhat arbitrarily, limit body size to 1MiB by default. """ content_type = get_content_type(request.requestHeaders) - if content_type == CBOR_MIME_TYPE: - # Read 1 byte more than 1MB. We expect length to be 1MB or - # less; if it's more assume it's not a legitimate message. - message = request.content.read(1024 * 1024 + 1) - if len(message) > 1024 * 1024: - raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE) - schema.validate_cbor(message) - result = loads(message) - return result - else: + if content_type != CBOR_MIME_TYPE: raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE) + # Make sure it's not too large: + request.content.seek(SEEK_END, 0) + if request.content.tell() > max_size: + raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE) + request.content.seek(SEEK_SET, 0) + + # We don't want to load the whole message into memory, cause it might + # be quite large. The CDDL validator takes a read-only bytes-like + # thing. Luckily, for large request bodies twisted.web will buffer the + # data in a file, so we can use mmap() to get a memory view. The CDDL + # validator will not make a copy, so it won't increase memory usage + # beyond that. + try: + fd = request.content.fileno() + except (ValueError, OSError): + fd = -1 + if fd >= 0 and not PYCDDL_BYTES_ONLY: + # It's a file, so we can use mmap() to save memory. + message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ) + else: + message = request.content.read() + schema.validate_cbor(message) + + # The CBOR parser will allocate more memory, but at least we can feed + # it the file-like object, so that if it's large it won't be make two + # copies. + request.content.seek(SEEK_SET, 0) + return cbor2.load(request.content) + ##### Generic APIs ##### @_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"]) @@ -746,7 +783,9 @@ class HTTPServer(object): ) def mutable_read_test_write(self, request, authorization, storage_index): """Read/test/write combined operation for mutables.""" - rtw_request = self._read_encoded(request, _SCHEMAS["mutable_read_test_write"]) + rtw_request = self._read_encoded( + request, _SCHEMAS["mutable_read_test_write"], max_size=2**48 + ) secrets = ( authorization[Secrets.WRITE_ENABLER], authorization[Secrets.LEASE_RENEW], diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index de60812e3..55754b29b 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -1201,18 +1201,42 @@ class MutableHTTPAPIsTests(SyncTestCase): ) return storage_index, write_secret, lease_secret - def test_write_can_be_read(self): + def test_write_can_be_read_small_data(self): + """ + Small written data can be read using ``read_share_chunk``. + """ + self.write_can_be_read(b"abcdef") + + def test_write_can_be_read_large_data(self): + """ + Large written data (50MB) can be read using ``read_share_chunk``. + """ + self.write_can_be_read(b"abcdefghij" * 5 * 1024 * 1024) + + def write_can_be_read(self, data): """ Written data can be read using ``read_share_chunk``. """ - storage_index, _, _ = self.create_upload() - data0 = self.http.result_of_with_flush( - self.mut_client.read_share_chunk(storage_index, 0, 1, 7) + lease_secret = urandom(32) + storage_index = urandom(16) + self.http.result_of_with_flush( + self.mut_client.read_test_write_chunks( + storage_index, + urandom(32), + lease_secret, + lease_secret, + { + 0: TestWriteVectors( + write_vectors=[WriteVector(offset=0, data=data)] + ), + }, + [], + ) ) - data1 = self.http.result_of_with_flush( - self.mut_client.read_share_chunk(storage_index, 1, 0, 8) + read_data = self.http.result_of_with_flush( + self.mut_client.read_share_chunk(storage_index, 0, 0, len(data)) ) - self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1")) + self.assertEqual(read_data, data) def test_read_before_write(self): """In combo read/test/write operation, reads happen before writes.""" @@ -1291,15 +1315,6 @@ class MutableHTTPAPIsTests(SyncTestCase): b"aXYZef-0", ) - def test_too_large_write(self): - """ - Writing too large of a chunk results in a REQUEST ENTITY TOO LARGE http - error. - """ - with self.assertRaises(ClientException) as e: - self.create_upload(b"0123456789" * 1024 * 1024) - self.assertEqual(e.exception.code, http.REQUEST_ENTITY_TOO_LARGE) - def test_list_shares(self): """``list_shares()`` returns the shares for a given storage index.""" storage_index, _, _ = self.create_upload() diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 670ac5868..10a64c1fe 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -34,7 +34,7 @@ from allmydata.util.encodingutil import quote_output, unicode_to_argv from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.consumer import MemoryConsumer, download_to_data from allmydata.interfaces import IDirectoryNode, IFileNode, \ - NoSuchChildError, NoSharesError + NoSuchChildError, NoSharesError, SDMF_VERSION, MDMF_VERSION from allmydata.monitor import Monitor from allmydata.mutable.common import NotWriteableError from allmydata.mutable import layout as mutable_layout @@ -477,9 +477,10 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): def _corrupt_mutable_share(self, filename, which): msf = MutableShareFile(filename) - datav = msf.readv([ (0, 1000000) ]) + # Read more than share length: + datav = msf.readv([ (0, 10_000_000) ]) final_share = datav[0] - assert len(final_share) < 1000000 # ought to be truncated + assert len(final_share) < 10_000_000 # ought to be truncated pieces = mutable_layout.unpack_share(final_share) (seqnum, root_hash, IV, k, N, segsize, datalen, verification_key, signature, share_hash_chain, block_hash_tree, @@ -519,12 +520,20 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): msf.writev( [(0, final_share)], None) - def test_mutable(self): + def test_mutable_sdmf(self): + """SDMF mutables can be uploaded, downloaded, and many other things.""" + return self._test_mutable(SDMF_VERSION) + + def test_mutable_mdmf(self): + """MDMF mutables can be uploaded, downloaded, and many other things.""" + return self._test_mutable(MDMF_VERSION) + + def _test_mutable(self, mutable_version): DATA = b"initial contents go here." # 25 bytes % 3 != 0 DATA_uploadable = MutableData(DATA) NEWDATA = b"new contents yay" NEWDATA_uploadable = MutableData(NEWDATA) - NEWERDATA = b"this is getting old" + NEWERDATA = b"this is getting old" * 1_000_000 NEWERDATA_uploadable = MutableData(NEWERDATA) d = self.set_up_nodes() @@ -532,7 +541,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): def _create_mutable(res): c = self.clients[0] log.msg("starting create_mutable_file") - d1 = c.create_mutable_file(DATA_uploadable) + d1 = c.create_mutable_file(DATA_uploadable, mutable_version) def _done(res): log.msg("DONE: %s" % (res,)) self._mutable_node_1 = res @@ -554,27 +563,33 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): filename) self.failUnlessEqual(rc, 0) try: + share_type = 'SDMF' if mutable_version == SDMF_VERSION else 'MDMF' self.failUnless("Mutable slot found:\n" in output) - self.failUnless("share_type: SDMF\n" in output) + self.assertIn(f"share_type: {share_type}\n", output) peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid) self.failUnless(" WE for nodeid: %s\n" % peerid in output) self.failUnless(" num_extra_leases: 0\n" in output) self.failUnless(" secrets are for nodeid: %s\n" % peerid in output) - self.failUnless(" SDMF contents:\n" in output) + self.failUnless(f" {share_type} contents:\n" in output) self.failUnless(" seqnum: 1\n" in output) self.failUnless(" required_shares: 3\n" in output) self.failUnless(" total_shares: 10\n" in output) - self.failUnless(" segsize: 27\n" in output, (output, filename)) + if mutable_version == SDMF_VERSION: + self.failUnless(" segsize: 27\n" in output, (output, filename)) self.failUnless(" datalen: 25\n" in output) # the exact share_hash_chain nodes depends upon the sharenum, # and is more of a hassle to compute than I want to deal with # now self.failUnless(" share_hash_chain: " in output) self.failUnless(" block_hash_tree: 1 nodes\n" in output) - expected = (" verify-cap: URI:SSK-Verifier:%s:" % - str(base32.b2a(storage_index), "ascii")) - self.failUnless(expected in output) + if mutable_version == SDMF_VERSION: + expected = (" verify-cap: URI:SSK-Verifier:%s:" % + str(base32.b2a(storage_index), "ascii")) + else: + expected = (" verify-cap: URI:MDMF-Verifier:%s" % + str(base32.b2a(storage_index), "ascii")) + self.assertIn(expected, output) except unittest.FailTest: print() print("dump-share output was:") @@ -694,7 +709,10 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): # when we retrieve this, we should get three signature # failures (where we've mangled seqnum, R, and segsize). The # pubkey mangling - d.addCallback(_corrupt_shares) + + if mutable_version == SDMF_VERSION: + # TODO Corrupting shares in test_systm doesn't work for MDMF right now + d.addCallback(_corrupt_shares) d.addCallback(lambda res: self._newnode3.download_best_version()) d.addCallback(_check_download_5) @@ -702,7 +720,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): def _check_empty_file(res): # make sure we can create empty files, this usually screws up the # segsize math - d1 = self.clients[2].create_mutable_file(MutableData(b"")) + d1 = self.clients[2].create_mutable_file(MutableData(b""), mutable_version) d1.addCallback(lambda newnode: newnode.download_best_version()) d1.addCallback(lambda res: self.failUnlessEqual(b"", res)) return d1