From d3ca02fa3f386afea76d992efee1f85a1547e5c8 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 16 Oct 2023 10:07:36 -0400 Subject: [PATCH] Run blocking operations in thread pool --- src/allmydata/codec.py | 9 +++++---- src/allmydata/mutable/publish.py | 9 ++++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/allmydata/codec.py b/src/allmydata/codec.py index 19345959e..a63f0a8c0 100644 --- a/src/allmydata/codec.py +++ b/src/allmydata/codec.py @@ -13,9 +13,10 @@ if PY2: from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 from zope.interface import implementer -from twisted.internet import defer +from twisted.internet import defer, reactor from allmydata.util import mathutil from allmydata.util.assertutil import precondition +from allmydata.util.cputhreadpool import defer_to_thread from allmydata.interfaces import ICodecEncoder, ICodecDecoder import zfec @@ -53,9 +54,9 @@ class CRSEncoder(object): for inshare in inshares: assert len(inshare) == self.share_size, (len(inshare), self.share_size, self.data_size, self.required_shares) - shares = self.encoder.encode(inshares, desired_share_ids) - - return defer.succeed((shares, desired_share_ids)) + d = defer_to_thread(reactor, self.encoder.encode, inshares, desired_share_ids) + d.addCallback(lambda shares: (shares, desired_share_ids)) + return d def encode_proposal(self, data, desired_share_ids=None): raise NotImplementedError() diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index 3dcbe2dc5..e262ab967 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -14,7 +14,7 @@ import os, time from io import BytesIO from itertools import count from zope.interface import implementer -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.python import failure from allmydata.crypto import aes @@ -23,6 +23,8 @@ from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \ IMutableUploadable from allmydata.util import base32, hashutil, mathutil, log from allmydata.util.dictutil import DictOfSets +from allmydata.util.deferredutil import async_to_deferred +from allmydata.util.cputhreadpool import defer_to_thread from allmydata import hashtree, codec from allmydata.storage.server import si_b2a from foolscap.api import eventually, fireEventually @@ -762,7 +764,8 @@ class Publish(object): return d - def _push_segment(self, encoded_and_salt, segnum): + @async_to_deferred + async def _push_segment(self, encoded_and_salt, segnum): """ I push (data, salt) as segment number segnum. """ @@ -776,7 +779,7 @@ class Publish(object): hashed = salt + sharedata else: hashed = sharedata - block_hash = hashutil.block_hash(hashed) + block_hash = await defer_to_thread(reactor, hashutil.block_hash, hashed) self.blockhashes[shareid][segnum] = block_hash # find the writer for this share writers = self.writers[shareid]