A working thread pool.

This commit is contained in:
Itamar Turner-Trauring 2023-02-06 12:09:55 -05:00
parent c371a1f6b3
commit b221954946
2 changed files with 81 additions and 0 deletions

View File

@ -15,8 +15,10 @@ import six
import os, time, sys
import yaml
import json
from threading import current_thread
from twisted.trial import unittest
from twisted.internet import reactor
from foolscap.api import Violation, RemoteException
from allmydata.util import idlib, mathutil
@ -26,6 +28,8 @@ from allmydata.util import pollmixin
from allmydata.util import yamlutil
from allmydata.util import rrefutil
from allmydata.util.fileutil import EncryptedTemporaryFile
from allmydata.util.cputhreadpool import defer_to_thread
from allmydata.util.deferredutil import async_to_deferred
from allmydata.test.common_util import ReallyEqualMixin
from .no_network import fireNow, LocalWrapper
@ -588,3 +592,33 @@ class RrefUtilTests(unittest.TestCase):
)
self.assertEqual(result.version, "Default")
self.assertIdentical(result, rref)
class CPUThreadPool(unittest.TestCase):
"""Tests for cputhreadpool."""
@async_to_deferred
async def test_runs_in_thread(self):
"""The given function runs in a thread."""
def f(*args, **kwargs):
time.sleep(0.1)
return current_thread(), args, kwargs
this_thread = current_thread().ident
result = defer_to_thread(reactor, f, 1, 3, key=4, value=5)
# Callbacks run in the correct thread:
callback_thread_ident = []
def passthrough(result):
callback_thread_ident.append(current_thread().ident)
return result
result.addCallback(passthrough)
# The task ran in a different thread:
thread, args, kwargs = await result
self.assertEqual(callback_thread_ident[0], this_thread)
self.assertNotEqual(thread.ident, this_thread)
self.assertEqual(args, (1, 3))
self.assertEqual(kwargs, {"key": 4, "value": 5})

View File

@ -0,0 +1,47 @@
"""
A global thread pool for CPU-intensive tasks.
Motivation:
* Certain tasks are blocking on CPU, and so should be run in a thread.
* The Twisted thread pool is used for operations that don't necessarily block
on CPU, like DNS lookups. CPU processing should not block DNS lookups!
* The number of threads should be fixed, and tied to the number of available
CPUs.
As a first pass, this uses ``os.cpu_count()`` to determine the max number of
threads. This may create too many threads, as it doesn't cover things like
scheduler affinity or cgroups, but that's not the end of the world.
"""
import os
from typing import TypeVar, Callable, cast
from functools import partial
from twisted.python.threadpool import ThreadPool
from twisted.internet.defer import Deferred
from twisted.internet.threads import deferToThreadPool
from twisted.internet.interfaces import IReactorFromThreads
_CPU_THREAD_POOL = ThreadPool(minthreads=0, maxthreads=os.cpu_count(), name="TahoeCPU")
# Daemon threads allow shutdown to happen:
_CPU_THREAD_POOL.threadFactory = partial(_CPU_THREAD_POOL.threadFactory, daemon=True)
_CPU_THREAD_POOL.start()
# Eventually type annotations should use PEP 612, but that requires Python
# 3.10.
R = TypeVar("R")
def defer_to_thread(
reactor: IReactorFromThreads, f: Callable[..., R], *args, **kwargs
) -> Deferred[R]:
"""Run the function in a thread, return the result as a ``Deferred``."""
# deferToThreadPool has no type annotations...
result = deferToThreadPool(reactor, _CPU_THREAD_POOL, f, *args, **kwargs)
return cast(Deferred[R], result)
__all__ = ["defer_to_thread"]