From b221954946c2b2e93d8b36124c17c3a21ee4c0ed Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 6 Feb 2023 12:09:55 -0500 Subject: [PATCH] A working thread pool. --- src/allmydata/test/test_util.py | 34 +++++++++++++++++++++ src/allmydata/util/cputhreadpool.py | 47 +++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 src/allmydata/util/cputhreadpool.py diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 9a0af1e06..f4e7d21e0 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -15,8 +15,10 @@ import six import os, time, sys import yaml import json +from threading import current_thread from twisted.trial import unittest +from twisted.internet import reactor from foolscap.api import Violation, RemoteException from allmydata.util import idlib, mathutil @@ -26,6 +28,8 @@ from allmydata.util import pollmixin from allmydata.util import yamlutil from allmydata.util import rrefutil from allmydata.util.fileutil import EncryptedTemporaryFile +from allmydata.util.cputhreadpool import defer_to_thread +from allmydata.util.deferredutil import async_to_deferred from allmydata.test.common_util import ReallyEqualMixin from .no_network import fireNow, LocalWrapper @@ -588,3 +592,33 @@ class RrefUtilTests(unittest.TestCase): ) self.assertEqual(result.version, "Default") self.assertIdentical(result, rref) + + +class CPUThreadPool(unittest.TestCase): + """Tests for cputhreadpool.""" + + @async_to_deferred + async def test_runs_in_thread(self): + """The given function runs in a thread.""" + def f(*args, **kwargs): + time.sleep(0.1) + return current_thread(), args, kwargs + + this_thread = current_thread().ident + result = defer_to_thread(reactor, f, 1, 3, key=4, value=5) + + # Callbacks run in the correct thread: + callback_thread_ident = [] + def passthrough(result): + callback_thread_ident.append(current_thread().ident) + return result + + result.addCallback(passthrough) + + # The task ran in a different thread: + thread, args, kwargs = await result + self.assertEqual(callback_thread_ident[0], this_thread) + self.assertNotEqual(thread.ident, this_thread) + self.assertEqual(args, (1, 3)) + self.assertEqual(kwargs, {"key": 4, "value": 5}) + diff --git a/src/allmydata/util/cputhreadpool.py b/src/allmydata/util/cputhreadpool.py new file mode 100644 index 000000000..a7f9d8bd6 --- /dev/null +++ b/src/allmydata/util/cputhreadpool.py @@ -0,0 +1,47 @@ +""" +A global thread pool for CPU-intensive tasks. + +Motivation: + +* Certain tasks are blocking on CPU, and so should be run in a thread. +* The Twisted thread pool is used for operations that don't necessarily block + on CPU, like DNS lookups. CPU processing should not block DNS lookups! +* The number of threads should be fixed, and tied to the number of available + CPUs. + +As a first pass, this uses ``os.cpu_count()`` to determine the max number of +threads. This may create too many threads, as it doesn't cover things like +scheduler affinity or cgroups, but that's not the end of the world. +""" + +import os +from typing import TypeVar, Callable, cast +from functools import partial + +from twisted.python.threadpool import ThreadPool +from twisted.internet.defer import Deferred +from twisted.internet.threads import deferToThreadPool +from twisted.internet.interfaces import IReactorFromThreads + + +_CPU_THREAD_POOL = ThreadPool(minthreads=0, maxthreads=os.cpu_count(), name="TahoeCPU") +# Daemon threads allow shutdown to happen: +_CPU_THREAD_POOL.threadFactory = partial(_CPU_THREAD_POOL.threadFactory, daemon=True) +_CPU_THREAD_POOL.start() + + +# Eventually type annotations should use PEP 612, but that requires Python +# 3.10. +R = TypeVar("R") + + +def defer_to_thread( + reactor: IReactorFromThreads, f: Callable[..., R], *args, **kwargs +) -> Deferred[R]: + """Run the function in a thread, return the result as a ``Deferred``.""" + # deferToThreadPool has no type annotations... + result = deferToThreadPool(reactor, _CPU_THREAD_POOL, f, *args, **kwargs) + return cast(Deferred[R], result) + + +__all__ = ["defer_to_thread"]