Merge branch '3879-more-http-storage-immutables' into 3881-cbor-refactor

This commit is contained in:
Itamar Turner-Trauring 2022-03-14 10:20:16 -04:00
commit ecec35d2ea
8 changed files with 247 additions and 54 deletions

View File

@ -614,16 +614,19 @@ From RFC 7231::
``POST /v1/immutable/:storage_index/:share_number/corrupt`` ``POST /v1/immutable/:storage_index/:share_number/corrupt``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Advise the server the data read from the indicated share was corrupt. Advise the server the data read from the indicated share was corrupt. The
The request body includes an human-meaningful string with details about the corruption. request body includes an human-meaningful text string with details about the
It also includes potentially important details about the share. corruption. It also includes potentially important details about the share.
For example:: For example::
{"reason": "expected hash abcd, got hash efgh"} {"reason": u"expected hash abcd, got hash efgh"}
.. share-type, storage-index, and share-number are inferred from the URL .. share-type, storage-index, and share-number are inferred from the URL
The response code is OK (200) by default, or NOT FOUND (404) if the share
couldn't be found.
Reading Reading
~~~~~~~ ~~~~~~~

View File

@ -0,0 +1 @@
Share corruption reports stored on disk are now always encoded in UTF-8.

View File

@ -310,9 +310,7 @@ class StorageClientImmutables(object):
body = yield response.content() body = yield response.content()
returnValue(body) returnValue(body)
else: else:
raise ClientException( raise ClientException(response.code)
response.code,
)
@inlineCallbacks @inlineCallbacks
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]] def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]]
@ -329,6 +327,57 @@ class StorageClientImmutables(object):
if response.code == http.OK: if response.code == http.OK:
body = yield _decode_cbor(response) body = yield _decode_cbor(response)
returnValue(set(body)) returnValue(set(body))
else:
raise ClientException(response.code)
@inlineCallbacks
def add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
):
"""
Add or renew a lease.
If the renewal secret matches an existing lease, it is renewed.
Otherwise a new lease is added.
"""
url = self._client.relative_url(
"/v1/lease/{}".format(_encode_si(storage_index))
)
response = yield self._client.request(
"PUT",
url,
lease_renew_secret=renew_secret,
lease_cancel_secret=cancel_secret,
)
if response.code == http.NO_CONTENT:
return
else:
raise ClientException(response.code)
@inlineCallbacks
def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
assert isinstance(reason, str)
url = self._client.relative_url(
"/v1/immutable/{}/{}/corrupt".format(
_encode_si(storage_index), share_number
)
)
message = dumps({"reason": reason})
response = yield self._client.request(
"POST",
url,
data=message,
headers=Headers({"content-type": ["application/cbor"]}),
)
if response.code == http.OK:
return
else: else:
raise ClientException( raise ClientException(
response.code, response.code,

View File

@ -434,3 +434,41 @@ class HTTPServer(object):
ContentRange("bytes", offset, offset + len(data)).to_header(), ContentRange("bytes", offset, offset + len(data)).to_header(),
) )
return data return data
@_authorized_route(
_app,
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL},
"/v1/lease/<storage_index:storage_index>",
methods=["PUT"],
)
def add_or_renew_lease(self, request, authorization, storage_index):
"""Update the lease for an immutable share."""
if not self._storage_server.get_buckets(storage_index):
raise _HTTPError(http.NOT_FOUND)
# Checking of the renewal secret is done by the backend.
self._storage_server.add_lease(
storage_index,
authorization[Secrets.LEASE_RENEW],
authorization[Secrets.LEASE_CANCEL],
)
request.setResponseCode(http.NO_CONTENT)
return b""
@_authorized_route(
_app,
set(),
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share(self, request, authorization, storage_index, share_number):
"""Indicate that given share is corrupt, with a text reason."""
try:
bucket = self._storage_server.get_buckets(storage_index)[share_number]
except KeyError:
raise _HTTPError(http.NOT_FOUND)
info = loads(request.content.read())
bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
return b""

View File

@ -743,8 +743,9 @@ class StorageServer(service.MultiService):
def advise_corrupt_share(self, share_type, storage_index, shnum, def advise_corrupt_share(self, share_type, storage_index, shnum,
reason): reason):
# This is a remote API, I believe, so this has to be bytes for legacy # Previously this had to be bytes for legacy protocol backwards
# protocol backwards compatibility reasons. # compatibility reasons. Now that Foolscap layer has been abstracted
# out, we can probably refactor this to be unicode...
assert isinstance(share_type, bytes) assert isinstance(share_type, bytes)
assert isinstance(reason, bytes), "%r is not bytes" % (reason,) assert isinstance(reason, bytes), "%r is not bytes" % (reason,)
@ -777,7 +778,7 @@ class StorageServer(service.MultiService):
si_s, si_s,
shnum, shnum,
) )
with open(report_path, "w") as f: with open(report_path, "w", encoding="utf-8") as f:
f.write(report) f.write(report)
return None return None

View File

@ -77,7 +77,7 @@ from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.storage.http_client import ( from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral, StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException, ClientException as HTTPClientException
) )
@ -1094,7 +1094,10 @@ class _HTTPBucketReader(object):
) )
def advise_corrupt_share(self, reason): def advise_corrupt_share(self, reason):
pass # TODO in later ticket return self.client.advise_corrupt_share(
self.storage_index, self.share_number,
str(reason, "utf-8", errors="backslashreplace")
)
# WORK IN PROGRESS, for now it doesn't actually implement whole thing. # WORK IN PROGRESS, for now it doesn't actually implement whole thing.
@ -1124,7 +1127,7 @@ class _HTTPStorageServer(object):
cancel_secret, cancel_secret,
sharenums, sharenums,
allocated_size, allocated_size,
canary, canary
): ):
upload_secret = urandom(20) upload_secret = urandom(20)
immutable_client = StorageClientImmutables(self._http_client) immutable_client = StorageClientImmutables(self._http_client)
@ -1148,7 +1151,7 @@ class _HTTPStorageServer(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_buckets( def get_buckets(
self, self,
storage_index, storage_index
): ):
immutable_client = StorageClientImmutables(self._http_client) immutable_client = StorageClientImmutables(self._http_client)
share_numbers = yield immutable_client.list_shares( share_numbers = yield immutable_client.list_shares(
@ -1160,3 +1163,29 @@ class _HTTPStorageServer(object):
)) ))
for share_num in share_numbers for share_num in share_numbers
}) })
def add_lease(
self,
storage_index,
renew_secret,
cancel_secret
):
immutable_client = StorageClientImmutables(self._http_client)
return immutable_client.add_or_renew_lease(
storage_index, renew_secret, cancel_secret
)
def advise_corrupt_share(
self,
share_type,
storage_index,
shnum,
reason: bytes
):
if share_type == b"immutable":
imm_client = StorageClientImmutables(self._http_client)
return imm_client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
)
else:
raise NotImplementedError() # future tickets

View File

@ -194,20 +194,6 @@ class IStorageServerImmutableAPIsTestsMixin(object):
) )
yield allocated[0].callRemote("write", 0, b"2" * 1024) yield allocated[0].callRemote("write", 0, b"2" * 1024)
def test_disconnection(self):
"""
If we disconnect in the middle of writing to a bucket, all data is
wiped, and it's even possible to write different data to the bucket.
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
HTTP protocol should skip this test, since disconnection is meaningless
concept; this is more about testing implicit contract the Foolscap
implementation depends on doesn't change as we refactor things.
"""
return self.abort_or_disconnect_half_way(lambda _: self.disconnect())
@inlineCallbacks @inlineCallbacks
def test_written_shares_are_allocated(self): def test_written_shares_are_allocated(self):
""" """
@ -1062,13 +1048,6 @@ class _SharedMixin(SystemTestMixin):
AsyncTestCase.tearDown(self) AsyncTestCase.tearDown(self)
yield SystemTestMixin.tearDown(self) yield SystemTestMixin.tearDown(self)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
raise NotImplementedError("implement in subclass")
class _FoolscapMixin(_SharedMixin): class _FoolscapMixin(_SharedMixin):
"""Run tests on Foolscap version of ``IStorageServer``.""" """Run tests on Foolscap version of ``IStorageServer``."""
@ -1081,16 +1060,6 @@ class _FoolscapMixin(_SharedMixin):
self.assertTrue(IStorageServer.providedBy(client)) self.assertTrue(IStorageServer.providedBy(client))
return succeed(client) return succeed(client)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
current = self.storage_client
yield self.bounce_client(0)
self.storage_client = self._get_native_server().get_storage_server()
assert self.storage_client is not current
class _HTTPMixin(_SharedMixin): class _HTTPMixin(_SharedMixin):
"""Run tests on the HTTP version of ``IStorageServer``.""" """Run tests on the HTTP version of ``IStorageServer``."""
@ -1149,21 +1118,37 @@ class FoolscapImmutableAPIsTests(
): ):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" """Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
def test_disconnection(self):
"""
If we disconnect in the middle of writing to a bucket, all data is
wiped, and it's even possible to write different data to the bucket.
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
HTTP protocol doesn't need this test, since disconnection is a
meaningless concept; this is more about testing the implicit contract
the Foolscap implementation depends on doesn't change as we refactor
things.
"""
return self.abort_or_disconnect_half_way(lambda _: self.disconnect())
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
current = self.storage_client
yield self.bounce_client(0)
self.storage_client = self._get_native_server().get_storage_server()
assert self.storage_client is not current
class HTTPImmutableAPIsTests( class HTTPImmutableAPIsTests(
_HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase _HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
): ):
"""HTTP-specific tests for immutable ``IStorageServer`` APIs.""" """HTTP-specific tests for immutable ``IStorageServer`` APIs."""
# These will start passing in future PRs as HTTP protocol is implemented.
SKIP_TESTS = {
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
"test_bucket_advise_corrupt_share",
"test_disconnection",
}
class FoolscapMutableAPIsTests( class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase _FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase

View File

@ -1000,3 +1000,90 @@ class ImmutableHTTPAPITests(SyncTestCase):
) )
), ),
) )
def test_lease_renew_and_add(self):
"""
It's possible the renew the lease on an uploaded immutable, by using
the same renewal secret, or add a new lease by choosing a different
renewal secret.
"""
# Create immutable:
(upload_secret, lease_secret, storage_index, _) = self.create_upload({0}, 100)
result_of(
self.imm_client.write_share_chunk(
storage_index,
0,
upload_secret,
0,
b"A" * 100,
)
)
[lease] = self.http.storage_server.get_leases(storage_index)
initial_expiration_time = lease.get_expiration_time()
# Time passes:
self.http.clock.advance(167)
# We renew the lease:
result_of(
self.imm_client.add_or_renew_lease(
storage_index, lease_secret, lease_secret
)
)
# More time passes:
self.http.clock.advance(10)
# We create a new lease:
lease_secret2 = urandom(32)
result_of(
self.imm_client.add_or_renew_lease(
storage_index, lease_secret2, lease_secret2
)
)
[lease1, lease2] = self.http.storage_server.get_leases(storage_index)
self.assertEqual(lease1.get_expiration_time(), initial_expiration_time + 167)
self.assertEqual(lease2.get_expiration_time(), initial_expiration_time + 177)
def test_lease_on_unknown_storage_index(self):
"""
An attempt to renew an unknown storage index will result in a HTTP 404.
"""
storage_index = urandom(16)
secret = b"A" * 32
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(self.imm_client.add_or_renew_lease(storage_index, secret, secret))
def test_advise_corrupt_share(self):
"""
Advising share was corrupted succeeds from HTTP client's perspective,
and calls appropriate method on server.
"""
corrupted = []
self.http.storage_server.advise_corrupt_share = lambda *args: corrupted.append(
args
)
storage_index, _ = self.upload(13)
reason = "OHNO \u1235"
result_of(self.imm_client.advise_corrupt_share(storage_index, 13, reason))
self.assertEqual(
corrupted, [(b"immutable", storage_index, 13, reason.encode("utf-8"))]
)
def test_advise_corrupt_share_unknown(self):
"""
Advising an unknown share was corrupted results in 404.
"""
storage_index, _ = self.upload(13)
reason = "OHNO \u1235"
result_of(self.imm_client.advise_corrupt_share(storage_index, 13, reason))
for (si, share_number) in [(storage_index, 11), (urandom(16), 13)]:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.advise_corrupt_share(si, share_number, reason)
)