diff --git a/.circleci/config.yml b/.circleci/config.yml index cf0c66aff..79ce57ed0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -48,8 +48,6 @@ workflows: {} - "pyinstaller": {} - - "deprecations": - {} - "c-locale": {} # Any locale other than C or UTF-8. @@ -297,20 +295,6 @@ jobs: # aka "Latin 1" LANG: "en_US.ISO-8859-1" - - deprecations: - <<: *DEBIAN - - environment: - <<: *UTF_8_ENVIRONMENT - # Select the deprecations tox environments. - TAHOE_LAFS_TOX_ENVIRONMENT: "deprecations,upcoming-deprecations" - # Put the logs somewhere we can report them. - TAHOE_LAFS_WARNINGS_LOG: "/tmp/artifacts/deprecation-warnings.log" - # The deprecations tox environments don't do coverage measurement. - UPLOAD_COVERAGE: "" - - integration: <<: *DEBIAN diff --git a/docs/proposed/http-storage-node-protocol.rst b/docs/proposed/http-storage-node-protocol.rst index 9354bc185..3926d9f4a 100644 --- a/docs/proposed/http-storage-node-protocol.rst +++ b/docs/proposed/http-storage-node-protocol.rst @@ -743,11 +743,15 @@ For example:: [1, 5] -``GET /v1/mutable/:storage_index?share=:s0&share=:sN&offset=:o1&size=:z0&offset=:oN&size=:zN`` +``GET /v1/mutable/:storage_index/:share_number`` !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -Read data from the indicated mutable shares. -Just like ``GET /v1/mutable/:storage_index``. +Read data from the indicated mutable shares, just like ``GET /v1/immutable/:storage_index`` + +The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content). +Interpretation and response behavior is as specified in RFC 7233 ยง 4.1. +Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported. + ``POST /v1/mutable/:storage_index/:share_number/corrupt`` !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! diff --git a/newsfragments/3890.minor b/newsfragments/3890.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index 65bcd1c4b..da350e0c6 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -4,11 +4,10 @@ HTTP client that talks to the HTTP storage server. from __future__ import annotations -from typing import Union, Set, Optional - +from typing import Union, Optional, Sequence, Mapping from base64 import b64encode -import attr +from attrs import define, asdict, frozen # TODO Make sure to import Python version? from cbor2 import loads, dumps @@ -39,6 +38,7 @@ from .http_common import ( ) from .common import si_b2a from ..util.hashutil import timing_safe_compare +from ..util.deferredutil import async_to_deferred _OPENSSL = Binding().lib @@ -64,7 +64,7 @@ class ClientException(Exception): _SCHEMAS = { "get_version": Schema( """ - message = {'http://allmydata.org/tahoe/protocols/storage/v1' => { + response = {'http://allmydata.org/tahoe/protocols/storage/v1' => { 'maximum-immutable-share-size' => uint 'maximum-mutable-share-size' => uint 'available-space' => uint @@ -79,7 +79,7 @@ _SCHEMAS = { ), "allocate_buckets": Schema( """ - message = { + response = { already-have: #6.258([* uint]) allocated: #6.258([* uint]) } @@ -87,16 +87,25 @@ _SCHEMAS = { ), "immutable_write_share_chunk": Schema( """ - message = { + response = { required: [* {begin: uint, end: uint}] } """ ), "list_shares": Schema( """ - message = #6.258([* uint]) + response = #6.258([* uint]) """ ), + "mutable_read_test_write": Schema( + """ + response = { + "success": bool, + "data": {* share_number: [* bstr]} + } + share_number = uint + """ + ), } @@ -121,12 +130,12 @@ def _decode_cbor(response, schema: Schema): ) -@attr.s +@define class ImmutableCreateResult(object): """Result of creating a storage index for an immutable.""" - already_have = attr.ib(type=Set[int]) - allocated = attr.ib(type=Set[int]) + already_have: set[int] + allocated: set[int] class _TLSContextFactory(CertificateOptions): @@ -200,14 +209,14 @@ class _TLSContextFactory(CertificateOptions): @implementer(IPolicyForHTTPS) @implementer(IOpenSSLClientConnectionCreator) -@attr.s +@define class _StorageClientHTTPSPolicy: """ A HTTPS policy that ensures the SPKI hash of the public key matches a known hash, i.e. pinning-based validation. """ - expected_spki_hash = attr.ib(type=bytes) + expected_spki_hash: bytes # IPolicyForHTTPS def creatorForNetloc(self, hostname, port): @@ -220,24 +229,22 @@ class _StorageClientHTTPSPolicy: ) +@define class StorageClient(object): """ Low-level HTTP client that talks to the HTTP storage server. """ - def __init__( - self, url, swissnum, treq=treq - ): # type: (DecodedURL, bytes, Union[treq,StubTreq,HTTPClient]) -> None - """ - The URL is a HTTPS URL ("https://..."). To construct from a NURL, use - ``StorageClient.from_nurl()``. - """ - self._base_url = url - self._swissnum = swissnum - self._treq = treq + # The URL is a HTTPS URL ("https://..."). To construct from a NURL, use + # ``StorageClient.from_nurl()``. + _base_url: DecodedURL + _swissnum: bytes + _treq: Union[treq, StubTreq, HTTPClient] @classmethod - def from_nurl(cls, nurl: DecodedURL, reactor, persistent: bool = True) -> StorageClient: + def from_nurl( + cls, nurl: DecodedURL, reactor, persistent: bool = True + ) -> StorageClient: """ Create a ``StorageClient`` for the given NURL. @@ -280,6 +287,7 @@ class StorageClient(object): lease_renew_secret=None, lease_cancel_secret=None, upload_secret=None, + write_enabler_secret=None, headers=None, message_to_serialize=None, **kwargs @@ -298,6 +306,7 @@ class StorageClient(object): (Secrets.LEASE_RENEW, lease_renew_secret), (Secrets.LEASE_CANCEL, lease_cancel_secret), (Secrets.UPLOAD, upload_secret), + (Secrets.WRITE_ENABLER, write_enabler_secret), ]: if value is None: continue @@ -342,25 +351,65 @@ class StorageClientGeneral(object): returnValue(decoded_response) -@attr.s +@define class UploadProgress(object): """ Progress of immutable upload, per the server. """ # True when upload has finished. - finished = attr.ib(type=bool) + finished: bool # Remaining ranges to upload. - required = attr.ib(type=RangeMap) + required: RangeMap +@inlineCallbacks +def read_share_chunk( + client: StorageClient, + share_type: str, + storage_index: bytes, + share_number: int, + offset: int, + length: int, +) -> Deferred[bytes]: + """ + Download a chunk of data from a share. + + TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed + downloads should be transparently retried and redownloaded by the + implementation a few times so that if a failure percolates up, the + caller can assume the failure isn't a short-term blip. + + NOTE: the underlying HTTP protocol is much more flexible than this API, + so a future refactor may expand this in order to simplify the calling + code and perhaps download data more efficiently. But then again maybe + the HTTP protocol will be simplified, see + https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777 + """ + url = client.relative_url( + "/v1/{}/{}/{}".format(share_type, _encode_si(storage_index), share_number) + ) + response = yield client.request( + "GET", + url, + headers=Headers( + {"range": [Range("bytes", [(offset, offset + length)]).to_header()]} + ), + ) + if response.code == http.PARTIAL_CONTENT: + body = yield response.content() + returnValue(body) + else: + raise ClientException(response.code) + + +@define class StorageClientImmutables(object): """ APIs for interacting with immutables. """ - def __init__(self, client: StorageClient): - self._client = client + _client: StorageClient @inlineCallbacks def create( @@ -371,7 +420,7 @@ class StorageClientImmutables(object): upload_secret, lease_renew_secret, lease_cancel_secret, - ): # type: (bytes, Set[int], int, bytes, bytes, bytes) -> Deferred[ImmutableCreateResult] + ): # type: (bytes, set[int], int, bytes, bytes, bytes) -> Deferred[ImmutableCreateResult] """ Create a new storage index for an immutable. @@ -474,42 +523,18 @@ class StorageClientImmutables(object): remaining.set(True, chunk["begin"], chunk["end"]) returnValue(UploadProgress(finished=finished, required=remaining)) - @inlineCallbacks def read_share_chunk( self, storage_index, share_number, offset, length ): # type: (bytes, int, int, int) -> Deferred[bytes] """ Download a chunk of data from a share. - - TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed - downloads should be transparently retried and redownloaded by the - implementation a few times so that if a failure percolates up, the - caller can assume the failure isn't a short-term blip. - - NOTE: the underlying HTTP protocol is much more flexible than this API, - so a future refactor may expand this in order to simplify the calling - code and perhaps download data more efficiently. But then again maybe - the HTTP protocol will be simplified, see - https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777 """ - url = self._client.relative_url( - "/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number) + return read_share_chunk( + self._client, "immutable", storage_index, share_number, offset, length ) - response = yield self._client.request( - "GET", - url, - headers=Headers( - {"range": [Range("bytes", [(offset, offset + length)]).to_header()]} - ), - ) - if response.code == http.PARTIAL_CONTENT: - body = yield response.content() - returnValue(body) - else: - raise ClientException(response.code) @inlineCallbacks - def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]] + def list_shares(self, storage_index): # type: (bytes,) -> Deferred[set[int]] """ Return the set of shares for a given storage index. """ @@ -573,3 +598,125 @@ class StorageClientImmutables(object): raise ClientException( response.code, ) + + +@frozen +class WriteVector: + """Data to write to a chunk.""" + + offset: int + data: bytes + + +@frozen +class TestVector: + """Checks to make on a chunk before writing to it.""" + + offset: int + size: int + specimen: bytes + + +@frozen +class ReadVector: + """ + Reads to do on chunks, as part of a read/test/write operation. + """ + + offset: int + size: int + + +@frozen +class TestWriteVectors: + """Test and write vectors for a specific share.""" + + test_vectors: Sequence[TestVector] + write_vectors: Sequence[WriteVector] + new_length: Optional[int] = None + + def asdict(self) -> dict: + """Return dictionary suitable for sending over CBOR.""" + d = asdict(self) + d["test"] = d.pop("test_vectors") + d["write"] = d.pop("write_vectors") + d["new-length"] = d.pop("new_length") + return d + + +@frozen +class ReadTestWriteResult: + """Result of sending read-test-write vectors.""" + + success: bool + # Map share numbers to reads corresponding to the request's list of + # ReadVectors: + reads: Mapping[int, Sequence[bytes]] + + +@frozen +class StorageClientMutables: + """ + APIs for interacting with mutables. + """ + + _client: StorageClient + + @async_to_deferred + async def read_test_write_chunks( + self, + storage_index: bytes, + write_enabler_secret: bytes, + lease_renew_secret: bytes, + lease_cancel_secret: bytes, + testwrite_vectors: dict[int, TestWriteVectors], + read_vector: list[ReadVector], + ) -> ReadTestWriteResult: + """ + Read, test, and possibly write chunks to a particular mutable storage + index. + + Reads are done before writes. + + Given a mapping between share numbers and test/write vectors, the tests + are done and if they are valid the writes are done. + """ + # TODO unit test all the things + url = self._client.relative_url( + "/v1/mutable/{}/read-test-write".format(_encode_si(storage_index)) + ) + message = { + "test-write-vectors": { + share_number: twv.asdict() + for (share_number, twv) in testwrite_vectors.items() + }, + "read-vector": [asdict(r) for r in read_vector], + } + response = await self._client.request( + "POST", + url, + write_enabler_secret=write_enabler_secret, + lease_renew_secret=lease_renew_secret, + lease_cancel_secret=lease_cancel_secret, + message_to_serialize=message, + ) + if response.code == http.OK: + result = await _decode_cbor(response, _SCHEMAS["mutable_read_test_write"]) + return ReadTestWriteResult(success=result["success"], reads=result["data"]) + else: + raise ClientException(response.code, (await response.content())) + + def read_share_chunk( + self, + storage_index: bytes, + share_number: int, + offset: int, + length: int, + ) -> bytes: + """ + Download a chunk of data from a share. + """ + # TODO unit test all the things + return read_share_chunk( + self._client, "mutable", storage_index, share_number, offset, length + ) diff --git a/src/allmydata/storage/http_common.py b/src/allmydata/storage/http_common.py index addd926d1..123ce403b 100644 --- a/src/allmydata/storage/http_common.py +++ b/src/allmydata/storage/http_common.py @@ -38,6 +38,7 @@ class Secrets(Enum): LEASE_RENEW = "lease-renew-secret" LEASE_CANCEL = "lease-cancel-secret" UPLOAD = "upload-secret" + WRITE_ENABLER = "write-enabler" def get_spki_hash(certificate: Certificate) -> bytes: diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 935390d10..06bc0d934 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -258,7 +258,7 @@ class _HTTPError(Exception): _SCHEMAS = { "allocate_buckets": Schema( """ - message = { + request = { share-numbers: #6.258([* uint]) allocated-size: uint } @@ -266,11 +266,26 @@ _SCHEMAS = { ), "advise_corrupt_share": Schema( """ - message = { + request = { reason: tstr } """ ), + "mutable_read_test_write": Schema( + """ + request = { + "test-write-vectors": { + * share_number: { + "test": [* {"offset": uint, "size": uint, "specimen": bstr}] + "write": [* {"offset": uint, "data": bstr}] + "new-length": uint // null + } + } + "read-vector": [* {"offset": uint, "size": uint}] + } + share_number = uint + """ + ), } @@ -559,7 +574,9 @@ class HTTPServer(object): "/v1/immutable///corrupt", methods=["POST"], ) - def advise_corrupt_share(self, request, authorization, storage_index, share_number): + def advise_corrupt_share_immutable( + self, request, authorization, storage_index, share_number + ): """Indicate that given share is corrupt, with a text reason.""" try: bucket = self._storage_server.get_buckets(storage_index)[share_number] @@ -570,6 +587,81 @@ class HTTPServer(object): bucket.advise_corrupt_share(info["reason"].encode("utf-8")) return b"" + ##### Mutable APIs ##### + + @_authorized_route( + _app, + {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.WRITE_ENABLER}, + "/v1/mutable//read-test-write", + methods=["POST"], + ) + def mutable_read_test_write(self, request, authorization, storage_index): + """Read/test/write combined operation for mutables.""" + # TODO unit tests + rtw_request = self._read_encoded(request, _SCHEMAS["mutable_read_test_write"]) + secrets = ( + authorization[Secrets.WRITE_ENABLER], + authorization[Secrets.LEASE_RENEW], + authorization[Secrets.LEASE_CANCEL], + ) + success, read_data = self._storage_server.slot_testv_and_readv_and_writev( + storage_index, + secrets, + { + k: ( + [(d["offset"], d["size"], b"eq", d["specimen"]) for d in v["test"]], + [(d["offset"], d["data"]) for d in v["write"]], + v["new-length"], + ) + for (k, v) in rtw_request["test-write-vectors"].items() + }, + [(d["offset"], d["size"]) for d in rtw_request["read-vector"]], + ) + return self._send_encoded(request, {"success": success, "data": read_data}) + + @_authorized_route( + _app, + set(), + "/v1/mutable//", + methods=["GET"], + ) + def read_mutable_chunk(self, request, authorization, storage_index, share_number): + """Read a chunk from a mutable.""" + if request.getHeader("range") is None: + # TODO in follow-up ticket + raise NotImplementedError() + + # TODO reduce duplication with immutable reads? + # TODO unit tests, perhaps shared if possible + range_header = parse_range_header(request.getHeader("range")) + if ( + range_header is None + or range_header.units != "bytes" + or len(range_header.ranges) > 1 # more than one range + or range_header.ranges[0][1] is None # range without end + ): + request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) + return b"" + + offset, end = range_header.ranges[0] + + # TODO limit memory usage + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872 + data = self._storage_server.slot_readv( + storage_index, [share_number], [(offset, end - offset)] + )[share_number][0] + + # TODO reduce duplication? + request.setResponseCode(http.PARTIAL_CONTENT) + if len(data): + # For empty bodies the content-range header makes no sense since + # the end of the range is inclusive. + request.setHeader( + "content-range", + ContentRange("bytes", offset, offset + len(data)).to_header(), + ) + return data + @implementer(IStreamServerEndpoint) @attr.s diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 55b6cfb05..68164e697 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -77,7 +77,8 @@ from allmydata.util.hashutil import permute_server_hash from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict from allmydata.storage.http_client import ( StorageClient, StorageClientImmutables, StorageClientGeneral, - ClientException as HTTPClientException + ClientException as HTTPClientException, StorageClientMutables, + ReadVector, TestWriteVectors, WriteVector, TestVector ) @@ -1189,3 +1190,64 @@ class _HTTPStorageServer(object): ) else: raise NotImplementedError() # future tickets + + @defer.inlineCallbacks + def slot_readv(self, storage_index, shares, readv): + mutable_client = StorageClientMutables(self._http_client) + pending_reads = {} + reads = {} + # TODO if shares list is empty, that means list all shares, so we need + # to do a query to get that. + assert shares # TODO replace with call to list shares if and only if it's empty + + # Start all the queries in parallel: + for share_number in shares: + share_reads = defer.gatherResults( + [ + mutable_client.read_share_chunk( + storage_index, share_number, offset, length + ) + for (offset, length) in readv + ] + ) + pending_reads[share_number] = share_reads + + # Wait for all the queries to finish: + for share_number, pending_result in pending_reads.items(): + reads[share_number] = yield pending_result + + return reads + + @defer.inlineCallbacks + def slot_testv_and_readv_and_writev( + self, + storage_index, + secrets, + tw_vectors, + r_vector, + ): + mutable_client = StorageClientMutables(self._http_client) + we_secret, lr_secret, lc_secret = secrets + client_tw_vectors = {} + for share_num, (test_vector, data_vector, new_length) in tw_vectors.items(): + client_test_vectors = [ + TestVector(offset=offset, size=size, specimen=specimen) + for (offset, size, specimen) in test_vector + ] + client_write_vectors = [ + WriteVector(offset=offset, data=data) for (offset, data) in data_vector + ] + client_tw_vectors[share_num] = TestWriteVectors( + test_vectors=client_test_vectors, + write_vectors=client_write_vectors, + new_length=new_length + ) + client_read_vectors = [ + ReadVector(offset=offset, size=size) + for (offset, size) in r_vector + ] + client_result = yield mutable_client.read_test_write_chunks( + storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors, + client_read_vectors, + ) + return (client_result.success, client_result.reads) diff --git a/src/allmydata/test/test_deferredutil.py b/src/allmydata/test/test_deferredutil.py index 2a155089f..a37dfdd6f 100644 --- a/src/allmydata/test/test_deferredutil.py +++ b/src/allmydata/test/test_deferredutil.py @@ -129,3 +129,31 @@ class UntilTests(unittest.TestCase): self.assertEqual([1], counter) r1.callback(None) self.assertEqual([2], counter) + + +class AsyncToDeferred(unittest.TestCase): + """Tests for ``deferredutil.async_to_deferred.``""" + + def test_async_to_deferred_success(self): + """ + Normal results from a ``@async_to_deferred``-wrapped function get + turned into a ``Deferred`` with that value. + """ + @deferredutil.async_to_deferred + async def f(x, y): + return x + y + + result = f(1, y=2) + self.assertEqual(self.successResultOf(result), 3) + + def test_async_to_deferred_exception(self): + """ + Exceptions from a ``@async_to_deferred``-wrapped function get + turned into a ``Deferred`` with that value. + """ + @deferredutil.async_to_deferred + async def f(x, y): + return x/y + + result = f(1, 0) + self.assertIsInstance(self.failureResultOf(result).value, ZeroDivisionError) diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 3d6f610be..e7b869713 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -1140,4 +1140,19 @@ class HTTPImmutableAPIsTests( class FoolscapMutableAPIsTests( _FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase ): - """Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" + """Foolscap-specific tests for mutable ``IStorageServer`` APIs.""" + + +class HTTPMutableAPIsTests( + _HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase +): + """HTTP-specific tests for mutable ``IStorageServer`` APIs.""" + + # TODO will be implemented in later tickets + SKIP_TESTS = { + "test_STARAW_write_enabler_must_match", + "test_add_lease_renewal", + "test_add_new_lease", + "test_advise_corrupt_share", + "test_slot_readv_no_shares", + } diff --git a/src/allmydata/test/test_storage_https.py b/src/allmydata/test/test_storage_https.py index 73c99725a..3b41e8308 100644 --- a/src/allmydata/test/test_storage_https.py +++ b/src/allmydata/test/test_storage_https.py @@ -6,14 +6,12 @@ server authentication logic, which may one day apply outside of HTTP Storage Protocol. """ -from functools import wraps from contextlib import asynccontextmanager from cryptography import x509 from twisted.internet.endpoints import serverFromString from twisted.internet import reactor -from twisted.internet.defer import Deferred from twisted.internet.task import deferLater from twisted.web.server import Site from twisted.web.static import Data @@ -31,6 +29,7 @@ from .certs import ( from ..storage.http_common import get_spki_hash from ..storage.http_client import _StorageClientHTTPSPolicy from ..storage.http_server import _TLSEndpointWrapper +from ..util.deferredutil import async_to_deferred class HTTPSNurlTests(SyncTestCase): @@ -73,20 +72,6 @@ ox5zO3LrQmQw11OaIAs2/kviKAoKTFFxeyYcpS5RuKNDZfHQCXlLwt9bySxG self.assertEqual(get_spki_hash(certificate), expected_hash) -def async_to_deferred(f): - """ - Wrap an async function to return a Deferred instead. - - Maybe solution to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3886 - """ - - @wraps(f) - def not_async(*args, **kwargs): - return Deferred.fromCoroutine(f(*args, **kwargs)) - - return not_async - - class PinningHTTPSValidation(AsyncTestCase): """ Test client-side validation logic of HTTPS certificates that uses diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index ed2a11ee4..782663e8b 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -4,24 +4,13 @@ Utilities for working with Twisted Deferreds. Ported to Python 3. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -from future.utils import PY2 -if PY2: - from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 - import time +from functools import wraps -try: - from typing import ( - Callable, - Any, - ) -except ImportError: - pass +from typing import ( + Callable, + Any, +) from foolscap.api import eventually from eliot.twisted import ( @@ -231,3 +220,17 @@ def until( yield action() if condition(): break + + +def async_to_deferred(f): + """ + Wrap an async function to return a Deferred instead. + + Maybe solution to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3886 + """ + + @wraps(f) + def not_async(*args, **kwargs): + return defer.Deferred.fromCoroutine(f(*args, **kwargs)) + + return not_async