diff --git a/newsfragments/3877.minor b/newsfragments/3877.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index 475aa2330..d83ecbdff 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -161,7 +161,7 @@ class StorageClientImmutables(object): APIs for interacting with immutables. """ - def __init__(self, client): # type: (StorageClient) -> None + def __init__(self, client: StorageClient): self._client = client @inlineCallbacks @@ -208,6 +208,27 @@ class StorageClientImmutables(object): ) ) + @inlineCallbacks + def abort_upload( + self, storage_index: bytes, share_number: int, upload_secret: bytes + ) -> Deferred[None]: + """Abort the upload.""" + url = self._client.relative_url( + "/v1/immutable/{}/{}/abort".format(_encode_si(storage_index), share_number) + ) + response = yield self._client.request( + "PUT", + url, + upload_secret=upload_secret, + ) + + if response.code == http.OK: + return + else: + raise ClientException( + response.code, + ) + @inlineCallbacks def write_share_chunk( self, storage_index, share_number, upload_secret, offset, data diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 9b158ecfd..6a43dec8b 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -2,19 +2,7 @@ HTTP server for storage. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -from future.utils import PY2 - -if PY2: - # fmt: off - from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 - # fmt: on -else: - from typing import Dict, List, Set +from typing import Dict, List, Set, Tuple from functools import wraps from base64 import b64decode @@ -138,9 +126,68 @@ class StorageIndexUploads(object): # different upload secrets). upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes] - def add_upload(self, share_number, upload_secret, bucket): - self.shares[share_number] = bucket - self.upload_secrets[share_number] = upload_secret + +@attr.s +class UploadsInProgress(object): + """ + Keep track of uploads for storage indexes. + """ + + # Map storage index to corresponding uploads-in-progress + _uploads = attr.ib(type=Dict[bytes, StorageIndexUploads], factory=dict) + + # Map BucketWriter to (storage index, share number) + _bucketwriters = attr.ib(type=Dict[BucketWriter, Tuple[bytes, int]], factory=dict) + + def add_write_bucket( + self, + storage_index: bytes, + share_number: int, + upload_secret: bytes, + bucket: BucketWriter, + ): + """Add a new ``BucketWriter`` to be tracked.""" + si_uploads = self._uploads.setdefault(storage_index, StorageIndexUploads()) + si_uploads.shares[share_number] = bucket + si_uploads.upload_secrets[share_number] = upload_secret + self._bucketwriters[bucket] = (storage_index, share_number) + + def get_write_bucket( + self, storage_index: bytes, share_number: int, upload_secret: bytes + ) -> BucketWriter: + """Get the given in-progress immutable share upload.""" + self.validate_upload_secret(storage_index, share_number, upload_secret) + try: + return self._uploads[storage_index].shares[share_number] + except (KeyError, IndexError): + raise _HTTPError(http.NOT_FOUND) + + def remove_write_bucket(self, bucket: BucketWriter): + """Stop tracking the given ``BucketWriter``.""" + storage_index, share_number = self._bucketwriters.pop(bucket) + uploads_index = self._uploads[storage_index] + uploads_index.shares.pop(share_number) + uploads_index.upload_secrets.pop(share_number) + if not uploads_index.shares: + self._uploads.pop(storage_index) + + def validate_upload_secret( + self, storage_index: bytes, share_number: int, upload_secret: bytes + ): + """ + Raise an unauthorized-HTTP-response exception if the given + storage_index+share_number have a different upload secret than the + given one. + + If the given upload doesn't exist at all, nothing happens. + """ + if storage_index in self._uploads: + in_progress = self._uploads[storage_index] + # For pre-existing upload, make sure password matches. + if share_number in in_progress.upload_secrets and not timing_safe_compare( + in_progress.upload_secrets[share_number], upload_secret + ): + raise _HTTPError(http.UNAUTHORIZED) class StorageIndexConverter(BaseConverter): @@ -155,6 +202,15 @@ class StorageIndexConverter(BaseConverter): raise ValidationError("Invalid storage index") +class _HTTPError(Exception): + """ + Raise from ``HTTPServer`` endpoint to return the given HTTP response code. + """ + + def __init__(self, code: int): + self.code = code + + class HTTPServer(object): """ A HTTP interface to the storage server. @@ -163,13 +219,25 @@ class HTTPServer(object): _app = Klein() _app.url_map.converters["storage_index"] = StorageIndexConverter + @_app.handle_errors(_HTTPError) + def _http_error(self, request, failure): + """Handle ``_HTTPError`` exceptions.""" + request.setResponseCode(failure.value.code) + return b"" + def __init__( self, storage_server, swissnum ): # type: (StorageServer, bytes) -> None self._storage_server = storage_server self._swissnum = swissnum # Maps storage index to StorageIndexUploads: - self._uploads = {} # type: Dict[bytes,StorageIndexUploads] + self._uploads = UploadsInProgress() + + # When an upload finishes successfully, gets aborted, or times out, + # make sure it gets removed from our tracking datastructure: + self._storage_server.register_bucket_writer_close_handler( + self._uploads.remove_write_bucket + ) def get_resource(self): """Return twisted.web ``Resource`` for this object.""" @@ -218,9 +286,10 @@ class HTTPServer(object): sharenums=info["share-numbers"], allocated_size=info["allocated-size"], ) - uploads = self._uploads.setdefault(storage_index, StorageIndexUploads()) for share_number, bucket in sharenum_to_bucket.items(): - uploads.add_upload(share_number, upload_secret, bucket) + self._uploads.add_write_bucket( + storage_index, share_number, upload_secret, bucket + ) return self._cbor( request, @@ -230,6 +299,37 @@ class HTTPServer(object): }, ) + @_authorized_route( + _app, + {Secrets.UPLOAD}, + "/v1/immutable///abort", + methods=["PUT"], + ) + def abort_share_upload(self, request, authorization, storage_index, share_number): + """Abort an in-progress immutable share upload.""" + try: + bucket = self._uploads.get_write_bucket( + storage_index, share_number, authorization[Secrets.UPLOAD] + ) + except _HTTPError as e: + if e.code == http.NOT_FOUND: + # It may be we've already uploaded this, in which case error + # should be method not allowed (405). + try: + self._storage_server.get_buckets(storage_index)[share_number] + except KeyError: + pass + else: + # Already uploaded, so we can't abort. + raise _HTTPError(http.NOT_ALLOWED) + raise + + # Abort the upload; this should close it which will eventually result + # in self._uploads.remove_write_bucket() being called. + bucket.abort() + + return b"" + @_authorized_route( _app, {Secrets.UPLOAD}, @@ -248,11 +348,9 @@ class HTTPServer(object): # TODO limit memory usage # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 data = request.content.read(content_range.stop - content_range.start + 1) - try: - bucket = self._uploads[storage_index].shares[share_number] - except (KeyError, IndexError): - request.setResponseCode(http.NOT_FOUND) - return b"" + bucket = self._uploads.get_write_bucket( + storage_index, share_number, authorization[Secrets.UPLOAD] + ) try: finished = bucket.write(offset, data) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 2c7e13890..0d3159b55 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -1059,7 +1059,8 @@ class _HTTPBucketWriter(object): finished = attr.ib(type=bool, default=False) def abort(self): - pass # TODO in later ticket + return self.client.abort_upload(self.storage_index, self.share_number, + self.upload_secret) @defer.inlineCallbacks def write(self, offset, data): diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 95261ddb2..668eeecc5 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -176,8 +176,9 @@ class IStorageServerImmutableAPIsTestsMixin(object): canary=Referenceable(), ) - # Bucket 1 is fully written in one go. - yield allocated[0].callRemote("write", 0, b"1" * 1024) + # Bucket 1 get some data written (but not all, or HTTP implicitly + # finishes the upload) + yield allocated[0].callRemote("write", 0, b"1" * 1023) # Disconnect or abort, depending on the test: yield abort_or_disconnect(allocated[0]) @@ -1156,7 +1157,6 @@ class HTTPImmutableAPIsTests( # These will start passing in future PRs as HTTP protocol is implemented. SKIP_TESTS = { - "test_abort", "test_add_lease_renewal", "test_add_new_lease", "test_advise_corrupt_share", diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index ae003c65f..e062864e2 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -15,6 +15,7 @@ if PY2: # fmt: on from base64 import b64encode +from contextlib import contextmanager from os import urandom from hypothesis import assume, given, strategies as st @@ -316,6 +317,20 @@ class StorageClientWithHeadersOverride(object): return self.storage_client.request(*args, headers=headers, **kwargs) +@contextmanager +def assert_fails_with_http_code(test_case: SyncTestCase, code: int): + """ + Context manager that asserts the code fails with the given HTTP response + code. + """ + with test_case.assertRaises(ClientException) as e: + try: + yield + finally: + pass + test_case.assertEqual(e.exception.code, code) + + class GenericHTTPAPITests(SyncTestCase): """ Tests of HTTP client talking to the HTTP server, for generic HTTP API @@ -340,9 +355,8 @@ class GenericHTTPAPITests(SyncTestCase): treq=StubTreq(self.http.http_server.get_resource()), ) ) - with self.assertRaises(ClientException) as e: + with assert_fails_with_http_code(self, http.UNAUTHORIZED): result_of(client.get_version()) - self.assertEqual(e.exception.args[0], 401) def test_version(self): """ @@ -474,6 +488,23 @@ class ImmutableHTTPAPITests(SyncTestCase): ) self.assertEqual(downloaded, expected_data[offset : offset + length]) + def test_write_with_wrong_upload_key(self): + """ + A write with an upload key that is different than the original upload + key will fail. + """ + (upload_secret, _, storage_index, _) = self.create_upload({1}, 100) + with assert_fails_with_http_code(self, http.UNAUTHORIZED): + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret + b"X", + 0, + b"123", + ) + ) + def test_allocate_buckets_second_time_different_shares(self): """ If allocate buckets endpoint is called second time with different @@ -583,7 +614,9 @@ class ImmutableHTTPAPITests(SyncTestCase): self.http.client, {"content-range": bad_content_range_value} ) ) - with self.assertRaises(ClientException) as e: + with assert_fails_with_http_code( + self, http.REQUESTED_RANGE_NOT_SATISFIABLE + ): result_of( client.write_share_chunk( storage_index, @@ -593,7 +626,6 @@ class ImmutableHTTPAPITests(SyncTestCase): b"0123456789", ) ) - self.assertEqual(e.exception.code, http.REQUESTED_RANGE_NOT_SATISFIABLE) check_invalid("not a valid content-range header at all") check_invalid("bytes -1-9/10") @@ -615,7 +647,7 @@ class ImmutableHTTPAPITests(SyncTestCase): (upload_secret, _, storage_index, _) = self.create_upload({1}, 10) def unknown_check(storage_index, share_number): - with self.assertRaises(ClientException) as e: + with assert_fails_with_http_code(self, http.NOT_FOUND): result_of( self.imm_client.write_share_chunk( storage_index, @@ -625,7 +657,6 @@ class ImmutableHTTPAPITests(SyncTestCase): b"0123456789", ) ) - self.assertEqual(e.exception.code, http.NOT_FOUND) # Wrong share number: unknown_check(storage_index, 7) @@ -684,7 +715,7 @@ class ImmutableHTTPAPITests(SyncTestCase): ) # Conflicting write: - with self.assertRaises(ClientException) as e: + with assert_fails_with_http_code(self, http.CONFLICT): result_of( self.imm_client.write_share_chunk( storage_index, @@ -694,7 +725,6 @@ class ImmutableHTTPAPITests(SyncTestCase): b"0123456789", ) ) - self.assertEqual(e.exception.code, http.NOT_FOUND) def upload(self, share_number, data_length=26): """ @@ -721,7 +751,7 @@ class ImmutableHTTPAPITests(SyncTestCase): """ Reading from unknown storage index results in 404. """ - with self.assertRaises(ClientException) as e: + with assert_fails_with_http_code(self, http.NOT_FOUND): result_of( self.imm_client.read_share_chunk( b"1" * 16, @@ -730,14 +760,13 @@ class ImmutableHTTPAPITests(SyncTestCase): 10, ) ) - self.assertEqual(e.exception.code, http.NOT_FOUND) def test_read_of_wrong_share_number_fails(self): """ Reading from unknown storage index results in 404. """ storage_index, _ = self.upload(1) - with self.assertRaises(ClientException) as e: + with assert_fails_with_http_code(self, http.NOT_FOUND): result_of( self.imm_client.read_share_chunk( storage_index, @@ -746,7 +775,6 @@ class ImmutableHTTPAPITests(SyncTestCase): 10, ) ) - self.assertEqual(e.exception.code, http.NOT_FOUND) def test_read_with_negative_offset_fails(self): """ @@ -762,7 +790,9 @@ class ImmutableHTTPAPITests(SyncTestCase): ) ) - with self.assertRaises(ClientException) as e: + with assert_fails_with_http_code( + self, http.REQUESTED_RANGE_NOT_SATISFIABLE + ): result_of( client.read_share_chunk( storage_index, @@ -771,7 +801,6 @@ class ImmutableHTTPAPITests(SyncTestCase): 10, ) ) - self.assertEqual(e.exception.code, http.REQUESTED_RANGE_NOT_SATISFIABLE) # Bad unit check_bad_range("molluscs=0-9") @@ -831,3 +860,143 @@ class ImmutableHTTPAPITests(SyncTestCase): check_range("bytes=0-10", "bytes 0-10/*") # Can't go beyond the end of the immutable! check_range("bytes=10-100", "bytes 10-25/*") + + def test_timed_out_upload_allows_reupload(self): + """ + If an in-progress upload times out, it is cancelled altogether, + allowing a new upload to occur. + """ + self._test_abort_or_timed_out_upload_to_existing_storage_index( + lambda **kwargs: self.http.clock.advance(30 * 60 + 1) + ) + + def test_abort_upload_allows_reupload(self): + """ + If an in-progress upload is aborted, it is cancelled altogether, + allowing a new upload to occur. + """ + + def abort(storage_index, share_number, upload_secret): + return result_of( + self.imm_client.abort_upload(storage_index, share_number, upload_secret) + ) + + self._test_abort_or_timed_out_upload_to_existing_storage_index(abort) + + def _test_abort_or_timed_out_upload_to_existing_storage_index(self, cancel_upload): + """Start uploading to an existing storage index that then times out or aborts. + + Re-uploading should work. + """ + # Start an upload: + (upload_secret, _, storage_index, _) = self.create_upload({1}, 100) + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"123", + ) + ) + + # Now, the upload is cancelled somehow: + cancel_upload( + storage_index=storage_index, upload_secret=upload_secret, share_number=1 + ) + + # Now we can create a new share with the same storage index without + # complaint: + upload_secret = urandom(32) + lease_secret = urandom(32) + created = result_of( + self.imm_client.create( + storage_index, + {1}, + 100, + upload_secret, + lease_secret, + lease_secret, + ) + ) + self.assertEqual(created.allocated, {1}) + + # And write to it, too: + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"ABC", + ) + ) + + def test_unknown_aborts(self): + """ + Aborting uploads with an unknown storage index or share number will + result 404 HTTP response code. + """ + (upload_secret, _, storage_index, _) = self.create_upload({1}, 100) + + for si, num in [(storage_index, 3), (b"x" * 16, 1)]: + with assert_fails_with_http_code(self, http.NOT_FOUND): + result_of(self.imm_client.abort_upload(si, num, upload_secret)) + + def test_unauthorized_abort(self): + """ + An abort with the wrong key will return an unauthorized error, and will + not abort the upload. + """ + (upload_secret, _, storage_index, _) = self.create_upload({1}, 100) + + # Failed to abort becaues wrong upload secret: + with assert_fails_with_http_code(self, http.UNAUTHORIZED): + result_of( + self.imm_client.abort_upload(storage_index, 1, upload_secret + b"X") + ) + + # We can still write to it: + result_of( + self.imm_client.write_share_chunk( + storage_index, + 1, + upload_secret, + 0, + b"ABC", + ) + ) + + def test_too_late_abort(self): + """ + An abort of an already-fully-uploaded immutable will result in 405 + error and will not affect the immutable. + """ + uploaded_data = b"123" + (upload_secret, _, storage_index, _) = self.create_upload({0}, 3) + result_of( + self.imm_client.write_share_chunk( + storage_index, + 0, + upload_secret, + 0, + uploaded_data, + ) + ) + + # Can't abort, we finished upload: + with assert_fails_with_http_code(self, http.NOT_ALLOWED): + result_of(self.imm_client.abort_upload(storage_index, 0, upload_secret)) + + # Abort didn't prevent reading: + self.assertEqual( + uploaded_data, + result_of( + self.imm_client.read_share_chunk( + storage_index, + 0, + 0, + 3, + ) + ), + )