From 898fe0bc0e48489668cf58981a18a252c9ed587f Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 19 Apr 2022 13:18:31 -0400 Subject: [PATCH] Closer to running end-to-end mutable tests. --- src/allmydata/storage/http_client.py | 107 ++++++++++++---------- src/allmydata/storage/http_server.py | 4 +- src/allmydata/storage_client.py | 54 ++++++++++- src/allmydata/test/test_istorageserver.py | 8 +- 4 files changed, 122 insertions(+), 51 deletions(-) diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index 8899614b8..52177f401 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -364,6 +364,46 @@ class UploadProgress(object): required: RangeMap +@inlineCallbacks +def read_share_chunk( + client: StorageClient, + share_type: str, + storage_index: bytes, + share_number: int, + offset: int, + length: int, +) -> Deferred[bytes]: + """ + Download a chunk of data from a share. + + TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed + downloads should be transparently retried and redownloaded by the + implementation a few times so that if a failure percolates up, the + caller can assume the failure isn't a short-term blip. + + NOTE: the underlying HTTP protocol is much more flexible than this API, + so a future refactor may expand this in order to simplify the calling + code and perhaps download data more efficiently. But then again maybe + the HTTP protocol will be simplified, see + https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777 + """ + url = client.relative_url( + "/v1/{}/{}/{}".format(share_type, _encode_si(storage_index), share_number) + ) + response = yield client.request( + "GET", + url, + headers=Headers( + {"range": [Range("bytes", [(offset, offset + length)]).to_header()]} + ), + ) + if response.code == http.PARTIAL_CONTENT: + body = yield response.content() + returnValue(body) + else: + raise ClientException(response.code) + + @define class StorageClientImmutables(object): """ @@ -484,39 +524,15 @@ class StorageClientImmutables(object): remaining.set(True, chunk["begin"], chunk["end"]) returnValue(UploadProgress(finished=finished, required=remaining)) - @inlineCallbacks def read_share_chunk( self, storage_index, share_number, offset, length ): # type: (bytes, int, int, int) -> Deferred[bytes] """ Download a chunk of data from a share. - - TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed - downloads should be transparently retried and redownloaded by the - implementation a few times so that if a failure percolates up, the - caller can assume the failure isn't a short-term blip. - - NOTE: the underlying HTTP protocol is much more flexible than this API, - so a future refactor may expand this in order to simplify the calling - code and perhaps download data more efficiently. But then again maybe - the HTTP protocol will be simplified, see - https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777 """ - url = self._client.relative_url( - "/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number) + return read_share_chunk( + self._client, "immutable", storage_index, share_number, offset, length ) - response = yield self._client.request( - "GET", - url, - headers=Headers( - {"range": [Range("bytes", [(offset, offset + length)]).to_header()]} - ), - ) - if response.code == http.PARTIAL_CONTENT: - body = yield response.content() - returnValue(body) - else: - raise ClientException(response.code) @inlineCallbacks def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]] @@ -610,7 +626,7 @@ class TestVector: offset: int size: int - operator: TestVectorOperator = field(default=TestVectorOperator.EQ) + operator: TestVectorOperator specimen: bytes @@ -632,6 +648,14 @@ class TestWriteVectors: write_vectors: list[WriteVector] new_length: Optional[int] = field(default=None) + def asdict(self) -> dict: + """Return dictionary suitable for sending over CBOR.""" + d = asdict(self) + d["test"] = d.pop("test_vectors") + d["write"] = d.pop("write_vectors") + d["new-length"] = d.pop("new_length") + return d + @define class ReadTestWriteResult: @@ -676,12 +700,12 @@ class StorageClientMutables: ) message = { "test-write-vectors": { - share_number: asdict(twv) + share_number: twv.asdict() for (share_number, twv) in testwrite_vectors.items() }, "read-vector": [asdict(r) for r in read_vector], } - response = yield self._client.request( + response = await self._client.request( "POST", url, write_enabler_secret=write_enabler_secret, @@ -692,31 +716,20 @@ class StorageClientMutables: if response.code == http.OK: return _decode_cbor(response, _SCHEMAS["mutable_test_read_write"]) else: - raise ClientException( - response.code, - ) + raise ClientException(response.code, (await response.content())) @async_to_deferred async def read_share_chunk( self, storage_index: bytes, share_number: int, - # TODO is this really optional? - # TODO if yes, test non-optional variants - offset: Optional[int], - length: Optional[int], + offset: int, + length: int, ) -> bytes: """ Download a chunk of data from a share. - - TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed - downloads should be transparently retried and redownloaded by the - implementation a few times so that if a failure percolates up, the - caller can assume the failure isn't a short-term blip. - - NOTE: the underlying HTTP protocol is much more flexible than this API, - so a future refactor may expand this in order to simplify the calling - code and perhaps download data more efficiently. But then again maybe - the HTTP protocol will be simplified, see - https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777 """ + # TODO unit test all the things + return read_share_chunk( + self._client, "mutable", storage_index, share_number, offset, length + ) diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 7f279580b..6def5aeeb 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -261,7 +261,7 @@ _SCHEMAS = { * share_number: { "test": [* {"offset": uint, "size": uint, "specimen": bstr}] "write": [* {"offset": uint, "data": bstr}] - "new-length": (uint // null) + "new-length": uint // null } } "read-vector": [* {"offset": uint, "size": uint}] @@ -590,7 +590,7 @@ class HTTPServer(object): storage_index, secrets, rtw_request["test-write-vectors"], - rtw_request["read-vectors"], + rtw_request["read-vector"], ) return self._send_encoded(request, {"success": success, "data": read_data}) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 55b6cfb05..afed0e274 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -77,7 +77,8 @@ from allmydata.util.hashutil import permute_server_hash from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict from allmydata.storage.http_client import ( StorageClient, StorageClientImmutables, StorageClientGeneral, - ClientException as HTTPClientException + ClientException as HTTPClientException, StorageClientMutables, + ReadVector, TestWriteVectors, WriteVector, TestVector, TestVectorOperator ) @@ -1189,3 +1190,54 @@ class _HTTPStorageServer(object): ) else: raise NotImplementedError() # future tickets + + @defer.inlineCallbacks + def slot_readv(self, storage_index, shares, readv): + mutable_client = StorageClientMutables(self._http_client) + reads = {} + for share_number in shares: + share_reads = reads[share_number] = [] + for (offset, length) in readv: + d = mutable_client.read_share_chunk( + storage_index, share_number, offset, length + ) + share_reads.append(d) + result = { + share_number: [(yield d) for d in share_reads] + for (share_number, reads) in reads.items() + } + defer.returnValue(result) + + @defer.inlineCallbacks + def slot_testv_and_readv_and_writev( + self, + storage_index, + secrets, + tw_vectors, + r_vector, + ): + mutable_client = StorageClientMutables(self._http_client) + we_secret, lr_secret, lc_secret = secrets + client_tw_vectors = {} + for share_num, (test_vector, data_vector, new_length) in tw_vectors.items(): + client_test_vectors = [ + TestVector(offset=offset, size=size, operator=TestVectorOperator[op], specimen=specimen) + for (offset, size, op, specimen) in test_vector + ] + client_write_vectors = [ + WriteVector(offset=offset, data=data) for (offset, data) in data_vector + ] + client_tw_vectors[share_num] = TestWriteVectors( + test_vectors=client_test_vectors, + write_vectors=client_write_vectors, + new_length=new_length + ) + client_read_vectors = [ + ReadVector(offset=offset, size=size) + for (offset, size) in r_vector + ] + client_result = yield mutable_client.read_test_write_chunks( + storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors, + client_read_vectors, + ) + defer.returnValue((client_result.success, client_result.reads)) diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 3d6f610be..702c66952 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -1140,4 +1140,10 @@ class HTTPImmutableAPIsTests( class FoolscapMutableAPIsTests( _FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase ): - """Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" + """Foolscap-specific tests for mutable ``IStorageServer`` APIs.""" + + +class HTTPMutableAPIsTests( + _HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase +): + """HTTP-specific tests for mutable ``IStorageServer`` APIs."""