diff --git a/docs/proposed/http-storage-node-protocol.rst b/docs/proposed/http-storage-node-protocol.rst index b00b327e3..2ceb3c03a 100644 --- a/docs/proposed/http-storage-node-protocol.rst +++ b/docs/proposed/http-storage-node-protocol.rst @@ -493,8 +493,8 @@ Handling repeat calls: * If the same API call is repeated with the same upload secret, the response is the same and no change is made to server state. This is necessary to ensure retries work in the face of lost responses from the server. * If the API calls is with a different upload secret, this implies a new client, perhaps because the old client died. - In order to prevent storage servers from being able to mess with each other, this API call will fail, because the secret doesn't match. - The use case of restarting upload from scratch if the client dies can be implemented by having the client persist the upload secret. + Or it may happen because the client wants to upload a different share number than a previous client. + New shares will be created, existing shares will be unchanged, regardless of whether the upload secret matches or not. Discussion `````````` @@ -540,7 +540,7 @@ Rejected designs for upload secrets: Write data for the indicated share. The share number must belong to the storage index. The request body is the raw share data (i.e., ``application/octet-stream``). -*Content-Range* requests are encouraged for large transfers to allow partially complete uploads to be resumed. +*Content-Range* requests are required; for large transfers this allows partially complete uploads to be resumed. For example, a 1MiB share can be divided in to eight separate 128KiB chunks. Each chunk can be uploaded in a separate request. @@ -614,16 +614,19 @@ From RFC 7231:: ``POST /v1/immutable/:storage_index/:share_number/corrupt`` !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -Advise the server the data read from the indicated share was corrupt. -The request body includes an human-meaningful string with details about the corruption. -It also includes potentially important details about the share. +Advise the server the data read from the indicated share was corrupt. The +request body includes an human-meaningful text string with details about the +corruption. It also includes potentially important details about the share. For example:: - {"reason": "expected hash abcd, got hash efgh"} + {"reason": u"expected hash abcd, got hash efgh"} .. share-type, storage-index, and share-number are inferred from the URL +The response code is OK (200) by default, or NOT FOUND (404) if the share +couldn't be found. + Reading ~~~~~~~ @@ -644,7 +647,7 @@ Read a contiguous sequence of bytes from one share in one bucket. The response body is the raw share data (i.e., ``application/octet-stream``). The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content). Interpretation and response behavior is as specified in RFC 7233 ยง 4.1. -Multiple ranges in a single request are *not* supported. +Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported. Discussion `````````` diff --git a/newsfragments/3860.minor b/newsfragments/3860.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3876.minor b/newsfragments/3876.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3877.minor b/newsfragments/3877.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3879.incompat b/newsfragments/3879.incompat new file mode 100644 index 000000000..ca3f24f94 --- /dev/null +++ b/newsfragments/3879.incompat @@ -0,0 +1 @@ +Share corruption reports stored on disk are now always encoded in UTF-8. \ No newline at end of file diff --git a/newsfragments/3881.minor b/newsfragments/3881.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3882.minor b/newsfragments/3882.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index b532c292e..572da506c 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -2,12 +2,8 @@ HTTP client that talks to the HTTP storage server. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - from typing import Union, Set, Optional +from treq.testing import StubTreq from base64 import b64encode @@ -25,7 +21,7 @@ import treq from treq.client import HTTPClient from treq.testing import StubTreq -from .http_common import swissnum_auth_header, Secrets +from .http_common import swissnum_auth_header, Secrets, get_content_type, CBOR_MIME_TYPE from .common import si_b2a @@ -35,14 +31,25 @@ def _encode_si(si): # type: (bytes) -> str class ClientException(Exception): - """An unexpected error.""" + """An unexpected response code from the server.""" + + def __init__(self, code, *additional_args): + Exception.__init__(self, code, *additional_args) + self.code = code def _decode_cbor(response): """Given HTTP response, return decoded CBOR body.""" if response.code > 199 and response.code < 300: - return treq.content(response).addCallback(loads) - return fail(ClientException(response.code, response.phrase)) + content_type = get_content_type(response.headers) + if content_type == CBOR_MIME_TYPE: + # TODO limit memory usage + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 + return treq.content(response).addCallback(loads) + else: + raise ClientException(-1, "Server didn't send CBOR") + else: + return fail(ClientException(response.code, response.phrase)) @attr.s @@ -55,17 +62,16 @@ class ImmutableCreateResult(object): class StorageClient(object): """ - HTTP client that talks to the HTTP storage server. + Low-level HTTP client that talks to the HTTP storage server. """ def __init__( self, url, swissnum, treq=treq ): # type: (DecodedURL, bytes, Union[treq,StubTreq,HTTPClient]) -> None """ - The URL is a HTTPS URL ("http://..."). To construct from a furl, use + The URL is a HTTPS URL ("https://..."). To construct from a furl, use ``StorageClient.from_furl()``. """ - assert url.to_text().startswith("https://") self._base_url = url self._swissnum = swissnum self._treq = treq @@ -79,8 +85,8 @@ class StorageClient(object): assert furl.scheme == "pb" swissnum = furl.path[0].encode("ascii") certificate_hash = furl.user.encode("ascii") - - def _url(self, path): + + def relative_url(self, path): """Get a URL relative to the base URL.""" return self._base_url.click(path) @@ -94,7 +100,7 @@ class StorageClient(object): ) return headers - def _request( + def request( self, method, url, @@ -102,13 +108,19 @@ class StorageClient(object): lease_cancel_secret=None, upload_secret=None, headers=None, + message_to_serialize=None, **kwargs ): """ Like ``treq.request()``, but with optional secrets that get translated into corresponding HTTP headers. + + If ``message_to_serialize`` is set, it will be serialized (by default + with CBOR) and set as the request body. """ headers = self._get_headers(headers) + + # Add secrets: for secret, value in [ (Secrets.LEASE_RENEW, lease_renew_secret), (Secrets.LEASE_CANCEL, lease_cancel_secret), @@ -120,15 +132,39 @@ class StorageClient(object): "X-Tahoe-Authorization", b"%s %s" % (secret.value.encode("ascii"), b64encode(value).strip()), ) + + # Note we can accept CBOR: + headers.addRawHeader("Accept", CBOR_MIME_TYPE) + + # If there's a request message, serialize it and set the Content-Type + # header: + if message_to_serialize is not None: + if "data" in kwargs: + raise TypeError( + "Can't use both `message_to_serialize` and `data` " + "as keyword arguments at the same time" + ) + kwargs["data"] = dumps(message_to_serialize) + headers.addRawHeader("Content-Type", CBOR_MIME_TYPE) + return self._treq.request(method, url, headers=headers, **kwargs) + +class StorageClientGeneral(object): + """ + High-level HTTP APIs that aren't immutable- or mutable-specific. + """ + + def __init__(self, client): # type: (StorageClient) -> None + self._client = client + @inlineCallbacks def get_version(self): """ Return the version metadata for the server. """ - url = self._url("/v1/version") - response = yield self._request("GET", url) + url = self._client.relative_url("/v1/version") + response = yield self._client.request("GET", url) decoded_response = yield _decode_cbor(response) returnValue(decoded_response) @@ -150,7 +186,7 @@ class StorageClientImmutables(object): APIs for interacting with immutables. """ - def __init__(self, client): # type: (StorageClient) -> None + def __init__(self, client: StorageClient): self._client = client @inlineCallbacks @@ -176,18 +212,16 @@ class StorageClientImmutables(object): Result fires when creating the storage index succeeded, if creating the storage index failed the result will fire with an exception. """ - url = self._client._url("/v1/immutable/" + _encode_si(storage_index)) - message = dumps( - {"share-numbers": share_numbers, "allocated-size": allocated_size} - ) - response = yield self._client._request( + url = self._client.relative_url("/v1/immutable/" + _encode_si(storage_index)) + message = {"share-numbers": share_numbers, "allocated-size": allocated_size} + + response = yield self._client.request( "POST", url, lease_renew_secret=lease_renew_secret, lease_cancel_secret=lease_cancel_secret, upload_secret=upload_secret, - data=message, - headers=Headers({"content-type": ["application/cbor"]}), + message_to_serialize=message, ) decoded_response = yield _decode_cbor(response) returnValue( @@ -197,6 +231,27 @@ class StorageClientImmutables(object): ) ) + @inlineCallbacks + def abort_upload( + self, storage_index: bytes, share_number: int, upload_secret: bytes + ) -> Deferred[None]: + """Abort the upload.""" + url = self._client.relative_url( + "/v1/immutable/{}/{}/abort".format(_encode_si(storage_index), share_number) + ) + response = yield self._client.request( + "PUT", + url, + upload_secret=upload_secret, + ) + + if response.code == http.OK: + return + else: + raise ClientException( + response.code, + ) + @inlineCallbacks def write_share_chunk( self, storage_index, share_number, upload_secret, offset, data @@ -213,10 +268,10 @@ class StorageClientImmutables(object): whether the _complete_ share (i.e. all chunks, not just this one) has been uploaded. """ - url = self._client._url( + url = self._client.relative_url( "/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number) ) - response = yield self._client._request( + response = yield self._client.request( "PATCH", url, upload_secret=upload_secret, @@ -264,10 +319,10 @@ class StorageClientImmutables(object): the HTTP protocol will be simplified, see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777 """ - url = self._client._url( + url = self._client.relative_url( "/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number) ) - response = yield self._client._request( + response = yield self._client.request( "GET", url, headers=Headers( @@ -278,25 +333,69 @@ class StorageClientImmutables(object): body = yield response.content() returnValue(body) else: - raise ClientException( - response.code, - ) + raise ClientException(response.code) @inlineCallbacks def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]] """ Return the set of shares for a given storage index. """ - url = self._client._url( + url = self._client.relative_url( "/v1/immutable/{}/shares".format(_encode_si(storage_index)) ) - response = yield self._client._request( + response = yield self._client.request( "GET", url, ) if response.code == http.OK: body = yield _decode_cbor(response) returnValue(set(body)) + else: + raise ClientException(response.code) + + @inlineCallbacks + def add_or_renew_lease( + self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes + ): + """ + Add or renew a lease. + + If the renewal secret matches an existing lease, it is renewed. + Otherwise a new lease is added. + """ + url = self._client.relative_url( + "/v1/lease/{}".format(_encode_si(storage_index)) + ) + response = yield self._client.request( + "PUT", + url, + lease_renew_secret=renew_secret, + lease_cancel_secret=cancel_secret, + ) + + if response.code == http.NO_CONTENT: + return + else: + raise ClientException(response.code) + + @inlineCallbacks + def advise_corrupt_share( + self, + storage_index: bytes, + share_number: int, + reason: str, + ): + """Indicate a share has been corrupted, with a human-readable message.""" + assert isinstance(reason, str) + url = self._client.relative_url( + "/v1/immutable/{}/{}/corrupt".format( + _encode_si(storage_index), share_number + ) + ) + message = {"reason": reason} + response = yield self._client.request("POST", url, message_to_serialize=message) + if response.code == http.OK: + return else: raise ClientException( response.code, diff --git a/src/allmydata/storage/http_common.py b/src/allmydata/storage/http_common.py index f570d45d7..c5e087b5b 100644 --- a/src/allmydata/storage/http_common.py +++ b/src/allmydata/storage/http_common.py @@ -1,13 +1,31 @@ """ Common HTTP infrastructure for the storge server. """ + from enum import Enum from base64 import b64encode from hashlib import sha256 +from typing import Optional from cryptography.x509 import Certificate from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat +from werkzeug.http import parse_options_header +from twisted.web.http_headers import Headers + +CBOR_MIME_TYPE = "application/cbor" + + +def get_content_type(headers: Headers) -> Optional[str]: + """ + Get the content type from the HTTP ``Content-Type`` header. + + Returns ``None`` if no content-type was set. + """ + values = headers.getRawHeaders("content-type") or [None] + content_type = parse_options_header(values[0])[0] or None + return content_type + def swissnum_auth_header(swissnum): # type: (bytes) -> bytes """Return value for ``Authentication`` header.""" diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index da9eea22f..c3228ea04 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -2,15 +2,12 @@ HTTP server for storage. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -from typing import Dict, List, Set, Tuple, Optional +from typing import Dict, List, Set, Tuple, Any, Optional from pathlib import Path + from functools import wraps from base64 import b64decode +import binascii from klein import Klein from twisted.web import http @@ -19,18 +16,33 @@ from twisted.internet.defer import Deferred from twisted.internet.endpoints import quoteStringArgument, serverFromString from twisted.web.server import Site import attr -from werkzeug.http import parse_range_header, parse_content_range_header +from werkzeug.http import ( + parse_range_header, + parse_content_range_header, + parse_accept_header, +) +from werkzeug.routing import BaseConverter, ValidationError +from werkzeug.datastructures import ContentRange from hyperlink import DecodedURL from cryptography.x509 import load_pem_x509_certificate + # TODO Make sure to use pure Python versions? from cbor2 import dumps, loads from .server import StorageServer -from .http_common import swissnum_auth_header, Secrets, get_spki_hash +from .http_common import ( + swissnum_auth_header, + Secrets, + get_content_type, + CBOR_MIME_TYPE, + get_spki_hash, +) + from .common import si_a2b -from .immutable import BucketWriter +from .immutable import BucketWriter, ConflictingWriteError from ..util.hashutil import timing_safe_compare +from ..util.base32 import rfc3548_alphabet class ClientSecretsException(Exception): @@ -127,10 +139,95 @@ class StorageIndexUploads(object): """ # Map share number to BucketWriter - shares = attr.ib() # type: Dict[int,BucketWriter] + shares = attr.ib(factory=dict) # type: Dict[int,BucketWriter] - # The upload key. - upload_secret = attr.ib() # type: bytes + # Map share number to the upload secret (different shares might have + # different upload secrets). + upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes] + + +@attr.s +class UploadsInProgress(object): + """ + Keep track of uploads for storage indexes. + """ + + # Map storage index to corresponding uploads-in-progress + _uploads = attr.ib(type=Dict[bytes, StorageIndexUploads], factory=dict) + + # Map BucketWriter to (storage index, share number) + _bucketwriters = attr.ib(type=Dict[BucketWriter, Tuple[bytes, int]], factory=dict) + + def add_write_bucket( + self, + storage_index: bytes, + share_number: int, + upload_secret: bytes, + bucket: BucketWriter, + ): + """Add a new ``BucketWriter`` to be tracked.""" + si_uploads = self._uploads.setdefault(storage_index, StorageIndexUploads()) + si_uploads.shares[share_number] = bucket + si_uploads.upload_secrets[share_number] = upload_secret + self._bucketwriters[bucket] = (storage_index, share_number) + + def get_write_bucket( + self, storage_index: bytes, share_number: int, upload_secret: bytes + ) -> BucketWriter: + """Get the given in-progress immutable share upload.""" + self.validate_upload_secret(storage_index, share_number, upload_secret) + try: + return self._uploads[storage_index].shares[share_number] + except (KeyError, IndexError): + raise _HTTPError(http.NOT_FOUND) + + def remove_write_bucket(self, bucket: BucketWriter): + """Stop tracking the given ``BucketWriter``.""" + storage_index, share_number = self._bucketwriters.pop(bucket) + uploads_index = self._uploads[storage_index] + uploads_index.shares.pop(share_number) + uploads_index.upload_secrets.pop(share_number) + if not uploads_index.shares: + self._uploads.pop(storage_index) + + def validate_upload_secret( + self, storage_index: bytes, share_number: int, upload_secret: bytes + ): + """ + Raise an unauthorized-HTTP-response exception if the given + storage_index+share_number have a different upload secret than the + given one. + + If the given upload doesn't exist at all, nothing happens. + """ + if storage_index in self._uploads: + in_progress = self._uploads[storage_index] + # For pre-existing upload, make sure password matches. + if share_number in in_progress.upload_secrets and not timing_safe_compare( + in_progress.upload_secrets[share_number], upload_secret + ): + raise _HTTPError(http.UNAUTHORIZED) + + +class StorageIndexConverter(BaseConverter): + """Parser/validator for storage index URL path segments.""" + + regex = "[" + str(rfc3548_alphabet, "ascii") + "]{26}" + + def to_python(self, value): + try: + return si_a2b(value.encode("ascii")) + except (AssertionError, binascii.Error, ValueError): + raise ValidationError("Invalid storage index") + + +class _HTTPError(Exception): + """ + Raise from ``HTTPServer`` endpoint to return the given HTTP response code. + """ + + def __init__(self, code: int): + self.code = code class HTTPServer(object): @@ -139,6 +236,13 @@ class HTTPServer(object): """ _app = Klein() + _app.url_map.converters["storage_index"] = StorageIndexConverter + + @_app.handle_errors(_HTTPError) + def _http_error(self, request, failure): + """Handle ``_HTTPError`` exceptions.""" + request.setResponseCode(failure.value.code) + return b"" def __init__( self, storage_server, swissnum @@ -146,102 +250,157 @@ class HTTPServer(object): self._storage_server = storage_server self._swissnum = swissnum # Maps storage index to StorageIndexUploads: - self._uploads = {} # type: Dict[bytes,StorageIndexUploads] + self._uploads = UploadsInProgress() + + # When an upload finishes successfully, gets aborted, or times out, + # make sure it gets removed from our tracking datastructure: + self._storage_server.register_bucket_writer_close_handler( + self._uploads.remove_write_bucket + ) def get_resource(self): """Return twisted.web ``Resource`` for this object.""" return self._app.resource() - def _cbor(self, request, data): - """Return CBOR-encoded data.""" - # TODO Might want to optionally send JSON someday, based on Accept - # headers, see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 - request.setHeader("Content-Type", "application/cbor") - # TODO if data is big, maybe want to use a temporary file eventually... - return dumps(data) + def _send_encoded(self, request, data): + """ + Return encoded data suitable for writing as the HTTP body response, by + default using CBOR. + + Also sets the appropriate ``Content-Type`` header on the response. + """ + accept_headers = request.requestHeaders.getRawHeaders("accept") or [ + CBOR_MIME_TYPE + ] + accept = parse_accept_header(accept_headers[0]) + if accept.best == CBOR_MIME_TYPE: + request.setHeader("Content-Type", CBOR_MIME_TYPE) + # TODO if data is big, maybe want to use a temporary file eventually... + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 + return dumps(data) + else: + # TODO Might want to optionally send JSON someday: + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 + raise _HTTPError(http.NOT_ACCEPTABLE) + + def _read_encoded(self, request) -> Any: + """ + Read encoded request body data, decoding it with CBOR by default. + """ + content_type = get_content_type(request.requestHeaders) + if content_type == CBOR_MIME_TYPE: + # TODO limit memory usage, client could send arbitrarily large data... + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 + return loads(request.content.read()) + else: + raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE) ##### Generic APIs ##### @_authorized_route(_app, set(), "/v1/version", methods=["GET"]) def version(self, request, authorization): """Return version information.""" - return self._cbor(request, self._storage_server.get_version()) + return self._send_encoded(request, self._storage_server.get_version()) ##### Immutable APIs ##### @_authorized_route( _app, {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.UPLOAD}, - "/v1/immutable/", + "/v1/immutable/", methods=["POST"], ) def allocate_buckets(self, request, authorization, storage_index): """Allocate buckets.""" - storage_index = si_a2b(storage_index.encode("ascii")) - info = loads(request.content.read()) upload_secret = authorization[Secrets.UPLOAD] + info = self._read_encoded(request) - if storage_index in self._uploads: - # Pre-existing upload. - in_progress = self._uploads[storage_index] - if timing_safe_compare(in_progress.upload_secret, upload_secret): - # Same session. - # TODO add BucketWriters only for new shares that don't already have buckets; see the HTTP spec for details. - # The backend code may already implement this logic. - pass - else: - # TODO Fail, since the secret doesnt match. - pass - else: - # New upload. - already_got, sharenum_to_bucket = self._storage_server.allocate_buckets( - storage_index, - renew_secret=authorization[Secrets.LEASE_RENEW], - cancel_secret=authorization[Secrets.LEASE_CANCEL], - sharenums=info["share-numbers"], - allocated_size=info["allocated-size"], - ) - self._uploads[storage_index] = StorageIndexUploads( - shares=sharenum_to_bucket, upload_secret=authorization[Secrets.UPLOAD] - ) - return self._cbor( - request, - { - "already-have": set(already_got), - "allocated": set(sharenum_to_bucket), - }, + # We do NOT validate the upload secret for existing bucket uploads. + # Another upload may be happening in parallel, with a different upload + # key. That's fine! If a client tries to _write_ to that upload, they + # need to have an upload key. That does mean we leak the existence of + # these parallel uploads, but if you know storage index you can + # download them once upload finishes, so it's not a big deal to leak + # that information. + + already_got, sharenum_to_bucket = self._storage_server.allocate_buckets( + storage_index, + renew_secret=authorization[Secrets.LEASE_RENEW], + cancel_secret=authorization[Secrets.LEASE_CANCEL], + sharenums=info["share-numbers"], + allocated_size=info["allocated-size"], + ) + for share_number, bucket in sharenum_to_bucket.items(): + self._uploads.add_write_bucket( + storage_index, share_number, upload_secret, bucket ) + return self._send_encoded( + request, + { + "already-have": set(already_got), + "allocated": set(sharenum_to_bucket), + }, + ) + @_authorized_route( _app, {Secrets.UPLOAD}, - "/v1/immutable//", + "/v1/immutable///abort", + methods=["PUT"], + ) + def abort_share_upload(self, request, authorization, storage_index, share_number): + """Abort an in-progress immutable share upload.""" + try: + bucket = self._uploads.get_write_bucket( + storage_index, share_number, authorization[Secrets.UPLOAD] + ) + except _HTTPError as e: + if e.code == http.NOT_FOUND: + # It may be we've already uploaded this, in which case error + # should be method not allowed (405). + try: + self._storage_server.get_buckets(storage_index)[share_number] + except KeyError: + pass + else: + # Already uploaded, so we can't abort. + raise _HTTPError(http.NOT_ALLOWED) + raise + + # Abort the upload; this should close it which will eventually result + # in self._uploads.remove_write_bucket() being called. + bucket.abort() + + return b"" + + @_authorized_route( + _app, + {Secrets.UPLOAD}, + "/v1/immutable//", methods=["PATCH"], ) def write_share_data(self, request, authorization, storage_index, share_number): """Write data to an in-progress immutable upload.""" - storage_index = si_a2b(storage_index.encode("ascii")) content_range = parse_content_range_header(request.getHeader("content-range")) - # TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - # 1. Malformed header should result in error 416 - # 2. Non-bytes unit should result in error 416 - # 3. Missing header means full upload in one request - # 4. Impossible range should resul tin error 416 + if content_range is None or content_range.units != "bytes": + request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) + return b"" + offset = content_range.start - # TODO basic checks on validity of start, offset, and content-range in general. also of share_number. - # TODO basic check that body isn't infinite. require content-length? or maybe we should require content-range (it's optional now)? if so, needs to be rflected in protocol spec. + # TODO limit memory usage + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 + data = request.content.read(content_range.stop - content_range.start + 1) + bucket = self._uploads.get_write_bucket( + storage_index, share_number, authorization[Secrets.UPLOAD] + ) - data = request.content.read() try: - bucket = self._uploads[storage_index].shares[share_number] - except (KeyError, IndexError): - # TODO return 404 - raise - - finished = bucket.write(offset, data) - - # TODO if raises ConflictingWriteError, return HTTP CONFLICT code. + finished = bucket.write(offset, data) + except ConflictingWriteError: + request.setResponseCode(http.CONFLICT) + return b"" if finished: bucket.close() @@ -252,55 +411,112 @@ class HTTPServer(object): required = [] for start, end, _ in bucket.required_ranges().ranges(): required.append({"begin": start, "end": end}) - return self._cbor(request, {"required": required}) + return self._send_encoded(request, {"required": required}) @_authorized_route( _app, set(), - "/v1/immutable//shares", + "/v1/immutable//shares", methods=["GET"], ) def list_shares(self, request, authorization, storage_index): """ List shares for the given storage index. """ - storage_index = si_a2b(storage_index.encode("ascii")) share_numbers = list(self._storage_server.get_buckets(storage_index).keys()) - return self._cbor(request, share_numbers) + return self._send_encoded(request, share_numbers) @_authorized_route( _app, set(), - "/v1/immutable//", + "/v1/immutable//", methods=["GET"], ) def read_share_chunk(self, request, authorization, storage_index, share_number): """Read a chunk for an already uploaded immutable.""" - # TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - # 1. basic checks on validity on storage index, share number - # 2. missing range header should have response code 200 and return whole thing - # 3. malformed range header should result in error? or return everything? - # 4. non-bytes range results in error - # 5. ranges make sense semantically (positive, etc.) - # 6. multiple ranges fails with error - # 7. missing end of range means "to the end of share" - storage_index = si_a2b(storage_index.encode("ascii")) - range_header = parse_range_header(request.getHeader("range")) - offset, end = range_header.ranges[0] - assert end != None # TODO support this case + try: + bucket = self._storage_server.get_buckets(storage_index)[share_number] + except KeyError: + request.setResponseCode(http.NOT_FOUND) + return b"" - # TODO if not found, 404 - bucket = self._storage_server.get_buckets(storage_index)[share_number] + if request.getHeader("range") is None: + # Return the whole thing. + start = 0 + while True: + # TODO should probably yield to event loop occasionally... + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 + data = bucket.read(start, start + 65536) + if not data: + request.finish() + return + request.write(data) + start += len(data) + + range_header = parse_range_header(request.getHeader("range")) + if ( + range_header is None + or range_header.units != "bytes" + or len(range_header.ranges) > 1 # more than one range + or range_header.ranges[0][1] is None # range without end + ): + request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) + return b"" + + offset, end = range_header.ranges[0] + + # TODO limit memory usage + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 data = bucket.read(offset, end - offset) + request.setResponseCode(http.PARTIAL_CONTENT) - # TODO set content-range on response. We we need to expand the - # BucketReader interface to return share's length. - # - # request.setHeader( - # "content-range", range_header.make_content_range(share_length).to_header() - # ) + if len(data): + # For empty bodies the content-range header makes no sense since + # the end of the range is inclusive. + request.setHeader( + "content-range", + ContentRange("bytes", offset, offset + len(data)).to_header(), + ) return data + @_authorized_route( + _app, + {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL}, + "/v1/lease/", + methods=["PUT"], + ) + def add_or_renew_lease(self, request, authorization, storage_index): + """Update the lease for an immutable share.""" + if not self._storage_server.get_buckets(storage_index): + raise _HTTPError(http.NOT_FOUND) + + # Checking of the renewal secret is done by the backend. + self._storage_server.add_lease( + storage_index, + authorization[Secrets.LEASE_RENEW], + authorization[Secrets.LEASE_CANCEL], + ) + + request.setResponseCode(http.NO_CONTENT) + return b"" + + @_authorized_route( + _app, + set(), + "/v1/immutable///corrupt", + methods=["POST"], + ) + def advise_corrupt_share(self, request, authorization, storage_index, share_number): + """Indicate that given share is corrupt, with a text reason.""" + try: + bucket = self._storage_server.get_buckets(storage_index)[share_number] + except KeyError: + raise _HTTPError(http.NOT_FOUND) + + info = self._read_encoded(request) + bucket.advise_corrupt_share(info["reason"].encode("utf-8")) + return b"" + def listen_tls( reactor, diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 0add9806b..9d1a3d6a4 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -743,8 +743,9 @@ class StorageServer(service.MultiService): def advise_corrupt_share(self, share_type, storage_index, shnum, reason): - # This is a remote API, I believe, so this has to be bytes for legacy - # protocol backwards compatibility reasons. + # Previously this had to be bytes for legacy protocol backwards + # compatibility reasons. Now that Foolscap layer has been abstracted + # out, we can probably refactor this to be unicode... assert isinstance(share_type, bytes) assert isinstance(reason, bytes), "%r is not bytes" % (reason,) @@ -777,7 +778,7 @@ class StorageServer(service.MultiService): si_s, shnum, ) - with open(report_path, "w") as f: + with open(report_path, "w", encoding="utf-8") as f: f.write(report) return None diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index cf5fb65a2..55b6cfb05 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -58,7 +58,7 @@ from twisted.plugin import ( from eliot import ( log_call, ) -from foolscap.api import eventually +from foolscap.api import eventually, RemoteException from foolscap.reconnector import ( ReconnectionInfo, ) @@ -75,7 +75,10 @@ from allmydata.util.observer import ObserverList from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.hashutil import permute_server_hash from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict -from allmydata.storage.http_client import StorageClient, StorageClientImmutables +from allmydata.storage.http_client import ( + StorageClient, StorageClientImmutables, StorageClientGeneral, + ClientException as HTTPClientException +) # who is responsible for de-duplication? @@ -1035,8 +1038,13 @@ class _FakeRemoteReference(object): """ local_object = attr.ib(type=object) + @defer.inlineCallbacks def callRemote(self, action, *args, **kwargs): - return getattr(self.local_object, action)(*args, **kwargs) + try: + result = yield getattr(self.local_object, action)(*args, **kwargs) + defer.returnValue(result) + except HTTPClientException as e: + raise RemoteException(e.args) @attr.s @@ -1051,7 +1059,8 @@ class _HTTPBucketWriter(object): finished = attr.ib(type=bool, default=False) def abort(self): - pass # TODO in later ticket + return self.client.abort_upload(self.storage_index, self.share_number, + self.upload_secret) @defer.inlineCallbacks def write(self, offset, data): @@ -1085,7 +1094,10 @@ class _HTTPBucketReader(object): ) def advise_corrupt_share(self, reason): - pass # TODO in later ticket + return self.client.advise_corrupt_share( + self.storage_index, self.share_number, + str(reason, "utf-8", errors="backslashreplace") + ) # WORK IN PROGRESS, for now it doesn't actually implement whole thing. @@ -1105,7 +1117,7 @@ class _HTTPStorageServer(object): return _HTTPStorageServer(http_client=http_client) def get_version(self): - return self._http_client.get_version() + return StorageClientGeneral(self._http_client).get_version() @defer.inlineCallbacks def allocate_buckets( @@ -1115,7 +1127,7 @@ class _HTTPStorageServer(object): cancel_secret, sharenums, allocated_size, - canary, + canary ): upload_secret = urandom(20) immutable_client = StorageClientImmutables(self._http_client) @@ -1139,7 +1151,7 @@ class _HTTPStorageServer(object): @defer.inlineCallbacks def get_buckets( self, - storage_index, + storage_index ): immutable_client = StorageClientImmutables(self._http_client) share_numbers = yield immutable_client.list_shares( @@ -1151,3 +1163,29 @@ class _HTTPStorageServer(object): )) for share_num in share_numbers }) + + def add_lease( + self, + storage_index, + renew_secret, + cancel_secret + ): + immutable_client = StorageClientImmutables(self._http_client) + return immutable_client.add_or_renew_lease( + storage_index, renew_secret, cancel_secret + ) + + def advise_corrupt_share( + self, + share_type, + storage_index, + shnum, + reason: bytes + ): + if share_type == b"immutable": + imm_client = StorageClientImmutables(self._http_client) + return imm_client.advise_corrupt_share( + storage_index, shnum, str(reason, "utf-8", errors="backslashreplace") + ) + else: + raise NotImplementedError() # future tickets diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 1c757551a..253ff6046 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -176,8 +176,9 @@ class IStorageServerImmutableAPIsTestsMixin(object): canary=Referenceable(), ) - # Bucket 1 is fully written in one go. - yield allocated[0].callRemote("write", 0, b"1" * 1024) + # Bucket 1 get some data written (but not all, or HTTP implicitly + # finishes the upload) + yield allocated[0].callRemote("write", 0, b"1" * 1023) # Disconnect or abort, depending on the test: yield abort_or_disconnect(allocated[0]) @@ -193,20 +194,6 @@ class IStorageServerImmutableAPIsTestsMixin(object): ) yield allocated[0].callRemote("write", 0, b"2" * 1024) - def test_disconnection(self): - """ - If we disconnect in the middle of writing to a bucket, all data is - wiped, and it's even possible to write different data to the bucket. - - (In the real world one shouldn't do that, but writing different data is - a good way to test that the original data really was wiped.) - - HTTP protocol should skip this test, since disconnection is meaningless - concept; this is more about testing implicit contract the Foolscap - implementation depends on doesn't change as we refactor things. - """ - return self.abort_or_disconnect_half_way(lambda _: self.disconnect()) - @inlineCallbacks def test_written_shares_are_allocated(self): """ @@ -1061,13 +1048,6 @@ class _SharedMixin(SystemTestMixin): AsyncTestCase.tearDown(self) yield SystemTestMixin.tearDown(self) - @inlineCallbacks - def disconnect(self): - """ - Disconnect and then reconnect with a new ``IStorageServer``. - """ - raise NotImplementedError("implement in subclass") - class _FoolscapMixin(_SharedMixin): """Run tests on Foolscap version of ``IStorageServer``.""" @@ -1080,16 +1060,6 @@ class _FoolscapMixin(_SharedMixin): self.assertTrue(IStorageServer.providedBy(client)) return succeed(client) - @inlineCallbacks - def disconnect(self): - """ - Disconnect and then reconnect with a new ``IStorageServer``. - """ - current = self.storage_client - yield self.bounce_client(0) - self.storage_client = self._get_native_server().get_storage_server() - assert self.storage_client is not current - class _HTTPMixin(_SharedMixin): """Run tests on the HTTP version of ``IStorageServer``.""" @@ -1148,27 +1118,37 @@ class FoolscapImmutableAPIsTests( ): """Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" + def test_disconnection(self): + """ + If we disconnect in the middle of writing to a bucket, all data is + wiped, and it's even possible to write different data to the bucket. + + (In the real world one shouldn't do that, but writing different data is + a good way to test that the original data really was wiped.) + + HTTP protocol doesn't need this test, since disconnection is a + meaningless concept; this is more about testing the implicit contract + the Foolscap implementation depends on doesn't change as we refactor + things. + """ + return self.abort_or_disconnect_half_way(lambda _: self.disconnect()) + + @inlineCallbacks + def disconnect(self): + """ + Disconnect and then reconnect with a new ``IStorageServer``. + """ + current = self.storage_client + yield self.bounce_client(0) + self.storage_client = self._get_native_server().get_storage_server() + assert self.storage_client is not current + class HTTPImmutableAPIsTests( _HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase ): """HTTP-specific tests for immutable ``IStorageServer`` APIs.""" - # These will start passing in future PRs as HTTP protocol is implemented. - SKIP_TESTS = { - "test_abort", - "test_add_lease_renewal", - "test_add_new_lease", - "test_advise_corrupt_share", - "test_allocate_buckets_repeat", - "test_bucket_advise_corrupt_share", - "test_disconnection", - "test_get_buckets_skips_unfinished_buckets", - "test_matching_overlapping_writes", - "test_non_matching_overlapping_writes", - "test_written_shares_are_allocated", - } - class FoolscapMutableAPIsTests( _FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index e9c9e83f5..2d17658ce 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -15,6 +15,7 @@ if PY2: # fmt: on from base64 import b64encode +from contextlib import contextmanager from os import urandom from hypothesis import assume, given, strategies as st @@ -24,9 +25,15 @@ from klein import Klein from hyperlink import DecodedURL from collections_extended import RangeMap from twisted.internet.task import Clock +from twisted.web import http +from twisted.web.http_headers import Headers from cryptography.x509 import load_pem_x509_certificate +from werkzeug import routing +from werkzeug.exceptions import NotFound as WNotFound from .common import SyncTestCase +from ..storage.http_common import get_content_type, get_spki_hash +from ..storage.common import si_b2a from ..storage.server import StorageServer from ..storage.http_server import ( HTTPServer, @@ -34,6 +41,7 @@ from ..storage.http_server import ( Secrets, ClientSecretsException, _authorized_route, + StorageIndexConverter, ) from ..storage.http_client import ( StorageClient, @@ -41,8 +49,9 @@ from ..storage.http_client import ( StorageClientImmutables, ImmutableCreateResult, UploadProgress, + StorageClientGeneral, + _encode_si, ) -from ..storage.http_common import get_spki_hash class HTTPFurlTests(SyncTestCase): @@ -85,6 +94,25 @@ ox5zO3LrQmQw11OaIAs2/kviKAoKTFFxeyYcpS5RuKNDZfHQCXlLwt9bySxG self.assertEqual(get_spki_hash(certificate), expected_hash) +class HTTPUtilities(SyncTestCase): + """Tests for HTTP common utilities.""" + + def test_get_content_type(self): + """``get_content_type()`` extracts the content-type from the header.""" + + def assert_header_values_result(values, expected_content_type): + headers = Headers() + if values: + headers.setRawHeaders("Content-Type", values) + content_type = get_content_type(headers) + self.assertEqual(content_type, expected_content_type) + + assert_header_values_result(["text/html"], "text/html") + assert_header_values_result([], None) + assert_header_values_result(["text/plain", "application/json"], "text/plain") + assert_header_values_result(["text/html;encoding=utf-8"], "text/html") + + def _post_process(params): secret_types, secrets = params secrets = {t: s for (t, s) in zip(secret_types, secrets)} @@ -189,6 +217,52 @@ class ExtractSecretsTests(SyncTestCase): _extract_secrets(["lease-cancel-secret eA=="], {Secrets.LEASE_RENEW}) +class RouteConverterTests(SyncTestCase): + """Tests for custom werkzeug path segment converters.""" + + adapter = routing.Map( + [ + routing.Rule( + "//", endpoint="si", methods=["GET"] + ) + ], + converters={"storage_index": StorageIndexConverter}, + ).bind("example.com", "/") + + @given(storage_index=st.binary(min_size=16, max_size=16)) + def test_good_storage_index_is_parsed(self, storage_index): + """ + A valid storage index is accepted and parsed back out by + StorageIndexConverter. + """ + self.assertEqual( + self.adapter.match( + "/{}/".format(str(si_b2a(storage_index), "ascii")), method="GET" + ), + ("si", {"storage_index": storage_index}), + ) + + def test_long_storage_index_is_not_parsed(self): + """An overly long storage_index string is not parsed.""" + with self.assertRaises(WNotFound): + self.adapter.match("/{}/".format("a" * 27), method="GET") + + def test_short_storage_index_is_not_parsed(self): + """An overly short storage_index string is not parsed.""" + with self.assertRaises(WNotFound): + self.adapter.match("/{}/".format("a" * 25), method="GET") + + def test_bad_characters_storage_index_is_not_parsed(self): + """A storage_index string with bad characters is not parsed.""" + with self.assertRaises(WNotFound): + self.adapter.match("/{}_/".format("a" * 25), method="GET") + + def test_invalid_storage_index_is_not_parsed(self): + """An invalid storage_index string is not parsed.""" + with self.assertRaises(WNotFound): + self.adapter.match("/nomd2a65ylxjbqzsw7gcfh4ivr/", method="GET") + + # TODO should be actual swissnum SWISSNUM_FOR_TEST = b"abcd" @@ -249,7 +323,7 @@ class RoutingTests(SyncTestCase): """ # Without secret, get a 400 error. response = result_of( - self.client._request( + self.client.request( "GET", "http://127.0.0.1/upload_secret", ) @@ -258,7 +332,7 @@ class RoutingTests(SyncTestCase): # With secret, we're good. response = result_of( - self.client._request( + self.client.request( "GET", "http://127.0.0.1/upload_secret", upload_secret=b"MAGIC" ) ) @@ -286,6 +360,38 @@ class HttpTestFixture(Fixture): ) +class StorageClientWithHeadersOverride(object): + """Wrap ``StorageClient`` and override sent headers.""" + + def __init__(self, storage_client, add_headers): + self.storage_client = storage_client + self.add_headers = add_headers + + def __getattr__(self, attr): + return getattr(self.storage_client, attr) + + def request(self, *args, headers=None, **kwargs): + if headers is None: + headers = Headers() + for key, value in self.add_headers.items(): + headers.setRawHeaders(key, [value]) + return self.storage_client.request(*args, headers=headers, **kwargs) + + +@contextmanager +def assert_fails_with_http_code(test_case: SyncTestCase, code: int): + """ + Context manager that asserts the code fails with the given HTTP response + code. + """ + with test_case.assertRaises(ClientException) as e: + try: + yield + finally: + pass + test_case.assertEqual(e.exception.code, code) + + class GenericHTTPAPITests(SyncTestCase): """ Tests of HTTP client talking to the HTTP server, for generic HTTP API @@ -303,14 +409,26 @@ class GenericHTTPAPITests(SyncTestCase): If the wrong swissnum is used, an ``Unauthorized`` response code is returned. """ - client = StorageClient( - DecodedURL.from_text("http://127.0.0.1"), - b"something wrong", - treq=StubTreq(self.http.http_server.get_resource()), + client = StorageClientGeneral( + StorageClient( + DecodedURL.from_text("http://127.0.0.1"), + b"something wrong", + treq=StubTreq(self.http.http_server.get_resource()), + ) ) - with self.assertRaises(ClientException) as e: + with assert_fails_with_http_code(self, http.UNAUTHORIZED): + result_of(client.get_version()) + + def test_unsupported_mime_type(self): + """ + The client can request mime types other than CBOR, and if they are + unsupported a NOT ACCEPTABLE (406) error will be returned. + """ + client = StorageClientGeneral( + StorageClientWithHeadersOverride(self.http.client, {"accept": "image/gif"}) + ) + with assert_fails_with_http_code(self, http.NOT_ACCEPTABLE): result_of(client.get_version()) - self.assertEqual(e.exception.args[0], 401) def test_version(self): """ @@ -319,7 +437,8 @@ class GenericHTTPAPITests(SyncTestCase): We ignore available disk space and max immutable share size, since that might change across calls. """ - version = result_of(self.http.client.get_version()) + client = StorageClientGeneral(self.http.client) + version = result_of(client.get_version()) version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop( b"available-space" ) @@ -346,6 +465,28 @@ class ImmutableHTTPAPITests(SyncTestCase): self.skipTest("Not going to bother supporting Python 2") super(ImmutableHTTPAPITests, self).setUp() self.http = self.useFixture(HttpTestFixture()) + self.imm_client = StorageClientImmutables(self.http.client) + + def create_upload(self, share_numbers, length): + """ + Create a write bucket on server, return: + + (upload_secret, lease_secret, storage_index, result) + """ + upload_secret = urandom(32) + lease_secret = urandom(32) + storage_index = urandom(16) + created = result_of( + self.imm_client.create( + storage_index, + share_numbers, + length, + upload_secret, + lease_secret, + lease_secret, + ) + ) + return (upload_secret, lease_secret, storage_index, created) def test_upload_can_be_downloaded(self): """ @@ -357,19 +498,10 @@ class ImmutableHTTPAPITests(SyncTestCase): that's already done in test_storage.py. """ length = 100 - expected_data = b"".join(bytes([i]) for i in range(100)) - - im_client = StorageClientImmutables(self.http.client) + expected_data = bytes(range(100)) # Create a upload: - upload_secret = urandom(32) - lease_secret = urandom(32) - storage_index = b"".join(bytes([i]) for i in range(16)) - created = result_of( - im_client.create( - storage_index, {1}, 100, upload_secret, lease_secret, lease_secret - ) - ) + (upload_secret, _, storage_index, created) = self.create_upload({1}, 100) self.assertEqual( created, ImmutableCreateResult(already_have=set(), allocated={1}) ) @@ -380,7 +512,7 @@ class ImmutableHTTPAPITests(SyncTestCase): # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. def write(offset, length): remaining.empty(offset, offset + length) - return im_client.write_share_chunk( + return self.imm_client.write_share_chunk( storage_index, 1, upload_secret, @@ -424,31 +556,111 @@ class ImmutableHTTPAPITests(SyncTestCase): # We can now read: for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]: downloaded = result_of( - im_client.read_share_chunk(storage_index, 1, offset, length) + self.imm_client.read_share_chunk(storage_index, 1, offset, length) ) self.assertEqual(downloaded, expected_data[offset : offset + length]) + def test_write_with_wrong_upload_key(self): + """ + A write with an upload key that is different than the original upload + key will fail. + """ + (upload_secret, _, storage_index, _) = self.create_upload({1}, 100) + with assert_fails_with_http_code(self, http.UNAUTHORIZED): + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret + b"X", + 0, + b"123", + ) + ) + + def test_allocate_buckets_second_time_different_shares(self): + """ + If allocate buckets endpoint is called second time with different + upload key on potentially different shares, that creates the buckets on + those shares that are different. + """ + # Create a upload: + (upload_secret, lease_secret, storage_index, created) = self.create_upload( + {1, 2, 3}, 100 + ) + + # Write half of share 1 + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"a" * 50, + ) + ) + + # Add same shares with a different upload key share 1 overlaps with + # existing shares, this call shouldn't overwrite the existing + # work-in-progress. + upload_secret2 = b"x" * 2 + created2 = result_of( + self.imm_client.create( + storage_index, + {1, 4, 6}, + 100, + upload_secret2, + lease_secret, + lease_secret, + ) + ) + self.assertEqual(created2.allocated, {4, 6}) + + # Write second half of share 1 + self.assertTrue( + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 50, + b"b" * 50, + ) + ).finished + ) + + # The upload of share 1 succeeded, demonstrating that second create() + # call didn't overwrite work-in-progress. + downloaded = result_of( + self.imm_client.read_share_chunk(storage_index, 1, 0, 100) + ) + self.assertEqual(downloaded, b"a" * 50 + b"b" * 50) + + # We can successfully upload the shares created with the second upload secret. + self.assertTrue( + result_of( + self.imm_client.write_share_chunk( + storage_index, + 4, + upload_secret2, + 0, + b"x" * 100, + ) + ).finished + ) + def test_list_shares(self): """ Once a share is finished uploading, it's possible to list it. """ - im_client = StorageClientImmutables(self.http.client) - upload_secret = urandom(32) - lease_secret = urandom(32) - storage_index = b"".join(bytes([i]) for i in range(16)) - result_of( - im_client.create( - storage_index, {1, 2, 3}, 10, upload_secret, lease_secret, lease_secret - ) - ) + (upload_secret, _, storage_index, created) = self.create_upload({1, 2, 3}, 10) # Initially there are no shares: - self.assertEqual(result_of(im_client.list_shares(storage_index)), set()) + self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set()) # Upload shares 1 and 3: for share_number in [1, 3]: progress = result_of( - im_client.write_share_chunk( + self.imm_client.write_share_chunk( storage_index, share_number, upload_secret, @@ -459,87 +671,491 @@ class ImmutableHTTPAPITests(SyncTestCase): self.assertTrue(progress.finished) # Now shares 1 and 3 exist: - self.assertEqual(result_of(im_client.list_shares(storage_index)), {1, 3}) + self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), {1, 3}) + + def test_upload_bad_content_range(self): + """ + Malformed or invalid Content-Range headers to the immutable upload + endpoint result in a 416 error. + """ + (upload_secret, _, storage_index, created) = self.create_upload({1}, 10) + + def check_invalid(bad_content_range_value): + client = StorageClientImmutables( + StorageClientWithHeadersOverride( + self.http.client, {"content-range": bad_content_range_value} + ) + ) + with assert_fails_with_http_code( + self, http.REQUESTED_RANGE_NOT_SATISFIABLE + ): + result_of( + client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"0123456789", + ) + ) + + check_invalid("not a valid content-range header at all") + check_invalid("bytes -1-9/10") + check_invalid("bytes 0--9/10") + check_invalid("teapots 0-9/10") def test_list_shares_unknown_storage_index(self): """ Listing unknown storage index's shares results in empty list of shares. """ - im_client = StorageClientImmutables(self.http.client) - storage_index = b"".join(bytes([i]) for i in range(16)) - self.assertEqual(result_of(im_client.list_shares(storage_index)), set()) + storage_index = bytes(range(16)) + self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set()) + + def test_upload_non_existent_storage_index(self): + """ + Uploading to a non-existent storage index or share number results in + 404. + """ + (upload_secret, _, storage_index, _) = self.create_upload({1}, 10) + + def unknown_check(storage_index, share_number): + with assert_fails_with_http_code(self, http.NOT_FOUND): + result_of( + self.imm_client.write_share_chunk( + storage_index, + share_number, + upload_secret, + 0, + b"0123456789", + ) + ) + + # Wrong share number: + unknown_check(storage_index, 7) + # Wrong storage index: + unknown_check(b"X" * 16, 7) def test_multiple_shares_uploaded_to_different_place(self): """ If a storage index has multiple shares, uploads to different shares are stored separately and can be downloaded separately. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - """ - - def test_bucket_allocated_with_new_shares(self): - """ - If some shares already exist, allocating shares indicates only the new - ones were created. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - """ - - def test_bucket_allocation_new_upload_secret(self): - """ - If a bucket was allocated with one upload secret, and a different upload - key is used to allocate the bucket again, the second allocation fails. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - """ - - def test_upload_with_wrong_upload_secret_fails(self): - """ - Uploading with a key that doesn't match the one used to allocate the - bucket will fail. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 - """ - - def test_upload_offset_cannot_be_negative(self): - """ - A negative upload offset will be rejected. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 """ + (upload_secret, _, storage_index, _) = self.create_upload({1, 2}, 10) + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"1" * 10, + ) + ) + result_of( + self.imm_client.write_share_chunk( + storage_index, + 2, + upload_secret, + 0, + b"2" * 10, + ) + ) + self.assertEqual( + result_of(self.imm_client.read_share_chunk(storage_index, 1, 0, 10)), + b"1" * 10, + ) + self.assertEqual( + result_of(self.imm_client.read_share_chunk(storage_index, 2, 0, 10)), + b"2" * 10, + ) def test_mismatching_upload_fails(self): """ If an uploaded chunk conflicts with an already uploaded chunk, a CONFLICT error is returned. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 """ + (upload_secret, _, storage_index, created) = self.create_upload({1}, 100) + + # Write: + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"0" * 10, + ) + ) + + # Conflicting write: + with assert_fails_with_http_code(self, http.CONFLICT): + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"0123456789", + ) + ) + + def upload(self, share_number, data_length=26): + """ + Create a share, return (storage_index, uploaded_data). + """ + uploaded_data = (b"abcdefghijklmnopqrstuvwxyz" * ((data_length // 26) + 1))[ + :data_length + ] + (upload_secret, _, storage_index, _) = self.create_upload( + {share_number}, data_length + ) + result_of( + self.imm_client.write_share_chunk( + storage_index, + share_number, + upload_secret, + 0, + uploaded_data, + ) + ) + return storage_index, uploaded_data def test_read_of_wrong_storage_index_fails(self): """ Reading from unknown storage index results in 404. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 """ + with assert_fails_with_http_code(self, http.NOT_FOUND): + result_of( + self.imm_client.read_share_chunk( + b"1" * 16, + 1, + 0, + 10, + ) + ) def test_read_of_wrong_share_number_fails(self): """ Reading from unknown storage index results in 404. - - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 """ + storage_index, _ = self.upload(1) + with assert_fails_with_http_code(self, http.NOT_FOUND): + result_of( + self.imm_client.read_share_chunk( + storage_index, + 7, # different share number + 0, + 10, + ) + ) def test_read_with_negative_offset_fails(self): """ - The offset for reads cannot be negative. + Malformed or unsupported Range headers result in 416 (requested range + not satisfiable) error. + """ + storage_index, _ = self.upload(1) - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 + def check_bad_range(bad_range_value): + client = StorageClientImmutables( + StorageClientWithHeadersOverride( + self.http.client, {"range": bad_range_value} + ) + ) + + with assert_fails_with_http_code( + self, http.REQUESTED_RANGE_NOT_SATISFIABLE + ): + result_of( + client.read_share_chunk( + storage_index, + 1, + 0, + 10, + ) + ) + + # Bad unit + check_bad_range("molluscs=0-9") + # Negative offsets + check_bad_range("bytes=-2-9") + check_bad_range("bytes=0--10") + # Negative offset no endpoint + check_bad_range("bytes=-300-") + check_bad_range("bytes=") + # Multiple ranges are currently unsupported, even if they're + # semantically valid under HTTP: + check_bad_range("bytes=0-5, 6-7") + # Ranges without an end are currently unsupported, even if they're + # semantically valid under HTTP. + check_bad_range("bytes=0-") + + @given(data_length=st.integers(min_value=1, max_value=300000)) + def test_read_with_no_range(self, data_length): + """ + A read with no range returns the whole immutable. + """ + storage_index, uploaded_data = self.upload(1, data_length) + response = result_of( + self.http.client.request( + "GET", + self.http.client.relative_url( + "/v1/immutable/{}/1".format(_encode_si(storage_index)) + ), + ) + ) + self.assertEqual(response.code, http.OK) + self.assertEqual(result_of(response.content()), uploaded_data) + + def test_validate_content_range_response_to_read(self): + """ + The server responds to ranged reads with an appropriate Content-Range + header. + """ + storage_index, _ = self.upload(1, 26) + + def check_range(requested_range, expected_response): + headers = Headers() + headers.setRawHeaders("range", [requested_range]) + response = result_of( + self.http.client.request( + "GET", + self.http.client.relative_url( + "/v1/immutable/{}/1".format(_encode_si(storage_index)) + ), + headers=headers, + ) + ) + self.assertEqual( + response.headers.getRawHeaders("content-range"), [expected_response] + ) + + check_range("bytes=0-10", "bytes 0-10/*") + # Can't go beyond the end of the immutable! + check_range("bytes=10-100", "bytes 10-25/*") + + def test_timed_out_upload_allows_reupload(self): + """ + If an in-progress upload times out, it is cancelled altogether, + allowing a new upload to occur. + """ + self._test_abort_or_timed_out_upload_to_existing_storage_index( + lambda **kwargs: self.http.clock.advance(30 * 60 + 1) + ) + + def test_abort_upload_allows_reupload(self): + """ + If an in-progress upload is aborted, it is cancelled altogether, + allowing a new upload to occur. """ - def test_read_with_negative_length_fails(self): - """ - The length for reads cannot be negative. + def abort(storage_index, share_number, upload_secret): + return result_of( + self.imm_client.abort_upload(storage_index, share_number, upload_secret) + ) - TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860 + self._test_abort_or_timed_out_upload_to_existing_storage_index(abort) + + def _test_abort_or_timed_out_upload_to_existing_storage_index(self, cancel_upload): + """Start uploading to an existing storage index that then times out or aborts. + + Re-uploading should work. """ + # Start an upload: + (upload_secret, _, storage_index, _) = self.create_upload({1}, 100) + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"123", + ) + ) + + # Now, the upload is cancelled somehow: + cancel_upload( + storage_index=storage_index, upload_secret=upload_secret, share_number=1 + ) + + # Now we can create a new share with the same storage index without + # complaint: + upload_secret = urandom(32) + lease_secret = urandom(32) + created = result_of( + self.imm_client.create( + storage_index, + {1}, + 100, + upload_secret, + lease_secret, + lease_secret, + ) + ) + self.assertEqual(created.allocated, {1}) + + # And write to it, too: + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"ABC", + ) + ) + + def test_unknown_aborts(self): + """ + Aborting uploads with an unknown storage index or share number will + result 404 HTTP response code. + """ + (upload_secret, _, storage_index, _) = self.create_upload({1}, 100) + + for si, num in [(storage_index, 3), (b"x" * 16, 1)]: + with assert_fails_with_http_code(self, http.NOT_FOUND): + result_of(self.imm_client.abort_upload(si, num, upload_secret)) + + def test_unauthorized_abort(self): + """ + An abort with the wrong key will return an unauthorized error, and will + not abort the upload. + """ + (upload_secret, _, storage_index, _) = self.create_upload({1}, 100) + + # Failed to abort becaues wrong upload secret: + with assert_fails_with_http_code(self, http.UNAUTHORIZED): + result_of( + self.imm_client.abort_upload(storage_index, 1, upload_secret + b"X") + ) + + # We can still write to it: + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"ABC", + ) + ) + + def test_too_late_abort(self): + """ + An abort of an already-fully-uploaded immutable will result in 405 + error and will not affect the immutable. + """ + uploaded_data = b"123" + (upload_secret, _, storage_index, _) = self.create_upload({0}, 3) + result_of( + self.imm_client.write_share_chunk( + storage_index, + 0, + upload_secret, + 0, + uploaded_data, + ) + ) + + # Can't abort, we finished upload: + with assert_fails_with_http_code(self, http.NOT_ALLOWED): + result_of(self.imm_client.abort_upload(storage_index, 0, upload_secret)) + + # Abort didn't prevent reading: + self.assertEqual( + uploaded_data, + result_of( + self.imm_client.read_share_chunk( + storage_index, + 0, + 0, + 3, + ) + ), + ) + + def test_lease_renew_and_add(self): + """ + It's possible the renew the lease on an uploaded immutable, by using + the same renewal secret, or add a new lease by choosing a different + renewal secret. + """ + # Create immutable: + (upload_secret, lease_secret, storage_index, _) = self.create_upload({0}, 100) + result_of( + self.imm_client.write_share_chunk( + storage_index, + 0, + upload_secret, + 0, + b"A" * 100, + ) + ) + + [lease] = self.http.storage_server.get_leases(storage_index) + initial_expiration_time = lease.get_expiration_time() + + # Time passes: + self.http.clock.advance(167) + + # We renew the lease: + result_of( + self.imm_client.add_or_renew_lease( + storage_index, lease_secret, lease_secret + ) + ) + + # More time passes: + self.http.clock.advance(10) + + # We create a new lease: + lease_secret2 = urandom(32) + result_of( + self.imm_client.add_or_renew_lease( + storage_index, lease_secret2, lease_secret2 + ) + ) + + [lease1, lease2] = self.http.storage_server.get_leases(storage_index) + self.assertEqual(lease1.get_expiration_time(), initial_expiration_time + 167) + self.assertEqual(lease2.get_expiration_time(), initial_expiration_time + 177) + + def test_lease_on_unknown_storage_index(self): + """ + An attempt to renew an unknown storage index will result in a HTTP 404. + """ + storage_index = urandom(16) + secret = b"A" * 32 + with assert_fails_with_http_code(self, http.NOT_FOUND): + result_of(self.imm_client.add_or_renew_lease(storage_index, secret, secret)) + + def test_advise_corrupt_share(self): + """ + Advising share was corrupted succeeds from HTTP client's perspective, + and calls appropriate method on server. + """ + corrupted = [] + self.http.storage_server.advise_corrupt_share = lambda *args: corrupted.append( + args + ) + + storage_index, _ = self.upload(13) + reason = "OHNO \u1235" + result_of(self.imm_client.advise_corrupt_share(storage_index, 13, reason)) + + self.assertEqual( + corrupted, [(b"immutable", storage_index, 13, reason.encode("utf-8"))] + ) + + def test_advise_corrupt_share_unknown(self): + """ + Advising an unknown share was corrupted results in 404. + """ + storage_index, _ = self.upload(13) + reason = "OHNO \u1235" + result_of(self.imm_client.advise_corrupt_share(storage_index, 13, reason)) + + for (si, share_number) in [(storage_index, 11), (urandom(16), 13)]: + with assert_fails_with_http_code(self, http.NOT_FOUND): + result_of( + self.imm_client.advise_corrupt_share(si, share_number, reason) + ) diff --git a/tests.nix b/tests.nix index 53a8885c0..dd477c273 100644 --- a/tests.nix +++ b/tests.nix @@ -73,6 +73,7 @@ let in # Make a derivation that runs the unit test suite. pkgs.runCommand "tahoe-lafs-tests" { } '' + export TAHOE_LAFS_HYPOTHESIS_PROFILE=ci ${python-env}/bin/python -m twisted.trial -j $NIX_BUILD_CORES allmydata # It's not cool to put the whole _trial_temp into $out because it has weird