Run joining in a thread

This commit is contained in:
Itamar Turner-Trauring 2023-12-06 15:58:30 -05:00
parent 4c03d931bd
commit 9a1e73892e

View File

@ -4,8 +4,9 @@ Ported to Python 3.
from __future__ import annotations from __future__ import annotations
import time import time
from io import BytesIO
from itertools import count from itertools import count
from zope.interface import implementer from zope.interface import implementer
from twisted.internet import defer from twisted.internet import defer
from twisted.python import failure from twisted.python import failure
@ -873,11 +874,26 @@ class Retrieve(object):
shares = shares[:self._required_shares] shares = shares[:self._required_shares]
self.log("decoding segment %d" % segnum) self.log("decoding segment %d" % segnum)
if segnum == self._num_segments - 1: if segnum == self._num_segments - 1:
d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids) d = self._tail_decoder.decode(shares, shareids)
else: else:
d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids) d = self._segment_decoder.decode(shares, shareids)
def _process(buffers):
segment = b"".join(buffers) # For larger shares, this can take a few milliseconds. As such, we want
# to unblock the event loop. Even if it doesn't release the GIL, if it
# really takes too long it will implicitly release it.
def _join(buffers):
f = BytesIO()
for b in buffers:
f.write(b)
return f.getbuffer()
@deferredutil.async_to_deferred
async def _got_buffers(buffers):
return await defer_to_thread(_join, buffers)
d.addCallback(_got_buffers)
def _process(segment):
self.log(format="now decoding segment %(segnum)s of %(numsegs)s", self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
segnum=segnum, segnum=segnum,
numsegs=self._num_segments, numsegs=self._num_segments,