diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index f885baa22..50aa6ae9e 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -128,10 +128,15 @@ class StorageIndexUploads(object): """ # Map share number to BucketWriter - shares = attr.ib() # type: Dict[int,BucketWriter] + shares = attr.ib(factory=dict) # type: Dict[int,BucketWriter] - # The upload key. - upload_secret = attr.ib() # type: bytes + # Mape share number to the upload secret (different shares might have + # different upload secrets). + upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes] + + def add_upload(self, share_number, upload_secret, bucket): + self.shares[share_number] = bucket + self.upload_secrets[share_number] = upload_secret class HTTPServer(object): @@ -179,39 +184,40 @@ class HTTPServer(object): def allocate_buckets(self, request, authorization, storage_index): """Allocate buckets.""" storage_index = si_a2b(storage_index.encode("ascii")) - info = loads(request.content.read()) upload_secret = authorization[Secrets.UPLOAD] + info = loads(request.content.read()) if storage_index in self._uploads: - # Pre-existing upload. - in_progress = self._uploads[storage_index] - if timing_safe_compare(in_progress.upload_secret, upload_secret): - # Same session. - # TODO add BucketWriters only for new shares that don't already have buckets; see the HTTP spec for details. - # The backend code may already implement this logic. - pass - else: - # TODO Fail, since the secret doesnt match. - pass - else: - # New upload. - already_got, sharenum_to_bucket = self._storage_server.allocate_buckets( - storage_index, - renew_secret=authorization[Secrets.LEASE_RENEW], - cancel_secret=authorization[Secrets.LEASE_CANCEL], - sharenums=info["share-numbers"], - allocated_size=info["allocated-size"], - ) - self._uploads[storage_index] = StorageIndexUploads( - shares=sharenum_to_bucket, upload_secret=authorization[Secrets.UPLOAD] - ) - return self._cbor( - request, - { - "already-have": set(already_got), - "allocated": set(sharenum_to_bucket), - }, - ) + for share_number in info["share-numbers"]: + in_progress = self._uploads[storage_index] + # For pre-existing upload, make sure password matches. + if ( + share_number in in_progress.upload_secrets + and not timing_safe_compare( + in_progress.upload_secrets[share_number], upload_secret + ) + ): + request.setResponseCode(http.UNAUTHORIZED) + return b"" + + already_got, sharenum_to_bucket = self._storage_server.allocate_buckets( + storage_index, + renew_secret=authorization[Secrets.LEASE_RENEW], + cancel_secret=authorization[Secrets.LEASE_CANCEL], + sharenums=info["share-numbers"], + allocated_size=info["allocated-size"], + ) + uploads = self._uploads.setdefault(storage_index, StorageIndexUploads()) + for share_number, bucket in sharenum_to_bucket.items(): + uploads.add_upload(share_number, upload_secret, bucket) + + return self._cbor( + request, + { + "already-have": set(already_got), + "allocated": set(sharenum_to_bucket), + }, + ) @_authorized_route( _app, diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 982e22859..39f07a54a 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -24,6 +24,7 @@ from klein import Klein from hyperlink import DecodedURL from collections_extended import RangeMap from twisted.internet.task import Clock +from twisted.web import http from .common import SyncTestCase from ..storage.server import StorageServer @@ -386,6 +387,55 @@ class ImmutableHTTPAPITests(SyncTestCase): ) self.assertEqual(downloaded, expected_data[offset : offset + length]) + def test_allocate_buckets_second_time_wrong_upload_key(self): + """ + If allocate buckets endpoint is called second time with wrong upload + key on the same shares, the result is an error. + """ + im_client = StorageClientImmutables(self.http.client) + + # Create a upload: + upload_secret = urandom(32) + lease_secret = urandom(32) + storage_index = b"".join(bytes([i]) for i in range(16)) + result_of( + im_client.create( + storage_index, {1, 2, 3}, 100, upload_secret, lease_secret, lease_secret + ) + ) + with self.assertRaises(ClientException) as e: + result_of( + im_client.create( + storage_index, {2, 3}, 100, b"x" * 32, lease_secret, lease_secret + ) + ) + self.assertEqual(e.exception.args[0], http.UNAUTHORIZED) + + def test_allocate_buckets_second_time_different_shares(self): + """ + If allocate buckets endpoint is called second time with different + upload key on different shares, that creates the buckets. + """ + im_client = StorageClientImmutables(self.http.client) + + # Create a upload: + upload_secret = urandom(32) + lease_secret = urandom(32) + storage_index = b"".join(bytes([i]) for i in range(16)) + result_of( + im_client.create( + storage_index, {1, 2, 3}, 100, upload_secret, lease_secret, lease_secret + ) + ) + + # Add same shares: + created2 = result_of( + im_client.create( + storage_index, {4, 6}, 100, b"x" * 2, lease_secret, lease_secret + ) + ) + self.assertEqual(created2.allocated, {4, 6}) + def test_list_shares(self): """ Once a share is finished uploading, it's possible to list it.