Merge branch '3890-mutable-http-api' into 3880-http-storage-logging

This commit is contained in:
Itamar Turner-Trauring 2022-04-29 10:18:55 -04:00
commit 48831792cf
11 changed files with 433 additions and 112 deletions

View File

@ -48,8 +48,6 @@ workflows:
{}
- "pyinstaller":
{}
- "deprecations":
{}
- "c-locale":
{}
# Any locale other than C or UTF-8.
@ -297,20 +295,6 @@ jobs:
# aka "Latin 1"
LANG: "en_US.ISO-8859-1"
deprecations:
<<: *DEBIAN
environment:
<<: *UTF_8_ENVIRONMENT
# Select the deprecations tox environments.
TAHOE_LAFS_TOX_ENVIRONMENT: "deprecations,upcoming-deprecations"
# Put the logs somewhere we can report them.
TAHOE_LAFS_WARNINGS_LOG: "/tmp/artifacts/deprecation-warnings.log"
# The deprecations tox environments don't do coverage measurement.
UPLOAD_COVERAGE: ""
integration:
<<: *DEBIAN

View File

@ -743,11 +743,15 @@ For example::
[1, 5]
``GET /v1/mutable/:storage_index?share=:s0&share=:sN&offset=:o1&size=:z0&offset=:oN&size=:zN``
``GET /v1/mutable/:storage_index/:share_number``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Read data from the indicated mutable shares.
Just like ``GET /v1/mutable/:storage_index``.
Read data from the indicated mutable shares, just like ``GET /v1/immutable/:storage_index``
The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content).
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported.
``POST /v1/mutable/:storage_index/:share_number/corrupt``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

0
newsfragments/3890.minor Normal file
View File

View File

@ -4,11 +4,10 @@ HTTP client that talks to the HTTP storage server.
from __future__ import annotations
from typing import Union, Set, Optional
from typing import Union, Optional, Sequence, Mapping
from base64 import b64encode
import attr
from attrs import define, asdict, frozen
# TODO Make sure to import Python version?
from cbor2 import loads, dumps
@ -39,6 +38,7 @@ from .http_common import (
)
from .common import si_b2a
from ..util.hashutil import timing_safe_compare
from ..util.deferredutil import async_to_deferred
_OPENSSL = Binding().lib
@ -64,7 +64,7 @@ class ClientException(Exception):
_SCHEMAS = {
"get_version": Schema(
"""
message = {'http://allmydata.org/tahoe/protocols/storage/v1' => {
response = {'http://allmydata.org/tahoe/protocols/storage/v1' => {
'maximum-immutable-share-size' => uint
'maximum-mutable-share-size' => uint
'available-space' => uint
@ -79,7 +79,7 @@ _SCHEMAS = {
),
"allocate_buckets": Schema(
"""
message = {
response = {
already-have: #6.258([* uint])
allocated: #6.258([* uint])
}
@ -87,16 +87,25 @@ _SCHEMAS = {
),
"immutable_write_share_chunk": Schema(
"""
message = {
response = {
required: [* {begin: uint, end: uint}]
}
"""
),
"list_shares": Schema(
"""
message = #6.258([* uint])
response = #6.258([* uint])
"""
),
"mutable_read_test_write": Schema(
"""
response = {
"success": bool,
"data": {* share_number: [* bstr]}
}
share_number = uint
"""
),
}
@ -121,12 +130,12 @@ def _decode_cbor(response, schema: Schema):
)
@attr.s
@define
class ImmutableCreateResult(object):
"""Result of creating a storage index for an immutable."""
already_have = attr.ib(type=Set[int])
allocated = attr.ib(type=Set[int])
already_have: set[int]
allocated: set[int]
class _TLSContextFactory(CertificateOptions):
@ -200,14 +209,14 @@ class _TLSContextFactory(CertificateOptions):
@implementer(IPolicyForHTTPS)
@implementer(IOpenSSLClientConnectionCreator)
@attr.s
@define
class _StorageClientHTTPSPolicy:
"""
A HTTPS policy that ensures the SPKI hash of the public key matches a known
hash, i.e. pinning-based validation.
"""
expected_spki_hash = attr.ib(type=bytes)
expected_spki_hash: bytes
# IPolicyForHTTPS
def creatorForNetloc(self, hostname, port):
@ -220,24 +229,22 @@ class _StorageClientHTTPSPolicy:
)
@define
class StorageClient(object):
"""
Low-level HTTP client that talks to the HTTP storage server.
"""
def __init__(
self, url, swissnum, treq=treq
): # type: (DecodedURL, bytes, Union[treq,StubTreq,HTTPClient]) -> None
"""
The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
``StorageClient.from_nurl()``.
"""
self._base_url = url
self._swissnum = swissnum
self._treq = treq
# The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
# ``StorageClient.from_nurl()``.
_base_url: DecodedURL
_swissnum: bytes
_treq: Union[treq, StubTreq, HTTPClient]
@classmethod
def from_nurl(cls, nurl: DecodedURL, reactor, persistent: bool = True) -> StorageClient:
def from_nurl(
cls, nurl: DecodedURL, reactor, persistent: bool = True
) -> StorageClient:
"""
Create a ``StorageClient`` for the given NURL.
@ -280,6 +287,7 @@ class StorageClient(object):
lease_renew_secret=None,
lease_cancel_secret=None,
upload_secret=None,
write_enabler_secret=None,
headers=None,
message_to_serialize=None,
**kwargs
@ -298,6 +306,7 @@ class StorageClient(object):
(Secrets.LEASE_RENEW, lease_renew_secret),
(Secrets.LEASE_CANCEL, lease_cancel_secret),
(Secrets.UPLOAD, upload_secret),
(Secrets.WRITE_ENABLER, write_enabler_secret),
]:
if value is None:
continue
@ -342,25 +351,65 @@ class StorageClientGeneral(object):
returnValue(decoded_response)
@attr.s
@define
class UploadProgress(object):
"""
Progress of immutable upload, per the server.
"""
# True when upload has finished.
finished = attr.ib(type=bool)
finished: bool
# Remaining ranges to upload.
required = attr.ib(type=RangeMap)
required: RangeMap
@inlineCallbacks
def read_share_chunk(
client: StorageClient,
share_type: str,
storage_index: bytes,
share_number: int,
offset: int,
length: int,
) -> Deferred[bytes]:
"""
Download a chunk of data from a share.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
downloads should be transparently retried and redownloaded by the
implementation a few times so that if a failure percolates up, the
caller can assume the failure isn't a short-term blip.
NOTE: the underlying HTTP protocol is much more flexible than this API,
so a future refactor may expand this in order to simplify the calling
code and perhaps download data more efficiently. But then again maybe
the HTTP protocol will be simplified, see
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
"""
url = client.relative_url(
"/v1/{}/{}/{}".format(share_type, _encode_si(storage_index), share_number)
)
response = yield client.request(
"GET",
url,
headers=Headers(
{"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
),
)
if response.code == http.PARTIAL_CONTENT:
body = yield response.content()
returnValue(body)
else:
raise ClientException(response.code)
@define
class StorageClientImmutables(object):
"""
APIs for interacting with immutables.
"""
def __init__(self, client: StorageClient):
self._client = client
_client: StorageClient
@inlineCallbacks
def create(
@ -371,7 +420,7 @@ class StorageClientImmutables(object):
upload_secret,
lease_renew_secret,
lease_cancel_secret,
): # type: (bytes, Set[int], int, bytes, bytes, bytes) -> Deferred[ImmutableCreateResult]
): # type: (bytes, set[int], int, bytes, bytes, bytes) -> Deferred[ImmutableCreateResult]
"""
Create a new storage index for an immutable.
@ -474,42 +523,18 @@ class StorageClientImmutables(object):
remaining.set(True, chunk["begin"], chunk["end"])
returnValue(UploadProgress(finished=finished, required=remaining))
@inlineCallbacks
def read_share_chunk(
self, storage_index, share_number, offset, length
): # type: (bytes, int, int, int) -> Deferred[bytes]
"""
Download a chunk of data from a share.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
downloads should be transparently retried and redownloaded by the
implementation a few times so that if a failure percolates up, the
caller can assume the failure isn't a short-term blip.
NOTE: the underlying HTTP protocol is much more flexible than this API,
so a future refactor may expand this in order to simplify the calling
code and perhaps download data more efficiently. But then again maybe
the HTTP protocol will be simplified, see
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
"""
url = self._client.relative_url(
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
return read_share_chunk(
self._client, "immutable", storage_index, share_number, offset, length
)
response = yield self._client.request(
"GET",
url,
headers=Headers(
{"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
),
)
if response.code == http.PARTIAL_CONTENT:
body = yield response.content()
returnValue(body)
else:
raise ClientException(response.code)
@inlineCallbacks
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]]
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[set[int]]
"""
Return the set of shares for a given storage index.
"""
@ -573,3 +598,125 @@ class StorageClientImmutables(object):
raise ClientException(
response.code,
)
@frozen
class WriteVector:
"""Data to write to a chunk."""
offset: int
data: bytes
@frozen
class TestVector:
"""Checks to make on a chunk before writing to it."""
offset: int
size: int
specimen: bytes
@frozen
class ReadVector:
"""
Reads to do on chunks, as part of a read/test/write operation.
"""
offset: int
size: int
@frozen
class TestWriteVectors:
"""Test and write vectors for a specific share."""
test_vectors: Sequence[TestVector]
write_vectors: Sequence[WriteVector]
new_length: Optional[int] = None
def asdict(self) -> dict:
"""Return dictionary suitable for sending over CBOR."""
d = asdict(self)
d["test"] = d.pop("test_vectors")
d["write"] = d.pop("write_vectors")
d["new-length"] = d.pop("new_length")
return d
@frozen
class ReadTestWriteResult:
"""Result of sending read-test-write vectors."""
success: bool
# Map share numbers to reads corresponding to the request's list of
# ReadVectors:
reads: Mapping[int, Sequence[bytes]]
@frozen
class StorageClientMutables:
"""
APIs for interacting with mutables.
"""
_client: StorageClient
@async_to_deferred
async def read_test_write_chunks(
self,
storage_index: bytes,
write_enabler_secret: bytes,
lease_renew_secret: bytes,
lease_cancel_secret: bytes,
testwrite_vectors: dict[int, TestWriteVectors],
read_vector: list[ReadVector],
) -> ReadTestWriteResult:
"""
Read, test, and possibly write chunks to a particular mutable storage
index.
Reads are done before writes.
Given a mapping between share numbers and test/write vectors, the tests
are done and if they are valid the writes are done.
"""
# TODO unit test all the things
url = self._client.relative_url(
"/v1/mutable/{}/read-test-write".format(_encode_si(storage_index))
)
message = {
"test-write-vectors": {
share_number: twv.asdict()
for (share_number, twv) in testwrite_vectors.items()
},
"read-vector": [asdict(r) for r in read_vector],
}
response = await self._client.request(
"POST",
url,
write_enabler_secret=write_enabler_secret,
lease_renew_secret=lease_renew_secret,
lease_cancel_secret=lease_cancel_secret,
message_to_serialize=message,
)
if response.code == http.OK:
result = await _decode_cbor(response, _SCHEMAS["mutable_read_test_write"])
return ReadTestWriteResult(success=result["success"], reads=result["data"])
else:
raise ClientException(response.code, (await response.content()))
def read_share_chunk(
self,
storage_index: bytes,
share_number: int,
offset: int,
length: int,
) -> bytes:
"""
Download a chunk of data from a share.
"""
# TODO unit test all the things
return read_share_chunk(
self._client, "mutable", storage_index, share_number, offset, length
)

View File

@ -38,6 +38,7 @@ class Secrets(Enum):
LEASE_RENEW = "lease-renew-secret"
LEASE_CANCEL = "lease-cancel-secret"
UPLOAD = "upload-secret"
WRITE_ENABLER = "write-enabler"
def get_spki_hash(certificate: Certificate) -> bytes:

View File

@ -258,7 +258,7 @@ class _HTTPError(Exception):
_SCHEMAS = {
"allocate_buckets": Schema(
"""
message = {
request = {
share-numbers: #6.258([* uint])
allocated-size: uint
}
@ -266,11 +266,26 @@ _SCHEMAS = {
),
"advise_corrupt_share": Schema(
"""
message = {
request = {
reason: tstr
}
"""
),
"mutable_read_test_write": Schema(
"""
request = {
"test-write-vectors": {
* share_number: {
"test": [* {"offset": uint, "size": uint, "specimen": bstr}]
"write": [* {"offset": uint, "data": bstr}]
"new-length": uint // null
}
}
"read-vector": [* {"offset": uint, "size": uint}]
}
share_number = uint
"""
),
}
@ -559,7 +574,9 @@ class HTTPServer(object):
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share(self, request, authorization, storage_index, share_number):
def advise_corrupt_share_immutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
try:
bucket = self._storage_server.get_buckets(storage_index)[share_number]
@ -570,6 +587,81 @@ class HTTPServer(object):
bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
return b""
##### Mutable APIs #####
@_authorized_route(
_app,
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.WRITE_ENABLER},
"/v1/mutable/<storage_index:storage_index>/read-test-write",
methods=["POST"],
)
def mutable_read_test_write(self, request, authorization, storage_index):
"""Read/test/write combined operation for mutables."""
# TODO unit tests
rtw_request = self._read_encoded(request, _SCHEMAS["mutable_read_test_write"])
secrets = (
authorization[Secrets.WRITE_ENABLER],
authorization[Secrets.LEASE_RENEW],
authorization[Secrets.LEASE_CANCEL],
)
success, read_data = self._storage_server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
{
k: (
[(d["offset"], d["size"], b"eq", d["specimen"]) for d in v["test"]],
[(d["offset"], d["data"]) for d in v["write"]],
v["new-length"],
)
for (k, v) in rtw_request["test-write-vectors"].items()
},
[(d["offset"], d["size"]) for d in rtw_request["read-vector"]],
)
return self._send_encoded(request, {"success": success, "data": read_data})
@_authorized_route(
_app,
set(),
"/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>",
methods=["GET"],
)
def read_mutable_chunk(self, request, authorization, storage_index, share_number):
"""Read a chunk from a mutable."""
if request.getHeader("range") is None:
# TODO in follow-up ticket
raise NotImplementedError()
# TODO reduce duplication with immutable reads?
# TODO unit tests, perhaps shared if possible
range_header = parse_range_header(request.getHeader("range"))
if (
range_header is None
or range_header.units != "bytes"
or len(range_header.ranges) > 1 # more than one range
or range_header.ranges[0][1] is None # range without end
):
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
return b""
offset, end = range_header.ranges[0]
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = self._storage_server.slot_readv(
storage_index, [share_number], [(offset, end - offset)]
)[share_number][0]
# TODO reduce duplication?
request.setResponseCode(http.PARTIAL_CONTENT)
if len(data):
# For empty bodies the content-range header makes no sense since
# the end of the range is inclusive.
request.setHeader(
"content-range",
ContentRange("bytes", offset, offset + len(data)).to_header(),
)
return data
@implementer(IStreamServerEndpoint)
@attr.s

View File

@ -77,7 +77,8 @@ from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException
ClientException as HTTPClientException, StorageClientMutables,
ReadVector, TestWriteVectors, WriteVector, TestVector
)
@ -1189,3 +1190,64 @@ class _HTTPStorageServer(object):
)
else:
raise NotImplementedError() # future tickets
@defer.inlineCallbacks
def slot_readv(self, storage_index, shares, readv):
mutable_client = StorageClientMutables(self._http_client)
pending_reads = {}
reads = {}
# TODO if shares list is empty, that means list all shares, so we need
# to do a query to get that.
assert shares # TODO replace with call to list shares if and only if it's empty
# Start all the queries in parallel:
for share_number in shares:
share_reads = defer.gatherResults(
[
mutable_client.read_share_chunk(
storage_index, share_number, offset, length
)
for (offset, length) in readv
]
)
pending_reads[share_number] = share_reads
# Wait for all the queries to finish:
for share_number, pending_result in pending_reads.items():
reads[share_number] = yield pending_result
return reads
@defer.inlineCallbacks
def slot_testv_and_readv_and_writev(
self,
storage_index,
secrets,
tw_vectors,
r_vector,
):
mutable_client = StorageClientMutables(self._http_client)
we_secret, lr_secret, lc_secret = secrets
client_tw_vectors = {}
for share_num, (test_vector, data_vector, new_length) in tw_vectors.items():
client_test_vectors = [
TestVector(offset=offset, size=size, specimen=specimen)
for (offset, size, specimen) in test_vector
]
client_write_vectors = [
WriteVector(offset=offset, data=data) for (offset, data) in data_vector
]
client_tw_vectors[share_num] = TestWriteVectors(
test_vectors=client_test_vectors,
write_vectors=client_write_vectors,
new_length=new_length
)
client_read_vectors = [
ReadVector(offset=offset, size=size)
for (offset, size) in r_vector
]
client_result = yield mutable_client.read_test_write_chunks(
storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors,
client_read_vectors,
)
return (client_result.success, client_result.reads)

View File

@ -129,3 +129,31 @@ class UntilTests(unittest.TestCase):
self.assertEqual([1], counter)
r1.callback(None)
self.assertEqual([2], counter)
class AsyncToDeferred(unittest.TestCase):
"""Tests for ``deferredutil.async_to_deferred.``"""
def test_async_to_deferred_success(self):
"""
Normal results from a ``@async_to_deferred``-wrapped function get
turned into a ``Deferred`` with that value.
"""
@deferredutil.async_to_deferred
async def f(x, y):
return x + y
result = f(1, y=2)
self.assertEqual(self.successResultOf(result), 3)
def test_async_to_deferred_exception(self):
"""
Exceptions from a ``@async_to_deferred``-wrapped function get
turned into a ``Deferred`` with that value.
"""
@deferredutil.async_to_deferred
async def f(x, y):
return x/y
result = f(1, 0)
self.assertIsInstance(self.failureResultOf(result).value, ZeroDivisionError)

View File

@ -1140,4 +1140,19 @@ class HTTPImmutableAPIsTests(
class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
"""Foolscap-specific tests for mutable ``IStorageServer`` APIs."""
class HTTPMutableAPIsTests(
_HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for mutable ``IStorageServer`` APIs."""
# TODO will be implemented in later tickets
SKIP_TESTS = {
"test_STARAW_write_enabler_must_match",
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
"test_slot_readv_no_shares",
}

View File

@ -6,14 +6,12 @@ server authentication logic, which may one day apply outside of HTTP Storage
Protocol.
"""
from functools import wraps
from contextlib import asynccontextmanager
from cryptography import x509
from twisted.internet.endpoints import serverFromString
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import deferLater
from twisted.web.server import Site
from twisted.web.static import Data
@ -31,6 +29,7 @@ from .certs import (
from ..storage.http_common import get_spki_hash
from ..storage.http_client import _StorageClientHTTPSPolicy
from ..storage.http_server import _TLSEndpointWrapper
from ..util.deferredutil import async_to_deferred
class HTTPSNurlTests(SyncTestCase):
@ -73,20 +72,6 @@ ox5zO3LrQmQw11OaIAs2/kviKAoKTFFxeyYcpS5RuKNDZfHQCXlLwt9bySxG
self.assertEqual(get_spki_hash(certificate), expected_hash)
def async_to_deferred(f):
"""
Wrap an async function to return a Deferred instead.
Maybe solution to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3886
"""
@wraps(f)
def not_async(*args, **kwargs):
return Deferred.fromCoroutine(f(*args, **kwargs))
return not_async
class PinningHTTPSValidation(AsyncTestCase):
"""
Test client-side validation logic of HTTPS certificates that uses

View File

@ -4,24 +4,13 @@ Utilities for working with Twisted Deferreds.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import time
from functools import wraps
try:
from typing import (
Callable,
Any,
)
except ImportError:
pass
from typing import (
Callable,
Any,
)
from foolscap.api import eventually
from eliot.twisted import (
@ -231,3 +220,17 @@ def until(
yield action()
if condition():
break
def async_to_deferred(f):
"""
Wrap an async function to return a Deferred instead.
Maybe solution to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3886
"""
@wraps(f)
def not_async(*args, **kwargs):
return defer.Deferred.fromCoroutine(f(*args, **kwargs))
return not_async