tahoe-lafs/src/allmydata/storage/http_client.py

764 lines
24 KiB
Python
Raw Normal View History

2021-11-12 12:51:52 -05:00
"""
HTTP client that talks to the HTTP storage server.
"""
2022-04-14 11:45:47 -04:00
from __future__ import annotations
2022-04-28 11:46:37 -04:00
from typing import Union, Optional, Sequence, Mapping
2021-12-16 11:17:11 -05:00
from base64 import b64encode
2021-11-16 10:56:21 -05:00
2022-04-28 11:46:37 -04:00
from attrs import define, asdict, frozen
2021-11-12 13:13:19 -05:00
# TODO Make sure to import Python version?
2022-01-06 12:36:46 -05:00
from cbor2 import loads, dumps
2022-04-11 14:03:48 -04:00
from pycddl import Schema
from collections_extended import RangeMap
from werkzeug.datastructures import Range, ContentRange
2021-11-16 10:56:21 -05:00
from twisted.web.http_headers import Headers
from twisted.web import http
from twisted.web.iweb import IPolicyForHTTPS
from twisted.internet.defer import inlineCallbacks, returnValue, fail, Deferred
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
from twisted.internet.ssl import CertificateOptions
from twisted.web.client import Agent, HTTPConnectionPool
from zope.interface import implementer
2021-11-12 12:51:52 -05:00
from hyperlink import DecodedURL
import treq
2022-03-07 08:21:58 -05:00
from treq.client import HTTPClient
from treq.testing import StubTreq
from OpenSSL import SSL
2022-03-28 11:27:32 -04:00
from cryptography.hazmat.bindings.openssl.binding import Binding
from .http_common import (
swissnum_auth_header,
Secrets,
get_content_type,
CBOR_MIME_TYPE,
get_spki_hash,
)
from .common import si_b2a
from ..util.hashutil import timing_safe_compare
from ..util.deferredutil import async_to_deferred
2022-03-28 11:27:32 -04:00
_OPENSSL = Binding().lib
def _encode_si(si): # type: (bytes) -> str
"""Encode the storage index into Unicode string."""
return str(si_b2a(si), "ascii")
2022-01-06 12:36:46 -05:00
2021-11-12 12:51:52 -05:00
2021-11-12 13:13:19 -05:00
class ClientException(Exception):
2022-02-08 10:46:55 -05:00
"""An unexpected response code from the server."""
def __init__(self, code, *additional_args):
Exception.__init__(self, code, *additional_args)
self.code = code
2021-11-12 13:13:19 -05:00
2022-04-11 14:03:48 -04:00
# Schemas for server responses.
#
2022-04-12 12:54:16 -04:00
# Tags are of the form #6.nnn, where the number is documented at
# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258
# indicates a set.
2022-04-11 14:03:48 -04:00
_SCHEMAS = {
"get_version": Schema(
"""
response = {'http://allmydata.org/tahoe/protocols/storage/v1' => {
2022-04-11 14:03:48 -04:00
'maximum-immutable-share-size' => uint
'maximum-mutable-share-size' => uint
'available-space' => uint
'tolerates-immutable-read-overrun' => bool
'delete-mutable-shares-with-zero-length-writev' => bool
'fills-holes-with-zero-bytes' => bool
'prevents-read-past-end-of-share-data' => bool
}
'application-version' => bstr
}
"""
),
"allocate_buckets": Schema(
"""
response = {
2022-04-11 14:03:48 -04:00
already-have: #6.258([* uint])
allocated: #6.258([* uint])
}
"""
),
"immutable_write_share_chunk": Schema(
"""
response = {
2022-04-11 14:03:48 -04:00
required: [* {begin: uint, end: uint}]
}
"""
),
"list_shares": Schema(
"""
response = #6.258([* uint])
2022-04-11 14:03:48 -04:00
"""
),
"mutable_read_test_write": Schema(
"""
response = {
"success": bool,
"data": {* share_number: [* bstr]}
}
share_number = uint
"""
),
"mutable_list_shares": Schema(
"""
response = #6.258([* uint])
"""
),
2022-04-11 14:03:48 -04:00
}
def _decode_cbor(response, schema: Schema):
2021-11-12 12:51:52 -05:00
"""Given HTTP response, return decoded CBOR body."""
2022-04-11 14:03:48 -04:00
def got_content(data):
schema.validate_cbor(data)
return loads(data)
2021-11-12 13:13:19 -05:00
if response.code > 199 and response.code < 300:
content_type = get_content_type(response.headers)
2022-03-14 11:18:53 -04:00
if content_type == CBOR_MIME_TYPE:
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
2022-04-11 14:03:48 -04:00
return treq.content(response).addCallback(got_content)
else:
raise ClientException(-1, "Server didn't send CBOR")
else:
2022-04-11 13:11:45 -04:00
return treq.content(response).addCallback(
lambda data: fail(ClientException(response.code, response.phrase, data))
)
2021-11-12 12:51:52 -05:00
2022-04-15 09:19:30 -04:00
@define
class ImmutableCreateResult(object):
"""Result of creating a storage index for an immutable."""
2022-04-28 11:46:37 -04:00
already_have: set[int]
allocated: set[int]
class _TLSContextFactory(CertificateOptions):
"""
Create a context that validates the way Tahoe-LAFS wants to: based on a
pinned certificate hash, rather than a certificate authority.
2022-04-14 11:52:20 -04:00
Originally implemented as part of Foolscap. To comply with the license,
here's the original licensing terms:
Copyright (c) 2006-2008 Brian Warner
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
2022-03-25 15:46:42 -04:00
def __init__(self, expected_spki_hash: bytes):
self.expected_spki_hash = expected_spki_hash
CertificateOptions.__init__(self)
def getContext(self) -> SSL.Context:
def always_validate(conn, cert, errno, depth, preverify_ok):
# This function is called to validate the certificate received by
2022-03-28 11:27:32 -04:00
# the other end. OpenSSL calls it multiple times, for each errno
# for each certificate.
# We do not care about certificate authorities or revocation
# lists, we just want to know that the certificate has a valid
# signature and follow the chain back to one which is
# self-signed. We need to protect against forged signatures, but
# not the usual TLS concerns about invalid CAs or revoked
# certificates.
things_are_ok = (
2022-03-28 11:27:32 -04:00
_OPENSSL.X509_V_OK,
_OPENSSL.X509_V_ERR_CERT_NOT_YET_VALID,
_OPENSSL.X509_V_ERR_CERT_HAS_EXPIRED,
_OPENSSL.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
_OPENSSL.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
)
# TODO can we do this once instead of multiple times?
if errno in things_are_ok and timing_safe_compare(
2022-03-25 15:46:42 -04:00
get_spki_hash(cert.to_cryptography()), self.expected_spki_hash
):
return 1
# TODO: log the details of the error, because otherwise they get
# lost in the PyOpenSSL exception that will eventually be raised
# (possibly OpenSSL.SSL.Error: certificate verify failed)
return 0
ctx = CertificateOptions.getContext(self)
# VERIFY_PEER means we ask the the other end for their certificate.
ctx.set_verify(SSL.VERIFY_PEER, always_validate)
return ctx
@implementer(IPolicyForHTTPS)
@implementer(IOpenSSLClientConnectionCreator)
2022-04-15 09:19:30 -04:00
@define
class _StorageClientHTTPSPolicy:
"""
2022-03-25 15:46:42 -04:00
A HTTPS policy that ensures the SPKI hash of the public key matches a known
hash, i.e. pinning-based validation.
"""
2022-04-15 09:19:30 -04:00
expected_spki_hash: bytes
# IPolicyForHTTPS
def creatorForNetloc(self, hostname, port):
return self
# IOpenSSLClientConnectionCreator
def clientConnectionForTLS(self, tlsProtocol):
2022-03-28 11:28:38 -04:00
return SSL.Connection(
2022-03-25 15:46:42 -04:00
_TLSContextFactory(self.expected_spki_hash).getContext(), None
)
2022-04-15 09:19:30 -04:00
@define
2022-01-06 12:36:46 -05:00
class StorageClient(object):
"""
Low-level HTTP client that talks to the HTTP storage server.
2022-01-06 12:36:46 -05:00
"""
2022-04-15 09:19:30 -04:00
# The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
# ``StorageClient.from_nurl()``.
_base_url: DecodedURL
_swissnum: bytes
_treq: Union[treq, StubTreq, HTTPClient]
2022-01-06 12:36:46 -05:00
2022-03-07 08:21:58 -05:00
@classmethod
2022-04-15 09:19:30 -04:00
def from_nurl(
cls, nurl: DecodedURL, reactor, persistent: bool = True
) -> StorageClient:
2022-03-07 08:21:58 -05:00
"""
2022-03-28 11:35:45 -04:00
Create a ``StorageClient`` for the given NURL.
``persistent`` indicates whether to use persistent HTTP connections.
2022-03-07 08:21:58 -05:00
"""
2022-03-28 11:35:45 -04:00
assert nurl.fragment == "v=1"
assert nurl.scheme == "pb"
swissnum = nurl.path[0].encode("ascii")
certificate_hash = nurl.user.encode("ascii")
treq_client = HTTPClient(
Agent(
reactor,
_StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash),
pool=HTTPConnectionPool(reactor, persistent=persistent),
)
)
2022-03-28 11:35:45 -04:00
https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
return cls(https_url, swissnum, treq_client)
def relative_url(self, path):
2022-01-06 12:36:46 -05:00
"""Get a URL relative to the base URL."""
return self._base_url.click(path)
def _get_headers(self, headers): # type: (Optional[Headers]) -> Headers
2022-01-06 12:36:46 -05:00
"""Return the basic headers to be used by default."""
if headers is None:
headers = Headers()
2022-01-06 12:36:46 -05:00
headers.addRawHeader(
"Authorization",
swissnum_auth_header(self._swissnum),
)
return headers
def request(
2022-01-06 12:36:46 -05:00
self,
method,
url,
lease_renew_secret=None,
2022-01-06 12:36:46 -05:00
lease_cancel_secret=None,
upload_secret=None,
write_enabler_secret=None,
headers=None,
message_to_serialize=None,
2022-01-06 12:36:46 -05:00
**kwargs
):
"""
Like ``treq.request()``, but with optional secrets that get translated
into corresponding HTTP headers.
If ``message_to_serialize`` is set, it will be serialized (by default
with CBOR) and set as the request body.
2022-01-06 12:36:46 -05:00
"""
headers = self._get_headers(headers)
# Add secrets:
2022-01-06 12:36:46 -05:00
for secret, value in [
(Secrets.LEASE_RENEW, lease_renew_secret),
2022-01-06 12:36:46 -05:00
(Secrets.LEASE_CANCEL, lease_cancel_secret),
(Secrets.UPLOAD, upload_secret),
(Secrets.WRITE_ENABLER, write_enabler_secret),
2022-01-06 12:36:46 -05:00
]:
if value is None:
continue
headers.addRawHeader(
"X-Tahoe-Authorization",
b"%s %s" % (secret.value.encode("ascii"), b64encode(value).strip()),
)
# Note we can accept CBOR:
headers.addRawHeader("Accept", CBOR_MIME_TYPE)
# If there's a request message, serialize it and set the Content-Type
# header:
if message_to_serialize is not None:
2022-03-18 10:12:51 -04:00
if "data" in kwargs:
raise TypeError(
"Can't use both `message_to_serialize` and `data` "
"as keyword arguments at the same time"
)
kwargs["data"] = dumps(message_to_serialize)
headers.addRawHeader("Content-Type", CBOR_MIME_TYPE)
2022-01-06 12:36:46 -05:00
return self._treq.request(method, url, headers=headers, **kwargs)
class StorageClientGeneral(object):
"""
High-level HTTP APIs that aren't immutable- or mutable-specific.
"""
def __init__(self, client): # type: (StorageClient) -> None
self._client = client
2022-01-06 12:36:46 -05:00
@inlineCallbacks
def get_version(self):
"""
Return the version metadata for the server.
"""
url = self._client.relative_url("/v1/version")
response = yield self._client.request("GET", url)
2022-04-11 14:03:48 -04:00
decoded_response = yield _decode_cbor(response, _SCHEMAS["get_version"])
2022-01-06 12:36:46 -05:00
returnValue(decoded_response)
2022-04-15 09:19:30 -04:00
@define
class UploadProgress(object):
"""
Progress of immutable upload, per the server.
"""
2022-02-01 10:20:23 -05:00
# True when upload has finished.
2022-04-15 09:19:30 -04:00
finished: bool
# Remaining ranges to upload.
2022-04-15 09:19:30 -04:00
required: RangeMap
@inlineCallbacks
def read_share_chunk(
client: StorageClient,
share_type: str,
storage_index: bytes,
share_number: int,
offset: int,
length: int,
) -> Deferred[bytes]:
"""
Download a chunk of data from a share.
2022-05-04 11:03:35 -04:00
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed downloads
should be transparently retried and redownloaded by the implementation a
few times so that if a failure percolates up, the caller can assume the
failure isn't a short-term blip.
NOTE: the underlying HTTP protocol is somewhat more flexible than this API,
insofar as it doesn't always require a range. In practice a range is
always provided by the current callers.
"""
url = client.relative_url(
"/v1/{}/{}/{}".format(share_type, _encode_si(storage_index), share_number)
)
response = yield client.request(
"GET",
url,
headers=Headers(
{"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
),
)
if response.code == http.PARTIAL_CONTENT:
body = yield response.content()
returnValue(body)
else:
raise ClientException(response.code)
@async_to_deferred
async def advise_corrupt_share(
client: StorageClient,
share_type: str,
storage_index: bytes,
share_number: int,
reason: str,
):
assert isinstance(reason, str)
url = client.relative_url(
"/v1/{}/{}/{}/corrupt".format(
share_type, _encode_si(storage_index), share_number
)
)
message = {"reason": reason}
response = await client.request("POST", url, message_to_serialize=message)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)
2022-04-15 09:19:30 -04:00
@define
class StorageClientImmutables(object):
"""
APIs for interacting with immutables.
"""
2022-04-15 09:19:30 -04:00
_client: StorageClient
@inlineCallbacks
def create(
self,
storage_index,
share_numbers,
allocated_size,
upload_secret,
lease_renew_secret,
lease_cancel_secret,
2022-04-28 11:46:37 -04:00
): # type: (bytes, set[int], int, bytes, bytes, bytes) -> Deferred[ImmutableCreateResult]
"""
Create a new storage index for an immutable.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 retry
internally on failure, to ensure the operation fully succeeded. If
sufficient number of failures occurred, the result may fire with an
error, but there's no expectation that user code needs to have a
recovery codepath; it will most likely just report an error to the
user.
Result fires when creating the storage index succeeded, if creating the
storage index failed the result will fire with an exception.
"""
url = self._client.relative_url("/v1/immutable/" + _encode_si(storage_index))
message = {"share-numbers": share_numbers, "allocated-size": allocated_size}
response = yield self._client.request(
"POST",
url,
lease_renew_secret=lease_renew_secret,
lease_cancel_secret=lease_cancel_secret,
upload_secret=upload_secret,
message_to_serialize=message,
)
2022-04-11 14:03:48 -04:00
decoded_response = yield _decode_cbor(response, _SCHEMAS["allocate_buckets"])
returnValue(
ImmutableCreateResult(
already_have=decoded_response["already-have"],
allocated=decoded_response["allocated"],
)
)
2022-03-08 10:41:56 -05:00
@inlineCallbacks
def abort_upload(
self, storage_index: bytes, share_number: int, upload_secret: bytes
) -> Deferred[None]:
"""Abort the upload."""
url = self._client.relative_url(
"/v1/immutable/{}/{}/abort".format(_encode_si(storage_index), share_number)
)
response = yield self._client.request(
"PUT",
url,
upload_secret=upload_secret,
)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)
@inlineCallbacks
def write_share_chunk(
self, storage_index, share_number, upload_secret, offset, data
): # type: (bytes, int, bytes, int, bytes) -> Deferred[UploadProgress]
"""
Upload a chunk of data for a specific share.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 The
implementation should retry failed uploads transparently a number of
times, so that if a failure percolates up, the caller can assume the
failure isn't a short-term blip.
Result fires when the upload succeeded, with a boolean indicating
whether the _complete_ share (i.e. all chunks, not just this one) has
been uploaded.
"""
url = self._client.relative_url(
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
)
response = yield self._client.request(
2022-01-14 08:34:17 -05:00
"PATCH",
url,
upload_secret=upload_secret,
data=data,
headers=Headers(
{
"content-range": [
2022-02-01 10:20:23 -05:00
ContentRange("bytes", offset, offset + len(data)).to_header()
]
}
),
)
if response.code == http.OK:
# Upload is still unfinished.
finished = False
elif response.code == http.CREATED:
# Upload is done!
finished = True
else:
raise ClientException(
response.code,
)
2022-04-11 14:03:48 -04:00
body = yield _decode_cbor(response, _SCHEMAS["immutable_write_share_chunk"])
remaining = RangeMap()
for chunk in body["required"]:
remaining.set(True, chunk["begin"], chunk["end"])
returnValue(UploadProgress(finished=finished, required=remaining))
def read_share_chunk(
self, storage_index, share_number, offset, length
): # type: (bytes, int, int, int) -> Deferred[bytes]
"""
Download a chunk of data from a share.
"""
return read_share_chunk(
self._client, "immutable", storage_index, share_number, offset, length
)
2022-02-01 10:20:23 -05:00
@inlineCallbacks
2022-04-28 11:46:37 -04:00
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[set[int]]
2022-02-01 10:20:23 -05:00
"""
Return the set of shares for a given storage index.
"""
url = self._client.relative_url(
2022-02-01 10:20:23 -05:00
"/v1/immutable/{}/shares".format(_encode_si(storage_index))
)
response = yield self._client.request(
2022-02-01 10:20:23 -05:00
"GET",
url,
)
if response.code == http.OK:
2022-04-11 14:03:48 -04:00
body = yield _decode_cbor(response, _SCHEMAS["list_shares"])
2022-02-04 09:26:25 -05:00
returnValue(set(body))
2022-02-01 10:20:23 -05:00
else:
raise ClientException(response.code)
@inlineCallbacks
def add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
):
"""
Add or renew a lease.
If the renewal secret matches an existing lease, it is renewed.
Otherwise a new lease is added.
"""
url = self._client.relative_url(
"/v1/lease/{}".format(_encode_si(storage_index))
)
response = yield self._client.request(
"PUT",
url,
lease_renew_secret=renew_secret,
lease_cancel_secret=cancel_secret,
)
if response.code == http.NO_CONTENT:
return
else:
raise ClientException(response.code)
def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
return advise_corrupt_share(
self._client, "immutable", storage_index, share_number, reason
)
2022-04-28 11:46:37 -04:00
@frozen
class WriteVector:
"""Data to write to a chunk."""
offset: int
data: bytes
2022-04-28 11:46:37 -04:00
@frozen
class TestVector:
"""Checks to make on a chunk before writing to it."""
offset: int
size: int
specimen: bytes
2022-04-28 11:46:37 -04:00
@frozen
class ReadVector:
"""
Reads to do on chunks, as part of a read/test/write operation.
"""
offset: int
size: int
2022-04-28 11:46:37 -04:00
@frozen
class TestWriteVectors:
"""Test and write vectors for a specific share."""
2022-04-28 11:46:37 -04:00
test_vectors: Sequence[TestVector]
write_vectors: Sequence[WriteVector]
new_length: Optional[int] = None
def asdict(self) -> dict:
"""Return dictionary suitable for sending over CBOR."""
d = asdict(self)
d["test"] = d.pop("test_vectors")
d["write"] = d.pop("write_vectors")
d["new-length"] = d.pop("new_length")
return d
2022-04-28 11:46:37 -04:00
@frozen
class ReadTestWriteResult:
"""Result of sending read-test-write vectors."""
success: bool
# Map share numbers to reads corresponding to the request's list of
# ReadVectors:
2022-04-28 11:46:37 -04:00
reads: Mapping[int, Sequence[bytes]]
2022-04-28 11:46:37 -04:00
@frozen
class StorageClientMutables:
"""
APIs for interacting with mutables.
"""
_client: StorageClient
@async_to_deferred
async def read_test_write_chunks(
self,
storage_index: bytes,
write_enabler_secret: bytes,
lease_renew_secret: bytes,
lease_cancel_secret: bytes,
testwrite_vectors: dict[int, TestWriteVectors],
read_vector: list[ReadVector],
) -> ReadTestWriteResult:
"""
Read, test, and possibly write chunks to a particular mutable storage
index.
Reads are done before writes.
Given a mapping between share numbers and test/write vectors, the tests
are done and if they are valid the writes are done.
"""
# TODO unit test all the things
url = self._client.relative_url(
"/v1/mutable/{}/read-test-write".format(_encode_si(storage_index))
)
message = {
"test-write-vectors": {
share_number: twv.asdict()
for (share_number, twv) in testwrite_vectors.items()
},
"read-vector": [asdict(r) for r in read_vector],
}
response = await self._client.request(
"POST",
url,
write_enabler_secret=write_enabler_secret,
lease_renew_secret=lease_renew_secret,
lease_cancel_secret=lease_cancel_secret,
message_to_serialize=message,
)
if response.code == http.OK:
result = await _decode_cbor(response, _SCHEMAS["mutable_read_test_write"])
return ReadTestWriteResult(success=result["success"], reads=result["data"])
else:
raise ClientException(response.code, (await response.content()))
def read_share_chunk(
self,
storage_index: bytes,
share_number: int,
offset: int,
length: int,
2022-05-04 11:03:35 -04:00
) -> Deferred[bytes]:
"""
Download a chunk of data from a share.
"""
# TODO unit test all the things
return read_share_chunk(
self._client, "mutable", storage_index, share_number, offset, length
)
@async_to_deferred
async def list_shares(self, storage_index: bytes) -> set[int]:
"""
List the share numbers for a given storage index.
"""
# TODO unit test all the things
url = self._client.relative_url(
"/v1/mutable/{}/shares".format(_encode_si(storage_index))
)
response = await self._client.request("GET", url)
if response.code == http.OK:
return await _decode_cbor(response, _SCHEMAS["mutable_list_shares"])
else:
raise ClientException(response.code)
def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
return advise_corrupt_share(
self._client, "mutable", storage_index, share_number, reason
)