mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
Merge branch '3956-mutable-uploads' into 3957-mutable-over-http-speed
This commit is contained in:
commit
3de5ebde48
0
newsfragments/3956.minor
Normal file
0
newsfragments/3956.minor
Normal file
3
setup.py
3
setup.py
@ -139,7 +139,8 @@ install_requires = [
|
||||
"werkzeug != 2.2.0",
|
||||
"treq",
|
||||
"cbor2",
|
||||
"pycddl >= 0.2",
|
||||
# Need 0.4 to be able to pass in mmap()
|
||||
"pycddl >= 0.4",
|
||||
|
||||
# for pid-file support
|
||||
"psutil",
|
||||
|
@ -9,6 +9,8 @@ from functools import wraps
|
||||
from base64 import b64decode
|
||||
import binascii
|
||||
from tempfile import TemporaryFile
|
||||
from os import SEEK_END, SEEK_SET
|
||||
import mmap
|
||||
|
||||
from cryptography.x509 import Certificate as CryptoCertificate
|
||||
from zope.interface import implementer
|
||||
@ -39,7 +41,7 @@ from cryptography.x509 import load_pem_x509_certificate
|
||||
|
||||
|
||||
# TODO Make sure to use pure Python versions?
|
||||
from cbor2 import dump, loads
|
||||
import cbor2
|
||||
from pycddl import Schema, ValidationError as CDDLValidationError
|
||||
from .server import StorageServer
|
||||
from .http_common import (
|
||||
@ -278,7 +280,7 @@ _SCHEMAS = {
|
||||
"test-write-vectors": {
|
||||
0*256 share_number : {
|
||||
"test": [0*30 {"offset": uint, "size": uint, "specimen": bstr}]
|
||||
"write": [0*30 {"offset": uint, "data": bstr}]
|
||||
"write": [* {"offset": uint, "data": bstr}]
|
||||
"new-length": uint / null
|
||||
}
|
||||
}
|
||||
@ -515,7 +517,7 @@ class HTTPServer(object):
|
||||
if accept.best == CBOR_MIME_TYPE:
|
||||
request.setHeader("Content-Type", CBOR_MIME_TYPE)
|
||||
f = TemporaryFile()
|
||||
dump(data, f)
|
||||
cbor2.dump(data, f)
|
||||
|
||||
def read_data(offset: int, length: int) -> bytes:
|
||||
f.seek(offset)
|
||||
@ -527,27 +529,47 @@ class HTTPServer(object):
|
||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
|
||||
raise _HTTPError(http.NOT_ACCEPTABLE)
|
||||
|
||||
def _read_encoded(self, request, schema: Schema) -> Any:
|
||||
def _read_encoded(
|
||||
self, request, schema: Schema, max_size: int = 1024 * 1024
|
||||
) -> Any:
|
||||
"""
|
||||
Read encoded request body data, decoding it with CBOR by default.
|
||||
|
||||
Somewhat arbitrarily, limit body size to 1MB; this may be too low, we
|
||||
may want to customize per query type, but this is the starting point
|
||||
for now.
|
||||
Somewhat arbitrarily, limit body size to 1MiB by default.
|
||||
"""
|
||||
content_type = get_content_type(request.requestHeaders)
|
||||
if content_type == CBOR_MIME_TYPE:
|
||||
# Read 1 byte more than 1MB. We expect length to be 1MB or
|
||||
# less; if it's more assume it's not a legitimate message.
|
||||
message = request.content.read(1024 * 1024 + 1)
|
||||
if len(message) > 1024 * 1024:
|
||||
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
|
||||
schema.validate_cbor(message)
|
||||
result = loads(message)
|
||||
return result
|
||||
else:
|
||||
if content_type != CBOR_MIME_TYPE:
|
||||
raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE)
|
||||
|
||||
# Make sure it's not too large:
|
||||
request.content.seek(SEEK_END, 0)
|
||||
if request.content.tell() > max_size:
|
||||
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
|
||||
request.content.seek(SEEK_SET, 0)
|
||||
|
||||
# We don't want to load the whole message into memory, cause it might
|
||||
# be quite large. The CDDL validator takes a read-only bytes-like
|
||||
# thing. Luckily, for large request bodies twisted.web will buffer the
|
||||
# data in a file, so we can use mmap() to get a memory view. The CDDL
|
||||
# validator will not make a copy, so it won't increase memory usage
|
||||
# beyond that.
|
||||
try:
|
||||
fd = request.content.fileno()
|
||||
except (ValueError, OSError):
|
||||
fd = -1
|
||||
if fd > 0:
|
||||
# It's a file, so we can use mmap() to save memory.
|
||||
message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
|
||||
else:
|
||||
message = request.content.read()
|
||||
schema.validate_cbor(message)
|
||||
|
||||
# The CBOR parser will allocate more memory, but at least we can feed
|
||||
# it the file-like object, so that if it's large it won't be make two
|
||||
# copies.
|
||||
request.content.seek(SEEK_SET, 0)
|
||||
return cbor2.load(request.content)
|
||||
|
||||
##### Generic APIs #####
|
||||
|
||||
@_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"])
|
||||
@ -746,7 +768,9 @@ class HTTPServer(object):
|
||||
)
|
||||
def mutable_read_test_write(self, request, authorization, storage_index):
|
||||
"""Read/test/write combined operation for mutables."""
|
||||
rtw_request = self._read_encoded(request, _SCHEMAS["mutable_read_test_write"])
|
||||
rtw_request = self._read_encoded(
|
||||
request, _SCHEMAS["mutable_read_test_write"], max_size=2**48
|
||||
)
|
||||
secrets = (
|
||||
authorization[Secrets.WRITE_ENABLER],
|
||||
authorization[Secrets.LEASE_RENEW],
|
||||
|
@ -1201,18 +1201,42 @@ class MutableHTTPAPIsTests(SyncTestCase):
|
||||
)
|
||||
return storage_index, write_secret, lease_secret
|
||||
|
||||
def test_write_can_be_read(self):
|
||||
def test_write_can_be_read_small_data(self):
|
||||
"""
|
||||
Small written data can be read using ``read_share_chunk``.
|
||||
"""
|
||||
self.write_can_be_read(b"abcdef")
|
||||
|
||||
def test_write_can_be_read_large_data(self):
|
||||
"""
|
||||
Large written data (50MB) can be read using ``read_share_chunk``.
|
||||
"""
|
||||
self.write_can_be_read(b"abcdefghij" * 5 * 1024 * 1024)
|
||||
|
||||
def write_can_be_read(self, data):
|
||||
"""
|
||||
Written data can be read using ``read_share_chunk``.
|
||||
"""
|
||||
storage_index, _, _ = self.create_upload()
|
||||
data0 = self.http.result_of_with_flush(
|
||||
self.mut_client.read_share_chunk(storage_index, 0, 1, 7)
|
||||
lease_secret = urandom(32)
|
||||
storage_index = urandom(16)
|
||||
self.http.result_of_with_flush(
|
||||
self.mut_client.read_test_write_chunks(
|
||||
storage_index,
|
||||
urandom(32),
|
||||
lease_secret,
|
||||
lease_secret,
|
||||
{
|
||||
0: TestWriteVectors(
|
||||
write_vectors=[WriteVector(offset=0, data=data)]
|
||||
),
|
||||
},
|
||||
[],
|
||||
)
|
||||
)
|
||||
data1 = self.http.result_of_with_flush(
|
||||
self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
|
||||
read_data = self.http.result_of_with_flush(
|
||||
self.mut_client.read_share_chunk(storage_index, 0, 0, len(data))
|
||||
)
|
||||
self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1"))
|
||||
self.assertEqual(read_data, data)
|
||||
|
||||
def test_read_before_write(self):
|
||||
"""In combo read/test/write operation, reads happen before writes."""
|
||||
@ -1291,15 +1315,6 @@ class MutableHTTPAPIsTests(SyncTestCase):
|
||||
b"aXYZef-0",
|
||||
)
|
||||
|
||||
def test_too_large_write(self):
|
||||
"""
|
||||
Writing too large of a chunk results in a REQUEST ENTITY TOO LARGE http
|
||||
error.
|
||||
"""
|
||||
with self.assertRaises(ClientException) as e:
|
||||
self.create_upload(b"0123456789" * 1024 * 1024)
|
||||
self.assertEqual(e.exception.code, http.REQUEST_ENTITY_TOO_LARGE)
|
||||
|
||||
def test_list_shares(self):
|
||||
"""``list_shares()`` returns the shares for a given storage index."""
|
||||
storage_index, _, _ = self.create_upload()
|
||||
|
@ -34,7 +34,7 @@ from allmydata.util.encodingutil import quote_output, unicode_to_argv
|
||||
from allmydata.util.fileutil import abspath_expanduser_unicode
|
||||
from allmydata.util.consumer import MemoryConsumer, download_to_data
|
||||
from allmydata.interfaces import IDirectoryNode, IFileNode, \
|
||||
NoSuchChildError, NoSharesError
|
||||
NoSuchChildError, NoSharesError, SDMF_VERSION, MDMF_VERSION
|
||||
from allmydata.monitor import Monitor
|
||||
from allmydata.mutable.common import NotWriteableError
|
||||
from allmydata.mutable import layout as mutable_layout
|
||||
@ -477,9 +477,10 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
|
||||
|
||||
def _corrupt_mutable_share(self, filename, which):
|
||||
msf = MutableShareFile(filename)
|
||||
datav = msf.readv([ (0, 1000000) ])
|
||||
# Read more than share length:
|
||||
datav = msf.readv([ (0, 10_000_000) ])
|
||||
final_share = datav[0]
|
||||
assert len(final_share) < 1000000 # ought to be truncated
|
||||
assert len(final_share) < 10_000_000 # ought to be truncated
|
||||
pieces = mutable_layout.unpack_share(final_share)
|
||||
(seqnum, root_hash, IV, k, N, segsize, datalen,
|
||||
verification_key, signature, share_hash_chain, block_hash_tree,
|
||||
@ -519,12 +520,18 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
|
||||
msf.writev( [(0, final_share)], None)
|
||||
|
||||
|
||||
def test_mutable(self):
|
||||
def test_mutable_sdmf(self):
|
||||
return self._test_mutable(SDMF_VERSION)
|
||||
|
||||
def test_mutable_mdmf(self):
|
||||
return self._test_mutable(MDMF_VERSION)
|
||||
|
||||
def _test_mutable(self, mutable_version):
|
||||
DATA = b"initial contents go here." # 25 bytes % 3 != 0
|
||||
DATA_uploadable = MutableData(DATA)
|
||||
NEWDATA = b"new contents yay"
|
||||
NEWDATA_uploadable = MutableData(NEWDATA)
|
||||
NEWERDATA = b"this is getting old"
|
||||
NEWERDATA = b"this is getting old" * 1_000_000
|
||||
NEWERDATA_uploadable = MutableData(NEWERDATA)
|
||||
|
||||
d = self.set_up_nodes()
|
||||
@ -532,7 +539,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
|
||||
def _create_mutable(res):
|
||||
c = self.clients[0]
|
||||
log.msg("starting create_mutable_file")
|
||||
d1 = c.create_mutable_file(DATA_uploadable)
|
||||
d1 = c.create_mutable_file(DATA_uploadable, mutable_version)
|
||||
def _done(res):
|
||||
log.msg("DONE: %s" % (res,))
|
||||
self._mutable_node_1 = res
|
||||
@ -554,27 +561,33 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
|
||||
filename)
|
||||
self.failUnlessEqual(rc, 0)
|
||||
try:
|
||||
share_type = 'SDMF' if mutable_version == SDMF_VERSION else 'MDMF'
|
||||
self.failUnless("Mutable slot found:\n" in output)
|
||||
self.failUnless("share_type: SDMF\n" in output)
|
||||
self.assertIn(f"share_type: {share_type}\n", output)
|
||||
peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
|
||||
self.failUnless(" WE for nodeid: %s\n" % peerid in output)
|
||||
self.failUnless(" num_extra_leases: 0\n" in output)
|
||||
self.failUnless(" secrets are for nodeid: %s\n" % peerid
|
||||
in output)
|
||||
self.failUnless(" SDMF contents:\n" in output)
|
||||
self.failUnless(f" {share_type} contents:\n" in output)
|
||||
self.failUnless(" seqnum: 1\n" in output)
|
||||
self.failUnless(" required_shares: 3\n" in output)
|
||||
self.failUnless(" total_shares: 10\n" in output)
|
||||
self.failUnless(" segsize: 27\n" in output, (output, filename))
|
||||
if mutable_version == SDMF_VERSION:
|
||||
self.failUnless(" segsize: 27\n" in output, (output, filename))
|
||||
self.failUnless(" datalen: 25\n" in output)
|
||||
# the exact share_hash_chain nodes depends upon the sharenum,
|
||||
# and is more of a hassle to compute than I want to deal with
|
||||
# now
|
||||
self.failUnless(" share_hash_chain: " in output)
|
||||
self.failUnless(" block_hash_tree: 1 nodes\n" in output)
|
||||
expected = (" verify-cap: URI:SSK-Verifier:%s:" %
|
||||
str(base32.b2a(storage_index), "ascii"))
|
||||
self.failUnless(expected in output)
|
||||
if mutable_version == SDMF_VERSION:
|
||||
expected = (" verify-cap: URI:SSK-Verifier:%s:" %
|
||||
str(base32.b2a(storage_index), "ascii"))
|
||||
else:
|
||||
expected = (" verify-cap: URI:MDMF-Verifier:%s" %
|
||||
str(base32.b2a(storage_index), "ascii"))
|
||||
self.assertIn(expected, output)
|
||||
except unittest.FailTest:
|
||||
print()
|
||||
print("dump-share output was:")
|
||||
@ -694,7 +707,10 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
|
||||
# when we retrieve this, we should get three signature
|
||||
# failures (where we've mangled seqnum, R, and segsize). The
|
||||
# pubkey mangling
|
||||
d.addCallback(_corrupt_shares)
|
||||
|
||||
if mutable_version == SDMF_VERSION:
|
||||
# TODO Corrupting shares in test_systm doesn't work for MDMF right now
|
||||
d.addCallback(_corrupt_shares)
|
||||
|
||||
d.addCallback(lambda res: self._newnode3.download_best_version())
|
||||
d.addCallback(_check_download_5)
|
||||
@ -702,7 +718,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
|
||||
def _check_empty_file(res):
|
||||
# make sure we can create empty files, this usually screws up the
|
||||
# segsize math
|
||||
d1 = self.clients[2].create_mutable_file(MutableData(b""))
|
||||
d1 = self.clients[2].create_mutable_file(MutableData(b""), mutable_version)
|
||||
d1.addCallback(lambda newnode: newnode.download_best_version())
|
||||
d1.addCallback(lambda res: self.failUnlessEqual(b"", res))
|
||||
return d1
|
||||
|
Loading…
x
Reference in New Issue
Block a user