mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
NO-TICK Cap the default size of the external operation thread pool (#5878)
Cap the default size of the external operation thread pool to 10 or the maximum number of available processors, whichever is smaller. Set the minimum size of the thread pool to 1. Meaning that only a single thread is used unless the node actually starts to use `FlowExternalOperation` which consumes threads from this pool.
This commit is contained in:
parent
b512a2981d
commit
4f1777adb4
@ -314,10 +314,10 @@ extraNetworkMapKeys
|
|||||||
.. _corda_configuration_flow_external_operation_thread_pool_size:
|
.. _corda_configuration_flow_external_operation_thread_pool_size:
|
||||||
|
|
||||||
flowExternalOperationThreadPoolSize
|
flowExternalOperationThreadPoolSize
|
||||||
The number of threads available to execute external operations called from flows. See the documentation on
|
The number of threads available to execute external operations that have been called from flows. See the documentation on
|
||||||
:ref:`calling external systems inside of flows <api_flows_external_operations>` for more information.
|
:ref:`calling external systems inside flows <api_flows_external_operations>` for more information.
|
||||||
|
|
||||||
*Default:* Set to the number of available cores on the machine the node is running on
|
*Default:* Set to the lesser of either the maximum number of cores allocated to the node, or 10.
|
||||||
|
|
||||||
flowMonitorPeriodMillis
|
flowMonitorPeriodMillis
|
||||||
Duration of the period suspended flows waiting for IO are logged.
|
Duration of the period suspended flows waiting for IO are logged.
|
||||||
|
@ -49,10 +49,10 @@ import net.corda.core.node.ServiceHub
|
|||||||
import net.corda.core.node.ServicesForResolution
|
import net.corda.core.node.ServicesForResolution
|
||||||
import net.corda.core.node.services.ContractUpgradeService
|
import net.corda.core.node.services.ContractUpgradeService
|
||||||
import net.corda.core.node.services.CordaService
|
import net.corda.core.node.services.CordaService
|
||||||
import net.corda.core.node.services.diagnostics.DiagnosticsService
|
|
||||||
import net.corda.core.node.services.IdentityService
|
import net.corda.core.node.services.IdentityService
|
||||||
import net.corda.core.node.services.KeyManagementService
|
import net.corda.core.node.services.KeyManagementService
|
||||||
import net.corda.core.node.services.TransactionVerifierService
|
import net.corda.core.node.services.TransactionVerifierService
|
||||||
|
import net.corda.core.node.services.diagnostics.DiagnosticsService
|
||||||
import net.corda.core.schemas.MappedSchema
|
import net.corda.core.schemas.MappedSchema
|
||||||
import net.corda.core.serialization.SerializationWhitelist
|
import net.corda.core.serialization.SerializationWhitelist
|
||||||
import net.corda.core.serialization.SerializeAsToken
|
import net.corda.core.serialization.SerializeAsToken
|
||||||
@ -61,11 +61,9 @@ import net.corda.core.transactions.LedgerTransaction
|
|||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.days
|
import net.corda.core.utilities.days
|
||||||
import net.corda.core.utilities.minutes
|
import net.corda.core.utilities.minutes
|
||||||
import net.corda.nodeapi.internal.lifecycle.NodeServicesContext
|
|
||||||
import net.corda.djvm.source.ApiSource
|
import net.corda.djvm.source.ApiSource
|
||||||
import net.corda.djvm.source.EmptyApi
|
import net.corda.djvm.source.EmptyApi
|
||||||
import net.corda.djvm.source.UserSource
|
import net.corda.djvm.source.UserSource
|
||||||
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent
|
|
||||||
import net.corda.node.CordaClock
|
import net.corda.node.CordaClock
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
import net.corda.node.internal.classloading.requireAnnotation
|
import net.corda.node.internal.classloading.requireAnnotation
|
||||||
@ -85,7 +83,6 @@ import net.corda.node.services.api.FlowStarter
|
|||||||
import net.corda.node.services.api.MonitoringService
|
import net.corda.node.services.api.MonitoringService
|
||||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||||
import net.corda.node.services.api.NodePropertiesStore
|
import net.corda.node.services.api.NodePropertiesStore
|
||||||
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEventsDistributor
|
|
||||||
import net.corda.node.services.api.SchemaService
|
import net.corda.node.services.api.SchemaService
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
import net.corda.node.services.api.VaultServiceInternal
|
import net.corda.node.services.api.VaultServiceInternal
|
||||||
@ -155,8 +152,11 @@ import net.corda.nodeapi.internal.crypto.X509Utilities.NODE_IDENTITY_KEY_ALIAS
|
|||||||
import net.corda.nodeapi.internal.cryptoservice.CryptoServiceFactory
|
import net.corda.nodeapi.internal.cryptoservice.CryptoServiceFactory
|
||||||
import net.corda.nodeapi.internal.cryptoservice.SupportedCryptoServices
|
import net.corda.nodeapi.internal.cryptoservice.SupportedCryptoServices
|
||||||
import net.corda.nodeapi.internal.cryptoservice.bouncycastle.BCCryptoService
|
import net.corda.nodeapi.internal.cryptoservice.bouncycastle.BCCryptoService
|
||||||
import net.corda.nodeapi.internal.persistence.CordaTransactionSupportImpl
|
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent
|
||||||
|
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEventsDistributor
|
||||||
|
import net.corda.nodeapi.internal.lifecycle.NodeServicesContext
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
|
import net.corda.nodeapi.internal.persistence.CordaTransactionSupportImpl
|
||||||
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
|
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseIncompatibleException
|
import net.corda.nodeapi.internal.persistence.DatabaseIncompatibleException
|
||||||
@ -178,9 +178,12 @@ import java.sql.Connection
|
|||||||
import java.time.Clock
|
import java.time.Clock
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.format.DateTimeParseException
|
import java.time.format.DateTimeParseException
|
||||||
import java.util.Properties
|
import java.util.*
|
||||||
import java.util.concurrent.ExecutorService
|
import java.util.concurrent.ExecutorService
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.TimeUnit.MINUTES
|
import java.util.concurrent.TimeUnit.MINUTES
|
||||||
import java.util.concurrent.TimeUnit.SECONDS
|
import java.util.concurrent.TimeUnit.SECONDS
|
||||||
import java.util.function.Consumer
|
import java.util.function.Consumer
|
||||||
@ -737,11 +740,17 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
|||||||
private fun createExternalOperationExecutor(numberOfThreads: Int): ExecutorService {
|
private fun createExternalOperationExecutor(numberOfThreads: Int): ExecutorService {
|
||||||
when (numberOfThreads) {
|
when (numberOfThreads) {
|
||||||
1 -> log.info("Flow external operation executor has $numberOfThreads thread")
|
1 -> log.info("Flow external operation executor has $numberOfThreads thread")
|
||||||
else -> log.info("Flow external operation executor has $numberOfThreads threads")
|
else -> log.info("Flow external operation executor has a max of $numberOfThreads threads")
|
||||||
}
|
}
|
||||||
return Executors.newFixedThreadPool(
|
// Start with 1 thread and scale up to the configured thread pool size if needed
|
||||||
|
// Parameters of [ThreadPoolExecutor] based on [Executors.newFixedThreadPool]
|
||||||
|
return ThreadPoolExecutor(
|
||||||
|
1,
|
||||||
numberOfThreads,
|
numberOfThreads,
|
||||||
ThreadFactoryBuilder().setNameFormat("flow-external-operation-thread").build()
|
0L,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
LinkedBlockingQueue<Runnable>(),
|
||||||
|
ThreadFactoryBuilder().setNameFormat("flow-external-operation-thread").setDaemon(true).build()
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,12 +23,15 @@ import net.corda.nodeapi.internal.loadDevCaTrustStore
|
|||||||
import net.corda.nodeapi.internal.registerDevP2pCertificates
|
import net.corda.nodeapi.internal.registerDevP2pCertificates
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
import kotlin.math.min
|
||||||
|
|
||||||
fun configOf(vararg pairs: Pair<String, Any?>): Config = ConfigFactory.parseMap(mapOf(*pairs))
|
fun configOf(vararg pairs: Pair<String, Any?>): Config = ConfigFactory.parseMap(mapOf(*pairs))
|
||||||
operator fun Config.plus(overrides: Map<String, Any?>): Config = ConfigFactory.parseMap(overrides).withFallback(this)
|
operator fun Config.plus(overrides: Map<String, Any?>): Config = ConfigFactory.parseMap(overrides).withFallback(this)
|
||||||
|
|
||||||
object ConfigHelper {
|
object ConfigHelper {
|
||||||
|
|
||||||
|
private const val FLOW_EXTERNAL_OPERATION_THREAD_POOL_SIZE_MAX = 10
|
||||||
|
|
||||||
private const val CORDA_PROPERTY_PREFIX = "corda."
|
private const val CORDA_PROPERTY_PREFIX = "corda."
|
||||||
private const val UPPERCASE_PROPERTY_PREFIX = "CORDA."
|
private const val UPPERCASE_PROPERTY_PREFIX = "CORDA."
|
||||||
|
|
||||||
@ -47,7 +50,9 @@ object ConfigHelper {
|
|||||||
|
|
||||||
// Detect the number of cores
|
// Detect the number of cores
|
||||||
val coreCount = Runtime.getRuntime().availableProcessors()
|
val coreCount = Runtime.getRuntime().availableProcessors()
|
||||||
val multiThreadingConfig = configOf("flowExternalOperationThreadPoolSize" to coreCount.toString())
|
val multiThreadingConfig = configOf(
|
||||||
|
"flowExternalOperationThreadPoolSize" to min(coreCount, FLOW_EXTERNAL_OPERATION_THREAD_POOL_SIZE_MAX).toString()
|
||||||
|
)
|
||||||
|
|
||||||
val systemOverrides = ConfigFactory.systemProperties().cordaEntriesOnly()
|
val systemOverrides = ConfigFactory.systemProperties().cordaEntriesOnly()
|
||||||
val environmentOverrides = ConfigFactory.systemEnvironment().cordaEntriesOnly()
|
val environmentOverrides = ConfigFactory.systemEnvironment().cordaEntriesOnly()
|
||||||
|
Loading…
Reference in New Issue
Block a user