Merge pull request #1194 from tahoe-lafs/3891-mutable-http-protocol-part-2

Mutable storage HTTP protocol, part 2

Fixes ticket:3891
This commit is contained in:
Itamar Turner-Trauring 2022-05-18 17:06:30 -04:00 committed by GitHub
commit 8da7f95579
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 137 additions and 47 deletions

View File

@ -350,8 +350,10 @@ Because of the simple types used throughout
and the equivalence described in `RFC 7049`_
these examples should be representative regardless of which of these two encodings is chosen.
The one exception is sets.
For CBOR messages, any sequence that is semantically a set (i.e. no repeated values allowed, order doesn't matter, and elements are hashable in Python) should be sent as a set.
Tag 6.258 is used to indicate sets in CBOR; see `the CBOR registry <https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml>`_ for more details.
Sets will be represented as JSON lists in examples because JSON doesn't support sets.
HTTP Design
~~~~~~~~~~~
@ -738,8 +740,8 @@ Reading
``GET /v1/mutable/:storage_index/shares``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Retrieve a list indicating all shares available for the indicated storage index.
For example::
Retrieve a set indicating all shares available for the indicated storage index.
For example (this is shown as list, since it will be list for JSON, but will be set for CBOR)::
[1, 5]

0
newsfragments/3891.minor Normal file
View File

View File

@ -114,7 +114,7 @@ install_requires = [
"attrs >= 18.2.0",
# WebSocket library for twisted and asyncio
"autobahn < 22.4.1", # remove this when https://github.com/crossbario/autobahn-python/issues/1566 is fixed
"autobahn < 22.4.1", # remove this when 22.4.3 is released
# Support for Python 3 transition
"future >= 0.18.2",

View File

@ -106,6 +106,11 @@ _SCHEMAS = {
share_number = uint
"""
),
"mutable_list_shares": Schema(
"""
response = #6.258([* uint])
"""
),
}
@ -375,16 +380,14 @@ def read_share_chunk(
"""
Download a chunk of data from a share.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
downloads should be transparently retried and redownloaded by the
implementation a few times so that if a failure percolates up, the
caller can assume the failure isn't a short-term blip.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed downloads
should be transparently retried and redownloaded by the implementation a
few times so that if a failure percolates up, the caller can assume the
failure isn't a short-term blip.
NOTE: the underlying HTTP protocol is much more flexible than this API,
so a future refactor may expand this in order to simplify the calling
code and perhaps download data more efficiently. But then again maybe
the HTTP protocol will be simplified, see
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
NOTE: the underlying HTTP protocol is somewhat more flexible than this API,
insofar as it doesn't always require a range. In practice a range is
always provided by the current callers.
"""
url = client.relative_url(
"/v1/{}/{}/{}".format(share_type, _encode_si(storage_index), share_number)
@ -712,7 +715,7 @@ class StorageClientMutables:
share_number: int,
offset: int,
length: int,
) -> bytes:
) -> Deferred[bytes]:
"""
Download a chunk of data from a share.
"""
@ -720,3 +723,18 @@ class StorageClientMutables:
return read_share_chunk(
self._client, "mutable", storage_index, share_number, offset, length
)
@async_to_deferred
async def list_shares(self, storage_index: bytes) -> set[int]:
"""
List the share numbers for a given storage index.
"""
# TODO unit test all the things
url = self._client.relative_url(
"/v1/mutable/{}/shares".format(_encode_si(storage_index))
)
response = await self._client.request("GET", url)
if response.code == http.OK:
return await _decode_cbor(response, _SCHEMAS["mutable_list_shares"])
else:
raise ClientException(response.code)

View File

@ -46,6 +46,7 @@ from .common import si_a2b
from .immutable import BucketWriter, ConflictingWriteError
from ..util.hashutil import timing_safe_compare
from ..util.base32 import rfc3548_alphabet
from allmydata.interfaces import BadWriteEnablerError
class ClientSecretsException(Exception):
@ -587,19 +588,25 @@ class HTTPServer(object):
authorization[Secrets.LEASE_RENEW],
authorization[Secrets.LEASE_CANCEL],
)
success, read_data = self._storage_server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
{
k: (
[(d["offset"], d["size"], b"eq", d["specimen"]) for d in v["test"]],
[(d["offset"], d["data"]) for d in v["write"]],
v["new-length"],
)
for (k, v) in rtw_request["test-write-vectors"].items()
},
[(d["offset"], d["size"]) for d in rtw_request["read-vector"]],
)
try:
success, read_data = self._storage_server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
{
k: (
[
(d["offset"], d["size"], b"eq", d["specimen"])
for d in v["test"]
],
[(d["offset"], d["data"]) for d in v["write"]],
v["new-length"],
)
for (k, v) in rtw_request["test-write-vectors"].items()
},
[(d["offset"], d["size"]) for d in rtw_request["read-vector"]],
)
except BadWriteEnablerError:
raise _HTTPError(http.UNAUTHORIZED)
return self._send_encoded(request, {"success": success, "data": read_data})
@_authorized_route(
@ -645,6 +652,17 @@ class HTTPServer(object):
)
return data
@_authorized_route(
_app,
set(),
"/v1/mutable/<storage_index:storage_index>/shares",
methods=["GET"],
)
def enumerate_mutable_shares(self, request, authorization, storage_index):
"""List mutable shares for a storage index."""
shares = self._storage_server.enumerate_mutable_shares(storage_index)
return self._send_encoded(request, shares)
@implementer(IStreamServerEndpoint)
@attr.s

View File

@ -1,18 +1,9 @@
"""
Ported to Python 3.
"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import bytes_to_native_str, PY2
if PY2:
# Omit open() to get native behavior where open("w") always accepts native
# strings. Omit bytes so we don't leak future's custom bytes.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401
else:
from typing import Dict, Tuple
from __future__ import annotations
from future.utils import bytes_to_native_str
from typing import Dict, Tuple
import os, re
@ -699,6 +690,21 @@ class StorageServer(service.MultiService):
self)
return share
def enumerate_mutable_shares(self, storage_index: bytes) -> set[int]:
"""Return all share numbers for the given mutable."""
si_dir = storage_index_to_dir(storage_index)
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
if not os.path.isdir(bucketdir):
return set()
result = set()
for sharenum_s in os.listdir(bucketdir):
try:
result.add(int(sharenum_s))
except ValueError:
continue
return result
def slot_readv(self, storage_index, shares, readv):
start = self._clock.seconds()
self.count("readv")

View File

@ -50,6 +50,7 @@ from zope.interface import (
Interface,
implementer,
)
from twisted.web import http
from twisted.internet import defer
from twisted.application import service
from twisted.plugin import (
@ -78,7 +79,7 @@ from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException, StorageClientMutables,
ReadVector, TestWriteVectors, WriteVector, TestVector
ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException
)
@ -1196,9 +1197,10 @@ class _HTTPStorageServer(object):
mutable_client = StorageClientMutables(self._http_client)
pending_reads = {}
reads = {}
# TODO if shares list is empty, that means list all shares, so we need
# If shares list is empty, that means list all shares, so we need
# to do a query to get that.
assert shares # TODO replace with call to list shares if and only if it's empty
if not shares:
shares = yield mutable_client.list_shares(storage_index)
# Start all the queries in parallel:
for share_number in shares:
@ -1246,8 +1248,13 @@ class _HTTPStorageServer(object):
ReadVector(offset=offset, size=size)
for (offset, size) in r_vector
]
client_result = yield mutable_client.read_test_write_chunks(
storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors,
client_read_vectors,
)
try:
client_result = yield mutable_client.read_test_write_chunks(
storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors,
client_read_vectors,
)
except ClientException as e:
if e.code == http.UNAUTHORIZED:
raise RemoteException("Unauthorized write, possibly you passed the wrong write enabler?")
raise
return (client_result.success, client_result.reads)

View File

@ -854,6 +854,22 @@ class IStorageServerMutableAPIsTestsMixin(object):
{0: [b"abcdefg"], 1: [b"0123456"], 2: [b"9876543"]},
)
@inlineCallbacks
def test_slot_readv_unknown_storage_index(self):
"""
With unknown storage index, ``IStorageServer.slot_readv()`` TODO.
"""
storage_index = new_storage_index()
reads = yield self.storage_client.slot_readv(
storage_index,
shares=[],
readv=[(0, 7)],
)
self.assertEqual(
reads,
{},
)
@inlineCallbacks
def create_slot(self):
"""Create a slot with sharenum 0."""
@ -1150,9 +1166,7 @@ class HTTPMutableAPIsTests(
# TODO will be implemented in later tickets
SKIP_TESTS = {
"test_STARAW_write_enabler_must_match",
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
"test_slot_readv_no_shares",
}

View File

@ -1315,6 +1315,31 @@ class MutableServer(unittest.TestCase):
self.failUnless(isinstance(readv_data, dict))
self.failUnlessEqual(len(readv_data), 0)
def test_enumerate_mutable_shares(self):
"""
``StorageServer.enumerate_mutable_shares()`` returns a set of share
numbers for the given storage index, or an empty set if it does not
exist at all.
"""
ss = self.create("test_enumerate_mutable_shares")
# Initially, nothing exists:
empty = ss.enumerate_mutable_shares(b"si1")
self.allocate(ss, b"si1", b"we1", b"le1", [0, 1, 4, 2], 12)
shares0_1_2_4 = ss.enumerate_mutable_shares(b"si1")
# Remove share 2, by setting size to 0:
secrets = (self.write_enabler(b"we1"),
self.renew_secret(b"le1"),
self.cancel_secret(b"le1"))
ss.slot_testv_and_readv_and_writev(b"si1", secrets, {2: ([], [], 0)}, [])
shares0_1_4 = ss.enumerate_mutable_shares(b"si1")
self.assertEqual(
(empty, shares0_1_2_4, shares0_1_4),
(set(), {0, 1, 2, 4}, {0, 1, 4})
)
def test_bad_magic(self):
ss = self.create("test_bad_magic")
self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0]), 10)