Merge remote-tracking branch 'origin/master' into 3875-http-storage-furls

This commit is contained in:
Itamar Turner-Trauring 2022-03-23 15:58:57 -04:00
commit eef99c1f22
15 changed files with 1253 additions and 280 deletions

View File

@ -493,8 +493,8 @@ Handling repeat calls:
* If the same API call is repeated with the same upload secret, the response is the same and no change is made to server state.
This is necessary to ensure retries work in the face of lost responses from the server.
* If the API calls is with a different upload secret, this implies a new client, perhaps because the old client died.
In order to prevent storage servers from being able to mess with each other, this API call will fail, because the secret doesn't match.
The use case of restarting upload from scratch if the client dies can be implemented by having the client persist the upload secret.
Or it may happen because the client wants to upload a different share number than a previous client.
New shares will be created, existing shares will be unchanged, regardless of whether the upload secret matches or not.
Discussion
``````````
@ -540,7 +540,7 @@ Rejected designs for upload secrets:
Write data for the indicated share.
The share number must belong to the storage index.
The request body is the raw share data (i.e., ``application/octet-stream``).
*Content-Range* requests are encouraged for large transfers to allow partially complete uploads to be resumed.
*Content-Range* requests are required; for large transfers this allows partially complete uploads to be resumed.
For example,
a 1MiB share can be divided in to eight separate 128KiB chunks.
Each chunk can be uploaded in a separate request.
@ -614,16 +614,19 @@ From RFC 7231::
``POST /v1/immutable/:storage_index/:share_number/corrupt``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Advise the server the data read from the indicated share was corrupt.
The request body includes an human-meaningful string with details about the corruption.
It also includes potentially important details about the share.
Advise the server the data read from the indicated share was corrupt. The
request body includes an human-meaningful text string with details about the
corruption. It also includes potentially important details about the share.
For example::
{"reason": "expected hash abcd, got hash efgh"}
{"reason": u"expected hash abcd, got hash efgh"}
.. share-type, storage-index, and share-number are inferred from the URL
The response code is OK (200) by default, or NOT FOUND (404) if the share
couldn't be found.
Reading
~~~~~~~
@ -644,7 +647,7 @@ Read a contiguous sequence of bytes from one share in one bucket.
The response body is the raw share data (i.e., ``application/octet-stream``).
The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content).
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
Multiple ranges in a single request are *not* supported.
Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported.
Discussion
``````````

0
newsfragments/3860.minor Normal file
View File

0
newsfragments/3876.minor Normal file
View File

0
newsfragments/3877.minor Normal file
View File

View File

@ -0,0 +1 @@
Share corruption reports stored on disk are now always encoded in UTF-8.

0
newsfragments/3881.minor Normal file
View File

0
newsfragments/3882.minor Normal file
View File

View File

@ -2,12 +2,8 @@
HTTP client that talks to the HTTP storage server.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from typing import Union, Set, Optional
from treq.testing import StubTreq
from base64 import b64encode
@ -25,7 +21,7 @@ import treq
from treq.client import HTTPClient
from treq.testing import StubTreq
from .http_common import swissnum_auth_header, Secrets
from .http_common import swissnum_auth_header, Secrets, get_content_type, CBOR_MIME_TYPE
from .common import si_b2a
@ -35,14 +31,25 @@ def _encode_si(si): # type: (bytes) -> str
class ClientException(Exception):
"""An unexpected error."""
"""An unexpected response code from the server."""
def __init__(self, code, *additional_args):
Exception.__init__(self, code, *additional_args)
self.code = code
def _decode_cbor(response):
"""Given HTTP response, return decoded CBOR body."""
if response.code > 199 and response.code < 300:
return treq.content(response).addCallback(loads)
return fail(ClientException(response.code, response.phrase))
content_type = get_content_type(response.headers)
if content_type == CBOR_MIME_TYPE:
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
return treq.content(response).addCallback(loads)
else:
raise ClientException(-1, "Server didn't send CBOR")
else:
return fail(ClientException(response.code, response.phrase))
@attr.s
@ -55,17 +62,16 @@ class ImmutableCreateResult(object):
class StorageClient(object):
"""
HTTP client that talks to the HTTP storage server.
Low-level HTTP client that talks to the HTTP storage server.
"""
def __init__(
self, url, swissnum, treq=treq
): # type: (DecodedURL, bytes, Union[treq,StubTreq,HTTPClient]) -> None
"""
The URL is a HTTPS URL ("http://..."). To construct from a furl, use
The URL is a HTTPS URL ("https://..."). To construct from a furl, use
``StorageClient.from_furl()``.
"""
assert url.to_text().startswith("https://")
self._base_url = url
self._swissnum = swissnum
self._treq = treq
@ -79,8 +85,8 @@ class StorageClient(object):
assert furl.scheme == "pb"
swissnum = furl.path[0].encode("ascii")
certificate_hash = furl.user.encode("ascii")
def _url(self, path):
def relative_url(self, path):
"""Get a URL relative to the base URL."""
return self._base_url.click(path)
@ -94,7 +100,7 @@ class StorageClient(object):
)
return headers
def _request(
def request(
self,
method,
url,
@ -102,13 +108,19 @@ class StorageClient(object):
lease_cancel_secret=None,
upload_secret=None,
headers=None,
message_to_serialize=None,
**kwargs
):
"""
Like ``treq.request()``, but with optional secrets that get translated
into corresponding HTTP headers.
If ``message_to_serialize`` is set, it will be serialized (by default
with CBOR) and set as the request body.
"""
headers = self._get_headers(headers)
# Add secrets:
for secret, value in [
(Secrets.LEASE_RENEW, lease_renew_secret),
(Secrets.LEASE_CANCEL, lease_cancel_secret),
@ -120,15 +132,39 @@ class StorageClient(object):
"X-Tahoe-Authorization",
b"%s %s" % (secret.value.encode("ascii"), b64encode(value).strip()),
)
# Note we can accept CBOR:
headers.addRawHeader("Accept", CBOR_MIME_TYPE)
# If there's a request message, serialize it and set the Content-Type
# header:
if message_to_serialize is not None:
if "data" in kwargs:
raise TypeError(
"Can't use both `message_to_serialize` and `data` "
"as keyword arguments at the same time"
)
kwargs["data"] = dumps(message_to_serialize)
headers.addRawHeader("Content-Type", CBOR_MIME_TYPE)
return self._treq.request(method, url, headers=headers, **kwargs)
class StorageClientGeneral(object):
"""
High-level HTTP APIs that aren't immutable- or mutable-specific.
"""
def __init__(self, client): # type: (StorageClient) -> None
self._client = client
@inlineCallbacks
def get_version(self):
"""
Return the version metadata for the server.
"""
url = self._url("/v1/version")
response = yield self._request("GET", url)
url = self._client.relative_url("/v1/version")
response = yield self._client.request("GET", url)
decoded_response = yield _decode_cbor(response)
returnValue(decoded_response)
@ -150,7 +186,7 @@ class StorageClientImmutables(object):
APIs for interacting with immutables.
"""
def __init__(self, client): # type: (StorageClient) -> None
def __init__(self, client: StorageClient):
self._client = client
@inlineCallbacks
@ -176,18 +212,16 @@ class StorageClientImmutables(object):
Result fires when creating the storage index succeeded, if creating the
storage index failed the result will fire with an exception.
"""
url = self._client._url("/v1/immutable/" + _encode_si(storage_index))
message = dumps(
{"share-numbers": share_numbers, "allocated-size": allocated_size}
)
response = yield self._client._request(
url = self._client.relative_url("/v1/immutable/" + _encode_si(storage_index))
message = {"share-numbers": share_numbers, "allocated-size": allocated_size}
response = yield self._client.request(
"POST",
url,
lease_renew_secret=lease_renew_secret,
lease_cancel_secret=lease_cancel_secret,
upload_secret=upload_secret,
data=message,
headers=Headers({"content-type": ["application/cbor"]}),
message_to_serialize=message,
)
decoded_response = yield _decode_cbor(response)
returnValue(
@ -197,6 +231,27 @@ class StorageClientImmutables(object):
)
)
@inlineCallbacks
def abort_upload(
self, storage_index: bytes, share_number: int, upload_secret: bytes
) -> Deferred[None]:
"""Abort the upload."""
url = self._client.relative_url(
"/v1/immutable/{}/{}/abort".format(_encode_si(storage_index), share_number)
)
response = yield self._client.request(
"PUT",
url,
upload_secret=upload_secret,
)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)
@inlineCallbacks
def write_share_chunk(
self, storage_index, share_number, upload_secret, offset, data
@ -213,10 +268,10 @@ class StorageClientImmutables(object):
whether the _complete_ share (i.e. all chunks, not just this one) has
been uploaded.
"""
url = self._client._url(
url = self._client.relative_url(
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
)
response = yield self._client._request(
response = yield self._client.request(
"PATCH",
url,
upload_secret=upload_secret,
@ -264,10 +319,10 @@ class StorageClientImmutables(object):
the HTTP protocol will be simplified, see
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
"""
url = self._client._url(
url = self._client.relative_url(
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
)
response = yield self._client._request(
response = yield self._client.request(
"GET",
url,
headers=Headers(
@ -278,25 +333,69 @@ class StorageClientImmutables(object):
body = yield response.content()
returnValue(body)
else:
raise ClientException(
response.code,
)
raise ClientException(response.code)
@inlineCallbacks
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]]
"""
Return the set of shares for a given storage index.
"""
url = self._client._url(
url = self._client.relative_url(
"/v1/immutable/{}/shares".format(_encode_si(storage_index))
)
response = yield self._client._request(
response = yield self._client.request(
"GET",
url,
)
if response.code == http.OK:
body = yield _decode_cbor(response)
returnValue(set(body))
else:
raise ClientException(response.code)
@inlineCallbacks
def add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
):
"""
Add or renew a lease.
If the renewal secret matches an existing lease, it is renewed.
Otherwise a new lease is added.
"""
url = self._client.relative_url(
"/v1/lease/{}".format(_encode_si(storage_index))
)
response = yield self._client.request(
"PUT",
url,
lease_renew_secret=renew_secret,
lease_cancel_secret=cancel_secret,
)
if response.code == http.NO_CONTENT:
return
else:
raise ClientException(response.code)
@inlineCallbacks
def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
assert isinstance(reason, str)
url = self._client.relative_url(
"/v1/immutable/{}/{}/corrupt".format(
_encode_si(storage_index), share_number
)
)
message = {"reason": reason}
response = yield self._client.request("POST", url, message_to_serialize=message)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,

View File

@ -1,13 +1,31 @@
"""
Common HTTP infrastructure for the storge server.
"""
from enum import Enum
from base64 import b64encode
from hashlib import sha256
from typing import Optional
from cryptography.x509 import Certificate
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from werkzeug.http import parse_options_header
from twisted.web.http_headers import Headers
CBOR_MIME_TYPE = "application/cbor"
def get_content_type(headers: Headers) -> Optional[str]:
"""
Get the content type from the HTTP ``Content-Type`` header.
Returns ``None`` if no content-type was set.
"""
values = headers.getRawHeaders("content-type") or [None]
content_type = parse_options_header(values[0])[0] or None
return content_type
def swissnum_auth_header(swissnum): # type: (bytes) -> bytes
"""Return value for ``Authentication`` header."""

View File

@ -2,15 +2,12 @@
HTTP server for storage.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from typing import Dict, List, Set, Tuple, Optional
from typing import Dict, List, Set, Tuple, Any, Optional
from pathlib import Path
from functools import wraps
from base64 import b64decode
import binascii
from klein import Klein
from twisted.web import http
@ -19,18 +16,33 @@ from twisted.internet.defer import Deferred
from twisted.internet.endpoints import quoteStringArgument, serverFromString
from twisted.web.server import Site
import attr
from werkzeug.http import parse_range_header, parse_content_range_header
from werkzeug.http import (
parse_range_header,
parse_content_range_header,
parse_accept_header,
)
from werkzeug.routing import BaseConverter, ValidationError
from werkzeug.datastructures import ContentRange
from hyperlink import DecodedURL
from cryptography.x509 import load_pem_x509_certificate
# TODO Make sure to use pure Python versions?
from cbor2 import dumps, loads
from .server import StorageServer
from .http_common import swissnum_auth_header, Secrets, get_spki_hash
from .http_common import (
swissnum_auth_header,
Secrets,
get_content_type,
CBOR_MIME_TYPE,
get_spki_hash,
)
from .common import si_a2b
from .immutable import BucketWriter
from .immutable import BucketWriter, ConflictingWriteError
from ..util.hashutil import timing_safe_compare
from ..util.base32 import rfc3548_alphabet
class ClientSecretsException(Exception):
@ -127,10 +139,95 @@ class StorageIndexUploads(object):
"""
# Map share number to BucketWriter
shares = attr.ib() # type: Dict[int,BucketWriter]
shares = attr.ib(factory=dict) # type: Dict[int,BucketWriter]
# The upload key.
upload_secret = attr.ib() # type: bytes
# Map share number to the upload secret (different shares might have
# different upload secrets).
upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes]
@attr.s
class UploadsInProgress(object):
"""
Keep track of uploads for storage indexes.
"""
# Map storage index to corresponding uploads-in-progress
_uploads = attr.ib(type=Dict[bytes, StorageIndexUploads], factory=dict)
# Map BucketWriter to (storage index, share number)
_bucketwriters = attr.ib(type=Dict[BucketWriter, Tuple[bytes, int]], factory=dict)
def add_write_bucket(
self,
storage_index: bytes,
share_number: int,
upload_secret: bytes,
bucket: BucketWriter,
):
"""Add a new ``BucketWriter`` to be tracked."""
si_uploads = self._uploads.setdefault(storage_index, StorageIndexUploads())
si_uploads.shares[share_number] = bucket
si_uploads.upload_secrets[share_number] = upload_secret
self._bucketwriters[bucket] = (storage_index, share_number)
def get_write_bucket(
self, storage_index: bytes, share_number: int, upload_secret: bytes
) -> BucketWriter:
"""Get the given in-progress immutable share upload."""
self.validate_upload_secret(storage_index, share_number, upload_secret)
try:
return self._uploads[storage_index].shares[share_number]
except (KeyError, IndexError):
raise _HTTPError(http.NOT_FOUND)
def remove_write_bucket(self, bucket: BucketWriter):
"""Stop tracking the given ``BucketWriter``."""
storage_index, share_number = self._bucketwriters.pop(bucket)
uploads_index = self._uploads[storage_index]
uploads_index.shares.pop(share_number)
uploads_index.upload_secrets.pop(share_number)
if not uploads_index.shares:
self._uploads.pop(storage_index)
def validate_upload_secret(
self, storage_index: bytes, share_number: int, upload_secret: bytes
):
"""
Raise an unauthorized-HTTP-response exception if the given
storage_index+share_number have a different upload secret than the
given one.
If the given upload doesn't exist at all, nothing happens.
"""
if storage_index in self._uploads:
in_progress = self._uploads[storage_index]
# For pre-existing upload, make sure password matches.
if share_number in in_progress.upload_secrets and not timing_safe_compare(
in_progress.upload_secrets[share_number], upload_secret
):
raise _HTTPError(http.UNAUTHORIZED)
class StorageIndexConverter(BaseConverter):
"""Parser/validator for storage index URL path segments."""
regex = "[" + str(rfc3548_alphabet, "ascii") + "]{26}"
def to_python(self, value):
try:
return si_a2b(value.encode("ascii"))
except (AssertionError, binascii.Error, ValueError):
raise ValidationError("Invalid storage index")
class _HTTPError(Exception):
"""
Raise from ``HTTPServer`` endpoint to return the given HTTP response code.
"""
def __init__(self, code: int):
self.code = code
class HTTPServer(object):
@ -139,6 +236,13 @@ class HTTPServer(object):
"""
_app = Klein()
_app.url_map.converters["storage_index"] = StorageIndexConverter
@_app.handle_errors(_HTTPError)
def _http_error(self, request, failure):
"""Handle ``_HTTPError`` exceptions."""
request.setResponseCode(failure.value.code)
return b""
def __init__(
self, storage_server, swissnum
@ -146,102 +250,157 @@ class HTTPServer(object):
self._storage_server = storage_server
self._swissnum = swissnum
# Maps storage index to StorageIndexUploads:
self._uploads = {} # type: Dict[bytes,StorageIndexUploads]
self._uploads = UploadsInProgress()
# When an upload finishes successfully, gets aborted, or times out,
# make sure it gets removed from our tracking datastructure:
self._storage_server.register_bucket_writer_close_handler(
self._uploads.remove_write_bucket
)
def get_resource(self):
"""Return twisted.web ``Resource`` for this object."""
return self._app.resource()
def _cbor(self, request, data):
"""Return CBOR-encoded data."""
# TODO Might want to optionally send JSON someday, based on Accept
# headers, see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
request.setHeader("Content-Type", "application/cbor")
# TODO if data is big, maybe want to use a temporary file eventually...
return dumps(data)
def _send_encoded(self, request, data):
"""
Return encoded data suitable for writing as the HTTP body response, by
default using CBOR.
Also sets the appropriate ``Content-Type`` header on the response.
"""
accept_headers = request.requestHeaders.getRawHeaders("accept") or [
CBOR_MIME_TYPE
]
accept = parse_accept_header(accept_headers[0])
if accept.best == CBOR_MIME_TYPE:
request.setHeader("Content-Type", CBOR_MIME_TYPE)
# TODO if data is big, maybe want to use a temporary file eventually...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
return dumps(data)
else:
# TODO Might want to optionally send JSON someday:
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
raise _HTTPError(http.NOT_ACCEPTABLE)
def _read_encoded(self, request) -> Any:
"""
Read encoded request body data, decoding it with CBOR by default.
"""
content_type = get_content_type(request.requestHeaders)
if content_type == CBOR_MIME_TYPE:
# TODO limit memory usage, client could send arbitrarily large data...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
return loads(request.content.read())
else:
raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE)
##### Generic APIs #####
@_authorized_route(_app, set(), "/v1/version", methods=["GET"])
def version(self, request, authorization):
"""Return version information."""
return self._cbor(request, self._storage_server.get_version())
return self._send_encoded(request, self._storage_server.get_version())
##### Immutable APIs #####
@_authorized_route(
_app,
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.UPLOAD},
"/v1/immutable/<string:storage_index>",
"/v1/immutable/<storage_index:storage_index>",
methods=["POST"],
)
def allocate_buckets(self, request, authorization, storage_index):
"""Allocate buckets."""
storage_index = si_a2b(storage_index.encode("ascii"))
info = loads(request.content.read())
upload_secret = authorization[Secrets.UPLOAD]
info = self._read_encoded(request)
if storage_index in self._uploads:
# Pre-existing upload.
in_progress = self._uploads[storage_index]
if timing_safe_compare(in_progress.upload_secret, upload_secret):
# Same session.
# TODO add BucketWriters only for new shares that don't already have buckets; see the HTTP spec for details.
# The backend code may already implement this logic.
pass
else:
# TODO Fail, since the secret doesnt match.
pass
else:
# New upload.
already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
storage_index,
renew_secret=authorization[Secrets.LEASE_RENEW],
cancel_secret=authorization[Secrets.LEASE_CANCEL],
sharenums=info["share-numbers"],
allocated_size=info["allocated-size"],
)
self._uploads[storage_index] = StorageIndexUploads(
shares=sharenum_to_bucket, upload_secret=authorization[Secrets.UPLOAD]
)
return self._cbor(
request,
{
"already-have": set(already_got),
"allocated": set(sharenum_to_bucket),
},
# We do NOT validate the upload secret for existing bucket uploads.
# Another upload may be happening in parallel, with a different upload
# key. That's fine! If a client tries to _write_ to that upload, they
# need to have an upload key. That does mean we leak the existence of
# these parallel uploads, but if you know storage index you can
# download them once upload finishes, so it's not a big deal to leak
# that information.
already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
storage_index,
renew_secret=authorization[Secrets.LEASE_RENEW],
cancel_secret=authorization[Secrets.LEASE_CANCEL],
sharenums=info["share-numbers"],
allocated_size=info["allocated-size"],
)
for share_number, bucket in sharenum_to_bucket.items():
self._uploads.add_write_bucket(
storage_index, share_number, upload_secret, bucket
)
return self._send_encoded(
request,
{
"already-have": set(already_got),
"allocated": set(sharenum_to_bucket),
},
)
@_authorized_route(
_app,
{Secrets.UPLOAD},
"/v1/immutable/<string:storage_index>/<int:share_number>",
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/abort",
methods=["PUT"],
)
def abort_share_upload(self, request, authorization, storage_index, share_number):
"""Abort an in-progress immutable share upload."""
try:
bucket = self._uploads.get_write_bucket(
storage_index, share_number, authorization[Secrets.UPLOAD]
)
except _HTTPError as e:
if e.code == http.NOT_FOUND:
# It may be we've already uploaded this, in which case error
# should be method not allowed (405).
try:
self._storage_server.get_buckets(storage_index)[share_number]
except KeyError:
pass
else:
# Already uploaded, so we can't abort.
raise _HTTPError(http.NOT_ALLOWED)
raise
# Abort the upload; this should close it which will eventually result
# in self._uploads.remove_write_bucket() being called.
bucket.abort()
return b""
@_authorized_route(
_app,
{Secrets.UPLOAD},
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
methods=["PATCH"],
)
def write_share_data(self, request, authorization, storage_index, share_number):
"""Write data to an in-progress immutable upload."""
storage_index = si_a2b(storage_index.encode("ascii"))
content_range = parse_content_range_header(request.getHeader("content-range"))
# TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
# 1. Malformed header should result in error 416
# 2. Non-bytes unit should result in error 416
# 3. Missing header means full upload in one request
# 4. Impossible range should resul tin error 416
if content_range is None or content_range.units != "bytes":
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
return b""
offset = content_range.start
# TODO basic checks on validity of start, offset, and content-range in general. also of share_number.
# TODO basic check that body isn't infinite. require content-length? or maybe we should require content-range (it's optional now)? if so, needs to be rflected in protocol spec.
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = request.content.read(content_range.stop - content_range.start + 1)
bucket = self._uploads.get_write_bucket(
storage_index, share_number, authorization[Secrets.UPLOAD]
)
data = request.content.read()
try:
bucket = self._uploads[storage_index].shares[share_number]
except (KeyError, IndexError):
# TODO return 404
raise
finished = bucket.write(offset, data)
# TODO if raises ConflictingWriteError, return HTTP CONFLICT code.
finished = bucket.write(offset, data)
except ConflictingWriteError:
request.setResponseCode(http.CONFLICT)
return b""
if finished:
bucket.close()
@ -252,55 +411,112 @@ class HTTPServer(object):
required = []
for start, end, _ in bucket.required_ranges().ranges():
required.append({"begin": start, "end": end})
return self._cbor(request, {"required": required})
return self._send_encoded(request, {"required": required})
@_authorized_route(
_app,
set(),
"/v1/immutable/<string:storage_index>/shares",
"/v1/immutable/<storage_index:storage_index>/shares",
methods=["GET"],
)
def list_shares(self, request, authorization, storage_index):
"""
List shares for the given storage index.
"""
storage_index = si_a2b(storage_index.encode("ascii"))
share_numbers = list(self._storage_server.get_buckets(storage_index).keys())
return self._cbor(request, share_numbers)
return self._send_encoded(request, share_numbers)
@_authorized_route(
_app,
set(),
"/v1/immutable/<string:storage_index>/<int:share_number>",
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
methods=["GET"],
)
def read_share_chunk(self, request, authorization, storage_index, share_number):
"""Read a chunk for an already uploaded immutable."""
# TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
# 1. basic checks on validity on storage index, share number
# 2. missing range header should have response code 200 and return whole thing
# 3. malformed range header should result in error? or return everything?
# 4. non-bytes range results in error
# 5. ranges make sense semantically (positive, etc.)
# 6. multiple ranges fails with error
# 7. missing end of range means "to the end of share"
storage_index = si_a2b(storage_index.encode("ascii"))
range_header = parse_range_header(request.getHeader("range"))
offset, end = range_header.ranges[0]
assert end != None # TODO support this case
try:
bucket = self._storage_server.get_buckets(storage_index)[share_number]
except KeyError:
request.setResponseCode(http.NOT_FOUND)
return b""
# TODO if not found, 404
bucket = self._storage_server.get_buckets(storage_index)[share_number]
if request.getHeader("range") is None:
# Return the whole thing.
start = 0
while True:
# TODO should probably yield to event loop occasionally...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = bucket.read(start, start + 65536)
if not data:
request.finish()
return
request.write(data)
start += len(data)
range_header = parse_range_header(request.getHeader("range"))
if (
range_header is None
or range_header.units != "bytes"
or len(range_header.ranges) > 1 # more than one range
or range_header.ranges[0][1] is None # range without end
):
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
return b""
offset, end = range_header.ranges[0]
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = bucket.read(offset, end - offset)
request.setResponseCode(http.PARTIAL_CONTENT)
# TODO set content-range on response. We we need to expand the
# BucketReader interface to return share's length.
#
# request.setHeader(
# "content-range", range_header.make_content_range(share_length).to_header()
# )
if len(data):
# For empty bodies the content-range header makes no sense since
# the end of the range is inclusive.
request.setHeader(
"content-range",
ContentRange("bytes", offset, offset + len(data)).to_header(),
)
return data
@_authorized_route(
_app,
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL},
"/v1/lease/<storage_index:storage_index>",
methods=["PUT"],
)
def add_or_renew_lease(self, request, authorization, storage_index):
"""Update the lease for an immutable share."""
if not self._storage_server.get_buckets(storage_index):
raise _HTTPError(http.NOT_FOUND)
# Checking of the renewal secret is done by the backend.
self._storage_server.add_lease(
storage_index,
authorization[Secrets.LEASE_RENEW],
authorization[Secrets.LEASE_CANCEL],
)
request.setResponseCode(http.NO_CONTENT)
return b""
@_authorized_route(
_app,
set(),
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share(self, request, authorization, storage_index, share_number):
"""Indicate that given share is corrupt, with a text reason."""
try:
bucket = self._storage_server.get_buckets(storage_index)[share_number]
except KeyError:
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request)
bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
return b""
def listen_tls(
reactor,

View File

@ -743,8 +743,9 @@ class StorageServer(service.MultiService):
def advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
# This is a remote API, I believe, so this has to be bytes for legacy
# protocol backwards compatibility reasons.
# Previously this had to be bytes for legacy protocol backwards
# compatibility reasons. Now that Foolscap layer has been abstracted
# out, we can probably refactor this to be unicode...
assert isinstance(share_type, bytes)
assert isinstance(reason, bytes), "%r is not bytes" % (reason,)
@ -777,7 +778,7 @@ class StorageServer(service.MultiService):
si_s,
shnum,
)
with open(report_path, "w") as f:
with open(report_path, "w", encoding="utf-8") as f:
f.write(report)
return None

View File

@ -58,7 +58,7 @@ from twisted.plugin import (
from eliot import (
log_call,
)
from foolscap.api import eventually
from foolscap.api import eventually, RemoteException
from foolscap.reconnector import (
ReconnectionInfo,
)
@ -75,7 +75,10 @@ from allmydata.util.observer import ObserverList
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.storage.http_client import StorageClient, StorageClientImmutables
from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException
)
# who is responsible for de-duplication?
@ -1035,8 +1038,13 @@ class _FakeRemoteReference(object):
"""
local_object = attr.ib(type=object)
@defer.inlineCallbacks
def callRemote(self, action, *args, **kwargs):
return getattr(self.local_object, action)(*args, **kwargs)
try:
result = yield getattr(self.local_object, action)(*args, **kwargs)
defer.returnValue(result)
except HTTPClientException as e:
raise RemoteException(e.args)
@attr.s
@ -1051,7 +1059,8 @@ class _HTTPBucketWriter(object):
finished = attr.ib(type=bool, default=False)
def abort(self):
pass # TODO in later ticket
return self.client.abort_upload(self.storage_index, self.share_number,
self.upload_secret)
@defer.inlineCallbacks
def write(self, offset, data):
@ -1085,7 +1094,10 @@ class _HTTPBucketReader(object):
)
def advise_corrupt_share(self, reason):
pass # TODO in later ticket
return self.client.advise_corrupt_share(
self.storage_index, self.share_number,
str(reason, "utf-8", errors="backslashreplace")
)
# WORK IN PROGRESS, for now it doesn't actually implement whole thing.
@ -1105,7 +1117,7 @@ class _HTTPStorageServer(object):
return _HTTPStorageServer(http_client=http_client)
def get_version(self):
return self._http_client.get_version()
return StorageClientGeneral(self._http_client).get_version()
@defer.inlineCallbacks
def allocate_buckets(
@ -1115,7 +1127,7 @@ class _HTTPStorageServer(object):
cancel_secret,
sharenums,
allocated_size,
canary,
canary
):
upload_secret = urandom(20)
immutable_client = StorageClientImmutables(self._http_client)
@ -1139,7 +1151,7 @@ class _HTTPStorageServer(object):
@defer.inlineCallbacks
def get_buckets(
self,
storage_index,
storage_index
):
immutable_client = StorageClientImmutables(self._http_client)
share_numbers = yield immutable_client.list_shares(
@ -1151,3 +1163,29 @@ class _HTTPStorageServer(object):
))
for share_num in share_numbers
})
def add_lease(
self,
storage_index,
renew_secret,
cancel_secret
):
immutable_client = StorageClientImmutables(self._http_client)
return immutable_client.add_or_renew_lease(
storage_index, renew_secret, cancel_secret
)
def advise_corrupt_share(
self,
share_type,
storage_index,
shnum,
reason: bytes
):
if share_type == b"immutable":
imm_client = StorageClientImmutables(self._http_client)
return imm_client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
)
else:
raise NotImplementedError() # future tickets

View File

@ -176,8 +176,9 @@ class IStorageServerImmutableAPIsTestsMixin(object):
canary=Referenceable(),
)
# Bucket 1 is fully written in one go.
yield allocated[0].callRemote("write", 0, b"1" * 1024)
# Bucket 1 get some data written (but not all, or HTTP implicitly
# finishes the upload)
yield allocated[0].callRemote("write", 0, b"1" * 1023)
# Disconnect or abort, depending on the test:
yield abort_or_disconnect(allocated[0])
@ -193,20 +194,6 @@ class IStorageServerImmutableAPIsTestsMixin(object):
)
yield allocated[0].callRemote("write", 0, b"2" * 1024)
def test_disconnection(self):
"""
If we disconnect in the middle of writing to a bucket, all data is
wiped, and it's even possible to write different data to the bucket.
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
HTTP protocol should skip this test, since disconnection is meaningless
concept; this is more about testing implicit contract the Foolscap
implementation depends on doesn't change as we refactor things.
"""
return self.abort_or_disconnect_half_way(lambda _: self.disconnect())
@inlineCallbacks
def test_written_shares_are_allocated(self):
"""
@ -1061,13 +1048,6 @@ class _SharedMixin(SystemTestMixin):
AsyncTestCase.tearDown(self)
yield SystemTestMixin.tearDown(self)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
raise NotImplementedError("implement in subclass")
class _FoolscapMixin(_SharedMixin):
"""Run tests on Foolscap version of ``IStorageServer``."""
@ -1080,16 +1060,6 @@ class _FoolscapMixin(_SharedMixin):
self.assertTrue(IStorageServer.providedBy(client))
return succeed(client)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
current = self.storage_client
yield self.bounce_client(0)
self.storage_client = self._get_native_server().get_storage_server()
assert self.storage_client is not current
class _HTTPMixin(_SharedMixin):
"""Run tests on the HTTP version of ``IStorageServer``."""
@ -1148,27 +1118,37 @@ class FoolscapImmutableAPIsTests(
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
def test_disconnection(self):
"""
If we disconnect in the middle of writing to a bucket, all data is
wiped, and it's even possible to write different data to the bucket.
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
HTTP protocol doesn't need this test, since disconnection is a
meaningless concept; this is more about testing the implicit contract
the Foolscap implementation depends on doesn't change as we refactor
things.
"""
return self.abort_or_disconnect_half_way(lambda _: self.disconnect())
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
current = self.storage_client
yield self.bounce_client(0)
self.storage_client = self._get_native_server().get_storage_server()
assert self.storage_client is not current
class HTTPImmutableAPIsTests(
_HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for immutable ``IStorageServer`` APIs."""
# These will start passing in future PRs as HTTP protocol is implemented.
SKIP_TESTS = {
"test_abort",
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
"test_allocate_buckets_repeat",
"test_bucket_advise_corrupt_share",
"test_disconnection",
"test_get_buckets_skips_unfinished_buckets",
"test_matching_overlapping_writes",
"test_non_matching_overlapping_writes",
"test_written_shares_are_allocated",
}
class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase

View File

@ -15,6 +15,7 @@ if PY2:
# fmt: on
from base64 import b64encode
from contextlib import contextmanager
from os import urandom
from hypothesis import assume, given, strategies as st
@ -24,9 +25,15 @@ from klein import Klein
from hyperlink import DecodedURL
from collections_extended import RangeMap
from twisted.internet.task import Clock
from twisted.web import http
from twisted.web.http_headers import Headers
from cryptography.x509 import load_pem_x509_certificate
from werkzeug import routing
from werkzeug.exceptions import NotFound as WNotFound
from .common import SyncTestCase
from ..storage.http_common import get_content_type, get_spki_hash
from ..storage.common import si_b2a
from ..storage.server import StorageServer
from ..storage.http_server import (
HTTPServer,
@ -34,6 +41,7 @@ from ..storage.http_server import (
Secrets,
ClientSecretsException,
_authorized_route,
StorageIndexConverter,
)
from ..storage.http_client import (
StorageClient,
@ -41,8 +49,9 @@ from ..storage.http_client import (
StorageClientImmutables,
ImmutableCreateResult,
UploadProgress,
StorageClientGeneral,
_encode_si,
)
from ..storage.http_common import get_spki_hash
class HTTPFurlTests(SyncTestCase):
@ -85,6 +94,25 @@ ox5zO3LrQmQw11OaIAs2/kviKAoKTFFxeyYcpS5RuKNDZfHQCXlLwt9bySxG
self.assertEqual(get_spki_hash(certificate), expected_hash)
class HTTPUtilities(SyncTestCase):
"""Tests for HTTP common utilities."""
def test_get_content_type(self):
"""``get_content_type()`` extracts the content-type from the header."""
def assert_header_values_result(values, expected_content_type):
headers = Headers()
if values:
headers.setRawHeaders("Content-Type", values)
content_type = get_content_type(headers)
self.assertEqual(content_type, expected_content_type)
assert_header_values_result(["text/html"], "text/html")
assert_header_values_result([], None)
assert_header_values_result(["text/plain", "application/json"], "text/plain")
assert_header_values_result(["text/html;encoding=utf-8"], "text/html")
def _post_process(params):
secret_types, secrets = params
secrets = {t: s for (t, s) in zip(secret_types, secrets)}
@ -189,6 +217,52 @@ class ExtractSecretsTests(SyncTestCase):
_extract_secrets(["lease-cancel-secret eA=="], {Secrets.LEASE_RENEW})
class RouteConverterTests(SyncTestCase):
"""Tests for custom werkzeug path segment converters."""
adapter = routing.Map(
[
routing.Rule(
"/<storage_index:storage_index>/", endpoint="si", methods=["GET"]
)
],
converters={"storage_index": StorageIndexConverter},
).bind("example.com", "/")
@given(storage_index=st.binary(min_size=16, max_size=16))
def test_good_storage_index_is_parsed(self, storage_index):
"""
A valid storage index is accepted and parsed back out by
StorageIndexConverter.
"""
self.assertEqual(
self.adapter.match(
"/{}/".format(str(si_b2a(storage_index), "ascii")), method="GET"
),
("si", {"storage_index": storage_index}),
)
def test_long_storage_index_is_not_parsed(self):
"""An overly long storage_index string is not parsed."""
with self.assertRaises(WNotFound):
self.adapter.match("/{}/".format("a" * 27), method="GET")
def test_short_storage_index_is_not_parsed(self):
"""An overly short storage_index string is not parsed."""
with self.assertRaises(WNotFound):
self.adapter.match("/{}/".format("a" * 25), method="GET")
def test_bad_characters_storage_index_is_not_parsed(self):
"""A storage_index string with bad characters is not parsed."""
with self.assertRaises(WNotFound):
self.adapter.match("/{}_/".format("a" * 25), method="GET")
def test_invalid_storage_index_is_not_parsed(self):
"""An invalid storage_index string is not parsed."""
with self.assertRaises(WNotFound):
self.adapter.match("/nomd2a65ylxjbqzsw7gcfh4ivr/", method="GET")
# TODO should be actual swissnum
SWISSNUM_FOR_TEST = b"abcd"
@ -249,7 +323,7 @@ class RoutingTests(SyncTestCase):
"""
# Without secret, get a 400 error.
response = result_of(
self.client._request(
self.client.request(
"GET",
"http://127.0.0.1/upload_secret",
)
@ -258,7 +332,7 @@ class RoutingTests(SyncTestCase):
# With secret, we're good.
response = result_of(
self.client._request(
self.client.request(
"GET", "http://127.0.0.1/upload_secret", upload_secret=b"MAGIC"
)
)
@ -286,6 +360,38 @@ class HttpTestFixture(Fixture):
)
class StorageClientWithHeadersOverride(object):
"""Wrap ``StorageClient`` and override sent headers."""
def __init__(self, storage_client, add_headers):
self.storage_client = storage_client
self.add_headers = add_headers
def __getattr__(self, attr):
return getattr(self.storage_client, attr)
def request(self, *args, headers=None, **kwargs):
if headers is None:
headers = Headers()
for key, value in self.add_headers.items():
headers.setRawHeaders(key, [value])
return self.storage_client.request(*args, headers=headers, **kwargs)
@contextmanager
def assert_fails_with_http_code(test_case: SyncTestCase, code: int):
"""
Context manager that asserts the code fails with the given HTTP response
code.
"""
with test_case.assertRaises(ClientException) as e:
try:
yield
finally:
pass
test_case.assertEqual(e.exception.code, code)
class GenericHTTPAPITests(SyncTestCase):
"""
Tests of HTTP client talking to the HTTP server, for generic HTTP API
@ -303,14 +409,26 @@ class GenericHTTPAPITests(SyncTestCase):
If the wrong swissnum is used, an ``Unauthorized`` response code is
returned.
"""
client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
b"something wrong",
treq=StubTreq(self.http.http_server.get_resource()),
client = StorageClientGeneral(
StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
b"something wrong",
treq=StubTreq(self.http.http_server.get_resource()),
)
)
with self.assertRaises(ClientException) as e:
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(client.get_version())
def test_unsupported_mime_type(self):
"""
The client can request mime types other than CBOR, and if they are
unsupported a NOT ACCEPTABLE (406) error will be returned.
"""
client = StorageClientGeneral(
StorageClientWithHeadersOverride(self.http.client, {"accept": "image/gif"})
)
with assert_fails_with_http_code(self, http.NOT_ACCEPTABLE):
result_of(client.get_version())
self.assertEqual(e.exception.args[0], 401)
def test_version(self):
"""
@ -319,7 +437,8 @@ class GenericHTTPAPITests(SyncTestCase):
We ignore available disk space and max immutable share size, since that
might change across calls.
"""
version = result_of(self.http.client.get_version())
client = StorageClientGeneral(self.http.client)
version = result_of(client.get_version())
version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
b"available-space"
)
@ -346,6 +465,28 @@ class ImmutableHTTPAPITests(SyncTestCase):
self.skipTest("Not going to bother supporting Python 2")
super(ImmutableHTTPAPITests, self).setUp()
self.http = self.useFixture(HttpTestFixture())
self.imm_client = StorageClientImmutables(self.http.client)
def create_upload(self, share_numbers, length):
"""
Create a write bucket on server, return:
(upload_secret, lease_secret, storage_index, result)
"""
upload_secret = urandom(32)
lease_secret = urandom(32)
storage_index = urandom(16)
created = result_of(
self.imm_client.create(
storage_index,
share_numbers,
length,
upload_secret,
lease_secret,
lease_secret,
)
)
return (upload_secret, lease_secret, storage_index, created)
def test_upload_can_be_downloaded(self):
"""
@ -357,19 +498,10 @@ class ImmutableHTTPAPITests(SyncTestCase):
that's already done in test_storage.py.
"""
length = 100
expected_data = b"".join(bytes([i]) for i in range(100))
im_client = StorageClientImmutables(self.http.client)
expected_data = bytes(range(100))
# Create a upload:
upload_secret = urandom(32)
lease_secret = urandom(32)
storage_index = b"".join(bytes([i]) for i in range(16))
created = result_of(
im_client.create(
storage_index, {1}, 100, upload_secret, lease_secret, lease_secret
)
)
(upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
self.assertEqual(
created, ImmutableCreateResult(already_have=set(), allocated={1})
)
@ -380,7 +512,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
def write(offset, length):
remaining.empty(offset, offset + length)
return im_client.write_share_chunk(
return self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
@ -424,31 +556,111 @@ class ImmutableHTTPAPITests(SyncTestCase):
# We can now read:
for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]:
downloaded = result_of(
im_client.read_share_chunk(storage_index, 1, offset, length)
self.imm_client.read_share_chunk(storage_index, 1, offset, length)
)
self.assertEqual(downloaded, expected_data[offset : offset + length])
def test_write_with_wrong_upload_key(self):
"""
A write with an upload key that is different than the original upload
key will fail.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret + b"X",
0,
b"123",
)
)
def test_allocate_buckets_second_time_different_shares(self):
"""
If allocate buckets endpoint is called second time with different
upload key on potentially different shares, that creates the buckets on
those shares that are different.
"""
# Create a upload:
(upload_secret, lease_secret, storage_index, created) = self.create_upload(
{1, 2, 3}, 100
)
# Write half of share 1
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"a" * 50,
)
)
# Add same shares with a different upload key share 1 overlaps with
# existing shares, this call shouldn't overwrite the existing
# work-in-progress.
upload_secret2 = b"x" * 2
created2 = result_of(
self.imm_client.create(
storage_index,
{1, 4, 6},
100,
upload_secret2,
lease_secret,
lease_secret,
)
)
self.assertEqual(created2.allocated, {4, 6})
# Write second half of share 1
self.assertTrue(
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
50,
b"b" * 50,
)
).finished
)
# The upload of share 1 succeeded, demonstrating that second create()
# call didn't overwrite work-in-progress.
downloaded = result_of(
self.imm_client.read_share_chunk(storage_index, 1, 0, 100)
)
self.assertEqual(downloaded, b"a" * 50 + b"b" * 50)
# We can successfully upload the shares created with the second upload secret.
self.assertTrue(
result_of(
self.imm_client.write_share_chunk(
storage_index,
4,
upload_secret2,
0,
b"x" * 100,
)
).finished
)
def test_list_shares(self):
"""
Once a share is finished uploading, it's possible to list it.
"""
im_client = StorageClientImmutables(self.http.client)
upload_secret = urandom(32)
lease_secret = urandom(32)
storage_index = b"".join(bytes([i]) for i in range(16))
result_of(
im_client.create(
storage_index, {1, 2, 3}, 10, upload_secret, lease_secret, lease_secret
)
)
(upload_secret, _, storage_index, created) = self.create_upload({1, 2, 3}, 10)
# Initially there are no shares:
self.assertEqual(result_of(im_client.list_shares(storage_index)), set())
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set())
# Upload shares 1 and 3:
for share_number in [1, 3]:
progress = result_of(
im_client.write_share_chunk(
self.imm_client.write_share_chunk(
storage_index,
share_number,
upload_secret,
@ -459,87 +671,491 @@ class ImmutableHTTPAPITests(SyncTestCase):
self.assertTrue(progress.finished)
# Now shares 1 and 3 exist:
self.assertEqual(result_of(im_client.list_shares(storage_index)), {1, 3})
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), {1, 3})
def test_upload_bad_content_range(self):
"""
Malformed or invalid Content-Range headers to the immutable upload
endpoint result in a 416 error.
"""
(upload_secret, _, storage_index, created) = self.create_upload({1}, 10)
def check_invalid(bad_content_range_value):
client = StorageClientImmutables(
StorageClientWithHeadersOverride(
self.http.client, {"content-range": bad_content_range_value}
)
)
with assert_fails_with_http_code(
self, http.REQUESTED_RANGE_NOT_SATISFIABLE
):
result_of(
client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"0123456789",
)
)
check_invalid("not a valid content-range header at all")
check_invalid("bytes -1-9/10")
check_invalid("bytes 0--9/10")
check_invalid("teapots 0-9/10")
def test_list_shares_unknown_storage_index(self):
"""
Listing unknown storage index's shares results in empty list of shares.
"""
im_client = StorageClientImmutables(self.http.client)
storage_index = b"".join(bytes([i]) for i in range(16))
self.assertEqual(result_of(im_client.list_shares(storage_index)), set())
storage_index = bytes(range(16))
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set())
def test_upload_non_existent_storage_index(self):
"""
Uploading to a non-existent storage index or share number results in
404.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 10)
def unknown_check(storage_index, share_number):
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.write_share_chunk(
storage_index,
share_number,
upload_secret,
0,
b"0123456789",
)
)
# Wrong share number:
unknown_check(storage_index, 7)
# Wrong storage index:
unknown_check(b"X" * 16, 7)
def test_multiple_shares_uploaded_to_different_place(self):
"""
If a storage index has multiple shares, uploads to different shares are
stored separately and can be downloaded separately.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
def test_bucket_allocated_with_new_shares(self):
"""
If some shares already exist, allocating shares indicates only the new
ones were created.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
def test_bucket_allocation_new_upload_secret(self):
"""
If a bucket was allocated with one upload secret, and a different upload
key is used to allocate the bucket again, the second allocation fails.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
def test_upload_with_wrong_upload_secret_fails(self):
"""
Uploading with a key that doesn't match the one used to allocate the
bucket will fail.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
def test_upload_offset_cannot_be_negative(self):
"""
A negative upload offset will be rejected.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
(upload_secret, _, storage_index, _) = self.create_upload({1, 2}, 10)
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"1" * 10,
)
)
result_of(
self.imm_client.write_share_chunk(
storage_index,
2,
upload_secret,
0,
b"2" * 10,
)
)
self.assertEqual(
result_of(self.imm_client.read_share_chunk(storage_index, 1, 0, 10)),
b"1" * 10,
)
self.assertEqual(
result_of(self.imm_client.read_share_chunk(storage_index, 2, 0, 10)),
b"2" * 10,
)
def test_mismatching_upload_fails(self):
"""
If an uploaded chunk conflicts with an already uploaded chunk, a
CONFLICT error is returned.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
(upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
# Write:
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"0" * 10,
)
)
# Conflicting write:
with assert_fails_with_http_code(self, http.CONFLICT):
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"0123456789",
)
)
def upload(self, share_number, data_length=26):
"""
Create a share, return (storage_index, uploaded_data).
"""
uploaded_data = (b"abcdefghijklmnopqrstuvwxyz" * ((data_length // 26) + 1))[
:data_length
]
(upload_secret, _, storage_index, _) = self.create_upload(
{share_number}, data_length
)
result_of(
self.imm_client.write_share_chunk(
storage_index,
share_number,
upload_secret,
0,
uploaded_data,
)
)
return storage_index, uploaded_data
def test_read_of_wrong_storage_index_fails(self):
"""
Reading from unknown storage index results in 404.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.read_share_chunk(
b"1" * 16,
1,
0,
10,
)
)
def test_read_of_wrong_share_number_fails(self):
"""
Reading from unknown storage index results in 404.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
storage_index, _ = self.upload(1)
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.read_share_chunk(
storage_index,
7, # different share number
0,
10,
)
)
def test_read_with_negative_offset_fails(self):
"""
The offset for reads cannot be negative.
Malformed or unsupported Range headers result in 416 (requested range
not satisfiable) error.
"""
storage_index, _ = self.upload(1)
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
def check_bad_range(bad_range_value):
client = StorageClientImmutables(
StorageClientWithHeadersOverride(
self.http.client, {"range": bad_range_value}
)
)
with assert_fails_with_http_code(
self, http.REQUESTED_RANGE_NOT_SATISFIABLE
):
result_of(
client.read_share_chunk(
storage_index,
1,
0,
10,
)
)
# Bad unit
check_bad_range("molluscs=0-9")
# Negative offsets
check_bad_range("bytes=-2-9")
check_bad_range("bytes=0--10")
# Negative offset no endpoint
check_bad_range("bytes=-300-")
check_bad_range("bytes=")
# Multiple ranges are currently unsupported, even if they're
# semantically valid under HTTP:
check_bad_range("bytes=0-5, 6-7")
# Ranges without an end are currently unsupported, even if they're
# semantically valid under HTTP.
check_bad_range("bytes=0-")
@given(data_length=st.integers(min_value=1, max_value=300000))
def test_read_with_no_range(self, data_length):
"""
A read with no range returns the whole immutable.
"""
storage_index, uploaded_data = self.upload(1, data_length)
response = result_of(
self.http.client.request(
"GET",
self.http.client.relative_url(
"/v1/immutable/{}/1".format(_encode_si(storage_index))
),
)
)
self.assertEqual(response.code, http.OK)
self.assertEqual(result_of(response.content()), uploaded_data)
def test_validate_content_range_response_to_read(self):
"""
The server responds to ranged reads with an appropriate Content-Range
header.
"""
storage_index, _ = self.upload(1, 26)
def check_range(requested_range, expected_response):
headers = Headers()
headers.setRawHeaders("range", [requested_range])
response = result_of(
self.http.client.request(
"GET",
self.http.client.relative_url(
"/v1/immutable/{}/1".format(_encode_si(storage_index))
),
headers=headers,
)
)
self.assertEqual(
response.headers.getRawHeaders("content-range"), [expected_response]
)
check_range("bytes=0-10", "bytes 0-10/*")
# Can't go beyond the end of the immutable!
check_range("bytes=10-100", "bytes 10-25/*")
def test_timed_out_upload_allows_reupload(self):
"""
If an in-progress upload times out, it is cancelled altogether,
allowing a new upload to occur.
"""
self._test_abort_or_timed_out_upload_to_existing_storage_index(
lambda **kwargs: self.http.clock.advance(30 * 60 + 1)
)
def test_abort_upload_allows_reupload(self):
"""
If an in-progress upload is aborted, it is cancelled altogether,
allowing a new upload to occur.
"""
def test_read_with_negative_length_fails(self):
"""
The length for reads cannot be negative.
def abort(storage_index, share_number, upload_secret):
return result_of(
self.imm_client.abort_upload(storage_index, share_number, upload_secret)
)
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
self._test_abort_or_timed_out_upload_to_existing_storage_index(abort)
def _test_abort_or_timed_out_upload_to_existing_storage_index(self, cancel_upload):
"""Start uploading to an existing storage index that then times out or aborts.
Re-uploading should work.
"""
# Start an upload:
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"123",
)
)
# Now, the upload is cancelled somehow:
cancel_upload(
storage_index=storage_index, upload_secret=upload_secret, share_number=1
)
# Now we can create a new share with the same storage index without
# complaint:
upload_secret = urandom(32)
lease_secret = urandom(32)
created = result_of(
self.imm_client.create(
storage_index,
{1},
100,
upload_secret,
lease_secret,
lease_secret,
)
)
self.assertEqual(created.allocated, {1})
# And write to it, too:
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"ABC",
)
)
def test_unknown_aborts(self):
"""
Aborting uploads with an unknown storage index or share number will
result 404 HTTP response code.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
for si, num in [(storage_index, 3), (b"x" * 16, 1)]:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(self.imm_client.abort_upload(si, num, upload_secret))
def test_unauthorized_abort(self):
"""
An abort with the wrong key will return an unauthorized error, and will
not abort the upload.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
# Failed to abort becaues wrong upload secret:
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(
self.imm_client.abort_upload(storage_index, 1, upload_secret + b"X")
)
# We can still write to it:
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"ABC",
)
)
def test_too_late_abort(self):
"""
An abort of an already-fully-uploaded immutable will result in 405
error and will not affect the immutable.
"""
uploaded_data = b"123"
(upload_secret, _, storage_index, _) = self.create_upload({0}, 3)
result_of(
self.imm_client.write_share_chunk(
storage_index,
0,
upload_secret,
0,
uploaded_data,
)
)
# Can't abort, we finished upload:
with assert_fails_with_http_code(self, http.NOT_ALLOWED):
result_of(self.imm_client.abort_upload(storage_index, 0, upload_secret))
# Abort didn't prevent reading:
self.assertEqual(
uploaded_data,
result_of(
self.imm_client.read_share_chunk(
storage_index,
0,
0,
3,
)
),
)
def test_lease_renew_and_add(self):
"""
It's possible the renew the lease on an uploaded immutable, by using
the same renewal secret, or add a new lease by choosing a different
renewal secret.
"""
# Create immutable:
(upload_secret, lease_secret, storage_index, _) = self.create_upload({0}, 100)
result_of(
self.imm_client.write_share_chunk(
storage_index,
0,
upload_secret,
0,
b"A" * 100,
)
)
[lease] = self.http.storage_server.get_leases(storage_index)
initial_expiration_time = lease.get_expiration_time()
# Time passes:
self.http.clock.advance(167)
# We renew the lease:
result_of(
self.imm_client.add_or_renew_lease(
storage_index, lease_secret, lease_secret
)
)
# More time passes:
self.http.clock.advance(10)
# We create a new lease:
lease_secret2 = urandom(32)
result_of(
self.imm_client.add_or_renew_lease(
storage_index, lease_secret2, lease_secret2
)
)
[lease1, lease2] = self.http.storage_server.get_leases(storage_index)
self.assertEqual(lease1.get_expiration_time(), initial_expiration_time + 167)
self.assertEqual(lease2.get_expiration_time(), initial_expiration_time + 177)
def test_lease_on_unknown_storage_index(self):
"""
An attempt to renew an unknown storage index will result in a HTTP 404.
"""
storage_index = urandom(16)
secret = b"A" * 32
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(self.imm_client.add_or_renew_lease(storage_index, secret, secret))
def test_advise_corrupt_share(self):
"""
Advising share was corrupted succeeds from HTTP client's perspective,
and calls appropriate method on server.
"""
corrupted = []
self.http.storage_server.advise_corrupt_share = lambda *args: corrupted.append(
args
)
storage_index, _ = self.upload(13)
reason = "OHNO \u1235"
result_of(self.imm_client.advise_corrupt_share(storage_index, 13, reason))
self.assertEqual(
corrupted, [(b"immutable", storage_index, 13, reason.encode("utf-8"))]
)
def test_advise_corrupt_share_unknown(self):
"""
Advising an unknown share was corrupted results in 404.
"""
storage_index, _ = self.upload(13)
reason = "OHNO \u1235"
result_of(self.imm_client.advise_corrupt_share(storage_index, 13, reason))
for (si, share_number) in [(storage_index, 11), (urandom(16), 13)]:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.advise_corrupt_share(si, share_number, reason)
)

View File

@ -73,6 +73,7 @@ let
in
# Make a derivation that runs the unit test suite.
pkgs.runCommand "tahoe-lafs-tests" { } ''
export TAHOE_LAFS_HYPOTHESIS_PROFILE=ci
${python-env}/bin/python -m twisted.trial -j $NIX_BUILD_CORES allmydata
# It's not cool to put the whole _trial_temp into $out because it has weird