Merge remote-tracking branch 'origin/master' into 3902-listen-storage-http

This commit is contained in:
Itamar Turner-Trauring 2022-07-20 14:48:26 -04:00
commit 5e0c32708b
4 changed files with 470 additions and 149 deletions

0
newsfragments/3872.minor Normal file
View File

View File

@ -4,8 +4,9 @@ HTTP client that talks to the HTTP storage server.
from __future__ import annotations
from typing import Union, Optional, Sequence, Mapping
from typing import Union, Optional, Sequence, Mapping, BinaryIO
from base64 import b64encode
from io import BytesIO
from attrs import define, asdict, frozen, field
@ -17,7 +18,7 @@ from werkzeug.datastructures import Range, ContentRange
from twisted.web.http_headers import Headers
from twisted.web import http
from twisted.web.iweb import IPolicyForHTTPS
from twisted.internet.defer import inlineCallbacks, returnValue, fail, Deferred
from twisted.internet.defer import inlineCallbacks, returnValue, fail, Deferred, succeed
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
from twisted.internet.ssl import CertificateOptions
from twisted.web.client import Agent, HTTPConnectionPool
@ -114,19 +115,58 @@ _SCHEMAS = {
}
@define
class _LengthLimitedCollector:
"""
Collect data using ``treq.collect()``, with limited length.
"""
remaining_length: int
f: BytesIO = field(factory=BytesIO)
def __call__(self, data: bytes):
self.remaining_length -= len(data)
if self.remaining_length < 0:
raise ValueError("Response length was too long")
self.f.write(data)
def limited_content(response, max_length: int = 30 * 1024 * 1024) -> Deferred[BinaryIO]:
"""
Like ``treq.content()``, but limit data read from the response to a set
length. If the response is longer than the max allowed length, the result
fails with a ``ValueError``.
A potentially useful future improvement would be using a temporary file to
store the content; since filesystem buffering means that would use memory
for small responses and disk for large responses.
"""
collector = _LengthLimitedCollector(max_length)
# Make really sure everything gets called in Deferred context, treq might
# call collector directly...
d = succeed(None)
d.addCallback(lambda _: treq.collect(response, collector))
def done(_):
collector.f.seek(0)
return collector.f
d.addCallback(done)
return d
def _decode_cbor(response, schema: Schema):
"""Given HTTP response, return decoded CBOR body."""
def got_content(data):
def got_content(f: BinaryIO):
data = f.read()
schema.validate_cbor(data)
return loads(data)
if response.code > 199 and response.code < 300:
content_type = get_content_type(response.headers)
if content_type == CBOR_MIME_TYPE:
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
return treq.content(response).addCallback(got_content)
return limited_content(response).addCallback(got_content)
else:
raise ClientException(-1, "Server didn't send CBOR")
else:
@ -295,7 +335,7 @@ class StorageClient(object):
write_enabler_secret=None,
headers=None,
message_to_serialize=None,
**kwargs
**kwargs,
):
"""
Like ``treq.request()``, but with optional secrets that get translated

View File

@ -3,23 +3,28 @@ HTTP server for storage.
"""
from __future__ import annotations
from typing import Dict, List, Set, Tuple, Any, Callable
from typing import Dict, List, Set, Tuple, Any, Callable, Union
from functools import wraps
from base64 import b64decode
import binascii
from tempfile import TemporaryFile
from zope.interface import implementer
from klein import Klein
from twisted.web import http
from twisted.internet.interfaces import IListeningPort, IStreamServerEndpoint
from twisted.internet.interfaces import (
IListeningPort,
IStreamServerEndpoint,
IPullProducer,
)
from twisted.internet.defer import Deferred
from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate
from twisted.web.server import Site
from twisted.web.server import Site, Request
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.filepath import FilePath
import attr
from attrs import define, field, Factory
from werkzeug.http import (
parse_range_header,
parse_content_range_header,
@ -32,7 +37,7 @@ from cryptography.x509 import load_pem_x509_certificate
# TODO Make sure to use pure Python versions?
from cbor2 import dumps, loads
from cbor2 import dump, loads
from pycddl import Schema, ValidationError as CDDLValidationError
from .server import StorageServer
from .http_common import (
@ -137,31 +142,31 @@ def _authorized_route(app, required_secrets, *route_args, **route_kwargs):
return decorator
@attr.s
@define
class StorageIndexUploads(object):
"""
In-progress upload to storage index.
"""
# Map share number to BucketWriter
shares = attr.ib(factory=dict) # type: Dict[int,BucketWriter]
shares: dict[int, BucketWriter] = Factory(dict)
# Map share number to the upload secret (different shares might have
# different upload secrets).
upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes]
upload_secrets: dict[int, bytes] = Factory(dict)
@attr.s
@define
class UploadsInProgress(object):
"""
Keep track of uploads for storage indexes.
"""
# Map storage index to corresponding uploads-in-progress
_uploads = attr.ib(type=Dict[bytes, StorageIndexUploads], factory=dict)
_uploads: dict[bytes, StorageIndexUploads] = Factory(dict)
# Map BucketWriter to (storage index, share number)
_bucketwriters = attr.ib(type=Dict[BucketWriter, Tuple[bytes, int]], factory=dict)
_bucketwriters: dict[BucketWriter, Tuple[bytes, int]] = Factory(dict)
def add_write_bucket(
self,
@ -245,11 +250,15 @@ class _HTTPError(Exception):
# Tags are of the form #6.nnn, where the number is documented at
# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258
# indicates a set.
#
# Somewhat arbitrary limits are set to reduce e.g. number of shares, number of
# vectors, etc.. These may need to be iterated on in future revisions of the
# code.
_SCHEMAS = {
"allocate_buckets": Schema(
"""
request = {
share-numbers: #6.258([* uint])
share-numbers: #6.258([*256 uint])
allocated-size: uint
}
"""
@ -265,13 +274,15 @@ _SCHEMAS = {
"""
request = {
"test-write-vectors": {
* share_number: {
"test": [* {"offset": uint, "size": uint, "specimen": bstr}]
"write": [* {"offset": uint, "data": bstr}]
; TODO Add length limit here, after
; https://github.com/anweiss/cddl/issues/128 is fixed
* share_number => {
"test": [*30 {"offset": uint, "size": uint, "specimen": bstr}]
"write": [*30 {"offset": uint, "data": bstr}]
"new-length": uint / null
}
}
"read-vector": [* {"offset": uint, "size": uint}]
"read-vector": [*30 {"offset": uint, "size": uint}]
}
share_number = uint
"""
@ -279,7 +290,116 @@ _SCHEMAS = {
}
def read_range(request, read_data: Callable[[int, int], bytes]) -> None:
# Callable that takes offset and length, returns the data at that range.
ReadData = Callable[[int, int], bytes]
@implementer(IPullProducer)
@define
class _ReadAllProducer:
"""
Producer that calls a read function repeatedly to read all the data, and
writes to a request.
"""
request: Request
read_data: ReadData
result: Deferred = Factory(Deferred)
start: int = field(default=0)
@classmethod
def produce_to(cls, request: Request, read_data: ReadData) -> Deferred:
"""
Create and register the producer, returning ``Deferred`` that should be
returned from a HTTP server endpoint.
"""
producer = cls(request, read_data)
request.registerProducer(producer, False)
return producer.result
def resumeProducing(self):
data = self.read_data(self.start, 65536)
if not data:
self.request.unregisterProducer()
d = self.result
del self.result
d.callback(b"")
return
self.request.write(data)
self.start += len(data)
def pauseProducing(self):
pass
def stopProducing(self):
pass
@implementer(IPullProducer)
@define
class _ReadRangeProducer:
"""
Producer that calls a read function to read a range of data, and writes to
a request.
"""
request: Request
read_data: ReadData
result: Deferred
start: int
remaining: int
first_read: bool = field(default=True)
def resumeProducing(self):
to_read = min(self.remaining, 65536)
data = self.read_data(self.start, to_read)
assert len(data) <= to_read
if self.first_read and self.remaining > 0:
# For empty bodies the content-range header makes no sense since
# the end of the range is inclusive.
#
# TODO this is wrong for requests that go beyond the end of the
# share. This will be fixed in
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3907 by making that
# edge case not happen.
self.request.setHeader(
"content-range",
ContentRange(
"bytes", self.start, self.start + self.remaining
).to_header(),
)
self.first_read = False
if not data and self.remaining > 0:
# TODO Either data is missing locally (storage issue?) or a bug,
# abort response with error? Until
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3907 is implemented
# we continue anyway.
pass
self.start += len(data)
self.remaining -= len(data)
assert self.remaining >= 0
self.request.write(data)
# TODO remove the second clause in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3907
if self.remaining == 0 or not data:
self.request.unregisterProducer()
d = self.result
del self.result
d.callback(b"")
return
def pauseProducing(self):
pass
def stopProducing(self):
pass
def read_range(request: Request, read_data: ReadData) -> Union[Deferred, bytes]:
"""
Read an optional ``Range`` header, reads data appropriately via the given
callable, writes the data to the request.
@ -294,18 +414,17 @@ def read_range(request, read_data: Callable[[int, int], bytes]) -> None:
The resulting data is written to the request.
"""
def read_data_with_error_handling(offset: int, length: int) -> bytes:
try:
return read_data(offset, length)
except _HTTPError as e:
request.setResponseCode(e.code)
# Empty read means we're done.
return b""
if request.getHeader("range") is None:
# Return the whole thing.
start = 0
while True:
# TODO should probably yield to event loop occasionally...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = read_data(start, start + 65536)
if not data:
request.finish()
return
request.write(data)
start += len(data)
return _ReadAllProducer.produce_to(request, read_data_with_error_handling)
range_header = parse_range_header(request.getHeader("range"))
if (
@ -316,22 +435,18 @@ def read_range(request, read_data: Callable[[int, int], bytes]) -> None:
):
raise _HTTPError(http.REQUESTED_RANGE_NOT_SATISFIABLE)
# TODO if end is beyond the end of the share, either return error, or maybe
# just return what we can...
offset, end = range_header.ranges[0]
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = read_data(offset, end - offset)
request.setResponseCode(http.PARTIAL_CONTENT)
if len(data):
# For empty bodies the content-range header makes no sense since
# the end of the range is inclusive.
request.setHeader(
"content-range",
ContentRange("bytes", offset, offset + len(data)).to_header(),
)
request.write(data)
request.finish()
d = Deferred()
request.registerProducer(
_ReadRangeProducer(
request, read_data_with_error_handling, d, offset, end - offset
),
False,
)
return d
class HTTPServer(object):
@ -385,9 +500,14 @@ class HTTPServer(object):
accept = parse_accept_header(accept_headers[0])
if accept.best == CBOR_MIME_TYPE:
request.setHeader("Content-Type", CBOR_MIME_TYPE)
# TODO if data is big, maybe want to use a temporary file eventually...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
return dumps(data)
f = TemporaryFile()
dump(data, f)
def read_data(offset: int, length: int) -> bytes:
f.seek(offset)
return f.read(length)
return _ReadAllProducer.produce_to(request, read_data)
else:
# TODO Might want to optionally send JSON someday:
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
@ -396,12 +516,18 @@ class HTTPServer(object):
def _read_encoded(self, request, schema: Schema) -> Any:
"""
Read encoded request body data, decoding it with CBOR by default.
Somewhat arbitrarily, limit body size to 1MB; this may be too low, we
may want to customize per query type, but this is the starting point
for now.
"""
content_type = get_content_type(request.requestHeaders)
if content_type == CBOR_MIME_TYPE:
# TODO limit memory usage, client could send arbitrarily large data...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
message = request.content.read()
# Read 1 byte more than 1MB. We expect length to be 1MB or
# less; if it's more assume it's not a legitimate message.
message = request.content.read(1024 * 1024 + 1)
if len(message) > 1024 * 1024:
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
schema.validate_cbor(message)
result = loads(message)
return result
@ -450,10 +576,7 @@ class HTTPServer(object):
return self._send_encoded(
request,
{
"already-have": set(already_got),
"allocated": set(sharenum_to_bucket),
},
{"already-have": set(already_got), "allocated": set(sharenum_to_bucket)},
)
@_authorized_route(
@ -500,20 +623,24 @@ class HTTPServer(object):
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
return b""
offset = content_range.start
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = request.content.read(content_range.stop - content_range.start + 1)
bucket = self._uploads.get_write_bucket(
storage_index, share_number, authorization[Secrets.UPLOAD]
)
offset = content_range.start
remaining = content_range.stop - content_range.start
finished = False
try:
finished = bucket.write(offset, data)
except ConflictingWriteError:
request.setResponseCode(http.CONFLICT)
return b""
while remaining > 0:
data = request.content.read(min(remaining, 65536))
assert data, "uploaded data length doesn't match range"
try:
finished = bucket.write(offset, data)
except ConflictingWriteError:
request.setResponseCode(http.CONFLICT)
return b""
remaining -= len(data)
offset += len(data)
if finished:
bucket.close()
@ -640,6 +767,7 @@ class HTTPServer(object):
)
def read_mutable_chunk(self, request, authorization, storage_index, share_number):
"""Read a chunk from a mutable."""
def read_data(offset, length):
try:
return self._storage_server.slot_readv(
@ -651,10 +779,7 @@ class HTTPServer(object):
return read_range(request, read_data)
@_authorized_route(
_app,
set(),
"/v1/mutable/<storage_index:storage_index>/shares",
methods=["GET"],
_app, set(), "/v1/mutable/<storage_index:storage_index>/shares", methods=["GET"]
)
def enumerate_mutable_shares(self, request, authorization, storage_index):
"""List mutable shares for a storage index."""
@ -684,7 +809,7 @@ class HTTPServer(object):
@implementer(IStreamServerEndpoint)
@attr.s
@define
class _TLSEndpointWrapper(object):
"""
Wrap an existing endpoint with the server-side storage TLS policy. This is
@ -692,8 +817,8 @@ class _TLSEndpointWrapper(object):
example there's Tor and i2p.
"""
endpoint = attr.ib(type=IStreamServerEndpoint)
context_factory = attr.ib(type=CertificateOptions)
endpoint: IStreamServerEndpoint
context_factory: CertificateOptions
@classmethod
def from_paths(

View File

@ -1,5 +1,21 @@
"""
Tests for HTTP storage client + server.
The tests here are synchronous and don't involve running a real reactor. This
works, but has some caveats when it comes to testing HTTP endpoints:
* Some HTTP endpoints are synchronous, some are not.
* For synchronous endpoints, the result is immediately available on the
``Deferred`` coming out of ``StubTreq``.
* For asynchronous endpoints, you need to use ``StubTreq.flush()`` and
iterate the fake in-memory clock/reactor to advance time .
So for HTTP endpoints, you should use ``HttpTestFixture.result_of_with_flush()``
which handles both, and patches and moves forward the global Twisted
``Cooperator`` since that is used to drive pull producers. This is,
sadly, an internal implementation detail of Twisted being leaked to tests...
For definitely synchronous calls, you can just use ``result_of()``.
"""
from base64 import b64encode
@ -9,12 +25,12 @@ from typing import Union, Callable, Tuple, Iterable
from cbor2 import dumps
from pycddl import ValidationError as CDDLValidationError
from hypothesis import assume, given, strategies as st
from fixtures import Fixture, TempDir
from fixtures import Fixture, TempDir, MonkeyPatch
from treq.testing import StubTreq
from klein import Klein
from hyperlink import DecodedURL
from collections_extended import RangeMap
from twisted.internet.task import Clock
from twisted.internet.task import Clock, Cooperator
from twisted.web import http
from twisted.web.http_headers import Headers
from werkzeug import routing
@ -47,6 +63,7 @@ from ..storage.http_client import (
ReadVector,
ReadTestWriteResult,
TestVector,
limited_content,
)
@ -218,6 +235,13 @@ class RouteConverterTests(SyncTestCase):
SWISSNUM_FOR_TEST = b"abcd"
def gen_bytes(length: int) -> bytes:
"""Generate bytes to the given length."""
result = (b"0123456789abcdef" * ((length // 16) + 1))[:length]
assert len(result) == length
return result
class TestApp(object):
"""HTTP API for testing purposes."""
@ -237,6 +261,11 @@ class TestApp(object):
request.setHeader("content-type", CBOR_MIME_TYPE)
return dumps({"garbage": 123})
@_authorized_route(_app, set(), "/bytes/<int:length>", methods=["GET"])
def generate_bytes(self, request, authorization, length):
"""Return bytes to the given length using ``gen_bytes()``."""
return gen_bytes(length)
def result_of(d):
"""
@ -302,6 +331,42 @@ class CustomHTTPServerTests(SyncTestCase):
with self.assertRaises(CDDLValidationError):
result_of(client.get_version())
@given(length=st.integers(min_value=1, max_value=1_000_000))
def test_limited_content_fits(self, length):
"""
``http_client.limited_content()`` returns the body if it is less than
the max length.
"""
for at_least_length in (length, length + 1, length + 1000, length + 100_000):
response = result_of(
self.client.request(
"GET",
f"http://127.0.0.1/bytes/{length}",
)
)
self.assertEqual(
result_of(limited_content(response, at_least_length)).read(),
gen_bytes(length),
)
@given(length=st.integers(min_value=10, max_value=1_000_000))
def test_limited_content_does_not_fit(self, length):
"""
If the body is longer than than max length,
``http_client.limited_content()`` fails with a ``ValueError``.
"""
for too_short in (length - 1, 5):
response = result_of(
self.client.request(
"GET",
f"http://127.0.0.1/bytes/{length}",
)
)
with self.assertRaises(ValueError):
result_of(limited_content(response, too_short))
class HttpTestFixture(Fixture):
"""
@ -312,14 +377,54 @@ class HttpTestFixture(Fixture):
def _setUp(self):
self.clock = Clock()
self.tempdir = self.useFixture(TempDir())
# The global Cooperator used by Twisted (a) used by pull producers in
# twisted.web, (b) is driven by a real reactor. We want to push time
# forward ourselves since we rely on pull producers in the HTTP storage
# server.
self.mock = self.useFixture(
MonkeyPatch(
"twisted.internet.task._theCooperator",
Cooperator(scheduler=lambda c: self.clock.callLater(0.000001, c)),
)
)
self.storage_server = StorageServer(
self.tempdir.path, b"\x00" * 20, clock=self.clock
)
self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST)
self.treq = StubTreq(self.http_server.get_resource())
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST,
treq=StubTreq(self.http_server.get_resource()),
treq=self.treq,
)
def result_of_with_flush(self, d):
"""
Like ``result_of``, but supports fake reactor and ``treq`` testing
infrastructure necessary to support asynchronous HTTP server endpoints.
"""
result = []
error = []
d.addCallbacks(result.append, error.append)
# Check for synchronous HTTP endpoint handler:
if result:
return result[0]
if error:
error[0].raiseException()
# OK, no result yet, probably async HTTP endpoint handler, so advance
# time, flush treq, and try again:
for i in range(100):
self.clock.advance(0.001)
self.treq.flush()
if result:
return result[0]
if error:
error[0].raiseException()
raise RuntimeError(
"We expected given Deferred to have result already, but it wasn't. "
+ "This is probably a test design issue."
)
@ -378,7 +483,7 @@ class GenericHTTPAPITests(SyncTestCase):
)
)
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(client.get_version())
self.http.result_of_with_flush(client.get_version())
def test_unsupported_mime_type(self):
"""
@ -389,7 +494,7 @@ class GenericHTTPAPITests(SyncTestCase):
StorageClientWithHeadersOverride(self.http.client, {"accept": "image/gif"})
)
with assert_fails_with_http_code(self, http.NOT_ACCEPTABLE):
result_of(client.get_version())
self.http.result_of_with_flush(client.get_version())
def test_version(self):
"""
@ -399,7 +504,7 @@ class GenericHTTPAPITests(SyncTestCase):
might change across calls.
"""
client = StorageClientGeneral(self.http.client)
version = result_of(client.get_version())
version = self.http.result_of_with_flush(client.get_version())
version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
b"available-space"
)
@ -433,7 +538,7 @@ class GenericHTTPAPITests(SyncTestCase):
)
message = {"bad-message": "missing expected keys"}
response = result_of(
response = self.http.result_of_with_flush(
self.http.client.request(
"POST",
url,
@ -466,7 +571,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
upload_secret = urandom(32)
lease_secret = urandom(32)
storage_index = urandom(16)
created = result_of(
created = self.http.result_of_with_flush(
self.imm_client.create(
storage_index,
share_numbers,
@ -510,42 +615,42 @@ class ImmutableHTTPAPITests(SyncTestCase):
expected_data[offset : offset + length],
)
upload_progress = result_of(write(10, 10))
upload_progress = self.http.result_of_with_flush(write(10, 10))
self.assertEqual(
upload_progress, UploadProgress(finished=False, required=remaining)
)
upload_progress = result_of(write(30, 10))
upload_progress = self.http.result_of_with_flush(write(30, 10))
self.assertEqual(
upload_progress, UploadProgress(finished=False, required=remaining)
)
upload_progress = result_of(write(50, 10))
upload_progress = self.http.result_of_with_flush(write(50, 10))
self.assertEqual(
upload_progress, UploadProgress(finished=False, required=remaining)
)
# Then, an overlapping write with matching data (15-35):
upload_progress = result_of(write(15, 20))
upload_progress = self.http.result_of_with_flush(write(15, 20))
self.assertEqual(
upload_progress, UploadProgress(finished=False, required=remaining)
)
# Now fill in the holes:
upload_progress = result_of(write(0, 10))
upload_progress = self.http.result_of_with_flush(write(0, 10))
self.assertEqual(
upload_progress, UploadProgress(finished=False, required=remaining)
)
upload_progress = result_of(write(40, 10))
upload_progress = self.http.result_of_with_flush(write(40, 10))
self.assertEqual(
upload_progress, UploadProgress(finished=False, required=remaining)
)
upload_progress = result_of(write(60, 40))
upload_progress = self.http.result_of_with_flush(write(60, 40))
self.assertEqual(
upload_progress, UploadProgress(finished=True, required=RangeMap())
)
# We can now read:
for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]:
downloaded = result_of(
downloaded = self.http.result_of_with_flush(
self.imm_client.read_share_chunk(storage_index, 1, offset, length)
)
self.assertEqual(downloaded, expected_data[offset : offset + length])
@ -557,7 +662,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
1,
@ -579,7 +684,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
)
# Write half of share 1
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
1,
@ -593,7 +698,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# existing shares, this call shouldn't overwrite the existing
# work-in-progress.
upload_secret2 = b"x" * 2
created2 = result_of(
created2 = self.http.result_of_with_flush(
self.imm_client.create(
storage_index,
{1, 4, 6},
@ -607,7 +712,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# Write second half of share 1
self.assertTrue(
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
1,
@ -620,14 +725,14 @@ class ImmutableHTTPAPITests(SyncTestCase):
# The upload of share 1 succeeded, demonstrating that second create()
# call didn't overwrite work-in-progress.
downloaded = result_of(
downloaded = self.http.result_of_with_flush(
self.imm_client.read_share_chunk(storage_index, 1, 0, 100)
)
self.assertEqual(downloaded, b"a" * 50 + b"b" * 50)
# We can successfully upload the shares created with the second upload secret.
self.assertTrue(
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
4,
@ -645,11 +750,14 @@ class ImmutableHTTPAPITests(SyncTestCase):
(upload_secret, _, storage_index, created) = self.create_upload({1, 2, 3}, 10)
# Initially there are no shares:
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set())
self.assertEqual(
self.http.result_of_with_flush(self.imm_client.list_shares(storage_index)),
set(),
)
# Upload shares 1 and 3:
for share_number in [1, 3]:
progress = result_of(
progress = self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
share_number,
@ -661,7 +769,10 @@ class ImmutableHTTPAPITests(SyncTestCase):
self.assertTrue(progress.finished)
# Now shares 1 and 3 exist:
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), {1, 3})
self.assertEqual(
self.http.result_of_with_flush(self.imm_client.list_shares(storage_index)),
{1, 3},
)
def test_upload_bad_content_range(self):
"""
@ -679,7 +790,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
with assert_fails_with_http_code(
self, http.REQUESTED_RANGE_NOT_SATISFIABLE
):
result_of(
self.http.result_of_with_flush(
client.write_share_chunk(
storage_index,
1,
@ -699,7 +810,10 @@ class ImmutableHTTPAPITests(SyncTestCase):
Listing unknown storage index's shares results in empty list of shares.
"""
storage_index = bytes(range(16))
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set())
self.assertEqual(
self.http.result_of_with_flush(self.imm_client.list_shares(storage_index)),
set(),
)
def test_upload_non_existent_storage_index(self):
"""
@ -710,7 +824,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
def unknown_check(storage_index, share_number):
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
share_number,
@ -731,7 +845,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
stored separately and can be downloaded separately.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1, 2}, 10)
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
1,
@ -740,7 +854,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
b"1" * 10,
)
)
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
2,
@ -750,11 +864,15 @@ class ImmutableHTTPAPITests(SyncTestCase):
)
)
self.assertEqual(
result_of(self.imm_client.read_share_chunk(storage_index, 1, 0, 10)),
self.http.result_of_with_flush(
self.imm_client.read_share_chunk(storage_index, 1, 0, 10)
),
b"1" * 10,
)
self.assertEqual(
result_of(self.imm_client.read_share_chunk(storage_index, 2, 0, 10)),
self.http.result_of_with_flush(
self.imm_client.read_share_chunk(storage_index, 2, 0, 10)
),
b"2" * 10,
)
@ -766,7 +884,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
(upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
# Write:
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
1,
@ -778,7 +896,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# Conflicting write:
with assert_fails_with_http_code(self, http.CONFLICT):
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
1,
@ -804,7 +922,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
"""
def abort(storage_index, share_number, upload_secret):
return result_of(
return self.http.result_of_with_flush(
self.imm_client.abort_upload(storage_index, share_number, upload_secret)
)
@ -817,7 +935,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
"""
# Start an upload:
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
1,
@ -836,7 +954,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# complaint:
upload_secret = urandom(32)
lease_secret = urandom(32)
created = result_of(
created = self.http.result_of_with_flush(
self.imm_client.create(
storage_index,
{1},
@ -849,7 +967,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
self.assertEqual(created.allocated, {1})
# And write to it, too:
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
1,
@ -868,7 +986,9 @@ class ImmutableHTTPAPITests(SyncTestCase):
for si, num in [(storage_index, 3), (b"x" * 16, 1)]:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(self.imm_client.abort_upload(si, num, upload_secret))
self.http.result_of_with_flush(
self.imm_client.abort_upload(si, num, upload_secret)
)
def test_unauthorized_abort(self):
"""
@ -879,12 +999,12 @@ class ImmutableHTTPAPITests(SyncTestCase):
# Failed to abort becaues wrong upload secret:
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(
self.http.result_of_with_flush(
self.imm_client.abort_upload(storage_index, 1, upload_secret + b"X")
)
# We can still write to it:
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
1,
@ -901,7 +1021,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
"""
uploaded_data = b"123"
(upload_secret, _, storage_index, _) = self.create_upload({0}, 3)
result_of(
self.http.result_of_with_flush(
self.imm_client.write_share_chunk(
storage_index,
0,
@ -913,12 +1033,14 @@ class ImmutableHTTPAPITests(SyncTestCase):
# Can't abort, we finished upload:
with assert_fails_with_http_code(self, http.NOT_ALLOWED):
result_of(self.imm_client.abort_upload(storage_index, 0, upload_secret))
self.http.result_of_with_flush(
self.imm_client.abort_upload(storage_index, 0, upload_secret)
)
# Abort didn't prevent reading:
self.assertEqual(
uploaded_data,
result_of(
self.http.result_of_with_flush(
self.imm_client.read_share_chunk(
storage_index,
0,
@ -935,7 +1057,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
storage_index = urandom(16)
secret = b"A" * 32
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.http.result_of_with_flush(
self.general_client.add_or_renew_lease(storage_index, secret, secret)
)
@ -956,7 +1078,7 @@ class MutableHTTPAPIsTests(SyncTestCase):
write_secret = urandom(32)
lease_secret = urandom(32)
storage_index = urandom(16)
result_of(
self.http.result_of_with_flush(
self.mut_client.read_test_write_chunks(
storage_index,
write_secret,
@ -983,14 +1105,18 @@ class MutableHTTPAPIsTests(SyncTestCase):
Written data can be read using ``read_share_chunk``.
"""
storage_index, _, _ = self.create_upload()
data0 = result_of(self.mut_client.read_share_chunk(storage_index, 0, 1, 7))
data1 = result_of(self.mut_client.read_share_chunk(storage_index, 1, 0, 8))
data0 = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 0, 1, 7)
)
data1 = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
)
self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1"))
def test_read_before_write(self):
"""In combo read/test/write operation, reads happen before writes."""
storage_index, write_secret, lease_secret = self.create_upload()
result = result_of(
result = self.http.result_of_with_flush(
self.mut_client.read_test_write_chunks(
storage_index,
write_secret,
@ -1012,14 +1138,18 @@ class MutableHTTPAPIsTests(SyncTestCase):
),
)
# But the write did happen:
data0 = result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8))
data1 = result_of(self.mut_client.read_share_chunk(storage_index, 1, 0, 8))
data0 = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
)
data1 = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
)
self.assertEqual((data0, data1), (b"aXYZef-0", b"abcdef-1"))
def test_conditional_write(self):
"""Uploads only happen if the test passes."""
storage_index, write_secret, lease_secret = self.create_upload()
result_failed = result_of(
result_failed = self.http.result_of_with_flush(
self.mut_client.read_test_write_chunks(
storage_index,
write_secret,
@ -1037,7 +1167,7 @@ class MutableHTTPAPIsTests(SyncTestCase):
self.assertFalse(result_failed.success)
# This time the test matches:
result = result_of(
result = self.http.result_of_with_flush(
self.mut_client.read_test_write_chunks(
storage_index,
write_secret,
@ -1054,26 +1184,40 @@ class MutableHTTPAPIsTests(SyncTestCase):
)
self.assertTrue(result.success)
self.assertEqual(
result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)),
self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
),
b"aXYZef-0",
)
def test_too_large_write(self):
"""
Writing too large of a chunk results in a REQUEST ENTITY TOO LARGE http
error.
"""
with self.assertRaises(ClientException) as e:
self.create_upload(b"0123456789" * 1024 * 1024)
self.assertEqual(e.exception.code, http.REQUEST_ENTITY_TOO_LARGE)
def test_list_shares(self):
"""``list_shares()`` returns the shares for a given storage index."""
storage_index, _, _ = self.create_upload()
self.assertEqual(result_of(self.mut_client.list_shares(storage_index)), {0, 1})
self.assertEqual(
self.http.result_of_with_flush(self.mut_client.list_shares(storage_index)),
{0, 1},
)
def test_non_existent_list_shares(self):
"""A non-existent storage index errors when shares are listed."""
with self.assertRaises(ClientException) as exc:
result_of(self.mut_client.list_shares(urandom(32)))
self.http.result_of_with_flush(self.mut_client.list_shares(urandom(32)))
self.assertEqual(exc.exception.code, http.NOT_FOUND)
def test_wrong_write_enabler(self):
"""Writes with the wrong write enabler fail, and are not processed."""
storage_index, write_secret, lease_secret = self.create_upload()
with self.assertRaises(ClientException) as exc:
result_of(
self.http.result_of_with_flush(
self.mut_client.read_test_write_chunks(
storage_index,
urandom(32),
@ -1091,7 +1235,9 @@ class MutableHTTPAPIsTests(SyncTestCase):
# The write did not happen:
self.assertEqual(
result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)),
self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
),
b"abcdef-0",
)
@ -1130,7 +1276,9 @@ class SharedImmutableMutableTestsMixin:
storage_index, _, _ = self.upload(13)
reason = "OHNO \u1235"
result_of(self.client.advise_corrupt_share(storage_index, 13, reason))
self.http.result_of_with_flush(
self.client.advise_corrupt_share(storage_index, 13, reason)
)
self.assertEqual(
corrupted,
@ -1143,11 +1291,15 @@ class SharedImmutableMutableTestsMixin:
"""
storage_index, _, _ = self.upload(13)
reason = "OHNO \u1235"
result_of(self.client.advise_corrupt_share(storage_index, 13, reason))
self.http.result_of_with_flush(
self.client.advise_corrupt_share(storage_index, 13, reason)
)
for (si, share_number) in [(storage_index, 11), (urandom(16), 13)]:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(self.client.advise_corrupt_share(si, share_number, reason))
self.http.result_of_with_flush(
self.client.advise_corrupt_share(si, share_number, reason)
)
def test_lease_renew_and_add(self):
"""
@ -1165,7 +1317,7 @@ class SharedImmutableMutableTestsMixin:
self.http.clock.advance(167)
# We renew the lease:
result_of(
self.http.result_of_with_flush(
self.general_client.add_or_renew_lease(
storage_index, lease_secret, lease_secret
)
@ -1176,7 +1328,7 @@ class SharedImmutableMutableTestsMixin:
# We create a new lease:
lease_secret2 = urandom(32)
result_of(
self.http.result_of_with_flush(
self.general_client.add_or_renew_lease(
storage_index, lease_secret2, lease_secret2
)
@ -1191,7 +1343,7 @@ class SharedImmutableMutableTestsMixin:
Reading from unknown storage index results in 404.
"""
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.http.result_of_with_flush(
self.client.read_share_chunk(
b"1" * 16,
1,
@ -1206,7 +1358,7 @@ class SharedImmutableMutableTestsMixin:
"""
storage_index, _, _ = self.upload(1)
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.http.result_of_with_flush(
self.client.read_share_chunk(
storage_index,
7, # different share number
@ -1232,7 +1384,7 @@ class SharedImmutableMutableTestsMixin:
with assert_fails_with_http_code(
self, http.REQUESTED_RANGE_NOT_SATISFIABLE
):
result_of(
self.http.result_of_with_flush(
client.read_share_chunk(
storage_index,
1,
@ -1262,7 +1414,7 @@ class SharedImmutableMutableTestsMixin:
A read with no range returns the whole mutable/immutable.
"""
storage_index, uploaded_data, _ = self.upload(1, data_length)
response = result_of(
response = self.http.result_of_with_flush(
self.http.client.request(
"GET",
self.http.client.relative_url(
@ -1271,7 +1423,9 @@ class SharedImmutableMutableTestsMixin:
)
)
self.assertEqual(response.code, http.OK)
self.assertEqual(result_of(response.content()), uploaded_data)
self.assertEqual(
self.http.result_of_with_flush(response.content()), uploaded_data
)
def test_validate_content_range_response_to_read(self):
"""
@ -1283,7 +1437,7 @@ class SharedImmutableMutableTestsMixin:
def check_range(requested_range, expected_response):
headers = Headers()
headers.setRawHeaders("range", [requested_range])
response = result_of(
response = self.http.result_of_with_flush(
self.http.client.request(
"GET",
self.http.client.relative_url(
@ -1297,8 +1451,10 @@ class SharedImmutableMutableTestsMixin:
)
check_range("bytes=0-10", "bytes 0-10/*")
check_range("bytes=3-17", "bytes 3-17/*")
# TODO re-enable in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3907
# Can't go beyond the end of the mutable/immutable!
check_range("bytes=10-100", "bytes 10-25/*")
# check_range("bytes=10-100", "bytes 10-25/*")
class ImmutableSharedTests(SharedImmutableMutableTestsMixin, SyncTestCase):
@ -1323,7 +1479,7 @@ class ImmutableSharedTests(SharedImmutableMutableTestsMixin, SyncTestCase):
upload_secret = urandom(32)
lease_secret = urandom(32)
storage_index = urandom(16)
result_of(
self.http.result_of_with_flush(
self.client.create(
storage_index,
{share_number},
@ -1333,7 +1489,7 @@ class ImmutableSharedTests(SharedImmutableMutableTestsMixin, SyncTestCase):
lease_secret,
)
)
result_of(
self.http.result_of_with_flush(
self.client.write_share_chunk(
storage_index,
share_number,
@ -1368,7 +1524,7 @@ class MutableSharedTests(SharedImmutableMutableTestsMixin, SyncTestCase):
write_secret = urandom(32)
lease_secret = urandom(32)
storage_index = urandom(16)
result_of(
self.http.result_of_with_flush(
self.client.read_test_write_chunks(
storage_index,
write_secret,