Merge pull request #1244 from tahoe-lafs/3956-mutable-uploads

Fix mutable uploads over HTTP above a certain size

Fixes ticket:3956
This commit is contained in:
Itamar Turner-Trauring 2023-01-10 15:53:38 -05:00 committed by GitHub
commit 7ef1c02067
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 125 additions and 49 deletions

0
newsfragments/3956.minor Normal file
View File

View File

@ -139,7 +139,11 @@ install_requires = [
"werkzeug != 2.2.0",
"treq",
"cbor2",
"pycddl >= 0.2",
# Ideally we want 0.4+ to be able to pass in mmap(), but it's not strictly
# necessary yet until we fix the workaround to
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3963 in
# allmydata.storage.http_server.
"pycddl",
# for pid-file support
"psutil",

View File

@ -9,6 +9,9 @@ from functools import wraps
from base64 import b64decode
import binascii
from tempfile import TemporaryFile
from os import SEEK_END, SEEK_SET
import mmap
from importlib.metadata import version as get_package_version, PackageNotFoundError
from cryptography.x509 import Certificate as CryptoCertificate
from zope.interface import implementer
@ -39,7 +42,7 @@ from cryptography.x509 import load_pem_x509_certificate
# TODO Make sure to use pure Python versions?
from cbor2 import dump, loads
import cbor2
from pycddl import Schema, ValidationError as CDDLValidationError
from .server import StorageServer
from .http_common import (
@ -57,6 +60,20 @@ from ..util.base32 import rfc3548_alphabet
from allmydata.interfaces import BadWriteEnablerError
# Until we figure out Nix (https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3963),
# need to support old pycddl which can only take bytes:
from distutils.version import LooseVersion
try:
PYCDDL_BYTES_ONLY = LooseVersion(get_package_version("pycddl")) < LooseVersion(
"0.4"
)
except PackageNotFoundError:
# This can happen when building PyInstaller distribution. We'll just assume
# you installed a modern pycddl, cause why wouldn't you?
PYCDDL_BYTES_ONLY = False
class ClientSecretsException(Exception):
"""The client did not send the appropriate secrets."""
@ -278,7 +295,7 @@ _SCHEMAS = {
"test-write-vectors": {
0*256 share_number : {
"test": [0*30 {"offset": uint, "size": uint, "specimen": bstr}]
"write": [0*30 {"offset": uint, "data": bstr}]
"write": [* {"offset": uint, "data": bstr}]
"new-length": uint / null
}
}
@ -515,7 +532,7 @@ class HTTPServer(object):
if accept.best == CBOR_MIME_TYPE:
request.setHeader("Content-Type", CBOR_MIME_TYPE)
f = TemporaryFile()
dump(data, f)
cbor2.dump(data, f)
def read_data(offset: int, length: int) -> bytes:
f.seek(offset)
@ -527,27 +544,47 @@ class HTTPServer(object):
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
raise _HTTPError(http.NOT_ACCEPTABLE)
def _read_encoded(self, request, schema: Schema) -> Any:
def _read_encoded(
self, request, schema: Schema, max_size: int = 1024 * 1024
) -> Any:
"""
Read encoded request body data, decoding it with CBOR by default.
Somewhat arbitrarily, limit body size to 1MB; this may be too low, we
may want to customize per query type, but this is the starting point
for now.
Somewhat arbitrarily, limit body size to 1MiB by default.
"""
content_type = get_content_type(request.requestHeaders)
if content_type == CBOR_MIME_TYPE:
# Read 1 byte more than 1MB. We expect length to be 1MB or
# less; if it's more assume it's not a legitimate message.
message = request.content.read(1024 * 1024 + 1)
if len(message) > 1024 * 1024:
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
schema.validate_cbor(message)
result = loads(message)
return result
else:
if content_type != CBOR_MIME_TYPE:
raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE)
# Make sure it's not too large:
request.content.seek(SEEK_END, 0)
if request.content.tell() > max_size:
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
request.content.seek(SEEK_SET, 0)
# We don't want to load the whole message into memory, cause it might
# be quite large. The CDDL validator takes a read-only bytes-like
# thing. Luckily, for large request bodies twisted.web will buffer the
# data in a file, so we can use mmap() to get a memory view. The CDDL
# validator will not make a copy, so it won't increase memory usage
# beyond that.
try:
fd = request.content.fileno()
except (ValueError, OSError):
fd = -1
if fd >= 0 and not PYCDDL_BYTES_ONLY:
# It's a file, so we can use mmap() to save memory.
message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
else:
message = request.content.read()
schema.validate_cbor(message)
# The CBOR parser will allocate more memory, but at least we can feed
# it the file-like object, so that if it's large it won't be make two
# copies.
request.content.seek(SEEK_SET, 0)
return cbor2.load(request.content)
##### Generic APIs #####
@_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"])
@ -746,7 +783,9 @@ class HTTPServer(object):
)
def mutable_read_test_write(self, request, authorization, storage_index):
"""Read/test/write combined operation for mutables."""
rtw_request = self._read_encoded(request, _SCHEMAS["mutable_read_test_write"])
rtw_request = self._read_encoded(
request, _SCHEMAS["mutable_read_test_write"], max_size=2**48
)
secrets = (
authorization[Secrets.WRITE_ENABLER],
authorization[Secrets.LEASE_RENEW],

View File

@ -1201,18 +1201,42 @@ class MutableHTTPAPIsTests(SyncTestCase):
)
return storage_index, write_secret, lease_secret
def test_write_can_be_read(self):
def test_write_can_be_read_small_data(self):
"""
Small written data can be read using ``read_share_chunk``.
"""
self.write_can_be_read(b"abcdef")
def test_write_can_be_read_large_data(self):
"""
Large written data (50MB) can be read using ``read_share_chunk``.
"""
self.write_can_be_read(b"abcdefghij" * 5 * 1024 * 1024)
def write_can_be_read(self, data):
"""
Written data can be read using ``read_share_chunk``.
"""
storage_index, _, _ = self.create_upload()
data0 = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 0, 1, 7)
lease_secret = urandom(32)
storage_index = urandom(16)
self.http.result_of_with_flush(
self.mut_client.read_test_write_chunks(
storage_index,
urandom(32),
lease_secret,
lease_secret,
{
0: TestWriteVectors(
write_vectors=[WriteVector(offset=0, data=data)]
),
},
[],
)
)
data1 = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
read_data = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 0, 0, len(data))
)
self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1"))
self.assertEqual(read_data, data)
def test_read_before_write(self):
"""In combo read/test/write operation, reads happen before writes."""
@ -1291,15 +1315,6 @@ class MutableHTTPAPIsTests(SyncTestCase):
b"aXYZef-0",
)
def test_too_large_write(self):
"""
Writing too large of a chunk results in a REQUEST ENTITY TOO LARGE http
error.
"""
with self.assertRaises(ClientException) as e:
self.create_upload(b"0123456789" * 1024 * 1024)
self.assertEqual(e.exception.code, http.REQUEST_ENTITY_TOO_LARGE)
def test_list_shares(self):
"""``list_shares()`` returns the shares for a given storage index."""
storage_index, _, _ = self.create_upload()

View File

@ -34,7 +34,7 @@ from allmydata.util.encodingutil import quote_output, unicode_to_argv
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.consumer import MemoryConsumer, download_to_data
from allmydata.interfaces import IDirectoryNode, IFileNode, \
NoSuchChildError, NoSharesError
NoSuchChildError, NoSharesError, SDMF_VERSION, MDMF_VERSION
from allmydata.monitor import Monitor
from allmydata.mutable.common import NotWriteableError
from allmydata.mutable import layout as mutable_layout
@ -477,9 +477,10 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
def _corrupt_mutable_share(self, filename, which):
msf = MutableShareFile(filename)
datav = msf.readv([ (0, 1000000) ])
# Read more than share length:
datav = msf.readv([ (0, 10_000_000) ])
final_share = datav[0]
assert len(final_share) < 1000000 # ought to be truncated
assert len(final_share) < 10_000_000 # ought to be truncated
pieces = mutable_layout.unpack_share(final_share)
(seqnum, root_hash, IV, k, N, segsize, datalen,
verification_key, signature, share_hash_chain, block_hash_tree,
@ -519,12 +520,20 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
msf.writev( [(0, final_share)], None)
def test_mutable(self):
def test_mutable_sdmf(self):
"""SDMF mutables can be uploaded, downloaded, and many other things."""
return self._test_mutable(SDMF_VERSION)
def test_mutable_mdmf(self):
"""MDMF mutables can be uploaded, downloaded, and many other things."""
return self._test_mutable(MDMF_VERSION)
def _test_mutable(self, mutable_version):
DATA = b"initial contents go here." # 25 bytes % 3 != 0
DATA_uploadable = MutableData(DATA)
NEWDATA = b"new contents yay"
NEWDATA_uploadable = MutableData(NEWDATA)
NEWERDATA = b"this is getting old"
NEWERDATA = b"this is getting old" * 1_000_000
NEWERDATA_uploadable = MutableData(NEWERDATA)
d = self.set_up_nodes()
@ -532,7 +541,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
def _create_mutable(res):
c = self.clients[0]
log.msg("starting create_mutable_file")
d1 = c.create_mutable_file(DATA_uploadable)
d1 = c.create_mutable_file(DATA_uploadable, mutable_version)
def _done(res):
log.msg("DONE: %s" % (res,))
self._mutable_node_1 = res
@ -554,27 +563,33 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
filename)
self.failUnlessEqual(rc, 0)
try:
share_type = 'SDMF' if mutable_version == SDMF_VERSION else 'MDMF'
self.failUnless("Mutable slot found:\n" in output)
self.failUnless("share_type: SDMF\n" in output)
self.assertIn(f"share_type: {share_type}\n", output)
peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
self.failUnless(" WE for nodeid: %s\n" % peerid in output)
self.failUnless(" num_extra_leases: 0\n" in output)
self.failUnless(" secrets are for nodeid: %s\n" % peerid
in output)
self.failUnless(" SDMF contents:\n" in output)
self.failUnless(f" {share_type} contents:\n" in output)
self.failUnless(" seqnum: 1\n" in output)
self.failUnless(" required_shares: 3\n" in output)
self.failUnless(" total_shares: 10\n" in output)
self.failUnless(" segsize: 27\n" in output, (output, filename))
if mutable_version == SDMF_VERSION:
self.failUnless(" segsize: 27\n" in output, (output, filename))
self.failUnless(" datalen: 25\n" in output)
# the exact share_hash_chain nodes depends upon the sharenum,
# and is more of a hassle to compute than I want to deal with
# now
self.failUnless(" share_hash_chain: " in output)
self.failUnless(" block_hash_tree: 1 nodes\n" in output)
expected = (" verify-cap: URI:SSK-Verifier:%s:" %
str(base32.b2a(storage_index), "ascii"))
self.failUnless(expected in output)
if mutable_version == SDMF_VERSION:
expected = (" verify-cap: URI:SSK-Verifier:%s:" %
str(base32.b2a(storage_index), "ascii"))
else:
expected = (" verify-cap: URI:MDMF-Verifier:%s" %
str(base32.b2a(storage_index), "ascii"))
self.assertIn(expected, output)
except unittest.FailTest:
print()
print("dump-share output was:")
@ -694,7 +709,10 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
# when we retrieve this, we should get three signature
# failures (where we've mangled seqnum, R, and segsize). The
# pubkey mangling
d.addCallback(_corrupt_shares)
if mutable_version == SDMF_VERSION:
# TODO Corrupting shares in test_systm doesn't work for MDMF right now
d.addCallback(_corrupt_shares)
d.addCallback(lambda res: self._newnode3.download_best_version())
d.addCallback(_check_download_5)
@ -702,7 +720,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
def _check_empty_file(res):
# make sure we can create empty files, this usually screws up the
# segsize math
d1 = self.clients[2].create_mutable_file(MutableData(b""))
d1 = self.clients[2].create_mutable_file(MutableData(b""), mutable_version)
d1.addCallback(lambda newnode: newnode.download_best_version())
d1.addCallback(lambda res: self.failUnlessEqual(b"", res))
return d1