Implement advise_corrupt_share for mutables.

This commit is contained in:
Itamar Turner-Trauring 2022-05-11 10:41:36 -04:00
parent 5b0762d3a3
commit 6f5a0e43eb
4 changed files with 64 additions and 20 deletions

View File

@ -406,6 +406,30 @@ def read_share_chunk(
raise ClientException(response.code)
@async_to_deferred
async def advise_corrupt_share(
client: StorageClient,
share_type: str,
storage_index: bytes,
share_number: int,
reason: str,
):
assert isinstance(reason, str)
url = client.relative_url(
"/v1/{}/{}/{}/corrupt".format(
share_type, _encode_si(storage_index), share_number
)
)
message = {"reason": reason}
response = await client.request("POST", url, message_to_serialize=message)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)
@define
class StorageClientImmutables(object):
"""
@ -579,7 +603,6 @@ class StorageClientImmutables(object):
else:
raise ClientException(response.code)
@inlineCallbacks
def advise_corrupt_share(
self,
storage_index: bytes,
@ -587,20 +610,9 @@ class StorageClientImmutables(object):
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
assert isinstance(reason, str)
url = self._client.relative_url(
"/v1/immutable/{}/{}/corrupt".format(
_encode_si(storage_index), share_number
)
return advise_corrupt_share(
self._client, "immutable", storage_index, share_number, reason
)
message = {"reason": reason}
response = yield self._client.request("POST", url, message_to_serialize=message)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)
@frozen
@ -738,3 +750,14 @@ class StorageClientMutables:
return await _decode_cbor(response, _SCHEMAS["mutable_list_shares"])
else:
raise ClientException(response.code)
def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
return advise_corrupt_share(
self._client, "mutable", storage_index, share_number, reason
)

View File

@ -666,6 +666,26 @@ class HTTPServer(object):
raise _HTTPError(http.NOT_FOUND)
return self._send_encoded(request, shares)
@_authorized_route(
_app,
set(),
"/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share_mutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
# TODO unit test all the paths
if not self._storage_server._share_exists(storage_index, share_number):
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
self._storage_server.advise_corrupt_share(
b"mutable", storage_index, share_number, info["reason"].encode("utf-8")
)
return b""
@implementer(IStreamServerEndpoint)
@attr.s

View File

@ -1185,12 +1185,14 @@ class _HTTPStorageServer(object):
reason: bytes
):
if share_type == b"immutable":
imm_client = StorageClientImmutables(self._http_client)
return imm_client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
)
client = StorageClientImmutables(self._http_client)
elif share_type == b"mutable":
client = StorageClientMutables(self._http_client)
else:
raise NotImplementedError() # future tickets
raise ValueError("Unknown share type")
return client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
)
@defer.inlineCallbacks
def slot_readv(self, storage_index, shares, readv):

View File

@ -1152,5 +1152,4 @@ class HTTPMutableAPIsTests(
SKIP_TESTS = {
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
}