Closer to running end-to-end mutable tests.

This commit is contained in:
Itamar Turner-Trauring 2022-04-19 13:18:31 -04:00
parent 2ca5e22af9
commit 898fe0bc0e
4 changed files with 122 additions and 51 deletions

View File

@ -364,6 +364,46 @@ class UploadProgress(object):
required: RangeMap
@inlineCallbacks
def read_share_chunk(
client: StorageClient,
share_type: str,
storage_index: bytes,
share_number: int,
offset: int,
length: int,
) -> Deferred[bytes]:
"""
Download a chunk of data from a share.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
downloads should be transparently retried and redownloaded by the
implementation a few times so that if a failure percolates up, the
caller can assume the failure isn't a short-term blip.
NOTE: the underlying HTTP protocol is much more flexible than this API,
so a future refactor may expand this in order to simplify the calling
code and perhaps download data more efficiently. But then again maybe
the HTTP protocol will be simplified, see
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
"""
url = client.relative_url(
"/v1/{}/{}/{}".format(share_type, _encode_si(storage_index), share_number)
)
response = yield client.request(
"GET",
url,
headers=Headers(
{"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
),
)
if response.code == http.PARTIAL_CONTENT:
body = yield response.content()
returnValue(body)
else:
raise ClientException(response.code)
@define
class StorageClientImmutables(object):
"""
@ -484,39 +524,15 @@ class StorageClientImmutables(object):
remaining.set(True, chunk["begin"], chunk["end"])
returnValue(UploadProgress(finished=finished, required=remaining))
@inlineCallbacks
def read_share_chunk(
self, storage_index, share_number, offset, length
): # type: (bytes, int, int, int) -> Deferred[bytes]
"""
Download a chunk of data from a share.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
downloads should be transparently retried and redownloaded by the
implementation a few times so that if a failure percolates up, the
caller can assume the failure isn't a short-term blip.
NOTE: the underlying HTTP protocol is much more flexible than this API,
so a future refactor may expand this in order to simplify the calling
code and perhaps download data more efficiently. But then again maybe
the HTTP protocol will be simplified, see
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
"""
url = self._client.relative_url(
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
return read_share_chunk(
self._client, "immutable", storage_index, share_number, offset, length
)
response = yield self._client.request(
"GET",
url,
headers=Headers(
{"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
),
)
if response.code == http.PARTIAL_CONTENT:
body = yield response.content()
returnValue(body)
else:
raise ClientException(response.code)
@inlineCallbacks
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]]
@ -610,7 +626,7 @@ class TestVector:
offset: int
size: int
operator: TestVectorOperator = field(default=TestVectorOperator.EQ)
operator: TestVectorOperator
specimen: bytes
@ -632,6 +648,14 @@ class TestWriteVectors:
write_vectors: list[WriteVector]
new_length: Optional[int] = field(default=None)
def asdict(self) -> dict:
"""Return dictionary suitable for sending over CBOR."""
d = asdict(self)
d["test"] = d.pop("test_vectors")
d["write"] = d.pop("write_vectors")
d["new-length"] = d.pop("new_length")
return d
@define
class ReadTestWriteResult:
@ -676,12 +700,12 @@ class StorageClientMutables:
)
message = {
"test-write-vectors": {
share_number: asdict(twv)
share_number: twv.asdict()
for (share_number, twv) in testwrite_vectors.items()
},
"read-vector": [asdict(r) for r in read_vector],
}
response = yield self._client.request(
response = await self._client.request(
"POST",
url,
write_enabler_secret=write_enabler_secret,
@ -692,31 +716,20 @@ class StorageClientMutables:
if response.code == http.OK:
return _decode_cbor(response, _SCHEMAS["mutable_test_read_write"])
else:
raise ClientException(
response.code,
)
raise ClientException(response.code, (await response.content()))
@async_to_deferred
async def read_share_chunk(
self,
storage_index: bytes,
share_number: int,
# TODO is this really optional?
# TODO if yes, test non-optional variants
offset: Optional[int],
length: Optional[int],
offset: int,
length: int,
) -> bytes:
"""
Download a chunk of data from a share.
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
downloads should be transparently retried and redownloaded by the
implementation a few times so that if a failure percolates up, the
caller can assume the failure isn't a short-term blip.
NOTE: the underlying HTTP protocol is much more flexible than this API,
so a future refactor may expand this in order to simplify the calling
code and perhaps download data more efficiently. But then again maybe
the HTTP protocol will be simplified, see
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
"""
# TODO unit test all the things
return read_share_chunk(
self._client, "mutable", storage_index, share_number, offset, length
)

View File

@ -261,7 +261,7 @@ _SCHEMAS = {
* share_number: {
"test": [* {"offset": uint, "size": uint, "specimen": bstr}]
"write": [* {"offset": uint, "data": bstr}]
"new-length": (uint // null)
"new-length": uint // null
}
}
"read-vector": [* {"offset": uint, "size": uint}]
@ -590,7 +590,7 @@ class HTTPServer(object):
storage_index,
secrets,
rtw_request["test-write-vectors"],
rtw_request["read-vectors"],
rtw_request["read-vector"],
)
return self._send_encoded(request, {"success": success, "data": read_data})

View File

@ -77,7 +77,8 @@ from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException
ClientException as HTTPClientException, StorageClientMutables,
ReadVector, TestWriteVectors, WriteVector, TestVector, TestVectorOperator
)
@ -1189,3 +1190,54 @@ class _HTTPStorageServer(object):
)
else:
raise NotImplementedError() # future tickets
@defer.inlineCallbacks
def slot_readv(self, storage_index, shares, readv):
mutable_client = StorageClientMutables(self._http_client)
reads = {}
for share_number in shares:
share_reads = reads[share_number] = []
for (offset, length) in readv:
d = mutable_client.read_share_chunk(
storage_index, share_number, offset, length
)
share_reads.append(d)
result = {
share_number: [(yield d) for d in share_reads]
for (share_number, reads) in reads.items()
}
defer.returnValue(result)
@defer.inlineCallbacks
def slot_testv_and_readv_and_writev(
self,
storage_index,
secrets,
tw_vectors,
r_vector,
):
mutable_client = StorageClientMutables(self._http_client)
we_secret, lr_secret, lc_secret = secrets
client_tw_vectors = {}
for share_num, (test_vector, data_vector, new_length) in tw_vectors.items():
client_test_vectors = [
TestVector(offset=offset, size=size, operator=TestVectorOperator[op], specimen=specimen)
for (offset, size, op, specimen) in test_vector
]
client_write_vectors = [
WriteVector(offset=offset, data=data) for (offset, data) in data_vector
]
client_tw_vectors[share_num] = TestWriteVectors(
test_vectors=client_test_vectors,
write_vectors=client_write_vectors,
new_length=new_length
)
client_read_vectors = [
ReadVector(offset=offset, size=size)
for (offset, size) in r_vector
]
client_result = yield mutable_client.read_test_write_chunks(
storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors,
client_read_vectors,
)
defer.returnValue((client_result.success, client_result.reads))

View File

@ -1140,4 +1140,10 @@ class HTTPImmutableAPIsTests(
class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
"""Foolscap-specific tests for mutable ``IStorageServer`` APIs."""
class HTTPMutableAPIsTests(
_HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for mutable ``IStorageServer`` APIs."""