Merge pull request #1198 from tahoe-lafs/3893-mutable-http-protocol-part-3

Mutable http protocol, part 3

Fixes ticket:3893
This commit is contained in:
Itamar Turner-Trauring 2022-05-20 11:47:07 -04:00 committed by GitHub
commit f0635d592a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 146 additions and 84 deletions

0
newsfragments/3893.minor Normal file
View File

View File

@ -355,6 +355,31 @@ class StorageClientGeneral(object):
decoded_response = yield _decode_cbor(response, _SCHEMAS["get_version"]) decoded_response = yield _decode_cbor(response, _SCHEMAS["get_version"])
returnValue(decoded_response) returnValue(decoded_response)
@inlineCallbacks
def add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
) -> Deferred[None]:
"""
Add or renew a lease.
If the renewal secret matches an existing lease, it is renewed.
Otherwise a new lease is added.
"""
url = self._client.relative_url(
"/v1/lease/{}".format(_encode_si(storage_index))
)
response = yield self._client.request(
"PUT",
url,
lease_renew_secret=renew_secret,
lease_cancel_secret=cancel_secret,
)
if response.code == http.NO_CONTENT:
return
else:
raise ClientException(response.code)
@define @define
class UploadProgress(object): class UploadProgress(object):
@ -406,6 +431,30 @@ def read_share_chunk(
raise ClientException(response.code) raise ClientException(response.code)
@async_to_deferred
async def advise_corrupt_share(
client: StorageClient,
share_type: str,
storage_index: bytes,
share_number: int,
reason: str,
):
assert isinstance(reason, str)
url = client.relative_url(
"/v1/{}/{}/{}/corrupt".format(
share_type, _encode_si(storage_index), share_number
)
)
message = {"reason": reason}
response = await client.request("POST", url, message_to_serialize=message)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)
@define @define
class StorageClientImmutables(object): class StorageClientImmutables(object):
""" """
@ -554,32 +603,6 @@ class StorageClientImmutables(object):
else: else:
raise ClientException(response.code) raise ClientException(response.code)
@inlineCallbacks
def add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
):
"""
Add or renew a lease.
If the renewal secret matches an existing lease, it is renewed.
Otherwise a new lease is added.
"""
url = self._client.relative_url(
"/v1/lease/{}".format(_encode_si(storage_index))
)
response = yield self._client.request(
"PUT",
url,
lease_renew_secret=renew_secret,
lease_cancel_secret=cancel_secret,
)
if response.code == http.NO_CONTENT:
return
else:
raise ClientException(response.code)
@inlineCallbacks
def advise_corrupt_share( def advise_corrupt_share(
self, self,
storage_index: bytes, storage_index: bytes,
@ -587,20 +610,9 @@ class StorageClientImmutables(object):
reason: str, reason: str,
): ):
"""Indicate a share has been corrupted, with a human-readable message.""" """Indicate a share has been corrupted, with a human-readable message."""
assert isinstance(reason, str) return advise_corrupt_share(
url = self._client.relative_url( self._client, "immutable", storage_index, share_number, reason
"/v1/immutable/{}/{}/corrupt".format(
_encode_si(storage_index), share_number
)
) )
message = {"reason": reason}
response = yield self._client.request("POST", url, message_to_serialize=message)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)
@frozen @frozen
@ -738,3 +750,14 @@ class StorageClientMutables:
return await _decode_cbor(response, _SCHEMAS["mutable_list_shares"]) return await _decode_cbor(response, _SCHEMAS["mutable_list_shares"])
else: else:
raise ClientException(response.code) raise ClientException(response.code)
def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
return advise_corrupt_share(
self._client, "mutable", storage_index, share_number, reason
)

View File

@ -538,8 +538,8 @@ class HTTPServer(object):
methods=["PUT"], methods=["PUT"],
) )
def add_or_renew_lease(self, request, authorization, storage_index): def add_or_renew_lease(self, request, authorization, storage_index):
"""Update the lease for an immutable share.""" """Update the lease for an immutable or mutable share."""
if not self._storage_server.get_buckets(storage_index): if not list(self._storage_server.get_shares(storage_index)):
raise _HTTPError(http.NOT_FOUND) raise _HTTPError(http.NOT_FOUND)
# Checking of the renewal secret is done by the backend. # Checking of the renewal secret is done by the backend.
@ -663,6 +663,28 @@ class HTTPServer(object):
shares = self._storage_server.enumerate_mutable_shares(storage_index) shares = self._storage_server.enumerate_mutable_shares(storage_index)
return self._send_encoded(request, shares) return self._send_encoded(request, shares)
@_authorized_route(
_app,
set(),
"/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share_mutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
# TODO unit test all the paths
if share_number not in {
shnum for (shnum, _) in self._storage_server.get_shares(storage_index)
}:
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
self._storage_server.advise_corrupt_share(
b"mutable", storage_index, share_number, info["reason"].encode("utf-8")
)
return b""
@implementer(IStreamServerEndpoint) @implementer(IStreamServerEndpoint)
@attr.s @attr.s

View File

@ -3,7 +3,7 @@ Ported to Python 3.
""" """
from __future__ import annotations from __future__ import annotations
from future.utils import bytes_to_native_str from future.utils import bytes_to_native_str
from typing import Dict, Tuple from typing import Dict, Tuple, Iterable
import os, re import os, re
@ -321,7 +321,7 @@ class StorageServer(service.MultiService):
# they asked about: this will save them a lot of work. Add or update # they asked about: this will save them a lot of work. Add or update
# leases for all of them: if they want us to hold shares for this # leases for all of them: if they want us to hold shares for this
# file, they'll want us to hold leases for this file. # file, they'll want us to hold leases for this file.
for (shnum, fn) in self._get_bucket_shares(storage_index): for (shnum, fn) in self.get_shares(storage_index):
alreadygot[shnum] = ShareFile(fn) alreadygot[shnum] = ShareFile(fn)
if renew_leases: if renew_leases:
self._add_or_renew_leases(alreadygot.values(), lease_info) self._add_or_renew_leases(alreadygot.values(), lease_info)
@ -363,7 +363,7 @@ class StorageServer(service.MultiService):
return set(alreadygot), bucketwriters return set(alreadygot), bucketwriters
def _iter_share_files(self, storage_index): def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index): for shnum, filename in self.get_shares(storage_index):
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
header = f.read(32) header = f.read(32)
if MutableShareFile.is_valid_header(header): if MutableShareFile.is_valid_header(header):
@ -416,10 +416,12 @@ class StorageServer(service.MultiService):
""" """
self._call_on_bucket_writer_close.append(handler) self._call_on_bucket_writer_close.append(handler)
def _get_bucket_shares(self, storage_index): def get_shares(self, storage_index) -> Iterable[tuple[int, str]]:
"""Return a list of (shnum, pathname) tuples for files that hold """
Return an iterable of (shnum, pathname) tuples for files that hold
shares for this storage_index. In each tuple, 'shnum' will always be shares for this storage_index. In each tuple, 'shnum' will always be
the integer form of the last component of 'pathname'.""" the integer form of the last component of 'pathname'.
"""
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index)) storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
try: try:
for f in os.listdir(storagedir): for f in os.listdir(storagedir):
@ -431,12 +433,15 @@ class StorageServer(service.MultiService):
pass pass
def get_buckets(self, storage_index): def get_buckets(self, storage_index):
"""
Get ``BucketReaders`` for an immutable.
"""
start = self._clock.seconds() start = self._clock.seconds()
self.count("get") self.count("get")
si_s = si_b2a(storage_index) si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %r" % si_s) log.msg("storage: get_buckets %r" % si_s)
bucketreaders = {} # k: sharenum, v: BucketReader bucketreaders = {} # k: sharenum, v: BucketReader
for shnum, filename in self._get_bucket_shares(storage_index): for shnum, filename in self.get_shares(storage_index):
bucketreaders[shnum] = BucketReader(self, filename, bucketreaders[shnum] = BucketReader(self, filename,
storage_index, shnum) storage_index, shnum)
self.add_latency("get", self._clock.seconds() - start) self.add_latency("get", self._clock.seconds() - start)
@ -453,7 +458,7 @@ class StorageServer(service.MultiService):
# since all shares get the same lease data, we just grab the leases # since all shares get the same lease data, we just grab the leases
# from the first share # from the first share
try: try:
shnum, filename = next(self._get_bucket_shares(storage_index)) shnum, filename = next(self.get_shares(storage_index))
sf = ShareFile(filename) sf = ShareFile(filename)
return sf.get_leases() return sf.get_leases()
except StopIteration: except StopIteration:
@ -467,7 +472,7 @@ class StorageServer(service.MultiService):
:return: An iterable of the leases attached to this slot. :return: An iterable of the leases attached to this slot.
""" """
for _, share_filename in self._get_bucket_shares(storage_index): for _, share_filename in self.get_shares(storage_index):
share = MutableShareFile(share_filename) share = MutableShareFile(share_filename)
return share.get_leases() return share.get_leases()
return [] return []
@ -742,7 +747,7 @@ class StorageServer(service.MultiService):
:return bool: ``True`` if a share with the given number exists at the :return bool: ``True`` if a share with the given number exists at the
given storage index, ``False`` otherwise. given storage index, ``False`` otherwise.
""" """
for existing_sharenum, ignored in self._get_bucket_shares(storage_index): for existing_sharenum, ignored in self.get_shares(storage_index):
if existing_sharenum == shnum: if existing_sharenum == shnum:
return True return True
return False return False

View File

@ -5,10 +5,6 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
Ported to Python 3. Ported to Python 3.
""" """
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
# roadmap: # roadmap:
# #
@ -34,14 +30,10 @@ from __future__ import unicode_literals
# #
# 6: implement other sorts of IStorageClient classes: S3, etc # 6: implement other sorts of IStorageClient classes: S3, etc
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six import ensure_text from six import ensure_text
from typing import Union
import re, time, hashlib import re, time, hashlib
from os import urandom from os import urandom
# On Python 2 this will be the backport.
from configparser import NoSectionError from configparser import NoSectionError
import attr import attr
@ -76,6 +68,7 @@ from allmydata.util.observer import ObserverList
from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import permute_server_hash from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.util.deferredutil import async_to_deferred
from allmydata.storage.http_client import ( from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral, StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException, StorageClientMutables, ClientException as HTTPClientException, StorageClientMutables,
@ -1166,16 +1159,23 @@ class _HTTPStorageServer(object):
for share_num in share_numbers for share_num in share_numbers
}) })
def add_lease( @async_to_deferred
async def add_lease(
self, self,
storage_index, storage_index,
renew_secret, renew_secret,
cancel_secret cancel_secret
): ):
immutable_client = StorageClientImmutables(self._http_client) client = StorageClientGeneral(self._http_client)
return immutable_client.add_or_renew_lease( try:
storage_index, renew_secret, cancel_secret await client.add_or_renew_lease(
) storage_index, renew_secret, cancel_secret
)
except ClientException as e:
if e.code == http.NOT_FOUND:
# Silently do nothing, as is the case for the Foolscap client
return
raise
def advise_corrupt_share( def advise_corrupt_share(
self, self,
@ -1185,12 +1185,14 @@ class _HTTPStorageServer(object):
reason: bytes reason: bytes
): ):
if share_type == b"immutable": if share_type == b"immutable":
imm_client = StorageClientImmutables(self._http_client) client : Union[StorageClientImmutables, StorageClientMutables] = StorageClientImmutables(self._http_client)
return imm_client.advise_corrupt_share( elif share_type == b"mutable":
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace") client = StorageClientMutables(self._http_client)
)
else: else:
raise NotImplementedError() # future tickets raise ValueError("Unknown share type")
return client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
)
@defer.inlineCallbacks @defer.inlineCallbacks
def slot_readv(self, storage_index, shares, readv): def slot_readv(self, storage_index, shares, readv):

View File

@ -459,6 +459,21 @@ class IStorageServerImmutableAPIsTestsMixin(object):
lease.get_expiration_time() - self.fake_time() > (31 * 24 * 60 * 60 - 10) lease.get_expiration_time() - self.fake_time() > (31 * 24 * 60 * 60 - 10)
) )
@inlineCallbacks
def test_add_lease_non_existent(self):
"""
If the storage index doesn't exist, adding the lease silently does nothing.
"""
storage_index = new_storage_index()
self.assertEqual(list(self.server.get_leases(storage_index)), [])
renew_secret = new_secret()
cancel_secret = new_secret()
# Add a lease:
yield self.storage_client.add_lease(storage_index, renew_secret, cancel_secret)
self.assertEqual(list(self.server.get_leases(storage_index)), [])
@inlineCallbacks @inlineCallbacks
def test_add_lease_renewal(self): def test_add_lease_renewal(self):
""" """
@ -857,7 +872,8 @@ class IStorageServerMutableAPIsTestsMixin(object):
@inlineCallbacks @inlineCallbacks
def test_slot_readv_unknown_storage_index(self): def test_slot_readv_unknown_storage_index(self):
""" """
With unknown storage index, ``IStorageServer.slot_readv()`` TODO. With unknown storage index, ``IStorageServer.slot_readv()`` returns
empty dict.
""" """
storage_index = new_storage_index() storage_index = new_storage_index()
reads = yield self.storage_client.slot_readv( reads = yield self.storage_client.slot_readv(
@ -1163,10 +1179,3 @@ class HTTPMutableAPIsTests(
_HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase _HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
): ):
"""HTTP-specific tests for mutable ``IStorageServer`` APIs.""" """HTTP-specific tests for mutable ``IStorageServer`` APIs."""
# TODO will be implemented in later tickets
SKIP_TESTS = {
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
}

View File

@ -717,7 +717,7 @@ class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
ss = self.g.servers_by_number[0] ss = self.g.servers_by_number[0]
# we want to delete the share corresponding to the server # we want to delete the share corresponding to the server
# we're making not-respond # we're making not-respond
share = next(ss._get_bucket_shares(self.c0_filenode.get_storage_index()))[0] share = next(ss.get_shares(self.c0_filenode.get_storage_index()))[0]
self.delete_shares_numbered(self.uri, [share]) self.delete_shares_numbered(self.uri, [share])
return self.c0_filenode.check_and_repair(Monitor()) return self.c0_filenode.check_and_repair(Monitor())
d.addCallback(_then) d.addCallback(_then)

View File

@ -766,7 +766,7 @@ class Server(unittest.TestCase):
writer.close() writer.close()
# It should have a lease granted at the current time. # It should have a lease granted at the current time.
shares = dict(ss._get_bucket_shares(storage_index)) shares = dict(ss.get_shares(storage_index))
self.assertEqual( self.assertEqual(
[first_lease], [first_lease],
list( list(
@ -789,7 +789,7 @@ class Server(unittest.TestCase):
writer.close() writer.close()
# The first share's lease expiration time is unchanged. # The first share's lease expiration time is unchanged.
shares = dict(ss._get_bucket_shares(storage_index)) shares = dict(ss.get_shares(storage_index))
self.assertEqual( self.assertEqual(
[first_lease], [first_lease],
list( list(

View File

@ -448,6 +448,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
super(ImmutableHTTPAPITests, self).setUp() super(ImmutableHTTPAPITests, self).setUp()
self.http = self.useFixture(HttpTestFixture()) self.http = self.useFixture(HttpTestFixture())
self.imm_client = StorageClientImmutables(self.http.client) self.imm_client = StorageClientImmutables(self.http.client)
self.general_client = StorageClientGeneral(self.http.client)
def create_upload(self, share_numbers, length): def create_upload(self, share_numbers, length):
""" """
@ -1081,7 +1082,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# We renew the lease: # We renew the lease:
result_of( result_of(
self.imm_client.add_or_renew_lease( self.general_client.add_or_renew_lease(
storage_index, lease_secret, lease_secret storage_index, lease_secret, lease_secret
) )
) )
@ -1092,7 +1093,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# We create a new lease: # We create a new lease:
lease_secret2 = urandom(32) lease_secret2 = urandom(32)
result_of( result_of(
self.imm_client.add_or_renew_lease( self.general_client.add_or_renew_lease(
storage_index, lease_secret2, lease_secret2 storage_index, lease_secret2, lease_secret2
) )
) )
@ -1108,7 +1109,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
storage_index = urandom(16) storage_index = urandom(16)
secret = b"A" * 32 secret = b"A" * 32
with assert_fails_with_http_code(self, http.NOT_FOUND): with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(self.imm_client.add_or_renew_lease(storage_index, secret, secret)) result_of(self.general_client.add_or_renew_lease(storage_index, secret, secret))
def test_advise_corrupt_share(self): def test_advise_corrupt_share(self):
""" """