Merge pull request #1183 from tahoe-lafs/3877-immutable-storage-apis-continued

HTTP immutable storage APIs, continued

Fixes ticket:3877
This commit is contained in:
Itamar Turner-Trauring 2022-03-09 13:00:32 -05:00 committed by GitHub
commit 2ac57cd567
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 332 additions and 43 deletions

0
newsfragments/3877.minor Normal file
View File

View File

@ -161,7 +161,7 @@ class StorageClientImmutables(object):
APIs for interacting with immutables.
"""
def __init__(self, client): # type: (StorageClient) -> None
def __init__(self, client: StorageClient):
self._client = client
@inlineCallbacks
@ -208,6 +208,27 @@ class StorageClientImmutables(object):
)
)
@inlineCallbacks
def abort_upload(
self, storage_index: bytes, share_number: int, upload_secret: bytes
) -> Deferred[None]:
"""Abort the upload."""
url = self._client.relative_url(
"/v1/immutable/{}/{}/abort".format(_encode_si(storage_index), share_number)
)
response = yield self._client.request(
"PUT",
url,
upload_secret=upload_secret,
)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)
@inlineCallbacks
def write_share_chunk(
self, storage_index, share_number, upload_secret, offset, data

View File

@ -2,19 +2,7 @@
HTTP server for storage.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
else:
from typing import Dict, List, Set
from typing import Dict, List, Set, Tuple
from functools import wraps
from base64 import b64decode
@ -138,9 +126,68 @@ class StorageIndexUploads(object):
# different upload secrets).
upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes]
def add_upload(self, share_number, upload_secret, bucket):
self.shares[share_number] = bucket
self.upload_secrets[share_number] = upload_secret
@attr.s
class UploadsInProgress(object):
"""
Keep track of uploads for storage indexes.
"""
# Map storage index to corresponding uploads-in-progress
_uploads = attr.ib(type=Dict[bytes, StorageIndexUploads], factory=dict)
# Map BucketWriter to (storage index, share number)
_bucketwriters = attr.ib(type=Dict[BucketWriter, Tuple[bytes, int]], factory=dict)
def add_write_bucket(
self,
storage_index: bytes,
share_number: int,
upload_secret: bytes,
bucket: BucketWriter,
):
"""Add a new ``BucketWriter`` to be tracked."""
si_uploads = self._uploads.setdefault(storage_index, StorageIndexUploads())
si_uploads.shares[share_number] = bucket
si_uploads.upload_secrets[share_number] = upload_secret
self._bucketwriters[bucket] = (storage_index, share_number)
def get_write_bucket(
self, storage_index: bytes, share_number: int, upload_secret: bytes
) -> BucketWriter:
"""Get the given in-progress immutable share upload."""
self.validate_upload_secret(storage_index, share_number, upload_secret)
try:
return self._uploads[storage_index].shares[share_number]
except (KeyError, IndexError):
raise _HTTPError(http.NOT_FOUND)
def remove_write_bucket(self, bucket: BucketWriter):
"""Stop tracking the given ``BucketWriter``."""
storage_index, share_number = self._bucketwriters.pop(bucket)
uploads_index = self._uploads[storage_index]
uploads_index.shares.pop(share_number)
uploads_index.upload_secrets.pop(share_number)
if not uploads_index.shares:
self._uploads.pop(storage_index)
def validate_upload_secret(
self, storage_index: bytes, share_number: int, upload_secret: bytes
):
"""
Raise an unauthorized-HTTP-response exception if the given
storage_index+share_number have a different upload secret than the
given one.
If the given upload doesn't exist at all, nothing happens.
"""
if storage_index in self._uploads:
in_progress = self._uploads[storage_index]
# For pre-existing upload, make sure password matches.
if share_number in in_progress.upload_secrets and not timing_safe_compare(
in_progress.upload_secrets[share_number], upload_secret
):
raise _HTTPError(http.UNAUTHORIZED)
class StorageIndexConverter(BaseConverter):
@ -155,6 +202,15 @@ class StorageIndexConverter(BaseConverter):
raise ValidationError("Invalid storage index")
class _HTTPError(Exception):
"""
Raise from ``HTTPServer`` endpoint to return the given HTTP response code.
"""
def __init__(self, code: int):
self.code = code
class HTTPServer(object):
"""
A HTTP interface to the storage server.
@ -163,13 +219,25 @@ class HTTPServer(object):
_app = Klein()
_app.url_map.converters["storage_index"] = StorageIndexConverter
@_app.handle_errors(_HTTPError)
def _http_error(self, request, failure):
"""Handle ``_HTTPError`` exceptions."""
request.setResponseCode(failure.value.code)
return b""
def __init__(
self, storage_server, swissnum
): # type: (StorageServer, bytes) -> None
self._storage_server = storage_server
self._swissnum = swissnum
# Maps storage index to StorageIndexUploads:
self._uploads = {} # type: Dict[bytes,StorageIndexUploads]
self._uploads = UploadsInProgress()
# When an upload finishes successfully, gets aborted, or times out,
# make sure it gets removed from our tracking datastructure:
self._storage_server.register_bucket_writer_close_handler(
self._uploads.remove_write_bucket
)
def get_resource(self):
"""Return twisted.web ``Resource`` for this object."""
@ -218,9 +286,10 @@ class HTTPServer(object):
sharenums=info["share-numbers"],
allocated_size=info["allocated-size"],
)
uploads = self._uploads.setdefault(storage_index, StorageIndexUploads())
for share_number, bucket in sharenum_to_bucket.items():
uploads.add_upload(share_number, upload_secret, bucket)
self._uploads.add_write_bucket(
storage_index, share_number, upload_secret, bucket
)
return self._cbor(
request,
@ -230,6 +299,37 @@ class HTTPServer(object):
},
)
@_authorized_route(
_app,
{Secrets.UPLOAD},
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/abort",
methods=["PUT"],
)
def abort_share_upload(self, request, authorization, storage_index, share_number):
"""Abort an in-progress immutable share upload."""
try:
bucket = self._uploads.get_write_bucket(
storage_index, share_number, authorization[Secrets.UPLOAD]
)
except _HTTPError as e:
if e.code == http.NOT_FOUND:
# It may be we've already uploaded this, in which case error
# should be method not allowed (405).
try:
self._storage_server.get_buckets(storage_index)[share_number]
except KeyError:
pass
else:
# Already uploaded, so we can't abort.
raise _HTTPError(http.NOT_ALLOWED)
raise
# Abort the upload; this should close it which will eventually result
# in self._uploads.remove_write_bucket() being called.
bucket.abort()
return b""
@_authorized_route(
_app,
{Secrets.UPLOAD},
@ -248,11 +348,9 @@ class HTTPServer(object):
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = request.content.read(content_range.stop - content_range.start + 1)
try:
bucket = self._uploads[storage_index].shares[share_number]
except (KeyError, IndexError):
request.setResponseCode(http.NOT_FOUND)
return b""
bucket = self._uploads.get_write_bucket(
storage_index, share_number, authorization[Secrets.UPLOAD]
)
try:
finished = bucket.write(offset, data)

View File

@ -1059,7 +1059,8 @@ class _HTTPBucketWriter(object):
finished = attr.ib(type=bool, default=False)
def abort(self):
pass # TODO in later ticket
return self.client.abort_upload(self.storage_index, self.share_number,
self.upload_secret)
@defer.inlineCallbacks
def write(self, offset, data):

View File

@ -176,8 +176,9 @@ class IStorageServerImmutableAPIsTestsMixin(object):
canary=Referenceable(),
)
# Bucket 1 is fully written in one go.
yield allocated[0].callRemote("write", 0, b"1" * 1024)
# Bucket 1 get some data written (but not all, or HTTP implicitly
# finishes the upload)
yield allocated[0].callRemote("write", 0, b"1" * 1023)
# Disconnect or abort, depending on the test:
yield abort_or_disconnect(allocated[0])
@ -1156,7 +1157,6 @@ class HTTPImmutableAPIsTests(
# These will start passing in future PRs as HTTP protocol is implemented.
SKIP_TESTS = {
"test_abort",
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",

View File

@ -15,6 +15,7 @@ if PY2:
# fmt: on
from base64 import b64encode
from contextlib import contextmanager
from os import urandom
from hypothesis import assume, given, strategies as st
@ -316,6 +317,20 @@ class StorageClientWithHeadersOverride(object):
return self.storage_client.request(*args, headers=headers, **kwargs)
@contextmanager
def assert_fails_with_http_code(test_case: SyncTestCase, code: int):
"""
Context manager that asserts the code fails with the given HTTP response
code.
"""
with test_case.assertRaises(ClientException) as e:
try:
yield
finally:
pass
test_case.assertEqual(e.exception.code, code)
class GenericHTTPAPITests(SyncTestCase):
"""
Tests of HTTP client talking to the HTTP server, for generic HTTP API
@ -340,9 +355,8 @@ class GenericHTTPAPITests(SyncTestCase):
treq=StubTreq(self.http.http_server.get_resource()),
)
)
with self.assertRaises(ClientException) as e:
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(client.get_version())
self.assertEqual(e.exception.args[0], 401)
def test_version(self):
"""
@ -474,6 +488,23 @@ class ImmutableHTTPAPITests(SyncTestCase):
)
self.assertEqual(downloaded, expected_data[offset : offset + length])
def test_write_with_wrong_upload_key(self):
"""
A write with an upload key that is different than the original upload
key will fail.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret + b"X",
0,
b"123",
)
)
def test_allocate_buckets_second_time_different_shares(self):
"""
If allocate buckets endpoint is called second time with different
@ -583,7 +614,9 @@ class ImmutableHTTPAPITests(SyncTestCase):
self.http.client, {"content-range": bad_content_range_value}
)
)
with self.assertRaises(ClientException) as e:
with assert_fails_with_http_code(
self, http.REQUESTED_RANGE_NOT_SATISFIABLE
):
result_of(
client.write_share_chunk(
storage_index,
@ -593,7 +626,6 @@ class ImmutableHTTPAPITests(SyncTestCase):
b"0123456789",
)
)
self.assertEqual(e.exception.code, http.REQUESTED_RANGE_NOT_SATISFIABLE)
check_invalid("not a valid content-range header at all")
check_invalid("bytes -1-9/10")
@ -615,7 +647,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
(upload_secret, _, storage_index, _) = self.create_upload({1}, 10)
def unknown_check(storage_index, share_number):
with self.assertRaises(ClientException) as e:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.write_share_chunk(
storage_index,
@ -625,7 +657,6 @@ class ImmutableHTTPAPITests(SyncTestCase):
b"0123456789",
)
)
self.assertEqual(e.exception.code, http.NOT_FOUND)
# Wrong share number:
unknown_check(storage_index, 7)
@ -684,7 +715,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
)
# Conflicting write:
with self.assertRaises(ClientException) as e:
with assert_fails_with_http_code(self, http.CONFLICT):
result_of(
self.imm_client.write_share_chunk(
storage_index,
@ -694,7 +725,6 @@ class ImmutableHTTPAPITests(SyncTestCase):
b"0123456789",
)
)
self.assertEqual(e.exception.code, http.NOT_FOUND)
def upload(self, share_number, data_length=26):
"""
@ -721,7 +751,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
"""
Reading from unknown storage index results in 404.
"""
with self.assertRaises(ClientException) as e:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.read_share_chunk(
b"1" * 16,
@ -730,14 +760,13 @@ class ImmutableHTTPAPITests(SyncTestCase):
10,
)
)
self.assertEqual(e.exception.code, http.NOT_FOUND)
def test_read_of_wrong_share_number_fails(self):
"""
Reading from unknown storage index results in 404.
"""
storage_index, _ = self.upload(1)
with self.assertRaises(ClientException) as e:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.read_share_chunk(
storage_index,
@ -746,7 +775,6 @@ class ImmutableHTTPAPITests(SyncTestCase):
10,
)
)
self.assertEqual(e.exception.code, http.NOT_FOUND)
def test_read_with_negative_offset_fails(self):
"""
@ -762,7 +790,9 @@ class ImmutableHTTPAPITests(SyncTestCase):
)
)
with self.assertRaises(ClientException) as e:
with assert_fails_with_http_code(
self, http.REQUESTED_RANGE_NOT_SATISFIABLE
):
result_of(
client.read_share_chunk(
storage_index,
@ -771,7 +801,6 @@ class ImmutableHTTPAPITests(SyncTestCase):
10,
)
)
self.assertEqual(e.exception.code, http.REQUESTED_RANGE_NOT_SATISFIABLE)
# Bad unit
check_bad_range("molluscs=0-9")
@ -831,3 +860,143 @@ class ImmutableHTTPAPITests(SyncTestCase):
check_range("bytes=0-10", "bytes 0-10/*")
# Can't go beyond the end of the immutable!
check_range("bytes=10-100", "bytes 10-25/*")
def test_timed_out_upload_allows_reupload(self):
"""
If an in-progress upload times out, it is cancelled altogether,
allowing a new upload to occur.
"""
self._test_abort_or_timed_out_upload_to_existing_storage_index(
lambda **kwargs: self.http.clock.advance(30 * 60 + 1)
)
def test_abort_upload_allows_reupload(self):
"""
If an in-progress upload is aborted, it is cancelled altogether,
allowing a new upload to occur.
"""
def abort(storage_index, share_number, upload_secret):
return result_of(
self.imm_client.abort_upload(storage_index, share_number, upload_secret)
)
self._test_abort_or_timed_out_upload_to_existing_storage_index(abort)
def _test_abort_or_timed_out_upload_to_existing_storage_index(self, cancel_upload):
"""Start uploading to an existing storage index that then times out or aborts.
Re-uploading should work.
"""
# Start an upload:
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"123",
)
)
# Now, the upload is cancelled somehow:
cancel_upload(
storage_index=storage_index, upload_secret=upload_secret, share_number=1
)
# Now we can create a new share with the same storage index without
# complaint:
upload_secret = urandom(32)
lease_secret = urandom(32)
created = result_of(
self.imm_client.create(
storage_index,
{1},
100,
upload_secret,
lease_secret,
lease_secret,
)
)
self.assertEqual(created.allocated, {1})
# And write to it, too:
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"ABC",
)
)
def test_unknown_aborts(self):
"""
Aborting uploads with an unknown storage index or share number will
result 404 HTTP response code.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
for si, num in [(storage_index, 3), (b"x" * 16, 1)]:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(self.imm_client.abort_upload(si, num, upload_secret))
def test_unauthorized_abort(self):
"""
An abort with the wrong key will return an unauthorized error, and will
not abort the upload.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
# Failed to abort becaues wrong upload secret:
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(
self.imm_client.abort_upload(storage_index, 1, upload_secret + b"X")
)
# We can still write to it:
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"ABC",
)
)
def test_too_late_abort(self):
"""
An abort of an already-fully-uploaded immutable will result in 405
error and will not affect the immutable.
"""
uploaded_data = b"123"
(upload_secret, _, storage_index, _) = self.create_upload({0}, 3)
result_of(
self.imm_client.write_share_chunk(
storage_index,
0,
upload_secret,
0,
uploaded_data,
)
)
# Can't abort, we finished upload:
with assert_fails_with_http_code(self, http.NOT_ALLOWED):
result_of(self.imm_client.abort_upload(storage_index, 0, upload_secret))
# Abort didn't prevent reading:
self.assertEqual(
uploaded_data,
result_of(
self.imm_client.read_share_chunk(
storage_index,
0,
0,
3,
)
),
)