Run blocking operations in thread pool

This commit is contained in:
Itamar Turner-Trauring 2023-10-16 10:07:36 -04:00
parent 08e8dd308f
commit d3ca02fa3f
2 changed files with 11 additions and 7 deletions

View File

@ -13,9 +13,10 @@ if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from zope.interface import implementer
from twisted.internet import defer
from twisted.internet import defer, reactor
from allmydata.util import mathutil
from allmydata.util.assertutil import precondition
from allmydata.util.cputhreadpool import defer_to_thread
from allmydata.interfaces import ICodecEncoder, ICodecDecoder
import zfec
@ -53,9 +54,9 @@ class CRSEncoder(object):
for inshare in inshares:
assert len(inshare) == self.share_size, (len(inshare), self.share_size, self.data_size, self.required_shares)
shares = self.encoder.encode(inshares, desired_share_ids)
return defer.succeed((shares, desired_share_ids))
d = defer_to_thread(reactor, self.encoder.encode, inshares, desired_share_ids)
d.addCallback(lambda shares: (shares, desired_share_ids))
return d
def encode_proposal(self, data, desired_share_ids=None):
raise NotImplementedError()

View File

@ -14,7 +14,7 @@ import os, time
from io import BytesIO
from itertools import count
from zope.interface import implementer
from twisted.internet import defer
from twisted.internet import defer, reactor
from twisted.python import failure
from allmydata.crypto import aes
@ -23,6 +23,8 @@ from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
IMutableUploadable
from allmydata.util import base32, hashutil, mathutil, log
from allmydata.util.dictutil import DictOfSets
from allmydata.util.deferredutil import async_to_deferred
from allmydata.util.cputhreadpool import defer_to_thread
from allmydata import hashtree, codec
from allmydata.storage.server import si_b2a
from foolscap.api import eventually, fireEventually
@ -762,7 +764,8 @@ class Publish(object):
return d
def _push_segment(self, encoded_and_salt, segnum):
@async_to_deferred
async def _push_segment(self, encoded_and_salt, segnum):
"""
I push (data, salt) as segment number segnum.
"""
@ -776,7 +779,7 @@ class Publish(object):
hashed = salt + sharedata
else:
hashed = sharedata
block_hash = hashutil.block_hash(hashed)
block_hash = await defer_to_thread(reactor, hashutil.block_hash, hashed)
self.blockhashes[shareid][segnum] = block_hash
# find the writer for this share
writers = self.writers[shareid]