Run blocking code in a thread

This commit is contained in:
Itamar Turner-Trauring 2023-10-16 10:38:05 -04:00
parent 2ccdd183c1
commit b60e53b3fb
3 changed files with 26 additions and 12 deletions

View File

@ -83,9 +83,12 @@ class CRSDecoder(object):
len(some_shares), len(their_shareids))
precondition(len(some_shares) == self.required_shares,
len(some_shares), self.required_shares)
data = self.decoder.decode(some_shares,
[int(s) for s in their_shareids])
return defer.succeed(data)
return defer_to_thread(
reactor,
self.decoder.decode,
some_shares,
[int(s) for s in their_shareids]
)
def parse_params(serializedparams):
pieces = serializedparams.split(b"-")

View File

@ -7,7 +7,7 @@ import time
from itertools import count
from zope.interface import implementer
from twisted.internet import defer
from twisted.internet import defer, reactor
from twisted.python import failure
from twisted.internet.interfaces import IPushProducer, IConsumer
from foolscap.api import eventually, fireEventually, DeadReferenceError, \
@ -20,6 +20,7 @@ from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
from allmydata.util.assertutil import _assert, precondition
from allmydata.util import hashutil, log, mathutil, deferredutil
from allmydata.util.dictutil import DictOfSets
from allmydata.util.cputhreadpool import defer_to_thread
from allmydata import hashtree, codec
from allmydata.storage.server import si_b2a
@ -734,7 +735,8 @@ class Retrieve(object):
return None
def _validate_block(self, results, segnum, reader, server, started):
@deferredutil.async_to_deferred
async def _validate_block(self, results, segnum, reader, server, started):
"""
I validate a block from one share on a remote server.
"""
@ -767,9 +769,9 @@ class Retrieve(object):
"block hash tree failure: %s" % e)
if self._version == MDMF_VERSION:
blockhash = hashutil.block_hash(salt + block)
blockhash = await defer_to_thread(reactor, hashutil.block_hash, salt + block)
else:
blockhash = hashutil.block_hash(block)
blockhash = await defer_to_thread(reactor, hashutil.block_hash, block)
# If this works without an error, then validation is
# successful.
try:

View File

@ -39,6 +39,7 @@ from twisted.internet.interfaces import (
IOpenSSLClientConnectionCreator,
IReactorTime,
IDelayedCall,
IReactorFromThreads,
)
from twisted.internet.ssl import CertificateOptions
from twisted.protocols.tls import TLSMemoryBIOProtocol
@ -64,6 +65,7 @@ from .common import si_b2a, si_to_human_readable
from ..util.hashutil import timing_safe_compare
from ..util.deferredutil import async_to_deferred
from ..util.tor_provider import _Provider as TorProvider
from ..util.cputhreadpool import defer_to_thread
try:
from txtorcon import Tor # type: ignore
@ -436,7 +438,7 @@ class StorageClient(object):
_swissnum: bytes
_treq: Union[treq, StubTreq, HTTPClient]
_pool: HTTPConnectionPool
_clock: IReactorTime
_clock: Union[IReactorTime, IReactorFromThreads]
# Are we running unit tests?
_analyze_response: Callable[[IResponse], None] = lambda _: None
@ -539,7 +541,9 @@ class StorageClient(object):
"Can't use both `message_to_serialize` and `data` "
"as keyword arguments at the same time"
)
kwargs["data"] = dumps(message_to_serialize)
kwargs["data"] = await defer_to_thread(
self._clock, dumps, message_to_serialize
)
headers.addRawHeader("Content-Type", CBOR_MIME_TYPE)
response = await self._treq.request(
@ -557,8 +561,12 @@ class StorageClient(object):
if content_type == CBOR_MIME_TYPE:
f = await limited_content(response, self._clock)
data = f.read()
schema.validate_cbor(data)
return loads(data)
def validate_and_decode():
schema.validate_cbor(data)
return loads(data)
return await defer_to_thread(self._clock, validate_and_decode)
else:
raise ClientException(
-1,
@ -1232,7 +1240,8 @@ class StorageClientMutables:
return cast(
Set[int],
await self._client.decode_cbor(
response, _SCHEMAS["mutable_list_shares"]
response,
_SCHEMAS["mutable_list_shares"],
),
)
else: