Merge pull request #1177 from tahoe-lafs/3871-http-list-storage-index-shares

HTTP API can list storage index shares

Fixes ticket:3871
This commit is contained in:
Itamar Turner-Trauring 2022-02-04 10:36:29 -05:00 committed by GitHub
commit 296bc3e68b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 14 deletions

View File

@ -630,11 +630,13 @@ Reading
``GET /v1/immutable/:storage_index/shares``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Retrieve a list indicating all shares available for the indicated storage index.
For example::
Retrieve a list (semantically, a set) indicating all shares available for the
indicated storage index. For example::
[1, 5]
An unknown storage index results in an empty list.
``GET /v1/immutable/:storage_index/:share_number``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

0
newsfragments/3871.minor Normal file
View File

View File

@ -136,6 +136,7 @@ class UploadProgress(object):
"""
Progress of immutable upload, per the server.
"""
# True when upload has finished.
finished = attr.ib(type=bool)
# Remaining ranges to upload.
@ -221,7 +222,7 @@ class StorageClientImmutables(object):
headers=Headers(
{
"content-range": [
ContentRange("bytes", offset, offset+len(data)).to_header()
ContentRange("bytes", offset, offset + len(data)).to_header()
]
}
),
@ -237,7 +238,7 @@ class StorageClientImmutables(object):
raise ClientException(
response.code,
)
body = loads((yield response.content()))
body = yield _decode_cbor(response)
remaining = RangeMap()
for chunk in body["required"]:
remaining.set(True, chunk["begin"], chunk["end"])
@ -268,11 +269,7 @@ class StorageClientImmutables(object):
"GET",
url,
headers=Headers(
{
"range": [
Range("bytes", [(offset, offset + length)]).to_header()
]
}
{"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
),
)
if response.code == http.PARTIAL_CONTENT:
@ -282,3 +279,23 @@ class StorageClientImmutables(object):
raise ClientException(
response.code,
)
@inlineCallbacks
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]]
"""
Return the set of shares for a given storage index.
"""
url = self._client._url(
"/v1/immutable/{}/shares".format(_encode_si(storage_index))
)
response = yield self._client._request(
"GET",
url,
)
if response.code == http.OK:
body = yield _decode_cbor(response)
returnValue(set(body))
else:
raise ClientException(
response.code,
)

View File

@ -255,6 +255,20 @@ class HTTPServer(object):
required.append({"begin": start, "end": end})
return self._cbor(request, {"required": required})
@_authorized_route(
_app,
set(),
"/v1/immutable/<string:storage_index>/shares",
methods=["GET"],
)
def list_shares(self, request, authorization, storage_index):
"""
List shares for the given storage index.
"""
storage_index = si_a2b(storage_index.encode("ascii"))
share_numbers = list(self._storage_server.get_buckets(storage_index).keys())
return self._cbor(request, share_numbers)
@_authorized_route(
_app,
set(),

View File

@ -1042,7 +1042,7 @@ class _FakeRemoteReference(object):
@attr.s
class _HTTPBucketWriter(object):
"""
Emulate a ``RIBucketWriter``.
Emulate a ``RIBucketWriter``, but use HTTP protocol underneath.
"""
client = attr.ib(type=StorageClientImmutables)
storage_index = attr.ib(type=bytes)
@ -1069,6 +1069,25 @@ class _HTTPBucketWriter(object):
return defer.succeed(None)
@attr.s
class _HTTPBucketReader(object):
"""
Emulate a ``RIBucketReader``, but use HTTP protocol underneath.
"""
client = attr.ib(type=StorageClientImmutables)
storage_index = attr.ib(type=bytes)
share_number = attr.ib(type=int)
def read(self, offset, length):
return self.client.read_share_chunk(
self.storage_index, self.share_number, offset, length
)
def advise_corrupt_share(self, reason):
pass # TODO in later ticket
# WORK IN PROGRESS, for now it doesn't actually implement whole thing.
@implementer(IStorageServer) # type: ignore
@attr.s
@ -1117,8 +1136,18 @@ class _HTTPStorageServer(object):
})
)
@defer.inlineCallbacks
def get_buckets(
self,
storage_index,
):
pass
immutable_client = StorageClientImmutables(self._http_client)
share_numbers = yield immutable_client.list_shares(
storage_index
)
defer.returnValue({
share_num: _FakeRemoteReference(_HTTPBucketReader(
immutable_client, storage_index, share_num
))
for share_num in share_numbers
})

View File

@ -1166,8 +1166,6 @@ class HTTPImmutableAPIsTests(
"test_get_buckets_skips_unfinished_buckets",
"test_matching_overlapping_writes",
"test_non_matching_overlapping_writes",
"test_read_bucket_at_offset",
"test_written_shares_are_readable",
"test_written_shares_are_allocated",
}

View File

@ -23,6 +23,7 @@ from treq.testing import StubTreq
from klein import Klein
from hyperlink import DecodedURL
from collections_extended import RangeMap
from twisted.internet.task import Clock
from .common import SyncTestCase
from ..storage.server import StorageServer
@ -230,8 +231,11 @@ class HttpTestFixture(Fixture):
"""
def _setUp(self):
self.clock = Clock()
self.tempdir = self.useFixture(TempDir())
self.storage_server = StorageServer(self.tempdir.path, b"\x00" * 20)
self.storage_server = StorageServer(
self.tempdir.path, b"\x00" * 20, clock=self.clock
)
self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST)
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
@ -382,6 +386,47 @@ class ImmutableHTTPAPITests(SyncTestCase):
)
self.assertEqual(downloaded, expected_data[offset : offset + length])
def test_list_shares(self):
"""
Once a share is finished uploading, it's possible to list it.
"""
im_client = StorageClientImmutables(self.http.client)
upload_secret = urandom(32)
lease_secret = urandom(32)
storage_index = b"".join(bytes([i]) for i in range(16))
result_of(
im_client.create(
storage_index, {1, 2, 3}, 10, upload_secret, lease_secret, lease_secret
)
)
# Initially there are no shares:
self.assertEqual(result_of(im_client.list_shares(storage_index)), set())
# Upload shares 1 and 3:
for share_number in [1, 3]:
progress = result_of(
im_client.write_share_chunk(
storage_index,
share_number,
upload_secret,
0,
b"0123456789",
)
)
self.assertTrue(progress.finished)
# Now shares 1 and 3 exist:
self.assertEqual(result_of(im_client.list_shares(storage_index)), {1, 3})
def test_list_shares_unknown_storage_index(self):
"""
Listing unknown storage index's shares results in empty list of shares.
"""
im_client = StorageClientImmutables(self.http.client)
storage_index = b"".join(bytes([i]) for i in range(16))
self.assertEqual(result_of(im_client.list_shares(storage_index)), set())
def test_multiple_shares_uploaded_to_different_place(self):
"""
If a storage index has multiple shares, uploads to different shares are