mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-21 03:55:27 +00:00
More bucket allocation logic.
This commit is contained in:
parent
f0c00fcbe4
commit
bceed6e199
@ -128,10 +128,15 @@ class StorageIndexUploads(object):
|
||||
"""
|
||||
|
||||
# Map share number to BucketWriter
|
||||
shares = attr.ib() # type: Dict[int,BucketWriter]
|
||||
shares = attr.ib(factory=dict) # type: Dict[int,BucketWriter]
|
||||
|
||||
# The upload key.
|
||||
upload_secret = attr.ib() # type: bytes
|
||||
# Mape share number to the upload secret (different shares might have
|
||||
# different upload secrets).
|
||||
upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes]
|
||||
|
||||
def add_upload(self, share_number, upload_secret, bucket):
|
||||
self.shares[share_number] = bucket
|
||||
self.upload_secrets[share_number] = upload_secret
|
||||
|
||||
|
||||
class HTTPServer(object):
|
||||
@ -179,39 +184,40 @@ class HTTPServer(object):
|
||||
def allocate_buckets(self, request, authorization, storage_index):
|
||||
"""Allocate buckets."""
|
||||
storage_index = si_a2b(storage_index.encode("ascii"))
|
||||
info = loads(request.content.read())
|
||||
upload_secret = authorization[Secrets.UPLOAD]
|
||||
info = loads(request.content.read())
|
||||
|
||||
if storage_index in self._uploads:
|
||||
# Pre-existing upload.
|
||||
in_progress = self._uploads[storage_index]
|
||||
if timing_safe_compare(in_progress.upload_secret, upload_secret):
|
||||
# Same session.
|
||||
# TODO add BucketWriters only for new shares that don't already have buckets; see the HTTP spec for details.
|
||||
# The backend code may already implement this logic.
|
||||
pass
|
||||
else:
|
||||
# TODO Fail, since the secret doesnt match.
|
||||
pass
|
||||
else:
|
||||
# New upload.
|
||||
already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
|
||||
storage_index,
|
||||
renew_secret=authorization[Secrets.LEASE_RENEW],
|
||||
cancel_secret=authorization[Secrets.LEASE_CANCEL],
|
||||
sharenums=info["share-numbers"],
|
||||
allocated_size=info["allocated-size"],
|
||||
)
|
||||
self._uploads[storage_index] = StorageIndexUploads(
|
||||
shares=sharenum_to_bucket, upload_secret=authorization[Secrets.UPLOAD]
|
||||
)
|
||||
return self._cbor(
|
||||
request,
|
||||
{
|
||||
"already-have": set(already_got),
|
||||
"allocated": set(sharenum_to_bucket),
|
||||
},
|
||||
)
|
||||
for share_number in info["share-numbers"]:
|
||||
in_progress = self._uploads[storage_index]
|
||||
# For pre-existing upload, make sure password matches.
|
||||
if (
|
||||
share_number in in_progress.upload_secrets
|
||||
and not timing_safe_compare(
|
||||
in_progress.upload_secrets[share_number], upload_secret
|
||||
)
|
||||
):
|
||||
request.setResponseCode(http.UNAUTHORIZED)
|
||||
return b""
|
||||
|
||||
already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
|
||||
storage_index,
|
||||
renew_secret=authorization[Secrets.LEASE_RENEW],
|
||||
cancel_secret=authorization[Secrets.LEASE_CANCEL],
|
||||
sharenums=info["share-numbers"],
|
||||
allocated_size=info["allocated-size"],
|
||||
)
|
||||
uploads = self._uploads.setdefault(storage_index, StorageIndexUploads())
|
||||
for share_number, bucket in sharenum_to_bucket.items():
|
||||
uploads.add_upload(share_number, upload_secret, bucket)
|
||||
|
||||
return self._cbor(
|
||||
request,
|
||||
{
|
||||
"already-have": set(already_got),
|
||||
"allocated": set(sharenum_to_bucket),
|
||||
},
|
||||
)
|
||||
|
||||
@_authorized_route(
|
||||
_app,
|
||||
|
@ -24,6 +24,7 @@ from klein import Klein
|
||||
from hyperlink import DecodedURL
|
||||
from collections_extended import RangeMap
|
||||
from twisted.internet.task import Clock
|
||||
from twisted.web import http
|
||||
|
||||
from .common import SyncTestCase
|
||||
from ..storage.server import StorageServer
|
||||
@ -386,6 +387,55 @@ class ImmutableHTTPAPITests(SyncTestCase):
|
||||
)
|
||||
self.assertEqual(downloaded, expected_data[offset : offset + length])
|
||||
|
||||
def test_allocate_buckets_second_time_wrong_upload_key(self):
|
||||
"""
|
||||
If allocate buckets endpoint is called second time with wrong upload
|
||||
key on the same shares, the result is an error.
|
||||
"""
|
||||
im_client = StorageClientImmutables(self.http.client)
|
||||
|
||||
# Create a upload:
|
||||
upload_secret = urandom(32)
|
||||
lease_secret = urandom(32)
|
||||
storage_index = b"".join(bytes([i]) for i in range(16))
|
||||
result_of(
|
||||
im_client.create(
|
||||
storage_index, {1, 2, 3}, 100, upload_secret, lease_secret, lease_secret
|
||||
)
|
||||
)
|
||||
with self.assertRaises(ClientException) as e:
|
||||
result_of(
|
||||
im_client.create(
|
||||
storage_index, {2, 3}, 100, b"x" * 32, lease_secret, lease_secret
|
||||
)
|
||||
)
|
||||
self.assertEqual(e.exception.args[0], http.UNAUTHORIZED)
|
||||
|
||||
def test_allocate_buckets_second_time_different_shares(self):
|
||||
"""
|
||||
If allocate buckets endpoint is called second time with different
|
||||
upload key on different shares, that creates the buckets.
|
||||
"""
|
||||
im_client = StorageClientImmutables(self.http.client)
|
||||
|
||||
# Create a upload:
|
||||
upload_secret = urandom(32)
|
||||
lease_secret = urandom(32)
|
||||
storage_index = b"".join(bytes([i]) for i in range(16))
|
||||
result_of(
|
||||
im_client.create(
|
||||
storage_index, {1, 2, 3}, 100, upload_secret, lease_secret, lease_secret
|
||||
)
|
||||
)
|
||||
|
||||
# Add same shares:
|
||||
created2 = result_of(
|
||||
im_client.create(
|
||||
storage_index, {4, 6}, 100, b"x" * 2, lease_secret, lease_secret
|
||||
)
|
||||
)
|
||||
self.assertEqual(created2.allocated, {4, 6})
|
||||
|
||||
def test_list_shares(self):
|
||||
"""
|
||||
Once a share is finished uploading, it's possible to list it.
|
||||
|
Loading…
Reference in New Issue
Block a user