diff --git a/docs/proposed/http-storage-node-protocol.rst b/docs/proposed/http-storage-node-protocol.rst index b00b327e3..315546b8a 100644 --- a/docs/proposed/http-storage-node-protocol.rst +++ b/docs/proposed/http-storage-node-protocol.rst @@ -540,7 +540,7 @@ Rejected designs for upload secrets: Write data for the indicated share. The share number must belong to the storage index. The request body is the raw share data (i.e., ``application/octet-stream``). -*Content-Range* requests are encouraged for large transfers to allow partially complete uploads to be resumed. +*Content-Range* requests are required; for large transfers this allows partially complete uploads to be resumed. For example, a 1MiB share can be divided in to eight separate 128KiB chunks. Each chunk can be uploaded in a separate request. @@ -644,7 +644,7 @@ Read a contiguous sequence of bytes from one share in one bucket. The response body is the raw share data (i.e., ``application/octet-stream``). The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content). Interpretation and response behavior is as specified in RFC 7233 ยง 4.1. -Multiple ranges in a single request are *not* supported. +Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported. Discussion `````````` diff --git a/newsfragments/3860.minor b/newsfragments/3860.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index 2dc133b52..475aa2330 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -48,7 +48,11 @@ def _encode_si(si): # type: (bytes) -> str class ClientException(Exception): - """An unexpected error.""" + """An unexpected response code from the server.""" + + def __init__(self, code, *additional_args): + Exception.__init__(self, code, *additional_args) + self.code = code def _decode_cbor(response): @@ -68,7 +72,7 @@ class ImmutableCreateResult(object): class StorageClient(object): """ - HTTP client that talks to the HTTP storage server. + Low-level HTTP client that talks to the HTTP storage server. """ def __init__( @@ -78,7 +82,7 @@ class StorageClient(object): self._swissnum = swissnum self._treq = treq - def _url(self, path): + def relative_url(self, path): """Get a URL relative to the base URL.""" return self._base_url.click(path) @@ -92,7 +96,7 @@ class StorageClient(object): ) return headers - def _request( + def request( self, method, url, @@ -120,13 +124,22 @@ class StorageClient(object): ) return self._treq.request(method, url, headers=headers, **kwargs) + +class StorageClientGeneral(object): + """ + High-level HTTP APIs that aren't immutable- or mutable-specific. + """ + + def __init__(self, client): # type: (StorageClient) -> None + self._client = client + @inlineCallbacks def get_version(self): """ Return the version metadata for the server. """ - url = self._url("/v1/version") - response = yield self._request("GET", url) + url = self._client.relative_url("/v1/version") + response = yield self._client.request("GET", url) decoded_response = yield _decode_cbor(response) returnValue(decoded_response) @@ -174,11 +187,11 @@ class StorageClientImmutables(object): Result fires when creating the storage index succeeded, if creating the storage index failed the result will fire with an exception. """ - url = self._client._url("/v1/immutable/" + _encode_si(storage_index)) + url = self._client.relative_url("/v1/immutable/" + _encode_si(storage_index)) message = dumps( {"share-numbers": share_numbers, "allocated-size": allocated_size} ) - response = yield self._client._request( + response = yield self._client.request( "POST", url, lease_renew_secret=lease_renew_secret, @@ -211,10 +224,10 @@ class StorageClientImmutables(object): whether the _complete_ share (i.e. all chunks, not just this one) has been uploaded. """ - url = self._client._url( + url = self._client.relative_url( "/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number) ) - response = yield self._client._request( + response = yield self._client.request( "PATCH", url, upload_secret=upload_secret, @@ -262,10 +275,10 @@ class StorageClientImmutables(object): the HTTP protocol will be simplified, see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777 """ - url = self._client._url( + url = self._client.relative_url( "/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number) ) - response = yield self._client._request( + response = yield self._client.request( "GET", url, headers=Headers( @@ -285,10 +298,10 @@ class StorageClientImmutables(object): """ Return the set of shares for a given storage index. """ - url = self._client._url( + url = self._client.relative_url( "/v1/immutable/{}/shares".format(_encode_si(storage_index)) ) - response = yield self._client._request( + response = yield self._client.request( "GET", url, ) diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index f885baa22..0e1969593 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -18,11 +18,14 @@ else: from functools import wraps from base64 import b64decode +import binascii from klein import Klein from twisted.web import http import attr from werkzeug.http import parse_range_header, parse_content_range_header +from werkzeug.routing import BaseConverter, ValidationError +from werkzeug.datastructures import ContentRange # TODO Make sure to use pure Python versions? from cbor2 import dumps, loads @@ -30,8 +33,9 @@ from cbor2 import dumps, loads from .server import StorageServer from .http_common import swissnum_auth_header, Secrets from .common import si_a2b -from .immutable import BucketWriter +from .immutable import BucketWriter, ConflictingWriteError from ..util.hashutil import timing_safe_compare +from ..util.base32 import rfc3548_alphabet class ClientSecretsException(Exception): @@ -128,10 +132,27 @@ class StorageIndexUploads(object): """ # Map share number to BucketWriter - shares = attr.ib() # type: Dict[int,BucketWriter] + shares = attr.ib(factory=dict) # type: Dict[int,BucketWriter] - # The upload key. - upload_secret = attr.ib() # type: bytes + # Map share number to the upload secret (different shares might have + # different upload secrets). + upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes] + + def add_upload(self, share_number, upload_secret, bucket): + self.shares[share_number] = bucket + self.upload_secrets[share_number] = upload_secret + + +class StorageIndexConverter(BaseConverter): + """Parser/validator for storage index URL path segments.""" + + regex = "[" + str(rfc3548_alphabet, "ascii") + "]{26}" + + def to_python(self, value): + try: + return si_a2b(value.encode("ascii")) + except (AssertionError, binascii.Error, ValueError): + raise ValidationError("Invalid storage index") class HTTPServer(object): @@ -140,6 +161,7 @@ class HTTPServer(object): """ _app = Klein() + _app.url_map.converters["storage_index"] = StorageIndexConverter def __init__( self, storage_server, swissnum @@ -173,76 +195,75 @@ class HTTPServer(object): @_authorized_route( _app, {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.UPLOAD}, - "/v1/immutable/", + "/v1/immutable/", methods=["POST"], ) def allocate_buckets(self, request, authorization, storage_index): """Allocate buckets.""" - storage_index = si_a2b(storage_index.encode("ascii")) - info = loads(request.content.read()) upload_secret = authorization[Secrets.UPLOAD] + info = loads(request.content.read()) if storage_index in self._uploads: - # Pre-existing upload. - in_progress = self._uploads[storage_index] - if timing_safe_compare(in_progress.upload_secret, upload_secret): - # Same session. - # TODO add BucketWriters only for new shares that don't already have buckets; see the HTTP spec for details. - # The backend code may already implement this logic. - pass - else: - # TODO Fail, since the secret doesnt match. - pass - else: - # New upload. - already_got, sharenum_to_bucket = self._storage_server.allocate_buckets( - storage_index, - renew_secret=authorization[Secrets.LEASE_RENEW], - cancel_secret=authorization[Secrets.LEASE_CANCEL], - sharenums=info["share-numbers"], - allocated_size=info["allocated-size"], - ) - self._uploads[storage_index] = StorageIndexUploads( - shares=sharenum_to_bucket, upload_secret=authorization[Secrets.UPLOAD] - ) - return self._cbor( - request, - { - "already-have": set(already_got), - "allocated": set(sharenum_to_bucket), - }, - ) + for share_number in info["share-numbers"]: + in_progress = self._uploads[storage_index] + # For pre-existing upload, make sure password matches. + if ( + share_number in in_progress.upload_secrets + and not timing_safe_compare( + in_progress.upload_secrets[share_number], upload_secret + ) + ): + request.setResponseCode(http.UNAUTHORIZED) + return b"" + + already_got, sharenum_to_bucket = self._storage_server.allocate_buckets( + storage_index, + renew_secret=authorization[Secrets.LEASE_RENEW], + cancel_secret=authorization[Secrets.LEASE_CANCEL], + sharenums=info["share-numbers"], + allocated_size=info["allocated-size"], + ) + uploads = self._uploads.setdefault(storage_index, StorageIndexUploads()) + for share_number, bucket in sharenum_to_bucket.items(): + uploads.add_upload(share_number, upload_secret, bucket) + + return self._cbor( + request, + { + "already-have": set(already_got), + "allocated": set(sharenum_to_bucket), + }, + ) @_authorized_route( _app, {Secrets.UPLOAD}, - "/v1/immutable//", + "/v1/immutable//", methods=["PATCH"], ) def write_share_data(self, request, authorization, storage_index, share_number): """Write data to an in-progress immutable upload.""" - storage_index = si_a2b(storage_index.encode("ascii")) content_range = parse_content_range_header(request.getHeader("content-range")) - # TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - # 1. Malformed header should result in error 416 - # 2. Non-bytes unit should result in error 416 - # 3. Missing header means full upload in one request - # 4. Impossible range should resul tin error 416 + if content_range is None or content_range.units != "bytes": + request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) + return b"" + offset = content_range.start - # TODO basic checks on validity of start, offset, and content-range in general. also of share_number. - # TODO basic check that body isn't infinite. require content-length? or maybe we should require content-range (it's optional now)? if so, needs to be rflected in protocol spec. - - data = request.content.read() + # TODO limit memory usage + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 + data = request.content.read(content_range.stop - content_range.start + 1) try: bucket = self._uploads[storage_index].shares[share_number] except (KeyError, IndexError): - # TODO return 404 - raise + request.setResponseCode(http.NOT_FOUND) + return b"" - finished = bucket.write(offset, data) - - # TODO if raises ConflictingWriteError, return HTTP CONFLICT code. + try: + finished = bucket.write(offset, data) + except ConflictingWriteError: + request.setResponseCode(http.CONFLICT) + return b"" if finished: bucket.close() @@ -258,46 +279,65 @@ class HTTPServer(object): @_authorized_route( _app, set(), - "/v1/immutable//shares", + "/v1/immutable//shares", methods=["GET"], ) def list_shares(self, request, authorization, storage_index): """ List shares for the given storage index. """ - storage_index = si_a2b(storage_index.encode("ascii")) share_numbers = list(self._storage_server.get_buckets(storage_index).keys()) return self._cbor(request, share_numbers) @_authorized_route( _app, set(), - "/v1/immutable//", + "/v1/immutable//", methods=["GET"], ) def read_share_chunk(self, request, authorization, storage_index, share_number): """Read a chunk for an already uploaded immutable.""" - # TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - # 1. basic checks on validity on storage index, share number - # 2. missing range header should have response code 200 and return whole thing - # 3. malformed range header should result in error? or return everything? - # 4. non-bytes range results in error - # 5. ranges make sense semantically (positive, etc.) - # 6. multiple ranges fails with error - # 7. missing end of range means "to the end of share" - storage_index = si_a2b(storage_index.encode("ascii")) - range_header = parse_range_header(request.getHeader("range")) - offset, end = range_header.ranges[0] - assert end != None # TODO support this case + try: + bucket = self._storage_server.get_buckets(storage_index)[share_number] + except KeyError: + request.setResponseCode(http.NOT_FOUND) + return b"" - # TODO if not found, 404 - bucket = self._storage_server.get_buckets(storage_index)[share_number] + if request.getHeader("range") is None: + # Return the whole thing. + start = 0 + while True: + # TODO should probably yield to event loop occasionally... + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 + data = bucket.read(start, start + 65536) + if not data: + request.finish() + return + request.write(data) + start += len(data) + + range_header = parse_range_header(request.getHeader("range")) + if ( + range_header is None + or range_header.units != "bytes" + or len(range_header.ranges) > 1 # more than one range + or range_header.ranges[0][1] is None # range without end + ): + request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) + return b"" + + offset, end = range_header.ranges[0] + + # TODO limit memory usage + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 data = bucket.read(offset, end - offset) + request.setResponseCode(http.PARTIAL_CONTENT) - # TODO set content-range on response. We we need to expand the - # BucketReader interface to return share's length. - # - # request.setHeader( - # "content-range", range_header.make_content_range(share_length).to_header() - # ) + if len(data): + # For empty bodies the content-range header makes no sense since + # the end of the range is inclusive. + request.setHeader( + "content-range", + ContentRange("bytes", offset, offset + len(data)).to_header(), + ) return data diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index cf5fb65a2..c2e26b4b6 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -58,7 +58,7 @@ from twisted.plugin import ( from eliot import ( log_call, ) -from foolscap.api import eventually +from foolscap.api import eventually, RemoteException from foolscap.reconnector import ( ReconnectionInfo, ) @@ -75,7 +75,10 @@ from allmydata.util.observer import ObserverList from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.hashutil import permute_server_hash from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict -from allmydata.storage.http_client import StorageClient, StorageClientImmutables +from allmydata.storage.http_client import ( + StorageClient, StorageClientImmutables, StorageClientGeneral, + ClientException as HTTPClientException, +) # who is responsible for de-duplication? @@ -1035,8 +1038,13 @@ class _FakeRemoteReference(object): """ local_object = attr.ib(type=object) + @defer.inlineCallbacks def callRemote(self, action, *args, **kwargs): - return getattr(self.local_object, action)(*args, **kwargs) + try: + result = yield getattr(self.local_object, action)(*args, **kwargs) + defer.returnValue(result) + except HTTPClientException as e: + raise RemoteException(e.args) @attr.s @@ -1094,18 +1102,21 @@ class _HTTPBucketReader(object): class _HTTPStorageServer(object): """ Talk to remote storage server over HTTP. + + The same upload key is used for all communication. """ _http_client = attr.ib(type=StorageClient) + _upload_secret = attr.ib(type=bytes) @staticmethod def from_http_client(http_client): # type: (StorageClient) -> _HTTPStorageServer """ Create an ``IStorageServer`` from a HTTP ``StorageClient``. """ - return _HTTPStorageServer(http_client=http_client) + return _HTTPStorageServer(http_client=http_client, upload_secret=urandom(20)) def get_version(self): - return self._http_client.get_version() + return StorageClientGeneral(self._http_client).get_version() @defer.inlineCallbacks def allocate_buckets( @@ -1117,10 +1128,9 @@ class _HTTPStorageServer(object): allocated_size, canary, ): - upload_secret = urandom(20) immutable_client = StorageClientImmutables(self._http_client) result = immutable_client.create( - storage_index, sharenums, allocated_size, upload_secret, renew_secret, + storage_index, sharenums, allocated_size, self._upload_secret, renew_secret, cancel_secret ) result = yield result @@ -1130,7 +1140,7 @@ class _HTTPStorageServer(object): client=immutable_client, storage_index=storage_index, share_number=share_num, - upload_secret=upload_secret + upload_secret=self._upload_secret )) for share_num in result.allocated }) diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 1c757551a..95261ddb2 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -1160,13 +1160,8 @@ class HTTPImmutableAPIsTests( "test_add_lease_renewal", "test_add_new_lease", "test_advise_corrupt_share", - "test_allocate_buckets_repeat", "test_bucket_advise_corrupt_share", "test_disconnection", - "test_get_buckets_skips_unfinished_buckets", - "test_matching_overlapping_writes", - "test_non_matching_overlapping_writes", - "test_written_shares_are_allocated", } diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 982e22859..14f4437b6 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -24,6 +24,10 @@ from klein import Klein from hyperlink import DecodedURL from collections_extended import RangeMap from twisted.internet.task import Clock +from twisted.web import http +from twisted.web.http_headers import Headers +from werkzeug import routing +from werkzeug.exceptions import NotFound as WNotFound from .common import SyncTestCase from ..storage.server import StorageServer @@ -33,6 +37,7 @@ from ..storage.http_server import ( Secrets, ClientSecretsException, _authorized_route, + StorageIndexConverter, ) from ..storage.http_client import ( StorageClient, @@ -40,7 +45,10 @@ from ..storage.http_client import ( StorageClientImmutables, ImmutableCreateResult, UploadProgress, + StorageClientGeneral, + _encode_si, ) +from ..storage.common import si_b2a def _post_process(params): @@ -147,6 +155,52 @@ class ExtractSecretsTests(SyncTestCase): _extract_secrets(["lease-cancel-secret eA=="], {Secrets.LEASE_RENEW}) +class RouteConverterTests(SyncTestCase): + """Tests for custom werkzeug path segment converters.""" + + adapter = routing.Map( + [ + routing.Rule( + "//", endpoint="si", methods=["GET"] + ) + ], + converters={"storage_index": StorageIndexConverter}, + ).bind("example.com", "/") + + @given(storage_index=st.binary(min_size=16, max_size=16)) + def test_good_storage_index_is_parsed(self, storage_index): + """ + A valid storage index is accepted and parsed back out by + StorageIndexConverter. + """ + self.assertEqual( + self.adapter.match( + "/{}/".format(str(si_b2a(storage_index), "ascii")), method="GET" + ), + ("si", {"storage_index": storage_index}), + ) + + def test_long_storage_index_is_not_parsed(self): + """An overly long storage_index string is not parsed.""" + with self.assertRaises(WNotFound): + self.adapter.match("/{}/".format("a" * 27), method="GET") + + def test_short_storage_index_is_not_parsed(self): + """An overly short storage_index string is not parsed.""" + with self.assertRaises(WNotFound): + self.adapter.match("/{}/".format("a" * 25), method="GET") + + def test_bad_characters_storage_index_is_not_parsed(self): + """A storage_index string with bad characters is not parsed.""" + with self.assertRaises(WNotFound): + self.adapter.match("/{}_/".format("a" * 25), method="GET") + + def test_invalid_storage_index_is_not_parsed(self): + """An invalid storage_index string is not parsed.""" + with self.assertRaises(WNotFound): + self.adapter.match("/nomd2a65ylxjbqzsw7gcfh4ivr/", method="GET") + + # TODO should be actual swissnum SWISSNUM_FOR_TEST = b"abcd" @@ -207,7 +261,7 @@ class RoutingTests(SyncTestCase): """ # Without secret, get a 400 error. response = result_of( - self.client._request( + self.client.request( "GET", "http://127.0.0.1/upload_secret", ) @@ -216,7 +270,7 @@ class RoutingTests(SyncTestCase): # With secret, we're good. response = result_of( - self.client._request( + self.client.request( "GET", "http://127.0.0.1/upload_secret", upload_secret=b"MAGIC" ) ) @@ -244,6 +298,24 @@ class HttpTestFixture(Fixture): ) +class StorageClientWithHeadersOverride(object): + """Wrap ``StorageClient`` and override sent headers.""" + + def __init__(self, storage_client, add_headers): + self.storage_client = storage_client + self.add_headers = add_headers + + def __getattr__(self, attr): + return getattr(self.storage_client, attr) + + def request(self, *args, headers=None, **kwargs): + if headers is None: + headers = Headers() + for key, value in self.add_headers.items(): + headers.setRawHeaders(key, [value]) + return self.storage_client.request(*args, headers=headers, **kwargs) + + class GenericHTTPAPITests(SyncTestCase): """ Tests of HTTP client talking to the HTTP server, for generic HTTP API @@ -261,10 +333,12 @@ class GenericHTTPAPITests(SyncTestCase): If the wrong swissnum is used, an ``Unauthorized`` response code is returned. """ - client = StorageClient( - DecodedURL.from_text("http://127.0.0.1"), - b"something wrong", - treq=StubTreq(self.http.http_server.get_resource()), + client = StorageClientGeneral( + StorageClient( + DecodedURL.from_text("http://127.0.0.1"), + b"something wrong", + treq=StubTreq(self.http.http_server.get_resource()), + ) ) with self.assertRaises(ClientException) as e: result_of(client.get_version()) @@ -277,7 +351,8 @@ class GenericHTTPAPITests(SyncTestCase): We ignore available disk space and max immutable share size, since that might change across calls. """ - version = result_of(self.http.client.get_version()) + client = StorageClientGeneral(self.http.client) + version = result_of(client.get_version()) version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop( b"available-space" ) @@ -304,6 +379,28 @@ class ImmutableHTTPAPITests(SyncTestCase): self.skipTest("Not going to bother supporting Python 2") super(ImmutableHTTPAPITests, self).setUp() self.http = self.useFixture(HttpTestFixture()) + self.imm_client = StorageClientImmutables(self.http.client) + + def create_upload(self, share_numbers, length): + """ + Create a write bucket on server, return: + + (upload_secret, lease_secret, storage_index, result) + """ + upload_secret = urandom(32) + lease_secret = urandom(32) + storage_index = urandom(16) + created = result_of( + self.imm_client.create( + storage_index, + share_numbers, + length, + upload_secret, + lease_secret, + lease_secret, + ) + ) + return (upload_secret, lease_secret, storage_index, created) def test_upload_can_be_downloaded(self): """ @@ -315,19 +412,10 @@ class ImmutableHTTPAPITests(SyncTestCase): that's already done in test_storage.py. """ length = 100 - expected_data = b"".join(bytes([i]) for i in range(100)) - - im_client = StorageClientImmutables(self.http.client) + expected_data = bytes(range(100)) # Create a upload: - upload_secret = urandom(32) - lease_secret = urandom(32) - storage_index = b"".join(bytes([i]) for i in range(16)) - created = result_of( - im_client.create( - storage_index, {1}, 100, upload_secret, lease_secret, lease_secret - ) - ) + (upload_secret, _, storage_index, created) = self.create_upload({1}, 100) self.assertEqual( created, ImmutableCreateResult(already_have=set(), allocated={1}) ) @@ -338,7 +426,7 @@ class ImmutableHTTPAPITests(SyncTestCase): # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. def write(offset, length): remaining.empty(offset, offset + length) - return im_client.write_share_chunk( + return self.imm_client.write_share_chunk( storage_index, 1, upload_secret, @@ -382,31 +470,58 @@ class ImmutableHTTPAPITests(SyncTestCase): # We can now read: for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]: downloaded = result_of( - im_client.read_share_chunk(storage_index, 1, offset, length) + self.imm_client.read_share_chunk(storage_index, 1, offset, length) ) self.assertEqual(downloaded, expected_data[offset : offset + length]) + def test_allocate_buckets_second_time_wrong_upload_key(self): + """ + If allocate buckets endpoint is called second time with wrong upload + key on the same shares, the result is an error. + """ + # Create a upload: + (upload_secret, lease_secret, storage_index, _) = self.create_upload( + {1, 2, 3}, 100 + ) + with self.assertRaises(ClientException) as e: + result_of( + self.imm_client.create( + storage_index, {2, 3}, 100, b"x" * 32, lease_secret, lease_secret + ) + ) + self.assertEqual(e.exception.args[0], http.UNAUTHORIZED) + + def test_allocate_buckets_second_time_different_shares(self): + """ + If allocate buckets endpoint is called second time with different + upload key on different shares, that creates the buckets. + """ + # Create a upload: + (upload_secret, lease_secret, storage_index, created) = self.create_upload( + {1, 2, 3}, 100 + ) + + # Add same shares: + created2 = result_of( + self.imm_client.create( + storage_index, {4, 6}, 100, b"x" * 2, lease_secret, lease_secret + ) + ) + self.assertEqual(created2.allocated, {4, 6}) + def test_list_shares(self): """ Once a share is finished uploading, it's possible to list it. """ - im_client = StorageClientImmutables(self.http.client) - upload_secret = urandom(32) - lease_secret = urandom(32) - storage_index = b"".join(bytes([i]) for i in range(16)) - result_of( - im_client.create( - storage_index, {1, 2, 3}, 10, upload_secret, lease_secret, lease_secret - ) - ) + (upload_secret, _, storage_index, created) = self.create_upload({1, 2, 3}, 10) # Initially there are no shares: - self.assertEqual(result_of(im_client.list_shares(storage_index)), set()) + self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set()) # Upload shares 1 and 3: for share_number in [1, 3]: progress = result_of( - im_client.write_share_chunk( + self.imm_client.write_share_chunk( storage_index, share_number, upload_secret, @@ -417,87 +532,266 @@ class ImmutableHTTPAPITests(SyncTestCase): self.assertTrue(progress.finished) # Now shares 1 and 3 exist: - self.assertEqual(result_of(im_client.list_shares(storage_index)), {1, 3}) + self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), {1, 3}) + + def test_upload_bad_content_range(self): + """ + Malformed or invalid Content-Range headers to the immutable upload + endpoint result in a 416 error. + """ + (upload_secret, _, storage_index, created) = self.create_upload({1}, 10) + + def check_invalid(bad_content_range_value): + client = StorageClientImmutables( + StorageClientWithHeadersOverride( + self.http.client, {"content-range": bad_content_range_value} + ) + ) + with self.assertRaises(ClientException) as e: + result_of( + client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"0123456789", + ) + ) + self.assertEqual(e.exception.code, http.REQUESTED_RANGE_NOT_SATISFIABLE) + + check_invalid("not a valid content-range header at all") + check_invalid("bytes -1-9/10") + check_invalid("bytes 0--9/10") + check_invalid("teapots 0-9/10") def test_list_shares_unknown_storage_index(self): """ Listing unknown storage index's shares results in empty list of shares. """ - im_client = StorageClientImmutables(self.http.client) - storage_index = b"".join(bytes([i]) for i in range(16)) - self.assertEqual(result_of(im_client.list_shares(storage_index)), set()) + storage_index = bytes(range(16)) + self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set()) + + def test_upload_non_existent_storage_index(self): + """ + Uploading to a non-existent storage index or share number results in + 404. + """ + (upload_secret, _, storage_index, _) = self.create_upload({1}, 10) + + def unknown_check(storage_index, share_number): + with self.assertRaises(ClientException) as e: + result_of( + self.imm_client.write_share_chunk( + storage_index, + share_number, + upload_secret, + 0, + b"0123456789", + ) + ) + self.assertEqual(e.exception.code, http.NOT_FOUND) + + # Wrong share number: + unknown_check(storage_index, 7) + # Wrong storage index: + unknown_check(b"X" * 16, 7) def test_multiple_shares_uploaded_to_different_place(self): """ If a storage index has multiple shares, uploads to different shares are stored separately and can be downloaded separately. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - """ - - def test_bucket_allocated_with_new_shares(self): - """ - If some shares already exist, allocating shares indicates only the new - ones were created. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - """ - - def test_bucket_allocation_new_upload_secret(self): - """ - If a bucket was allocated with one upload secret, and a different upload - key is used to allocate the bucket again, the second allocation fails. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - """ - - def test_upload_with_wrong_upload_secret_fails(self): - """ - Uploading with a key that doesn't match the one used to allocate the - bucket will fail. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - """ - - def test_upload_offset_cannot_be_negative(self): - """ - A negative upload offset will be rejected. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 """ + (upload_secret, _, storage_index, _) = self.create_upload({1, 2}, 10) + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"1" * 10, + ) + ) + result_of( + self.imm_client.write_share_chunk( + storage_index, + 2, + upload_secret, + 0, + b"2" * 10, + ) + ) + self.assertEqual( + result_of(self.imm_client.read_share_chunk(storage_index, 1, 0, 10)), + b"1" * 10, + ) + self.assertEqual( + result_of(self.imm_client.read_share_chunk(storage_index, 2, 0, 10)), + b"2" * 10, + ) def test_mismatching_upload_fails(self): """ If an uploaded chunk conflicts with an already uploaded chunk, a CONFLICT error is returned. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 """ + (upload_secret, _, storage_index, created) = self.create_upload({1}, 100) + + # Write: + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"0" * 10, + ) + ) + + # Conflicting write: + with self.assertRaises(ClientException) as e: + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"0123456789", + ) + ) + self.assertEqual(e.exception.code, http.NOT_FOUND) + + def upload(self, share_number, data_length=26): + """ + Create a share, return (storage_index, uploaded_data). + """ + uploaded_data = (b"abcdefghijklmnopqrstuvwxyz" * ((data_length // 26) + 1))[ + :data_length + ] + (upload_secret, _, storage_index, _) = self.create_upload( + {share_number}, data_length + ) + result_of( + self.imm_client.write_share_chunk( + storage_index, + share_number, + upload_secret, + 0, + uploaded_data, + ) + ) + return storage_index, uploaded_data def test_read_of_wrong_storage_index_fails(self): """ Reading from unknown storage index results in 404. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 """ + with self.assertRaises(ClientException) as e: + result_of( + self.imm_client.read_share_chunk( + b"1" * 16, + 1, + 0, + 10, + ) + ) + self.assertEqual(e.exception.code, http.NOT_FOUND) def test_read_of_wrong_share_number_fails(self): """ Reading from unknown storage index results in 404. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 """ + storage_index, _ = self.upload(1) + with self.assertRaises(ClientException) as e: + result_of( + self.imm_client.read_share_chunk( + storage_index, + 7, # different share number + 0, + 10, + ) + ) + self.assertEqual(e.exception.code, http.NOT_FOUND) def test_read_with_negative_offset_fails(self): """ - The offset for reads cannot be negative. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 + Malformed or unsupported Range headers result in 416 (requested range + not satisfiable) error. """ + storage_index, _ = self.upload(1) - def test_read_with_negative_length_fails(self): - """ - The length for reads cannot be negative. + def check_bad_range(bad_range_value): + client = StorageClientImmutables( + StorageClientWithHeadersOverride( + self.http.client, {"range": bad_range_value} + ) + ) - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 + with self.assertRaises(ClientException) as e: + result_of( + client.read_share_chunk( + storage_index, + 1, + 0, + 10, + ) + ) + self.assertEqual(e.exception.code, http.REQUESTED_RANGE_NOT_SATISFIABLE) + + # Bad unit + check_bad_range("molluscs=0-9") + # Negative offsets + check_bad_range("bytes=-2-9") + check_bad_range("bytes=0--10") + # Negative offset no endpoint + check_bad_range("bytes=-300-") + check_bad_range("bytes=") + # Multiple ranges are currently unsupported, even if they're + # semantically valid under HTTP: + check_bad_range("bytes=0-5, 6-7") + # Ranges without an end are currently unsupported, even if they're + # semantically valid under HTTP. + check_bad_range("bytes=0-") + + @given(data_length=st.integers(min_value=1, max_value=300000)) + def test_read_with_no_range(self, data_length): """ + A read with no range returns the whole immutable. + """ + storage_index, uploaded_data = self.upload(1, data_length) + response = result_of( + self.http.client.request( + "GET", + self.http.client.relative_url( + "/v1/immutable/{}/1".format(_encode_si(storage_index)) + ), + ) + ) + self.assertEqual(response.code, http.OK) + self.assertEqual(result_of(response.content()), uploaded_data) + + def test_validate_content_range_response_to_read(self): + """ + The server responds to ranged reads with an appropriate Content-Range + header. + """ + storage_index, _ = self.upload(1, 26) + + def check_range(requested_range, expected_response): + headers = Headers() + headers.setRawHeaders("range", [requested_range]) + response = result_of( + self.http.client.request( + "GET", + self.http.client.relative_url( + "/v1/immutable/{}/1".format(_encode_si(storage_index)) + ), + headers=headers, + ) + ) + self.assertEqual( + response.headers.getRawHeaders("content-range"), [expected_response] + ) + + check_range("bytes=0-10", "bytes 0-10/*") + # Can't go beyond the end of the immutable! + check_range("bytes=10-100", "bytes 10-25/*")