mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-13 22:03:04 +00:00
Merge pull request #1178 from tahoe-lafs/3860-http-more-immutables
More immutable support for HTTP storage API Fixes ticket:3860
This commit is contained in:
commit
40e5ab1661
@ -540,7 +540,7 @@ Rejected designs for upload secrets:
|
||||
Write data for the indicated share.
|
||||
The share number must belong to the storage index.
|
||||
The request body is the raw share data (i.e., ``application/octet-stream``).
|
||||
*Content-Range* requests are encouraged for large transfers to allow partially complete uploads to be resumed.
|
||||
*Content-Range* requests are required; for large transfers this allows partially complete uploads to be resumed.
|
||||
For example,
|
||||
a 1MiB share can be divided in to eight separate 128KiB chunks.
|
||||
Each chunk can be uploaded in a separate request.
|
||||
@ -644,7 +644,7 @@ Read a contiguous sequence of bytes from one share in one bucket.
|
||||
The response body is the raw share data (i.e., ``application/octet-stream``).
|
||||
The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content).
|
||||
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
|
||||
Multiple ranges in a single request are *not* supported.
|
||||
Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported.
|
||||
|
||||
Discussion
|
||||
``````````
|
||||
|
0
newsfragments/3860.minor
Normal file
0
newsfragments/3860.minor
Normal file
@ -48,7 +48,11 @@ def _encode_si(si): # type: (bytes) -> str
|
||||
|
||||
|
||||
class ClientException(Exception):
|
||||
"""An unexpected error."""
|
||||
"""An unexpected response code from the server."""
|
||||
|
||||
def __init__(self, code, *additional_args):
|
||||
Exception.__init__(self, code, *additional_args)
|
||||
self.code = code
|
||||
|
||||
|
||||
def _decode_cbor(response):
|
||||
@ -68,7 +72,7 @@ class ImmutableCreateResult(object):
|
||||
|
||||
class StorageClient(object):
|
||||
"""
|
||||
HTTP client that talks to the HTTP storage server.
|
||||
Low-level HTTP client that talks to the HTTP storage server.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -78,7 +82,7 @@ class StorageClient(object):
|
||||
self._swissnum = swissnum
|
||||
self._treq = treq
|
||||
|
||||
def _url(self, path):
|
||||
def relative_url(self, path):
|
||||
"""Get a URL relative to the base URL."""
|
||||
return self._base_url.click(path)
|
||||
|
||||
@ -92,7 +96,7 @@ class StorageClient(object):
|
||||
)
|
||||
return headers
|
||||
|
||||
def _request(
|
||||
def request(
|
||||
self,
|
||||
method,
|
||||
url,
|
||||
@ -120,13 +124,22 @@ class StorageClient(object):
|
||||
)
|
||||
return self._treq.request(method, url, headers=headers, **kwargs)
|
||||
|
||||
|
||||
class StorageClientGeneral(object):
|
||||
"""
|
||||
High-level HTTP APIs that aren't immutable- or mutable-specific.
|
||||
"""
|
||||
|
||||
def __init__(self, client): # type: (StorageClient) -> None
|
||||
self._client = client
|
||||
|
||||
@inlineCallbacks
|
||||
def get_version(self):
|
||||
"""
|
||||
Return the version metadata for the server.
|
||||
"""
|
||||
url = self._url("/v1/version")
|
||||
response = yield self._request("GET", url)
|
||||
url = self._client.relative_url("/v1/version")
|
||||
response = yield self._client.request("GET", url)
|
||||
decoded_response = yield _decode_cbor(response)
|
||||
returnValue(decoded_response)
|
||||
|
||||
@ -174,11 +187,11 @@ class StorageClientImmutables(object):
|
||||
Result fires when creating the storage index succeeded, if creating the
|
||||
storage index failed the result will fire with an exception.
|
||||
"""
|
||||
url = self._client._url("/v1/immutable/" + _encode_si(storage_index))
|
||||
url = self._client.relative_url("/v1/immutable/" + _encode_si(storage_index))
|
||||
message = dumps(
|
||||
{"share-numbers": share_numbers, "allocated-size": allocated_size}
|
||||
)
|
||||
response = yield self._client._request(
|
||||
response = yield self._client.request(
|
||||
"POST",
|
||||
url,
|
||||
lease_renew_secret=lease_renew_secret,
|
||||
@ -211,10 +224,10 @@ class StorageClientImmutables(object):
|
||||
whether the _complete_ share (i.e. all chunks, not just this one) has
|
||||
been uploaded.
|
||||
"""
|
||||
url = self._client._url(
|
||||
url = self._client.relative_url(
|
||||
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
|
||||
)
|
||||
response = yield self._client._request(
|
||||
response = yield self._client.request(
|
||||
"PATCH",
|
||||
url,
|
||||
upload_secret=upload_secret,
|
||||
@ -262,10 +275,10 @@ class StorageClientImmutables(object):
|
||||
the HTTP protocol will be simplified, see
|
||||
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
|
||||
"""
|
||||
url = self._client._url(
|
||||
url = self._client.relative_url(
|
||||
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
|
||||
)
|
||||
response = yield self._client._request(
|
||||
response = yield self._client.request(
|
||||
"GET",
|
||||
url,
|
||||
headers=Headers(
|
||||
@ -285,10 +298,10 @@ class StorageClientImmutables(object):
|
||||
"""
|
||||
Return the set of shares for a given storage index.
|
||||
"""
|
||||
url = self._client._url(
|
||||
url = self._client.relative_url(
|
||||
"/v1/immutable/{}/shares".format(_encode_si(storage_index))
|
||||
)
|
||||
response = yield self._client._request(
|
||||
response = yield self._client.request(
|
||||
"GET",
|
||||
url,
|
||||
)
|
||||
|
@ -18,11 +18,14 @@ else:
|
||||
|
||||
from functools import wraps
|
||||
from base64 import b64decode
|
||||
import binascii
|
||||
|
||||
from klein import Klein
|
||||
from twisted.web import http
|
||||
import attr
|
||||
from werkzeug.http import parse_range_header, parse_content_range_header
|
||||
from werkzeug.routing import BaseConverter, ValidationError
|
||||
from werkzeug.datastructures import ContentRange
|
||||
|
||||
# TODO Make sure to use pure Python versions?
|
||||
from cbor2 import dumps, loads
|
||||
@ -30,8 +33,9 @@ from cbor2 import dumps, loads
|
||||
from .server import StorageServer
|
||||
from .http_common import swissnum_auth_header, Secrets
|
||||
from .common import si_a2b
|
||||
from .immutable import BucketWriter
|
||||
from .immutable import BucketWriter, ConflictingWriteError
|
||||
from ..util.hashutil import timing_safe_compare
|
||||
from ..util.base32 import rfc3548_alphabet
|
||||
|
||||
|
||||
class ClientSecretsException(Exception):
|
||||
@ -128,10 +132,27 @@ class StorageIndexUploads(object):
|
||||
"""
|
||||
|
||||
# Map share number to BucketWriter
|
||||
shares = attr.ib() # type: Dict[int,BucketWriter]
|
||||
shares = attr.ib(factory=dict) # type: Dict[int,BucketWriter]
|
||||
|
||||
# The upload key.
|
||||
upload_secret = attr.ib() # type: bytes
|
||||
# Map share number to the upload secret (different shares might have
|
||||
# different upload secrets).
|
||||
upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes]
|
||||
|
||||
def add_upload(self, share_number, upload_secret, bucket):
|
||||
self.shares[share_number] = bucket
|
||||
self.upload_secrets[share_number] = upload_secret
|
||||
|
||||
|
||||
class StorageIndexConverter(BaseConverter):
|
||||
"""Parser/validator for storage index URL path segments."""
|
||||
|
||||
regex = "[" + str(rfc3548_alphabet, "ascii") + "]{26}"
|
||||
|
||||
def to_python(self, value):
|
||||
try:
|
||||
return si_a2b(value.encode("ascii"))
|
||||
except (AssertionError, binascii.Error, ValueError):
|
||||
raise ValidationError("Invalid storage index")
|
||||
|
||||
|
||||
class HTTPServer(object):
|
||||
@ -140,6 +161,7 @@ class HTTPServer(object):
|
||||
"""
|
||||
|
||||
_app = Klein()
|
||||
_app.url_map.converters["storage_index"] = StorageIndexConverter
|
||||
|
||||
def __init__(
|
||||
self, storage_server, swissnum
|
||||
@ -173,76 +195,75 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.UPLOAD},
|
||||
"/v1/immutable/<string:storage_index>",
|
||||
"/v1/immutable/<storage_index:storage_index>",
|
||||
methods=["POST"],
|
||||
)
|
||||
def allocate_buckets(self, request, authorization, storage_index):
|
||||
"""Allocate buckets."""
|
||||
storage_index = si_a2b(storage_index.encode("ascii"))
|
||||
info = loads(request.content.read())
|
||||
upload_secret = authorization[Secrets.UPLOAD]
|
||||
info = loads(request.content.read())
|
||||
|
||||
if storage_index in self._uploads:
|
||||
# Pre-existing upload.
|
||||
in_progress = self._uploads[storage_index]
|
||||
if timing_safe_compare(in_progress.upload_secret, upload_secret):
|
||||
# Same session.
|
||||
# TODO add BucketWriters only for new shares that don't already have buckets; see the HTTP spec for details.
|
||||
# The backend code may already implement this logic.
|
||||
pass
|
||||
else:
|
||||
# TODO Fail, since the secret doesnt match.
|
||||
pass
|
||||
else:
|
||||
# New upload.
|
||||
already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
|
||||
storage_index,
|
||||
renew_secret=authorization[Secrets.LEASE_RENEW],
|
||||
cancel_secret=authorization[Secrets.LEASE_CANCEL],
|
||||
sharenums=info["share-numbers"],
|
||||
allocated_size=info["allocated-size"],
|
||||
)
|
||||
self._uploads[storage_index] = StorageIndexUploads(
|
||||
shares=sharenum_to_bucket, upload_secret=authorization[Secrets.UPLOAD]
|
||||
)
|
||||
return self._cbor(
|
||||
request,
|
||||
{
|
||||
"already-have": set(already_got),
|
||||
"allocated": set(sharenum_to_bucket),
|
||||
},
|
||||
)
|
||||
for share_number in info["share-numbers"]:
|
||||
in_progress = self._uploads[storage_index]
|
||||
# For pre-existing upload, make sure password matches.
|
||||
if (
|
||||
share_number in in_progress.upload_secrets
|
||||
and not timing_safe_compare(
|
||||
in_progress.upload_secrets[share_number], upload_secret
|
||||
)
|
||||
):
|
||||
request.setResponseCode(http.UNAUTHORIZED)
|
||||
return b""
|
||||
|
||||
already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
|
||||
storage_index,
|
||||
renew_secret=authorization[Secrets.LEASE_RENEW],
|
||||
cancel_secret=authorization[Secrets.LEASE_CANCEL],
|
||||
sharenums=info["share-numbers"],
|
||||
allocated_size=info["allocated-size"],
|
||||
)
|
||||
uploads = self._uploads.setdefault(storage_index, StorageIndexUploads())
|
||||
for share_number, bucket in sharenum_to_bucket.items():
|
||||
uploads.add_upload(share_number, upload_secret, bucket)
|
||||
|
||||
return self._cbor(
|
||||
request,
|
||||
{
|
||||
"already-have": set(already_got),
|
||||
"allocated": set(sharenum_to_bucket),
|
||||
},
|
||||
)
|
||||
|
||||
@_authorized_route(
|
||||
_app,
|
||||
{Secrets.UPLOAD},
|
||||
"/v1/immutable/<string:storage_index>/<int:share_number>",
|
||||
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
|
||||
methods=["PATCH"],
|
||||
)
|
||||
def write_share_data(self, request, authorization, storage_index, share_number):
|
||||
"""Write data to an in-progress immutable upload."""
|
||||
storage_index = si_a2b(storage_index.encode("ascii"))
|
||||
content_range = parse_content_range_header(request.getHeader("content-range"))
|
||||
# TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
# 1. Malformed header should result in error 416
|
||||
# 2. Non-bytes unit should result in error 416
|
||||
# 3. Missing header means full upload in one request
|
||||
# 4. Impossible range should resul tin error 416
|
||||
if content_range is None or content_range.units != "bytes":
|
||||
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
|
||||
return b""
|
||||
|
||||
offset = content_range.start
|
||||
|
||||
# TODO basic checks on validity of start, offset, and content-range in general. also of share_number.
|
||||
# TODO basic check that body isn't infinite. require content-length? or maybe we should require content-range (it's optional now)? if so, needs to be rflected in protocol spec.
|
||||
|
||||
data = request.content.read()
|
||||
# TODO limit memory usage
|
||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
|
||||
data = request.content.read(content_range.stop - content_range.start + 1)
|
||||
try:
|
||||
bucket = self._uploads[storage_index].shares[share_number]
|
||||
except (KeyError, IndexError):
|
||||
# TODO return 404
|
||||
raise
|
||||
request.setResponseCode(http.NOT_FOUND)
|
||||
return b""
|
||||
|
||||
finished = bucket.write(offset, data)
|
||||
|
||||
# TODO if raises ConflictingWriteError, return HTTP CONFLICT code.
|
||||
try:
|
||||
finished = bucket.write(offset, data)
|
||||
except ConflictingWriteError:
|
||||
request.setResponseCode(http.CONFLICT)
|
||||
return b""
|
||||
|
||||
if finished:
|
||||
bucket.close()
|
||||
@ -258,46 +279,65 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
set(),
|
||||
"/v1/immutable/<string:storage_index>/shares",
|
||||
"/v1/immutable/<storage_index:storage_index>/shares",
|
||||
methods=["GET"],
|
||||
)
|
||||
def list_shares(self, request, authorization, storage_index):
|
||||
"""
|
||||
List shares for the given storage index.
|
||||
"""
|
||||
storage_index = si_a2b(storage_index.encode("ascii"))
|
||||
share_numbers = list(self._storage_server.get_buckets(storage_index).keys())
|
||||
return self._cbor(request, share_numbers)
|
||||
|
||||
@_authorized_route(
|
||||
_app,
|
||||
set(),
|
||||
"/v1/immutable/<string:storage_index>/<int:share_number>",
|
||||
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
|
||||
methods=["GET"],
|
||||
)
|
||||
def read_share_chunk(self, request, authorization, storage_index, share_number):
|
||||
"""Read a chunk for an already uploaded immutable."""
|
||||
# TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
# 1. basic checks on validity on storage index, share number
|
||||
# 2. missing range header should have response code 200 and return whole thing
|
||||
# 3. malformed range header should result in error? or return everything?
|
||||
# 4. non-bytes range results in error
|
||||
# 5. ranges make sense semantically (positive, etc.)
|
||||
# 6. multiple ranges fails with error
|
||||
# 7. missing end of range means "to the end of share"
|
||||
storage_index = si_a2b(storage_index.encode("ascii"))
|
||||
range_header = parse_range_header(request.getHeader("range"))
|
||||
offset, end = range_header.ranges[0]
|
||||
assert end != None # TODO support this case
|
||||
try:
|
||||
bucket = self._storage_server.get_buckets(storage_index)[share_number]
|
||||
except KeyError:
|
||||
request.setResponseCode(http.NOT_FOUND)
|
||||
return b""
|
||||
|
||||
# TODO if not found, 404
|
||||
bucket = self._storage_server.get_buckets(storage_index)[share_number]
|
||||
if request.getHeader("range") is None:
|
||||
# Return the whole thing.
|
||||
start = 0
|
||||
while True:
|
||||
# TODO should probably yield to event loop occasionally...
|
||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
|
||||
data = bucket.read(start, start + 65536)
|
||||
if not data:
|
||||
request.finish()
|
||||
return
|
||||
request.write(data)
|
||||
start += len(data)
|
||||
|
||||
range_header = parse_range_header(request.getHeader("range"))
|
||||
if (
|
||||
range_header is None
|
||||
or range_header.units != "bytes"
|
||||
or len(range_header.ranges) > 1 # more than one range
|
||||
or range_header.ranges[0][1] is None # range without end
|
||||
):
|
||||
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
|
||||
return b""
|
||||
|
||||
offset, end = range_header.ranges[0]
|
||||
|
||||
# TODO limit memory usage
|
||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
|
||||
data = bucket.read(offset, end - offset)
|
||||
|
||||
request.setResponseCode(http.PARTIAL_CONTENT)
|
||||
# TODO set content-range on response. We we need to expand the
|
||||
# BucketReader interface to return share's length.
|
||||
#
|
||||
# request.setHeader(
|
||||
# "content-range", range_header.make_content_range(share_length).to_header()
|
||||
# )
|
||||
if len(data):
|
||||
# For empty bodies the content-range header makes no sense since
|
||||
# the end of the range is inclusive.
|
||||
request.setHeader(
|
||||
"content-range",
|
||||
ContentRange("bytes", offset, offset + len(data)).to_header(),
|
||||
)
|
||||
return data
|
||||
|
@ -58,7 +58,7 @@ from twisted.plugin import (
|
||||
from eliot import (
|
||||
log_call,
|
||||
)
|
||||
from foolscap.api import eventually
|
||||
from foolscap.api import eventually, RemoteException
|
||||
from foolscap.reconnector import (
|
||||
ReconnectionInfo,
|
||||
)
|
||||
@ -75,7 +75,10 @@ from allmydata.util.observer import ObserverList
|
||||
from allmydata.util.rrefutil import add_version_to_remote_reference
|
||||
from allmydata.util.hashutil import permute_server_hash
|
||||
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
|
||||
from allmydata.storage.http_client import StorageClient, StorageClientImmutables
|
||||
from allmydata.storage.http_client import (
|
||||
StorageClient, StorageClientImmutables, StorageClientGeneral,
|
||||
ClientException as HTTPClientException,
|
||||
)
|
||||
|
||||
|
||||
# who is responsible for de-duplication?
|
||||
@ -1035,8 +1038,13 @@ class _FakeRemoteReference(object):
|
||||
"""
|
||||
local_object = attr.ib(type=object)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def callRemote(self, action, *args, **kwargs):
|
||||
return getattr(self.local_object, action)(*args, **kwargs)
|
||||
try:
|
||||
result = yield getattr(self.local_object, action)(*args, **kwargs)
|
||||
defer.returnValue(result)
|
||||
except HTTPClientException as e:
|
||||
raise RemoteException(e.args)
|
||||
|
||||
|
||||
@attr.s
|
||||
@ -1094,18 +1102,21 @@ class _HTTPBucketReader(object):
|
||||
class _HTTPStorageServer(object):
|
||||
"""
|
||||
Talk to remote storage server over HTTP.
|
||||
|
||||
The same upload key is used for all communication.
|
||||
"""
|
||||
_http_client = attr.ib(type=StorageClient)
|
||||
_upload_secret = attr.ib(type=bytes)
|
||||
|
||||
@staticmethod
|
||||
def from_http_client(http_client): # type: (StorageClient) -> _HTTPStorageServer
|
||||
"""
|
||||
Create an ``IStorageServer`` from a HTTP ``StorageClient``.
|
||||
"""
|
||||
return _HTTPStorageServer(http_client=http_client)
|
||||
return _HTTPStorageServer(http_client=http_client, upload_secret=urandom(20))
|
||||
|
||||
def get_version(self):
|
||||
return self._http_client.get_version()
|
||||
return StorageClientGeneral(self._http_client).get_version()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def allocate_buckets(
|
||||
@ -1117,10 +1128,9 @@ class _HTTPStorageServer(object):
|
||||
allocated_size,
|
||||
canary,
|
||||
):
|
||||
upload_secret = urandom(20)
|
||||
immutable_client = StorageClientImmutables(self._http_client)
|
||||
result = immutable_client.create(
|
||||
storage_index, sharenums, allocated_size, upload_secret, renew_secret,
|
||||
storage_index, sharenums, allocated_size, self._upload_secret, renew_secret,
|
||||
cancel_secret
|
||||
)
|
||||
result = yield result
|
||||
@ -1130,7 +1140,7 @@ class _HTTPStorageServer(object):
|
||||
client=immutable_client,
|
||||
storage_index=storage_index,
|
||||
share_number=share_num,
|
||||
upload_secret=upload_secret
|
||||
upload_secret=self._upload_secret
|
||||
))
|
||||
for share_num in result.allocated
|
||||
})
|
||||
|
@ -1160,13 +1160,8 @@ class HTTPImmutableAPIsTests(
|
||||
"test_add_lease_renewal",
|
||||
"test_add_new_lease",
|
||||
"test_advise_corrupt_share",
|
||||
"test_allocate_buckets_repeat",
|
||||
"test_bucket_advise_corrupt_share",
|
||||
"test_disconnection",
|
||||
"test_get_buckets_skips_unfinished_buckets",
|
||||
"test_matching_overlapping_writes",
|
||||
"test_non_matching_overlapping_writes",
|
||||
"test_written_shares_are_allocated",
|
||||
}
|
||||
|
||||
|
||||
|
@ -24,6 +24,10 @@ from klein import Klein
|
||||
from hyperlink import DecodedURL
|
||||
from collections_extended import RangeMap
|
||||
from twisted.internet.task import Clock
|
||||
from twisted.web import http
|
||||
from twisted.web.http_headers import Headers
|
||||
from werkzeug import routing
|
||||
from werkzeug.exceptions import NotFound as WNotFound
|
||||
|
||||
from .common import SyncTestCase
|
||||
from ..storage.server import StorageServer
|
||||
@ -33,6 +37,7 @@ from ..storage.http_server import (
|
||||
Secrets,
|
||||
ClientSecretsException,
|
||||
_authorized_route,
|
||||
StorageIndexConverter,
|
||||
)
|
||||
from ..storage.http_client import (
|
||||
StorageClient,
|
||||
@ -40,7 +45,10 @@ from ..storage.http_client import (
|
||||
StorageClientImmutables,
|
||||
ImmutableCreateResult,
|
||||
UploadProgress,
|
||||
StorageClientGeneral,
|
||||
_encode_si,
|
||||
)
|
||||
from ..storage.common import si_b2a
|
||||
|
||||
|
||||
def _post_process(params):
|
||||
@ -147,6 +155,52 @@ class ExtractSecretsTests(SyncTestCase):
|
||||
_extract_secrets(["lease-cancel-secret eA=="], {Secrets.LEASE_RENEW})
|
||||
|
||||
|
||||
class RouteConverterTests(SyncTestCase):
|
||||
"""Tests for custom werkzeug path segment converters."""
|
||||
|
||||
adapter = routing.Map(
|
||||
[
|
||||
routing.Rule(
|
||||
"/<storage_index:storage_index>/", endpoint="si", methods=["GET"]
|
||||
)
|
||||
],
|
||||
converters={"storage_index": StorageIndexConverter},
|
||||
).bind("example.com", "/")
|
||||
|
||||
@given(storage_index=st.binary(min_size=16, max_size=16))
|
||||
def test_good_storage_index_is_parsed(self, storage_index):
|
||||
"""
|
||||
A valid storage index is accepted and parsed back out by
|
||||
StorageIndexConverter.
|
||||
"""
|
||||
self.assertEqual(
|
||||
self.adapter.match(
|
||||
"/{}/".format(str(si_b2a(storage_index), "ascii")), method="GET"
|
||||
),
|
||||
("si", {"storage_index": storage_index}),
|
||||
)
|
||||
|
||||
def test_long_storage_index_is_not_parsed(self):
|
||||
"""An overly long storage_index string is not parsed."""
|
||||
with self.assertRaises(WNotFound):
|
||||
self.adapter.match("/{}/".format("a" * 27), method="GET")
|
||||
|
||||
def test_short_storage_index_is_not_parsed(self):
|
||||
"""An overly short storage_index string is not parsed."""
|
||||
with self.assertRaises(WNotFound):
|
||||
self.adapter.match("/{}/".format("a" * 25), method="GET")
|
||||
|
||||
def test_bad_characters_storage_index_is_not_parsed(self):
|
||||
"""A storage_index string with bad characters is not parsed."""
|
||||
with self.assertRaises(WNotFound):
|
||||
self.adapter.match("/{}_/".format("a" * 25), method="GET")
|
||||
|
||||
def test_invalid_storage_index_is_not_parsed(self):
|
||||
"""An invalid storage_index string is not parsed."""
|
||||
with self.assertRaises(WNotFound):
|
||||
self.adapter.match("/nomd2a65ylxjbqzsw7gcfh4ivr/", method="GET")
|
||||
|
||||
|
||||
# TODO should be actual swissnum
|
||||
SWISSNUM_FOR_TEST = b"abcd"
|
||||
|
||||
@ -207,7 +261,7 @@ class RoutingTests(SyncTestCase):
|
||||
"""
|
||||
# Without secret, get a 400 error.
|
||||
response = result_of(
|
||||
self.client._request(
|
||||
self.client.request(
|
||||
"GET",
|
||||
"http://127.0.0.1/upload_secret",
|
||||
)
|
||||
@ -216,7 +270,7 @@ class RoutingTests(SyncTestCase):
|
||||
|
||||
# With secret, we're good.
|
||||
response = result_of(
|
||||
self.client._request(
|
||||
self.client.request(
|
||||
"GET", "http://127.0.0.1/upload_secret", upload_secret=b"MAGIC"
|
||||
)
|
||||
)
|
||||
@ -244,6 +298,24 @@ class HttpTestFixture(Fixture):
|
||||
)
|
||||
|
||||
|
||||
class StorageClientWithHeadersOverride(object):
|
||||
"""Wrap ``StorageClient`` and override sent headers."""
|
||||
|
||||
def __init__(self, storage_client, add_headers):
|
||||
self.storage_client = storage_client
|
||||
self.add_headers = add_headers
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return getattr(self.storage_client, attr)
|
||||
|
||||
def request(self, *args, headers=None, **kwargs):
|
||||
if headers is None:
|
||||
headers = Headers()
|
||||
for key, value in self.add_headers.items():
|
||||
headers.setRawHeaders(key, [value])
|
||||
return self.storage_client.request(*args, headers=headers, **kwargs)
|
||||
|
||||
|
||||
class GenericHTTPAPITests(SyncTestCase):
|
||||
"""
|
||||
Tests of HTTP client talking to the HTTP server, for generic HTTP API
|
||||
@ -261,10 +333,12 @@ class GenericHTTPAPITests(SyncTestCase):
|
||||
If the wrong swissnum is used, an ``Unauthorized`` response code is
|
||||
returned.
|
||||
"""
|
||||
client = StorageClient(
|
||||
DecodedURL.from_text("http://127.0.0.1"),
|
||||
b"something wrong",
|
||||
treq=StubTreq(self.http.http_server.get_resource()),
|
||||
client = StorageClientGeneral(
|
||||
StorageClient(
|
||||
DecodedURL.from_text("http://127.0.0.1"),
|
||||
b"something wrong",
|
||||
treq=StubTreq(self.http.http_server.get_resource()),
|
||||
)
|
||||
)
|
||||
with self.assertRaises(ClientException) as e:
|
||||
result_of(client.get_version())
|
||||
@ -277,7 +351,8 @@ class GenericHTTPAPITests(SyncTestCase):
|
||||
We ignore available disk space and max immutable share size, since that
|
||||
might change across calls.
|
||||
"""
|
||||
version = result_of(self.http.client.get_version())
|
||||
client = StorageClientGeneral(self.http.client)
|
||||
version = result_of(client.get_version())
|
||||
version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
|
||||
b"available-space"
|
||||
)
|
||||
@ -304,6 +379,28 @@ class ImmutableHTTPAPITests(SyncTestCase):
|
||||
self.skipTest("Not going to bother supporting Python 2")
|
||||
super(ImmutableHTTPAPITests, self).setUp()
|
||||
self.http = self.useFixture(HttpTestFixture())
|
||||
self.imm_client = StorageClientImmutables(self.http.client)
|
||||
|
||||
def create_upload(self, share_numbers, length):
|
||||
"""
|
||||
Create a write bucket on server, return:
|
||||
|
||||
(upload_secret, lease_secret, storage_index, result)
|
||||
"""
|
||||
upload_secret = urandom(32)
|
||||
lease_secret = urandom(32)
|
||||
storage_index = urandom(16)
|
||||
created = result_of(
|
||||
self.imm_client.create(
|
||||
storage_index,
|
||||
share_numbers,
|
||||
length,
|
||||
upload_secret,
|
||||
lease_secret,
|
||||
lease_secret,
|
||||
)
|
||||
)
|
||||
return (upload_secret, lease_secret, storage_index, created)
|
||||
|
||||
def test_upload_can_be_downloaded(self):
|
||||
"""
|
||||
@ -315,19 +412,10 @@ class ImmutableHTTPAPITests(SyncTestCase):
|
||||
that's already done in test_storage.py.
|
||||
"""
|
||||
length = 100
|
||||
expected_data = b"".join(bytes([i]) for i in range(100))
|
||||
|
||||
im_client = StorageClientImmutables(self.http.client)
|
||||
expected_data = bytes(range(100))
|
||||
|
||||
# Create a upload:
|
||||
upload_secret = urandom(32)
|
||||
lease_secret = urandom(32)
|
||||
storage_index = b"".join(bytes([i]) for i in range(16))
|
||||
created = result_of(
|
||||
im_client.create(
|
||||
storage_index, {1}, 100, upload_secret, lease_secret, lease_secret
|
||||
)
|
||||
)
|
||||
(upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
|
||||
self.assertEqual(
|
||||
created, ImmutableCreateResult(already_have=set(), allocated={1})
|
||||
)
|
||||
@ -338,7 +426,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
|
||||
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
|
||||
def write(offset, length):
|
||||
remaining.empty(offset, offset + length)
|
||||
return im_client.write_share_chunk(
|
||||
return self.imm_client.write_share_chunk(
|
||||
storage_index,
|
||||
1,
|
||||
upload_secret,
|
||||
@ -382,31 +470,58 @@ class ImmutableHTTPAPITests(SyncTestCase):
|
||||
# We can now read:
|
||||
for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]:
|
||||
downloaded = result_of(
|
||||
im_client.read_share_chunk(storage_index, 1, offset, length)
|
||||
self.imm_client.read_share_chunk(storage_index, 1, offset, length)
|
||||
)
|
||||
self.assertEqual(downloaded, expected_data[offset : offset + length])
|
||||
|
||||
def test_allocate_buckets_second_time_wrong_upload_key(self):
|
||||
"""
|
||||
If allocate buckets endpoint is called second time with wrong upload
|
||||
key on the same shares, the result is an error.
|
||||
"""
|
||||
# Create a upload:
|
||||
(upload_secret, lease_secret, storage_index, _) = self.create_upload(
|
||||
{1, 2, 3}, 100
|
||||
)
|
||||
with self.assertRaises(ClientException) as e:
|
||||
result_of(
|
||||
self.imm_client.create(
|
||||
storage_index, {2, 3}, 100, b"x" * 32, lease_secret, lease_secret
|
||||
)
|
||||
)
|
||||
self.assertEqual(e.exception.args[0], http.UNAUTHORIZED)
|
||||
|
||||
def test_allocate_buckets_second_time_different_shares(self):
|
||||
"""
|
||||
If allocate buckets endpoint is called second time with different
|
||||
upload key on different shares, that creates the buckets.
|
||||
"""
|
||||
# Create a upload:
|
||||
(upload_secret, lease_secret, storage_index, created) = self.create_upload(
|
||||
{1, 2, 3}, 100
|
||||
)
|
||||
|
||||
# Add same shares:
|
||||
created2 = result_of(
|
||||
self.imm_client.create(
|
||||
storage_index, {4, 6}, 100, b"x" * 2, lease_secret, lease_secret
|
||||
)
|
||||
)
|
||||
self.assertEqual(created2.allocated, {4, 6})
|
||||
|
||||
def test_list_shares(self):
|
||||
"""
|
||||
Once a share is finished uploading, it's possible to list it.
|
||||
"""
|
||||
im_client = StorageClientImmutables(self.http.client)
|
||||
upload_secret = urandom(32)
|
||||
lease_secret = urandom(32)
|
||||
storage_index = b"".join(bytes([i]) for i in range(16))
|
||||
result_of(
|
||||
im_client.create(
|
||||
storage_index, {1, 2, 3}, 10, upload_secret, lease_secret, lease_secret
|
||||
)
|
||||
)
|
||||
(upload_secret, _, storage_index, created) = self.create_upload({1, 2, 3}, 10)
|
||||
|
||||
# Initially there are no shares:
|
||||
self.assertEqual(result_of(im_client.list_shares(storage_index)), set())
|
||||
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set())
|
||||
|
||||
# Upload shares 1 and 3:
|
||||
for share_number in [1, 3]:
|
||||
progress = result_of(
|
||||
im_client.write_share_chunk(
|
||||
self.imm_client.write_share_chunk(
|
||||
storage_index,
|
||||
share_number,
|
||||
upload_secret,
|
||||
@ -417,87 +532,266 @@ class ImmutableHTTPAPITests(SyncTestCase):
|
||||
self.assertTrue(progress.finished)
|
||||
|
||||
# Now shares 1 and 3 exist:
|
||||
self.assertEqual(result_of(im_client.list_shares(storage_index)), {1, 3})
|
||||
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), {1, 3})
|
||||
|
||||
def test_upload_bad_content_range(self):
|
||||
"""
|
||||
Malformed or invalid Content-Range headers to the immutable upload
|
||||
endpoint result in a 416 error.
|
||||
"""
|
||||
(upload_secret, _, storage_index, created) = self.create_upload({1}, 10)
|
||||
|
||||
def check_invalid(bad_content_range_value):
|
||||
client = StorageClientImmutables(
|
||||
StorageClientWithHeadersOverride(
|
||||
self.http.client, {"content-range": bad_content_range_value}
|
||||
)
|
||||
)
|
||||
with self.assertRaises(ClientException) as e:
|
||||
result_of(
|
||||
client.write_share_chunk(
|
||||
storage_index,
|
||||
1,
|
||||
upload_secret,
|
||||
0,
|
||||
b"0123456789",
|
||||
)
|
||||
)
|
||||
self.assertEqual(e.exception.code, http.REQUESTED_RANGE_NOT_SATISFIABLE)
|
||||
|
||||
check_invalid("not a valid content-range header at all")
|
||||
check_invalid("bytes -1-9/10")
|
||||
check_invalid("bytes 0--9/10")
|
||||
check_invalid("teapots 0-9/10")
|
||||
|
||||
def test_list_shares_unknown_storage_index(self):
|
||||
"""
|
||||
Listing unknown storage index's shares results in empty list of shares.
|
||||
"""
|
||||
im_client = StorageClientImmutables(self.http.client)
|
||||
storage_index = b"".join(bytes([i]) for i in range(16))
|
||||
self.assertEqual(result_of(im_client.list_shares(storage_index)), set())
|
||||
storage_index = bytes(range(16))
|
||||
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set())
|
||||
|
||||
def test_upload_non_existent_storage_index(self):
|
||||
"""
|
||||
Uploading to a non-existent storage index or share number results in
|
||||
404.
|
||||
"""
|
||||
(upload_secret, _, storage_index, _) = self.create_upload({1}, 10)
|
||||
|
||||
def unknown_check(storage_index, share_number):
|
||||
with self.assertRaises(ClientException) as e:
|
||||
result_of(
|
||||
self.imm_client.write_share_chunk(
|
||||
storage_index,
|
||||
share_number,
|
||||
upload_secret,
|
||||
0,
|
||||
b"0123456789",
|
||||
)
|
||||
)
|
||||
self.assertEqual(e.exception.code, http.NOT_FOUND)
|
||||
|
||||
# Wrong share number:
|
||||
unknown_check(storage_index, 7)
|
||||
# Wrong storage index:
|
||||
unknown_check(b"X" * 16, 7)
|
||||
|
||||
def test_multiple_shares_uploaded_to_different_place(self):
|
||||
"""
|
||||
If a storage index has multiple shares, uploads to different shares are
|
||||
stored separately and can be downloaded separately.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_bucket_allocated_with_new_shares(self):
|
||||
"""
|
||||
If some shares already exist, allocating shares indicates only the new
|
||||
ones were created.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_bucket_allocation_new_upload_secret(self):
|
||||
"""
|
||||
If a bucket was allocated with one upload secret, and a different upload
|
||||
key is used to allocate the bucket again, the second allocation fails.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_upload_with_wrong_upload_secret_fails(self):
|
||||
"""
|
||||
Uploading with a key that doesn't match the one used to allocate the
|
||||
bucket will fail.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_upload_offset_cannot_be_negative(self):
|
||||
"""
|
||||
A negative upload offset will be rejected.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
(upload_secret, _, storage_index, _) = self.create_upload({1, 2}, 10)
|
||||
result_of(
|
||||
self.imm_client.write_share_chunk(
|
||||
storage_index,
|
||||
1,
|
||||
upload_secret,
|
||||
0,
|
||||
b"1" * 10,
|
||||
)
|
||||
)
|
||||
result_of(
|
||||
self.imm_client.write_share_chunk(
|
||||
storage_index,
|
||||
2,
|
||||
upload_secret,
|
||||
0,
|
||||
b"2" * 10,
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
result_of(self.imm_client.read_share_chunk(storage_index, 1, 0, 10)),
|
||||
b"1" * 10,
|
||||
)
|
||||
self.assertEqual(
|
||||
result_of(self.imm_client.read_share_chunk(storage_index, 2, 0, 10)),
|
||||
b"2" * 10,
|
||||
)
|
||||
|
||||
def test_mismatching_upload_fails(self):
|
||||
"""
|
||||
If an uploaded chunk conflicts with an already uploaded chunk, a
|
||||
CONFLICT error is returned.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
(upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
|
||||
|
||||
# Write:
|
||||
result_of(
|
||||
self.imm_client.write_share_chunk(
|
||||
storage_index,
|
||||
1,
|
||||
upload_secret,
|
||||
0,
|
||||
b"0" * 10,
|
||||
)
|
||||
)
|
||||
|
||||
# Conflicting write:
|
||||
with self.assertRaises(ClientException) as e:
|
||||
result_of(
|
||||
self.imm_client.write_share_chunk(
|
||||
storage_index,
|
||||
1,
|
||||
upload_secret,
|
||||
0,
|
||||
b"0123456789",
|
||||
)
|
||||
)
|
||||
self.assertEqual(e.exception.code, http.NOT_FOUND)
|
||||
|
||||
def upload(self, share_number, data_length=26):
|
||||
"""
|
||||
Create a share, return (storage_index, uploaded_data).
|
||||
"""
|
||||
uploaded_data = (b"abcdefghijklmnopqrstuvwxyz" * ((data_length // 26) + 1))[
|
||||
:data_length
|
||||
]
|
||||
(upload_secret, _, storage_index, _) = self.create_upload(
|
||||
{share_number}, data_length
|
||||
)
|
||||
result_of(
|
||||
self.imm_client.write_share_chunk(
|
||||
storage_index,
|
||||
share_number,
|
||||
upload_secret,
|
||||
0,
|
||||
uploaded_data,
|
||||
)
|
||||
)
|
||||
return storage_index, uploaded_data
|
||||
|
||||
def test_read_of_wrong_storage_index_fails(self):
|
||||
"""
|
||||
Reading from unknown storage index results in 404.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
with self.assertRaises(ClientException) as e:
|
||||
result_of(
|
||||
self.imm_client.read_share_chunk(
|
||||
b"1" * 16,
|
||||
1,
|
||||
0,
|
||||
10,
|
||||
)
|
||||
)
|
||||
self.assertEqual(e.exception.code, http.NOT_FOUND)
|
||||
|
||||
def test_read_of_wrong_share_number_fails(self):
|
||||
"""
|
||||
Reading from unknown storage index results in 404.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
storage_index, _ = self.upload(1)
|
||||
with self.assertRaises(ClientException) as e:
|
||||
result_of(
|
||||
self.imm_client.read_share_chunk(
|
||||
storage_index,
|
||||
7, # different share number
|
||||
0,
|
||||
10,
|
||||
)
|
||||
)
|
||||
self.assertEqual(e.exception.code, http.NOT_FOUND)
|
||||
|
||||
def test_read_with_negative_offset_fails(self):
|
||||
"""
|
||||
The offset for reads cannot be negative.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
Malformed or unsupported Range headers result in 416 (requested range
|
||||
not satisfiable) error.
|
||||
"""
|
||||
storage_index, _ = self.upload(1)
|
||||
|
||||
def test_read_with_negative_length_fails(self):
|
||||
"""
|
||||
The length for reads cannot be negative.
|
||||
def check_bad_range(bad_range_value):
|
||||
client = StorageClientImmutables(
|
||||
StorageClientWithHeadersOverride(
|
||||
self.http.client, {"range": bad_range_value}
|
||||
)
|
||||
)
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
with self.assertRaises(ClientException) as e:
|
||||
result_of(
|
||||
client.read_share_chunk(
|
||||
storage_index,
|
||||
1,
|
||||
0,
|
||||
10,
|
||||
)
|
||||
)
|
||||
self.assertEqual(e.exception.code, http.REQUESTED_RANGE_NOT_SATISFIABLE)
|
||||
|
||||
# Bad unit
|
||||
check_bad_range("molluscs=0-9")
|
||||
# Negative offsets
|
||||
check_bad_range("bytes=-2-9")
|
||||
check_bad_range("bytes=0--10")
|
||||
# Negative offset no endpoint
|
||||
check_bad_range("bytes=-300-")
|
||||
check_bad_range("bytes=")
|
||||
# Multiple ranges are currently unsupported, even if they're
|
||||
# semantically valid under HTTP:
|
||||
check_bad_range("bytes=0-5, 6-7")
|
||||
# Ranges without an end are currently unsupported, even if they're
|
||||
# semantically valid under HTTP.
|
||||
check_bad_range("bytes=0-")
|
||||
|
||||
@given(data_length=st.integers(min_value=1, max_value=300000))
|
||||
def test_read_with_no_range(self, data_length):
|
||||
"""
|
||||
A read with no range returns the whole immutable.
|
||||
"""
|
||||
storage_index, uploaded_data = self.upload(1, data_length)
|
||||
response = result_of(
|
||||
self.http.client.request(
|
||||
"GET",
|
||||
self.http.client.relative_url(
|
||||
"/v1/immutable/{}/1".format(_encode_si(storage_index))
|
||||
),
|
||||
)
|
||||
)
|
||||
self.assertEqual(response.code, http.OK)
|
||||
self.assertEqual(result_of(response.content()), uploaded_data)
|
||||
|
||||
def test_validate_content_range_response_to_read(self):
|
||||
"""
|
||||
The server responds to ranged reads with an appropriate Content-Range
|
||||
header.
|
||||
"""
|
||||
storage_index, _ = self.upload(1, 26)
|
||||
|
||||
def check_range(requested_range, expected_response):
|
||||
headers = Headers()
|
||||
headers.setRawHeaders("range", [requested_range])
|
||||
response = result_of(
|
||||
self.http.client.request(
|
||||
"GET",
|
||||
self.http.client.relative_url(
|
||||
"/v1/immutable/{}/1".format(_encode_si(storage_index))
|
||||
),
|
||||
headers=headers,
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
response.headers.getRawHeaders("content-range"), [expected_response]
|
||||
)
|
||||
|
||||
check_range("bytes=0-10", "bytes 0-10/*")
|
||||
# Can't go beyond the end of the immutable!
|
||||
check_range("bytes=10-100", "bytes 10-25/*")
|
||||
|
Loading…
x
Reference in New Issue
Block a user