From b60e53b3fbed63aefb6264807414abe79be43a1b Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 16 Oct 2023 10:38:05 -0400 Subject: [PATCH] Run blocking code in a thread --- src/allmydata/codec.py | 9 ++++++--- src/allmydata/mutable/retrieve.py | 10 ++++++---- src/allmydata/storage/http_client.py | 19 ++++++++++++++----- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/allmydata/codec.py b/src/allmydata/codec.py index a63f0a8c0..51dc74a8a 100644 --- a/src/allmydata/codec.py +++ b/src/allmydata/codec.py @@ -83,9 +83,12 @@ class CRSDecoder(object): len(some_shares), len(their_shareids)) precondition(len(some_shares) == self.required_shares, len(some_shares), self.required_shares) - data = self.decoder.decode(some_shares, - [int(s) for s in their_shareids]) - return defer.succeed(data) + return defer_to_thread( + reactor, + self.decoder.decode, + some_shares, + [int(s) for s in their_shareids] + ) def parse_params(serializedparams): pieces = serializedparams.split(b"-") diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 64573a49a..93d0a410f 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -7,7 +7,7 @@ import time from itertools import count from zope.interface import implementer -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.python import failure from twisted.internet.interfaces import IPushProducer, IConsumer from foolscap.api import eventually, fireEventually, DeadReferenceError, \ @@ -20,6 +20,7 @@ from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \ from allmydata.util.assertutil import _assert, precondition from allmydata.util import hashutil, log, mathutil, deferredutil from allmydata.util.dictutil import DictOfSets +from allmydata.util.cputhreadpool import defer_to_thread from allmydata import hashtree, codec from allmydata.storage.server import si_b2a @@ -734,7 +735,8 @@ class Retrieve(object): return None - def _validate_block(self, results, segnum, reader, server, started): + @deferredutil.async_to_deferred + async def _validate_block(self, results, segnum, reader, server, started): """ I validate a block from one share on a remote server. """ @@ -767,9 +769,9 @@ class Retrieve(object): "block hash tree failure: %s" % e) if self._version == MDMF_VERSION: - blockhash = hashutil.block_hash(salt + block) + blockhash = await defer_to_thread(reactor, hashutil.block_hash, salt + block) else: - blockhash = hashutil.block_hash(block) + blockhash = await defer_to_thread(reactor, hashutil.block_hash, block) # If this works without an error, then validation is # successful. try: diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py index b508c07fd..41c97bfb6 100644 --- a/src/allmydata/storage/http_client.py +++ b/src/allmydata/storage/http_client.py @@ -39,6 +39,7 @@ from twisted.internet.interfaces import ( IOpenSSLClientConnectionCreator, IReactorTime, IDelayedCall, + IReactorFromThreads, ) from twisted.internet.ssl import CertificateOptions from twisted.protocols.tls import TLSMemoryBIOProtocol @@ -64,6 +65,7 @@ from .common import si_b2a, si_to_human_readable from ..util.hashutil import timing_safe_compare from ..util.deferredutil import async_to_deferred from ..util.tor_provider import _Provider as TorProvider +from ..util.cputhreadpool import defer_to_thread try: from txtorcon import Tor # type: ignore @@ -436,7 +438,7 @@ class StorageClient(object): _swissnum: bytes _treq: Union[treq, StubTreq, HTTPClient] _pool: HTTPConnectionPool - _clock: IReactorTime + _clock: Union[IReactorTime, IReactorFromThreads] # Are we running unit tests? _analyze_response: Callable[[IResponse], None] = lambda _: None @@ -539,7 +541,9 @@ class StorageClient(object): "Can't use both `message_to_serialize` and `data` " "as keyword arguments at the same time" ) - kwargs["data"] = dumps(message_to_serialize) + kwargs["data"] = await defer_to_thread( + self._clock, dumps, message_to_serialize + ) headers.addRawHeader("Content-Type", CBOR_MIME_TYPE) response = await self._treq.request( @@ -557,8 +561,12 @@ class StorageClient(object): if content_type == CBOR_MIME_TYPE: f = await limited_content(response, self._clock) data = f.read() - schema.validate_cbor(data) - return loads(data) + + def validate_and_decode(): + schema.validate_cbor(data) + return loads(data) + + return await defer_to_thread(self._clock, validate_and_decode) else: raise ClientException( -1, @@ -1232,7 +1240,8 @@ class StorageClientMutables: return cast( Set[int], await self._client.decode_cbor( - response, _SCHEMAS["mutable_list_shares"] + response, + _SCHEMAS["mutable_list_shares"], ), ) else: