mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-31 16:36:20 +00:00
Merge pull request #1172 from tahoe-lafs/3855-immutable-http-apis-part-1
Immutable HTTP APIs, part 1 Fixes ticket:3855
This commit is contained in:
commit
5e8cc06e93
@ -73,12 +73,13 @@ mach-nix.buildPythonPackage rec {
|
||||
# file. Tell it about them here.
|
||||
setuptools_rust
|
||||
|
||||
# mach-nix does not yet parse environment markers correctly. It misses
|
||||
# all of our requirements which have an environment marker. Duplicate them
|
||||
# here.
|
||||
# mach-nix does not yet parse environment markers (e.g. "python > '3.0'")
|
||||
# correctly. It misses all of our requirements which have an environment marker.
|
||||
# Duplicate them here.
|
||||
foolscap
|
||||
eliot
|
||||
pyrsistent
|
||||
collections-extended
|
||||
'';
|
||||
|
||||
# Specify where mach-nix should find packages for our Python dependencies.
|
||||
|
@ -382,6 +382,11 @@ the server will respond with ``400 BAD REQUEST``.
|
||||
|
||||
If authorization using the secret fails, then a ``401 UNAUTHORIZED`` response should be sent.
|
||||
|
||||
Encoding
|
||||
~~~~~~~~
|
||||
|
||||
* ``storage_index`` should be base32 encoded (RFC3548) in URLs.
|
||||
|
||||
General
|
||||
~~~~~~~
|
||||
|
||||
@ -483,6 +488,14 @@ For example::
|
||||
|
||||
The upload secret is an opaque _byte_ string.
|
||||
|
||||
Handling repeat calls:
|
||||
|
||||
* If the same API call is repeated with the same upload secret, the response is the same and no change is made to server state.
|
||||
This is necessary to ensure retries work in the face of lost responses from the server.
|
||||
* If the API calls is with a different upload secret, this implies a new client, perhaps because the old client died.
|
||||
In order to prevent storage servers from being able to mess with each other, this API call will fail, because the secret doesn't match.
|
||||
The use case of restarting upload from scratch if the client dies can be implemented by having the client persist the upload secret.
|
||||
|
||||
Discussion
|
||||
``````````
|
||||
|
||||
@ -627,7 +640,7 @@ For example::
|
||||
|
||||
Read a contiguous sequence of bytes from one share in one bucket.
|
||||
The response body is the raw share data (i.e., ``application/octet-stream``).
|
||||
The ``Range`` header may be used to request exactly one ``bytes`` range.
|
||||
The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content).
|
||||
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
|
||||
Multiple ranges in a single request are *not* supported.
|
||||
|
||||
|
0
newsfragments/3855.minor
Normal file
0
newsfragments/3855.minor
Normal file
8
setup.py
8
setup.py
@ -138,11 +138,15 @@ install_requires = [
|
||||
# Backported configparser for Python 2:
|
||||
"configparser ; python_version < '3.0'",
|
||||
|
||||
# For the RangeMap datastructure.
|
||||
"collections-extended",
|
||||
# For the RangeMap datastructure. Need 2.0.2 at least for bugfixes. Python
|
||||
# 2 doesn't actually need this, since HTTP storage protocol isn't supported
|
||||
# there, so we just pick whatever version so that code imports.
|
||||
"collections-extended >= 2.0.2 ; python_version > '3.0'",
|
||||
"collections-extended ; python_version < '3.0'",
|
||||
|
||||
# HTTP server and client
|
||||
"klein",
|
||||
"werkzeug",
|
||||
"treq",
|
||||
"cbor2"
|
||||
]
|
||||
|
@ -13,23 +13,39 @@ if PY2:
|
||||
# fmt: off
|
||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
||||
# fmt: on
|
||||
from collections import defaultdict
|
||||
|
||||
Optional = Set = defaultdict(
|
||||
lambda: None
|
||||
) # some garbage to just make this module import
|
||||
else:
|
||||
# typing module not available in Python 2, and we only do type checking in
|
||||
# Python 3 anyway.
|
||||
from typing import Union
|
||||
from typing import Union, Set, Optional
|
||||
from treq.testing import StubTreq
|
||||
|
||||
from base64 import b64encode
|
||||
|
||||
import attr
|
||||
|
||||
# TODO Make sure to import Python version?
|
||||
from cbor2 import loads
|
||||
|
||||
|
||||
from cbor2 import loads, dumps
|
||||
from collections_extended import RangeMap
|
||||
from werkzeug.datastructures import Range, ContentRange
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.internet.defer import inlineCallbacks, returnValue, fail
|
||||
from twisted.web import http
|
||||
from twisted.internet.defer import inlineCallbacks, returnValue, fail, Deferred
|
||||
from hyperlink import DecodedURL
|
||||
import treq
|
||||
|
||||
from .http_common import swissnum_auth_header, Secrets
|
||||
from .common import si_b2a
|
||||
|
||||
|
||||
def _encode_si(si): # type: (bytes) -> str
|
||||
"""Encode the storage index into Unicode string."""
|
||||
return str(si_b2a(si), "ascii")
|
||||
|
||||
|
||||
class ClientException(Exception):
|
||||
"""An unexpected error."""
|
||||
@ -42,9 +58,12 @@ def _decode_cbor(response):
|
||||
return fail(ClientException(response.code, response.phrase))
|
||||
|
||||
|
||||
def swissnum_auth_header(swissnum): # type: (bytes) -> bytes
|
||||
"""Return value for ``Authentication`` header."""
|
||||
return b"Tahoe-LAFS " + b64encode(swissnum).strip()
|
||||
@attr.s
|
||||
class ImmutableCreateResult(object):
|
||||
"""Result of creating a storage index for an immutable."""
|
||||
|
||||
already_have = attr.ib(type=Set[int])
|
||||
allocated = attr.ib(type=Set[int])
|
||||
|
||||
|
||||
class StorageClient(object):
|
||||
@ -59,25 +78,45 @@ class StorageClient(object):
|
||||
self._swissnum = swissnum
|
||||
self._treq = treq
|
||||
|
||||
def _get_headers(self): # type: () -> Headers
|
||||
def _url(self, path):
|
||||
"""Get a URL relative to the base URL."""
|
||||
return self._base_url.click(path)
|
||||
|
||||
def _get_headers(self, headers): # type: (Optional[Headers]) -> Headers
|
||||
"""Return the basic headers to be used by default."""
|
||||
headers = Headers()
|
||||
if headers is None:
|
||||
headers = Headers()
|
||||
headers.addRawHeader(
|
||||
"Authorization",
|
||||
swissnum_auth_header(self._swissnum),
|
||||
)
|
||||
return headers
|
||||
|
||||
def _request(self, method, url, secrets, **kwargs):
|
||||
def _request(
|
||||
self,
|
||||
method,
|
||||
url,
|
||||
lease_renew_secret=None,
|
||||
lease_cancel_secret=None,
|
||||
upload_secret=None,
|
||||
headers=None,
|
||||
**kwargs
|
||||
):
|
||||
"""
|
||||
Like ``treq.request()``, but additional argument of secrets mapping
|
||||
``http_server.Secret`` to the bytes value of the secret.
|
||||
Like ``treq.request()``, but with optional secrets that get translated
|
||||
into corresponding HTTP headers.
|
||||
"""
|
||||
headers = self._get_headers()
|
||||
for key, value in secrets.items():
|
||||
headers = self._get_headers(headers)
|
||||
for secret, value in [
|
||||
(Secrets.LEASE_RENEW, lease_renew_secret),
|
||||
(Secrets.LEASE_CANCEL, lease_cancel_secret),
|
||||
(Secrets.UPLOAD, upload_secret),
|
||||
]:
|
||||
if value is None:
|
||||
continue
|
||||
headers.addRawHeader(
|
||||
"X-Tahoe-Authorization",
|
||||
b"%s %s" % (key.value.encode("ascii"), b64encode(value).strip())
|
||||
b"%s %s" % (secret.value.encode("ascii"), b64encode(value).strip()),
|
||||
)
|
||||
return self._treq.request(method, url, headers=headers, **kwargs)
|
||||
|
||||
@ -86,7 +125,160 @@ class StorageClient(object):
|
||||
"""
|
||||
Return the version metadata for the server.
|
||||
"""
|
||||
url = self._base_url.click("/v1/version")
|
||||
response = yield self._request("GET", url, {})
|
||||
url = self._url("/v1/version")
|
||||
response = yield self._request("GET", url)
|
||||
decoded_response = yield _decode_cbor(response)
|
||||
returnValue(decoded_response)
|
||||
|
||||
|
||||
@attr.s
|
||||
class UploadProgress(object):
|
||||
"""
|
||||
Progress of immutable upload, per the server.
|
||||
"""
|
||||
# True when upload has finished.
|
||||
finished = attr.ib(type=bool)
|
||||
# Remaining ranges to upload.
|
||||
required = attr.ib(type=RangeMap)
|
||||
|
||||
|
||||
class StorageClientImmutables(object):
|
||||
"""
|
||||
APIs for interacting with immutables.
|
||||
"""
|
||||
|
||||
def __init__(self, client): # type: (StorageClient) -> None
|
||||
self._client = client
|
||||
|
||||
@inlineCallbacks
|
||||
def create(
|
||||
self,
|
||||
storage_index,
|
||||
share_numbers,
|
||||
allocated_size,
|
||||
upload_secret,
|
||||
lease_renew_secret,
|
||||
lease_cancel_secret,
|
||||
): # type: (bytes, Set[int], int, bytes, bytes, bytes) -> Deferred[ImmutableCreateResult]
|
||||
"""
|
||||
Create a new storage index for an immutable.
|
||||
|
||||
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 retry
|
||||
internally on failure, to ensure the operation fully succeeded. If
|
||||
sufficient number of failures occurred, the result may fire with an
|
||||
error, but there's no expectation that user code needs to have a
|
||||
recovery codepath; it will most likely just report an error to the
|
||||
user.
|
||||
|
||||
Result fires when creating the storage index succeeded, if creating the
|
||||
storage index failed the result will fire with an exception.
|
||||
"""
|
||||
url = self._client._url("/v1/immutable/" + _encode_si(storage_index))
|
||||
message = dumps(
|
||||
{"share-numbers": share_numbers, "allocated-size": allocated_size}
|
||||
)
|
||||
response = yield self._client._request(
|
||||
"POST",
|
||||
url,
|
||||
lease_renew_secret=lease_renew_secret,
|
||||
lease_cancel_secret=lease_cancel_secret,
|
||||
upload_secret=upload_secret,
|
||||
data=message,
|
||||
headers=Headers({"content-type": ["application/cbor"]}),
|
||||
)
|
||||
decoded_response = yield _decode_cbor(response)
|
||||
returnValue(
|
||||
ImmutableCreateResult(
|
||||
already_have=decoded_response["already-have"],
|
||||
allocated=decoded_response["allocated"],
|
||||
)
|
||||
)
|
||||
|
||||
@inlineCallbacks
|
||||
def write_share_chunk(
|
||||
self, storage_index, share_number, upload_secret, offset, data
|
||||
): # type: (bytes, int, bytes, int, bytes) -> Deferred[UploadProgress]
|
||||
"""
|
||||
Upload a chunk of data for a specific share.
|
||||
|
||||
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 The
|
||||
implementation should retry failed uploads transparently a number of
|
||||
times, so that if a failure percolates up, the caller can assume the
|
||||
failure isn't a short-term blip.
|
||||
|
||||
Result fires when the upload succeeded, with a boolean indicating
|
||||
whether the _complete_ share (i.e. all chunks, not just this one) has
|
||||
been uploaded.
|
||||
"""
|
||||
url = self._client._url(
|
||||
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
|
||||
)
|
||||
response = yield self._client._request(
|
||||
"PATCH",
|
||||
url,
|
||||
upload_secret=upload_secret,
|
||||
data=data,
|
||||
headers=Headers(
|
||||
{
|
||||
"content-range": [
|
||||
ContentRange("bytes", offset, offset+len(data)).to_header()
|
||||
]
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
if response.code == http.OK:
|
||||
# Upload is still unfinished.
|
||||
finished = False
|
||||
elif response.code == http.CREATED:
|
||||
# Upload is done!
|
||||
finished = True
|
||||
else:
|
||||
raise ClientException(
|
||||
response.code,
|
||||
)
|
||||
body = loads((yield response.content()))
|
||||
remaining = RangeMap()
|
||||
for chunk in body["required"]:
|
||||
remaining.set(True, chunk["begin"], chunk["end"])
|
||||
returnValue(UploadProgress(finished=finished, required=remaining))
|
||||
|
||||
@inlineCallbacks
|
||||
def read_share_chunk(
|
||||
self, storage_index, share_number, offset, length
|
||||
): # type: (bytes, int, int, int) -> Deferred[bytes]
|
||||
"""
|
||||
Download a chunk of data from a share.
|
||||
|
||||
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
|
||||
downloads should be transparently retried and redownloaded by the
|
||||
implementation a few times so that if a failure percolates up, the
|
||||
caller can assume the failure isn't a short-term blip.
|
||||
|
||||
NOTE: the underlying HTTP protocol is much more flexible than this API,
|
||||
so a future refactor may expand this in order to simplify the calling
|
||||
code and perhaps download data more efficiently. But then again maybe
|
||||
the HTTP protocol will be simplified, see
|
||||
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
|
||||
"""
|
||||
url = self._client._url(
|
||||
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
|
||||
)
|
||||
response = yield self._client._request(
|
||||
"GET",
|
||||
url,
|
||||
headers=Headers(
|
||||
{
|
||||
"range": [
|
||||
Range("bytes", [(offset, offset + length)]).to_header()
|
||||
]
|
||||
}
|
||||
),
|
||||
)
|
||||
if response.code == http.PARTIAL_CONTENT:
|
||||
body = yield response.content()
|
||||
returnValue(body)
|
||||
else:
|
||||
raise ClientException(
|
||||
response.code,
|
||||
)
|
||||
|
25
src/allmydata/storage/http_common.py
Normal file
25
src/allmydata/storage/http_common.py
Normal file
@ -0,0 +1,25 @@
|
||||
"""
|
||||
Common HTTP infrastructure for the storge server.
|
||||
"""
|
||||
from future.utils import PY2
|
||||
|
||||
if PY2:
|
||||
# fmt: off
|
||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
||||
# fmt: on
|
||||
|
||||
from enum import Enum
|
||||
from base64 import b64encode
|
||||
|
||||
|
||||
def swissnum_auth_header(swissnum): # type: (bytes) -> bytes
|
||||
"""Return value for ``Authentication`` header."""
|
||||
return b"Tahoe-LAFS " + b64encode(swissnum).strip()
|
||||
|
||||
|
||||
class Secrets(Enum):
|
||||
"""Different kinds of secrets the client may send."""
|
||||
|
||||
LEASE_RENEW = "lease-renew-secret"
|
||||
LEASE_CANCEL = "lease-cancel-secret"
|
||||
UPLOAD = "upload-secret"
|
@ -17,28 +17,23 @@ else:
|
||||
from typing import Dict, List, Set
|
||||
|
||||
from functools import wraps
|
||||
from enum import Enum
|
||||
from base64 import b64decode
|
||||
|
||||
from klein import Klein
|
||||
from twisted.web import http
|
||||
import attr
|
||||
from werkzeug.http import parse_range_header, parse_content_range_header
|
||||
|
||||
# TODO Make sure to use pure Python versions?
|
||||
from cbor2 import dumps
|
||||
from cbor2 import dumps, loads
|
||||
|
||||
from .server import StorageServer
|
||||
from .http_client import swissnum_auth_header
|
||||
from .http_common import swissnum_auth_header, Secrets
|
||||
from .common import si_a2b
|
||||
from .immutable import BucketWriter
|
||||
from ..util.hashutil import timing_safe_compare
|
||||
|
||||
|
||||
class Secrets(Enum):
|
||||
"""Different kinds of secrets the client may send."""
|
||||
|
||||
LEASE_RENEW = "lease-renew-secret"
|
||||
LEASE_CANCEL = "lease-cancel-secret"
|
||||
UPLOAD = "upload-secret"
|
||||
|
||||
|
||||
class ClientSecretsException(Exception):
|
||||
"""The client did not send the appropriate secrets."""
|
||||
|
||||
@ -117,6 +112,7 @@ def _authorized_route(app, required_secrets, *route_args, **route_kwargs):
|
||||
def decorator(f):
|
||||
@app.route(*route_args, **route_kwargs)
|
||||
@_authorization_decorator(required_secrets)
|
||||
@wraps(f)
|
||||
def handle_route(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
@ -125,6 +121,19 @@ def _authorized_route(app, required_secrets, *route_args, **route_kwargs):
|
||||
return decorator
|
||||
|
||||
|
||||
@attr.s
|
||||
class StorageIndexUploads(object):
|
||||
"""
|
||||
In-progress upload to storage index.
|
||||
"""
|
||||
|
||||
# Map share number to BucketWriter
|
||||
shares = attr.ib() # type: Dict[int,BucketWriter]
|
||||
|
||||
# The upload key.
|
||||
upload_secret = attr.ib() # type: bytes
|
||||
|
||||
|
||||
class HTTPServer(object):
|
||||
"""
|
||||
A HTTP interface to the storage server.
|
||||
@ -137,6 +146,8 @@ class HTTPServer(object):
|
||||
): # type: (StorageServer, bytes) -> None
|
||||
self._storage_server = storage_server
|
||||
self._swissnum = swissnum
|
||||
# Maps storage index to StorageIndexUploads:
|
||||
self._uploads = {} # type: Dict[bytes,StorageIndexUploads]
|
||||
|
||||
def get_resource(self):
|
||||
"""Return twisted.web ``Resource`` for this object."""
|
||||
@ -144,6 +155,8 @@ class HTTPServer(object):
|
||||
|
||||
def _cbor(self, request, data):
|
||||
"""Return CBOR-encoded data."""
|
||||
# TODO Might want to optionally send JSON someday, based on Accept
|
||||
# headers, see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
|
||||
request.setHeader("Content-Type", "application/cbor")
|
||||
# TODO if data is big, maybe want to use a temporary file eventually...
|
||||
return dumps(data)
|
||||
@ -154,3 +167,123 @@ class HTTPServer(object):
|
||||
def version(self, request, authorization):
|
||||
"""Return version information."""
|
||||
return self._cbor(request, self._storage_server.get_version())
|
||||
|
||||
##### Immutable APIs #####
|
||||
|
||||
@_authorized_route(
|
||||
_app,
|
||||
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.UPLOAD},
|
||||
"/v1/immutable/<string:storage_index>",
|
||||
methods=["POST"],
|
||||
)
|
||||
def allocate_buckets(self, request, authorization, storage_index):
|
||||
"""Allocate buckets."""
|
||||
storage_index = si_a2b(storage_index.encode("ascii"))
|
||||
info = loads(request.content.read())
|
||||
upload_secret = authorization[Secrets.UPLOAD]
|
||||
|
||||
if storage_index in self._uploads:
|
||||
# Pre-existing upload.
|
||||
in_progress = self._uploads[storage_index]
|
||||
if timing_safe_compare(in_progress.upload_secret, upload_secret):
|
||||
# Same session.
|
||||
# TODO add BucketWriters only for new shares that don't already have buckets; see the HTTP spec for details.
|
||||
# The backend code may already implement this logic.
|
||||
pass
|
||||
else:
|
||||
# TODO Fail, since the secret doesnt match.
|
||||
pass
|
||||
else:
|
||||
# New upload.
|
||||
already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
|
||||
storage_index,
|
||||
renew_secret=authorization[Secrets.LEASE_RENEW],
|
||||
cancel_secret=authorization[Secrets.LEASE_CANCEL],
|
||||
sharenums=info["share-numbers"],
|
||||
allocated_size=info["allocated-size"],
|
||||
)
|
||||
self._uploads[storage_index] = StorageIndexUploads(
|
||||
shares=sharenum_to_bucket, upload_secret=authorization[Secrets.UPLOAD]
|
||||
)
|
||||
return self._cbor(
|
||||
request,
|
||||
{
|
||||
"already-have": set(already_got),
|
||||
"allocated": set(sharenum_to_bucket),
|
||||
},
|
||||
)
|
||||
|
||||
@_authorized_route(
|
||||
_app,
|
||||
{Secrets.UPLOAD},
|
||||
"/v1/immutable/<string:storage_index>/<int:share_number>",
|
||||
methods=["PATCH"],
|
||||
)
|
||||
def write_share_data(self, request, authorization, storage_index, share_number):
|
||||
"""Write data to an in-progress immutable upload."""
|
||||
storage_index = si_a2b(storage_index.encode("ascii"))
|
||||
content_range = parse_content_range_header(request.getHeader("content-range"))
|
||||
# TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
# 1. Malformed header should result in error 416
|
||||
# 2. Non-bytes unit should result in error 416
|
||||
# 3. Missing header means full upload in one request
|
||||
# 4. Impossible range should resul tin error 416
|
||||
offset = content_range.start
|
||||
|
||||
# TODO basic checks on validity of start, offset, and content-range in general. also of share_number.
|
||||
# TODO basic check that body isn't infinite. require content-length? or maybe we should require content-range (it's optional now)? if so, needs to be rflected in protocol spec.
|
||||
|
||||
data = request.content.read()
|
||||
try:
|
||||
bucket = self._uploads[storage_index].shares[share_number]
|
||||
except (KeyError, IndexError):
|
||||
# TODO return 404
|
||||
raise
|
||||
|
||||
finished = bucket.write(offset, data)
|
||||
|
||||
# TODO if raises ConflictingWriteError, return HTTP CONFLICT code.
|
||||
|
||||
if finished:
|
||||
bucket.close()
|
||||
request.setResponseCode(http.CREATED)
|
||||
else:
|
||||
request.setResponseCode(http.OK)
|
||||
|
||||
required = []
|
||||
for start, end, _ in bucket.required_ranges().ranges():
|
||||
required.append({"begin": start, "end": end})
|
||||
return self._cbor(request, {"required": required})
|
||||
|
||||
@_authorized_route(
|
||||
_app,
|
||||
set(),
|
||||
"/v1/immutable/<string:storage_index>/<int:share_number>",
|
||||
methods=["GET"],
|
||||
)
|
||||
def read_share_chunk(self, request, authorization, storage_index, share_number):
|
||||
"""Read a chunk for an already uploaded immutable."""
|
||||
# TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
# 1. basic checks on validity on storage index, share number
|
||||
# 2. missing range header should have response code 200 and return whole thing
|
||||
# 3. malformed range header should result in error? or return everything?
|
||||
# 4. non-bytes range results in error
|
||||
# 5. ranges make sense semantically (positive, etc.)
|
||||
# 6. multiple ranges fails with error
|
||||
# 7. missing end of range means "to the end of share"
|
||||
storage_index = si_a2b(storage_index.encode("ascii"))
|
||||
range_header = parse_range_header(request.getHeader("range"))
|
||||
offset, end = range_header.ranges[0]
|
||||
assert end != None # TODO support this case
|
||||
|
||||
# TODO if not found, 404
|
||||
bucket = self._storage_server.get_buckets(storage_index)[share_number]
|
||||
data = bucket.read(offset, end - offset)
|
||||
request.setResponseCode(http.PARTIAL_CONTENT)
|
||||
# TODO set content-range on response. We we need to expand the
|
||||
# BucketReader interface to return share's length.
|
||||
#
|
||||
# request.setHeader(
|
||||
# "content-range", range_header.make_content_range(share_length).to_header()
|
||||
# )
|
||||
return data
|
||||
|
@ -372,16 +372,29 @@ class BucketWriter(object):
|
||||
self._clock = clock
|
||||
self._timeout = clock.callLater(30 * 60, self._abort_due_to_timeout)
|
||||
|
||||
def required_ranges(self): # type: () -> RangeMap
|
||||
"""
|
||||
Return which ranges still need to be written.
|
||||
"""
|
||||
result = RangeMap()
|
||||
result.set(True, 0, self._max_size)
|
||||
for start, end, _ in self._already_written.ranges():
|
||||
result.delete(start, end)
|
||||
return result
|
||||
|
||||
def allocated_size(self):
|
||||
return self._max_size
|
||||
|
||||
def write(self, offset, data):
|
||||
def write(self, offset, data): # type: (int, bytes) -> bool
|
||||
"""
|
||||
Write data at given offset, return whether the upload is complete.
|
||||
"""
|
||||
# Delay the timeout, since we received data:
|
||||
self._timeout.reset(30 * 60)
|
||||
start = self._clock.seconds()
|
||||
precondition(not self.closed)
|
||||
if self.throw_out_all_data:
|
||||
return
|
||||
return False
|
||||
|
||||
# Make sure we're not conflicting with existing data:
|
||||
end = offset + len(data)
|
||||
@ -399,6 +412,12 @@ class BucketWriter(object):
|
||||
self.ss.add_latency("write", self._clock.seconds() - start)
|
||||
self.ss.count("write")
|
||||
|
||||
# Return whether the whole thing has been written. See
|
||||
# https://github.com/mlenzen/collections-extended/issues/169 and
|
||||
# https://github.com/mlenzen/collections-extended/issues/172 for why
|
||||
# it's done this way.
|
||||
return sum([mr.stop - mr.start for mr in self._already_written.ranges()]) == self._max_size
|
||||
|
||||
def close(self):
|
||||
precondition(not self.closed)
|
||||
self._timeout.cancel()
|
||||
@ -485,7 +504,7 @@ class FoolscapBucketWriter(Referenceable): # type: ignore # warner/foolscap#78
|
||||
self._bucket_writer = bucket_writer
|
||||
|
||||
def remote_write(self, offset, data):
|
||||
return self._bucket_writer.write(offset, data)
|
||||
self._bucket_writer.write(offset, data)
|
||||
|
||||
def remote_close(self):
|
||||
return self._bucket_writer.close()
|
||||
|
@ -353,6 +353,9 @@ class StorageServer(service.MultiService):
|
||||
max_space_per_bucket, lease_info,
|
||||
clock=self._clock)
|
||||
if self.no_storage:
|
||||
# Really this should be done by having a separate class for
|
||||
# this situation; see
|
||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3862
|
||||
bw.throw_out_all_data = True
|
||||
bucketwriters[shnum] = bw
|
||||
self._bucket_writers[incominghome] = bw
|
||||
|
@ -34,7 +34,7 @@ from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.task import Clock
|
||||
|
||||
from hypothesis import given, strategies
|
||||
from hypothesis import given, strategies, example
|
||||
|
||||
import itertools
|
||||
from allmydata import interfaces
|
||||
@ -230,7 +230,6 @@ class Bucket(unittest.TestCase):
|
||||
br = BucketReader(self, bw.finalhome)
|
||||
self.assertEqual(br.read(0, length), expected_data)
|
||||
|
||||
|
||||
@given(
|
||||
maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
|
||||
maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
|
||||
@ -264,6 +263,38 @@ class Bucket(unittest.TestCase):
|
||||
bw.write(40, b"1" * 10)
|
||||
bw.write(60, b"1" * 40)
|
||||
|
||||
@given(
|
||||
offsets=strategies.lists(
|
||||
strategies.integers(min_value=0, max_value=99),
|
||||
min_size=20,
|
||||
max_size=20
|
||||
),
|
||||
)
|
||||
@example(offsets=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 40, 70])
|
||||
def test_writes_return_when_finished(
|
||||
self, offsets
|
||||
):
|
||||
"""
|
||||
The ``BucketWriter.write()`` return true if and only if the maximum
|
||||
size has been reached via potentially overlapping writes. The
|
||||
remaining ranges can be checked via ``BucketWriter.required_ranges()``.
|
||||
"""
|
||||
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
|
||||
bw = BucketWriter(
|
||||
self, incoming, final, 100, self.make_lease(), Clock()
|
||||
)
|
||||
local_written = [0] * 100
|
||||
for offset in offsets:
|
||||
length = min(30, 100 - offset)
|
||||
data = b"1" * length
|
||||
for i in range(offset, offset+length):
|
||||
local_written[i] = 1
|
||||
finished = bw.write(offset, data)
|
||||
self.assertEqual(finished, sum(local_written) == 100)
|
||||
required_ranges = bw.required_ranges()
|
||||
for i in range(0, 100):
|
||||
self.assertEqual(local_written[i] == 1, required_ranges.get(i) is None)
|
||||
|
||||
def test_read_past_end_of_share_data(self):
|
||||
# test vector for immutable files (hard-coded contents of an immutable share
|
||||
# file):
|
||||
|
@ -15,16 +15,16 @@ if PY2:
|
||||
# fmt: on
|
||||
|
||||
from base64 import b64encode
|
||||
|
||||
from twisted.internet.defer import inlineCallbacks
|
||||
from os import urandom
|
||||
|
||||
from hypothesis import assume, given, strategies as st
|
||||
from fixtures import Fixture, TempDir
|
||||
from treq.testing import StubTreq
|
||||
from klein import Klein
|
||||
from hyperlink import DecodedURL
|
||||
from collections_extended import RangeMap
|
||||
|
||||
from .common import AsyncTestCase, SyncTestCase
|
||||
from .common import SyncTestCase
|
||||
from ..storage.server import StorageServer
|
||||
from ..storage.http_server import (
|
||||
HTTPServer,
|
||||
@ -33,7 +33,13 @@ from ..storage.http_server import (
|
||||
ClientSecretsException,
|
||||
_authorized_route,
|
||||
)
|
||||
from ..storage.http_client import StorageClient, ClientException
|
||||
from ..storage.http_client import (
|
||||
StorageClient,
|
||||
ClientException,
|
||||
StorageClientImmutables,
|
||||
ImmutableCreateResult,
|
||||
UploadProgress,
|
||||
)
|
||||
|
||||
|
||||
def _post_process(params):
|
||||
@ -140,6 +146,7 @@ class ExtractSecretsTests(SyncTestCase):
|
||||
_extract_secrets(["lease-cancel-secret eA=="], {Secrets.LEASE_RENEW})
|
||||
|
||||
|
||||
# TODO should be actual swissnum
|
||||
SWISSNUM_FOR_TEST = b"abcd"
|
||||
|
||||
|
||||
@ -157,7 +164,24 @@ class TestApp(object):
|
||||
return "BAD: {}".format(authorization)
|
||||
|
||||
|
||||
class RoutingTests(AsyncTestCase):
|
||||
def result_of(d):
|
||||
"""
|
||||
Synchronously extract the result of a Deferred.
|
||||
"""
|
||||
result = []
|
||||
error = []
|
||||
d.addCallbacks(result.append, error.append)
|
||||
if result:
|
||||
return result[0]
|
||||
if error:
|
||||
error[0].raiseException()
|
||||
raise RuntimeError(
|
||||
"We expected given Deferred to have result already, but it wasn't. "
|
||||
+ "This is probably a test design issue."
|
||||
)
|
||||
|
||||
|
||||
class RoutingTests(SyncTestCase):
|
||||
"""
|
||||
Tests for the HTTP routing infrastructure.
|
||||
"""
|
||||
@ -175,24 +199,28 @@ class RoutingTests(AsyncTestCase):
|
||||
treq=StubTreq(self._http_server._app.resource()),
|
||||
)
|
||||
|
||||
@inlineCallbacks
|
||||
def test_authorization_enforcement(self):
|
||||
"""
|
||||
The requirement for secrets is enforced; if they are not given, a 400
|
||||
response code is returned.
|
||||
"""
|
||||
# Without secret, get a 400 error.
|
||||
response = yield self.client._request(
|
||||
"GET", "http://127.0.0.1/upload_secret", {}
|
||||
response = result_of(
|
||||
self.client._request(
|
||||
"GET",
|
||||
"http://127.0.0.1/upload_secret",
|
||||
)
|
||||
)
|
||||
self.assertEqual(response.code, 400)
|
||||
|
||||
# With secret, we're good.
|
||||
response = yield self.client._request(
|
||||
"GET", "http://127.0.0.1/upload_secret", {Secrets.UPLOAD: b"MAGIC"}
|
||||
response = result_of(
|
||||
self.client._request(
|
||||
"GET", "http://127.0.0.1/upload_secret", upload_secret=b"MAGIC"
|
||||
)
|
||||
)
|
||||
self.assertEqual(response.code, 200)
|
||||
self.assertEqual((yield response.content()), b"GOOD SECRET")
|
||||
self.assertEqual(result_of(response.content()), b"GOOD SECRET")
|
||||
|
||||
|
||||
class HttpTestFixture(Fixture):
|
||||
@ -204,7 +232,6 @@ class HttpTestFixture(Fixture):
|
||||
def _setUp(self):
|
||||
self.tempdir = self.useFixture(TempDir())
|
||||
self.storage_server = StorageServer(self.tempdir.path, b"\x00" * 20)
|
||||
# TODO what should the swissnum _actually_ be?
|
||||
self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST)
|
||||
self.client = StorageClient(
|
||||
DecodedURL.from_text("http://127.0.0.1"),
|
||||
@ -213,7 +240,7 @@ class HttpTestFixture(Fixture):
|
||||
)
|
||||
|
||||
|
||||
class GenericHTTPAPITests(AsyncTestCase):
|
||||
class GenericHTTPAPITests(SyncTestCase):
|
||||
"""
|
||||
Tests of HTTP client talking to the HTTP server, for generic HTTP API
|
||||
endpoints and concerns.
|
||||
@ -225,7 +252,6 @@ class GenericHTTPAPITests(AsyncTestCase):
|
||||
super(GenericHTTPAPITests, self).setUp()
|
||||
self.http = self.useFixture(HttpTestFixture())
|
||||
|
||||
@inlineCallbacks
|
||||
def test_bad_authentication(self):
|
||||
"""
|
||||
If the wrong swissnum is used, an ``Unauthorized`` response code is
|
||||
@ -237,10 +263,9 @@ class GenericHTTPAPITests(AsyncTestCase):
|
||||
treq=StubTreq(self.http.http_server.get_resource()),
|
||||
)
|
||||
with self.assertRaises(ClientException) as e:
|
||||
yield client.get_version()
|
||||
result_of(client.get_version())
|
||||
self.assertEqual(e.exception.args[0], 401)
|
||||
|
||||
@inlineCallbacks
|
||||
def test_version(self):
|
||||
"""
|
||||
The client can return the version.
|
||||
@ -248,7 +273,7 @@ class GenericHTTPAPITests(AsyncTestCase):
|
||||
We ignore available disk space and max immutable share size, since that
|
||||
might change across calls.
|
||||
"""
|
||||
version = yield self.http.client.get_version()
|
||||
version = result_of(self.http.client.get_version())
|
||||
version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
|
||||
b"available-space"
|
||||
)
|
||||
@ -263,3 +288,171 @@ class GenericHTTPAPITests(AsyncTestCase):
|
||||
b"maximum-immutable-share-size"
|
||||
)
|
||||
self.assertEqual(version, expected_version)
|
||||
|
||||
|
||||
class ImmutableHTTPAPITests(SyncTestCase):
|
||||
"""
|
||||
Tests for immutable upload/download APIs.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
if PY2:
|
||||
self.skipTest("Not going to bother supporting Python 2")
|
||||
super(ImmutableHTTPAPITests, self).setUp()
|
||||
self.http = self.useFixture(HttpTestFixture())
|
||||
|
||||
def test_upload_can_be_downloaded(self):
|
||||
"""
|
||||
A single share can be uploaded in (possibly overlapping) chunks, and
|
||||
then a random chunk can be downloaded, and it will match the original
|
||||
file.
|
||||
|
||||
We don't exercise the full variation of overlapping chunks because
|
||||
that's already done in test_storage.py.
|
||||
"""
|
||||
length = 100
|
||||
expected_data = b"".join(bytes([i]) for i in range(100))
|
||||
|
||||
im_client = StorageClientImmutables(self.http.client)
|
||||
|
||||
# Create a upload:
|
||||
upload_secret = urandom(32)
|
||||
lease_secret = urandom(32)
|
||||
storage_index = b"".join(bytes([i]) for i in range(16))
|
||||
created = result_of(
|
||||
im_client.create(
|
||||
storage_index, {1}, 100, upload_secret, lease_secret, lease_secret
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
created, ImmutableCreateResult(already_have=set(), allocated={1})
|
||||
)
|
||||
|
||||
remaining = RangeMap()
|
||||
remaining.set(True, 0, 100)
|
||||
|
||||
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
|
||||
def write(offset, length):
|
||||
remaining.empty(offset, offset + length)
|
||||
return im_client.write_share_chunk(
|
||||
storage_index,
|
||||
1,
|
||||
upload_secret,
|
||||
offset,
|
||||
expected_data[offset : offset + length],
|
||||
)
|
||||
|
||||
upload_progress = result_of(write(10, 10))
|
||||
self.assertEqual(
|
||||
upload_progress, UploadProgress(finished=False, required=remaining)
|
||||
)
|
||||
upload_progress = result_of(write(30, 10))
|
||||
self.assertEqual(
|
||||
upload_progress, UploadProgress(finished=False, required=remaining)
|
||||
)
|
||||
upload_progress = result_of(write(50, 10))
|
||||
self.assertEqual(
|
||||
upload_progress, UploadProgress(finished=False, required=remaining)
|
||||
)
|
||||
|
||||
# Then, an overlapping write with matching data (15-35):
|
||||
upload_progress = result_of(write(15, 20))
|
||||
self.assertEqual(
|
||||
upload_progress, UploadProgress(finished=False, required=remaining)
|
||||
)
|
||||
|
||||
# Now fill in the holes:
|
||||
upload_progress = result_of(write(0, 10))
|
||||
self.assertEqual(
|
||||
upload_progress, UploadProgress(finished=False, required=remaining)
|
||||
)
|
||||
upload_progress = result_of(write(40, 10))
|
||||
self.assertEqual(
|
||||
upload_progress, UploadProgress(finished=False, required=remaining)
|
||||
)
|
||||
upload_progress = result_of(write(60, 40))
|
||||
self.assertEqual(
|
||||
upload_progress, UploadProgress(finished=True, required=RangeMap())
|
||||
)
|
||||
|
||||
# We can now read:
|
||||
for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]:
|
||||
downloaded = result_of(
|
||||
im_client.read_share_chunk(storage_index, 1, offset, length)
|
||||
)
|
||||
self.assertEqual(downloaded, expected_data[offset : offset + length])
|
||||
|
||||
def test_multiple_shares_uploaded_to_different_place(self):
|
||||
"""
|
||||
If a storage index has multiple shares, uploads to different shares are
|
||||
stored separately and can be downloaded separately.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_bucket_allocated_with_new_shares(self):
|
||||
"""
|
||||
If some shares already exist, allocating shares indicates only the new
|
||||
ones were created.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_bucket_allocation_new_upload_secret(self):
|
||||
"""
|
||||
If a bucket was allocated with one upload secret, and a different upload
|
||||
key is used to allocate the bucket again, the second allocation fails.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_upload_with_wrong_upload_secret_fails(self):
|
||||
"""
|
||||
Uploading with a key that doesn't match the one used to allocate the
|
||||
bucket will fail.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_upload_offset_cannot_be_negative(self):
|
||||
"""
|
||||
A negative upload offset will be rejected.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_mismatching_upload_fails(self):
|
||||
"""
|
||||
If an uploaded chunk conflicts with an already uploaded chunk, a
|
||||
CONFLICT error is returned.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_read_of_wrong_storage_index_fails(self):
|
||||
"""
|
||||
Reading from unknown storage index results in 404.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_read_of_wrong_share_number_fails(self):
|
||||
"""
|
||||
Reading from unknown storage index results in 404.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_read_with_negative_offset_fails(self):
|
||||
"""
|
||||
The offset for reads cannot be negative.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
||||
def test_read_with_negative_length_fails(self):
|
||||
"""
|
||||
The length for reads cannot be negative.
|
||||
|
||||
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
|
||||
"""
|
||||
|
Loading…
x
Reference in New Issue
Block a user