mirror of
https://github.com/corda/corda.git
synced 2024-12-24 07:06:44 +00:00
ENT-10016: Give all node threads descriptive names
This commit is contained in:
parent
746e16bca0
commit
5b3180bf7b
@ -1,5 +1,6 @@
|
||||
package net.corda.client.rpc
|
||||
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import net.corda.client.rpc.internal.RPCClient
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
|
||||
import net.corda.client.rpc.internal.SerializationEnvironmentHelper
|
||||
@ -52,7 +53,7 @@ class CordaRPCConnection private constructor(
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null
|
||||
): CordaRPCConnection {
|
||||
val observersPool: ExecutorService = Executors.newCachedThreadPool()
|
||||
val observersPool: ExecutorService = Executors.newCachedThreadPool(DefaultThreadFactory("RPCObserver"))
|
||||
return CordaRPCConnection(null, observersPool, ReconnectingCordaRPCOps(
|
||||
addresses,
|
||||
username,
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.client.rpc.internal
|
||||
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import net.corda.client.rpc.ConnectionFailureException
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
@ -99,7 +100,8 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
ErrorInterceptingHandler(reconnectingRPCConnection)) as CordaRPCOps
|
||||
}
|
||||
}
|
||||
private val retryFlowsPool = Executors.newScheduledThreadPool(1)
|
||||
private val retryFlowsPool = Executors.newScheduledThreadPool(1, DefaultThreadFactory("FlowRetry"))
|
||||
|
||||
/**
|
||||
* This function runs a flow and retries until it completes successfully.
|
||||
*
|
||||
|
@ -42,8 +42,8 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
override fun start(): Started = synchronized(this) {
|
||||
check(started == null) { "start can't be called twice" }
|
||||
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace)
|
||||
val backupTransports = backupServerAddressPool.map {
|
||||
p2pConnectorTcpTransport(it, config, threadPoolName = threadPoolName, trace = trace)
|
||||
val backupTransports = backupServerAddressPool.mapIndexed { index, address ->
|
||||
p2pConnectorTcpTransport(address, config, threadPoolName = "$threadPoolName-backup${index+1}", trace = trace)
|
||||
}
|
||||
|
||||
log.info("Connecting to message broker: $serverAddress")
|
||||
|
@ -122,6 +122,7 @@ class ArtemisTcpTransport {
|
||||
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: BrokerRpcSslOptions?,
|
||||
enableSSL: Boolean = true,
|
||||
threadPoolName: String = "RPCServer",
|
||||
trace: Boolean = false,
|
||||
remotingThreads: Int? = null): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
@ -129,7 +130,7 @@ class ArtemisTcpTransport {
|
||||
config.keyStorePath.requireOnDefaultFileSystem()
|
||||
options.putAll(config.toTransportOptions())
|
||||
}
|
||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, null, enableSSL, "RPCServer", trace, remotingThreads)
|
||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, null, enableSSL, threadPoolName, trace, remotingThreads)
|
||||
}
|
||||
|
||||
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
@ -147,14 +148,16 @@ class ArtemisTcpTransport {
|
||||
|
||||
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: SslConfiguration,
|
||||
threadPoolName: String = "Internal-RPCClient",
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
config.addToTransportOptions(options)
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCClient", trace, null)
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, threadPoolName, trace, null)
|
||||
}
|
||||
|
||||
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: SslConfiguration,
|
||||
threadPoolName: String = "Internal-RPCServer",
|
||||
trace: Boolean = false,
|
||||
remotingThreads: Int? = null): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
@ -165,7 +168,7 @@ class ArtemisTcpTransport {
|
||||
options,
|
||||
trustManagerFactory(requireNotNull(config.trustStore).get()),
|
||||
true,
|
||||
"Internal-RPCServer",
|
||||
threadPoolName,
|
||||
trace,
|
||||
remotingThreads
|
||||
)
|
||||
@ -209,7 +212,7 @@ class ArtemisTcpTransport {
|
||||
trace: Boolean,
|
||||
remotingThreads: Int?): TransportConfiguration {
|
||||
return createTransport(
|
||||
NodeNettyConnectorFactory::class.java.name,
|
||||
CordaNettyConnectorFactory::class.java.name,
|
||||
hostAndPort,
|
||||
protocols,
|
||||
options,
|
||||
|
@ -1,8 +1,14 @@
|
||||
@file:JvmName("ArtemisUtils")
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import net.corda.core.internal.declaredField
|
||||
import org.apache.activemq.artemis.utils.actors.ProcessorBase
|
||||
import java.nio.file.FileSystems
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ThreadFactory
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/**
|
||||
* Require that the [Path] is on a default file system, and therefore is one that Artemis is willing to use.
|
||||
@ -16,3 +22,29 @@ fun requireMessageSize(messageSize: Int, limit: Int) {
|
||||
require(messageSize <= limit) { "Message exceeds maxMessageSize network parameter, maxMessageSize: [$limit], message size: [$messageSize]" }
|
||||
}
|
||||
|
||||
val Executor.rootExecutor: Executor get() {
|
||||
var executor: Executor = this
|
||||
while (executor is ProcessorBase<*>) {
|
||||
executor = executor.declaredField<Executor>("delegate").value
|
||||
}
|
||||
return executor
|
||||
}
|
||||
|
||||
fun Executor.setThreadPoolName(threadPoolName: String) {
|
||||
(rootExecutor as? ThreadPoolExecutor)?.let { it.threadFactory = NamedThreadFactory(threadPoolName, it.threadFactory) }
|
||||
}
|
||||
|
||||
private class NamedThreadFactory(poolName: String, private val delegate: ThreadFactory) : ThreadFactory {
|
||||
companion object {
|
||||
private val poolId = AtomicInteger(0)
|
||||
}
|
||||
|
||||
private val prefix = "$poolName-${poolId.incrementAndGet()}-"
|
||||
private val nextId = AtomicInteger(0)
|
||||
|
||||
override fun newThread(r: Runnable): Thread {
|
||||
val thread = delegate.newThread(r)
|
||||
thread.name = "$prefix${nextId.incrementAndGet()}"
|
||||
return thread
|
||||
}
|
||||
}
|
||||
|
@ -14,15 +14,16 @@ import org.apache.activemq.artemis.utils.ConfigurationHelper
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
|
||||
class NodeNettyConnectorFactory : ConnectorFactory {
|
||||
class CordaNettyConnectorFactory : ConnectorFactory {
|
||||
override fun createConnector(configuration: MutableMap<String, Any>?,
|
||||
handler: BufferHandler?,
|
||||
listener: ClientConnectionLifeCycleListener?,
|
||||
closeExecutor: Executor?,
|
||||
threadPool: Executor?,
|
||||
scheduledThreadPool: ScheduledExecutorService?,
|
||||
closeExecutor: Executor,
|
||||
threadPool: Executor,
|
||||
scheduledThreadPool: ScheduledExecutorService,
|
||||
protocolManager: ClientProtocolManager?): Connector {
|
||||
val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration)
|
||||
setThreadPoolName(threadPool, closeExecutor, scheduledThreadPool, threadPoolName)
|
||||
val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
|
||||
return NettyConnector(
|
||||
configuration,
|
||||
@ -31,7 +32,7 @@ class NodeNettyConnectorFactory : ConnectorFactory {
|
||||
closeExecutor,
|
||||
threadPool,
|
||||
scheduledThreadPool,
|
||||
MyClientProtocolManager(threadPoolName, trace)
|
||||
MyClientProtocolManager("$threadPoolName-netty", trace)
|
||||
)
|
||||
}
|
||||
|
||||
@ -39,6 +40,17 @@ class NodeNettyConnectorFactory : ConnectorFactory {
|
||||
|
||||
override fun getDefaults(): Map<String?, Any?> = NettyConnector.DEFAULT_CONFIG
|
||||
|
||||
private fun setThreadPoolName(threadPool: Executor, closeExecutor: Executor, scheduledThreadPool: ScheduledExecutorService, name: String) {
|
||||
threadPool.setThreadPoolName("$name-artemis")
|
||||
// Artemis will actually wrap the same backing Executor to create multiple "OrderedExecutors". In this scenerio both the threadPool
|
||||
// and the closeExecutor are the same when it comes to the pool names. If however they are different then given them separate names.
|
||||
if (threadPool.rootExecutor !== closeExecutor.rootExecutor) {
|
||||
closeExecutor.setThreadPoolName("$name-artemis-closer")
|
||||
}
|
||||
// The scheduler is separate
|
||||
scheduledThreadPool.setThreadPoolName("$name-artemis-scheduler")
|
||||
}
|
||||
|
||||
|
||||
private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() {
|
||||
override fun addChannelHandlers(pipeline: ChannelPipeline) {
|
@ -22,6 +22,7 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.sslDelegatedTaskExecutor
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.slf4j.MDC
|
||||
import rx.Subscription
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
@ -53,7 +55,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
maxMessageSize: Int,
|
||||
revocationConfig: RevocationConfig,
|
||||
enableSNI: Boolean,
|
||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService? = null,
|
||||
trace: Boolean,
|
||||
sslHandshakeTimeout: Duration?,
|
||||
@ -78,9 +80,11 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
|
||||
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, revocationConfig,useOpenSSL, enableSNI, trace = trace, _sslHandshakeTimeout = sslHandshakeTimeout)
|
||||
private var sharedEventLoopGroup: EventLoopGroup? = null
|
||||
private var sslDelegatedTaskExecutor: ExecutorService? = null
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
||||
private const val CORDA_NUM_BRIDGE_THREADS_PROP_NAME = "net.corda.nodeapi.amqpbridgemanager.NumBridgeThreads"
|
||||
|
||||
@ -97,18 +101,11 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
* however Artemis and the remote Corda instanced will deduplicate these messages.
|
||||
*/
|
||||
@Suppress("TooManyFunctions")
|
||||
private class AMQPBridge(val sourceX500Name: String,
|
||||
private inner class AMQPBridge(val sourceX500Name: String,
|
||||
val queueName: String,
|
||||
val targets: List<NetworkHostAndPort>,
|
||||
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
||||
private val amqpConfig: AMQPConfiguration,
|
||||
sharedEventGroup: EventLoopGroup,
|
||||
private val artemis: ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService?,
|
||||
private val bridgeConnectionTTLSeconds: Int) {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
private val amqpConfig: AMQPConfiguration) {
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
val oldMDC = MDC.getCopyOfContextMap() ?: emptyMap<String, String>()
|
||||
@ -134,13 +131,18 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
|
||||
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||
|
||||
val amqpClient = AMQPClient(targets, allowedRemoteLegalNames, amqpConfig, sharedThreadPool = sharedEventGroup)
|
||||
val amqpClient = AMQPClient(
|
||||
targets,
|
||||
allowedRemoteLegalNames,
|
||||
amqpConfig,
|
||||
AMQPClient.NettyThreading.Shared(sharedEventLoopGroup!!, sslDelegatedTaskExecutor!!)
|
||||
)
|
||||
private var session: ClientSession? = null
|
||||
private var consumer: ClientConsumer? = null
|
||||
private var connectedSubscription: Subscription? = null
|
||||
@Volatile
|
||||
private var messagesReceived: Boolean = false
|
||||
private val eventLoop: EventLoop = sharedEventGroup.next()
|
||||
private val eventLoop: EventLoop = sharedEventLoopGroup!!.next()
|
||||
private var artemisState: ArtemisState = ArtemisState.STOPPED
|
||||
set(value) {
|
||||
logDebugWithMDC { "State change $field to $value" }
|
||||
@ -152,32 +154,9 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
private var scheduledExecutorService: ScheduledExecutorService
|
||||
= Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder().setNameFormat("bridge-connection-reset-%d").build())
|
||||
|
||||
@Suppress("ClassNaming")
|
||||
private sealed class ArtemisState {
|
||||
object STARTING : ArtemisState()
|
||||
data class STARTED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||
|
||||
object CHECKING : ArtemisState()
|
||||
object RESTARTED : ArtemisState()
|
||||
object RECEIVING : ArtemisState()
|
||||
|
||||
object AMQP_STOPPED : ArtemisState()
|
||||
object AMQP_STARTING : ArtemisState()
|
||||
object AMQP_STARTED : ArtemisState()
|
||||
object AMQP_RESTARTED : ArtemisState()
|
||||
|
||||
object STOPPING : ArtemisState()
|
||||
object STOPPED : ArtemisState()
|
||||
data class STOPPED_AMQP_START_SCHEDULED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||
|
||||
open val pending: ScheduledFuture<Unit>? = null
|
||||
|
||||
override fun toString(): String = javaClass.simpleName
|
||||
}
|
||||
|
||||
private fun artemis(inProgress: ArtemisState, block: (precedingState: ArtemisState) -> ArtemisState) {
|
||||
val runnable = {
|
||||
synchronized(artemis) {
|
||||
synchronized(artemis!!) {
|
||||
try {
|
||||
val precedingState = artemisState
|
||||
artemisState.pending?.cancel(false)
|
||||
@ -253,7 +232,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
}
|
||||
artemis(ArtemisState.STARTING) {
|
||||
val startedArtemis = artemis.started
|
||||
val startedArtemis = artemis!!.started
|
||||
if (startedArtemis == null) {
|
||||
logInfoWithMDC("Bridge Connected but Artemis is disconnected")
|
||||
ArtemisState.STOPPED
|
||||
@ -457,6 +436,29 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("ClassNaming")
|
||||
private sealed class ArtemisState {
|
||||
object STARTING : ArtemisState()
|
||||
data class STARTED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||
|
||||
object CHECKING : ArtemisState()
|
||||
object RESTARTED : ArtemisState()
|
||||
object RECEIVING : ArtemisState()
|
||||
|
||||
object AMQP_STOPPED : ArtemisState()
|
||||
object AMQP_STARTING : ArtemisState()
|
||||
object AMQP_STARTED : ArtemisState()
|
||||
object AMQP_RESTARTED : ArtemisState()
|
||||
|
||||
object STOPPING : ArtemisState()
|
||||
object STOPPED : ArtemisState()
|
||||
data class STOPPED_AMQP_START_SCHEDULED(override val pending: ScheduledFuture<Unit>) : ArtemisState()
|
||||
|
||||
open val pending: ScheduledFuture<Unit>? = null
|
||||
|
||||
override fun toString(): String = javaClass.simpleName
|
||||
}
|
||||
|
||||
override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
|
||||
lock.withLock {
|
||||
val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() }
|
||||
@ -467,8 +469,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize,
|
||||
revocationConfig, useOpenSsl, enableSNI, sourceX500Name, trace, sslHandshakeTimeout) }
|
||||
val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!,
|
||||
bridgeMetricsService, bridgeConnectionTTLSeconds)
|
||||
val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig)
|
||||
bridges += newBridge
|
||||
bridgeMetricsService?.bridgeCreated(targets, legalNames)
|
||||
newBridge
|
||||
@ -497,15 +498,16 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
// queueNamesToBridgesMap returns a mutable list, .toList converts it to a immutable list so it won't be changed by the [destroyBridge] method.
|
||||
val bridges = queueNamesToBridgesMap[queueName]?.toList()
|
||||
destroyBridge(queueName, bridges?.flatMap { it.targets } ?: emptyList())
|
||||
bridges?.map {
|
||||
bridges?.associate {
|
||||
it.sourceX500Name to BridgeEntry(it.queueName, it.targets, it.allowedRemoteLegalNames.toList(), serviceAddress = false)
|
||||
}?.toMap() ?: emptyMap()
|
||||
} ?: emptyMap()
|
||||
}
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("AMQPBridge", Thread.MAX_PRIORITY))
|
||||
val artemis = artemisMessageClientFactory()
|
||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("NettyBridge", Thread.MAX_PRIORITY))
|
||||
sslDelegatedTaskExecutor = sslDelegatedTaskExecutor("NettyBridge")
|
||||
val artemis = artemisMessageClientFactory("ArtemisBridge")
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
}
|
||||
@ -522,6 +524,8 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
sharedEventLoopGroup = null
|
||||
queueNamesToBridgesMap.clear()
|
||||
artemis?.stop()
|
||||
sslDelegatedTaskExecutor?.shutdown()
|
||||
sslDelegatedTaskExecutor = null
|
||||
}
|
||||
}
|
||||
}
|
@ -34,7 +34,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
||||
maxMessageSize: Int,
|
||||
revocationConfig: RevocationConfig,
|
||||
enableSNI: Boolean,
|
||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider,
|
||||
bridgeMetricsService: BridgeMetricsService? = null,
|
||||
trace: Boolean = false,
|
||||
sslHandshakeTimeout: Duration? = null,
|
||||
@ -79,7 +79,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
|
||||
bridgeNotifyQueue = "$BRIDGE_NOTIFY.$queueDisambiguityId"
|
||||
|
||||
bridgeManager.start()
|
||||
val artemis = artemisMessageClientFactory()
|
||||
val artemis = artemisMessageClientFactory("BridgeControl")
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
val artemisClient = artemis.started!!
|
||||
|
@ -37,7 +37,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore,
|
||||
maxMessageSize: Int,
|
||||
revocationConfig: RevocationConfig,
|
||||
enableSNI: Boolean,
|
||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
private val artemisMessageClientFactory: (String) -> ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService? = null,
|
||||
private val isLocalInbox: (String) -> Boolean,
|
||||
trace: Boolean,
|
||||
@ -204,7 +204,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore,
|
||||
|
||||
override fun start() {
|
||||
super.start()
|
||||
val artemis = artemisMessageClientFactory()
|
||||
val artemis = artemisMessageClientFactory("LoopbackBridge")
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
}
|
||||
|
@ -32,7 +32,9 @@ import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.lang.Long.min
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.concurrent.withLock
|
||||
@ -61,8 +63,7 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA
|
||||
class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
||||
private val configuration: AMQPConfiguration,
|
||||
private val sharedThreadPool: EventLoopGroup? = null,
|
||||
private val threadPoolName: String = "AMQPClient",
|
||||
private val nettyThreading: NettyThreading = NettyThreading.NonShared("AMQPClient"),
|
||||
private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON) : AutoCloseable {
|
||||
companion object {
|
||||
init {
|
||||
@ -82,7 +83,6 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
private val lock = ReentrantLock()
|
||||
@Volatile
|
||||
private var started: Boolean = false
|
||||
private var workerGroup: EventLoopGroup? = null
|
||||
@Volatile
|
||||
private var clientChannel: Channel? = null
|
||||
// Offset into the list of targets, so that we can implement round-robin reconnect logic.
|
||||
@ -94,7 +94,6 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
private var amqpActive = false
|
||||
@Volatile
|
||||
private var amqpChannelHandler: ChannelHandler? = null
|
||||
private var sslDelegatedTaskExecutor: ExecutorService? = null
|
||||
|
||||
val localAddressString: String
|
||||
get() = clientChannel?.localAddress()?.toString() ?: "<unknownLocalAddress>"
|
||||
@ -123,7 +122,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
log.info("Failed to connect to $currentTarget", future.cause())
|
||||
|
||||
if (started) {
|
||||
workerGroup?.schedule({
|
||||
nettyThreading.eventLoopGroup.schedule({
|
||||
nextTarget()
|
||||
restart()
|
||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||
@ -142,7 +141,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
clientChannel = null
|
||||
if (started && !amqpActive) {
|
||||
log.debug { "Scheduling restart of $currentTarget (AMQP inactive)" }
|
||||
workerGroup?.schedule({
|
||||
nettyThreading.eventLoopGroup.schedule({
|
||||
nextTarget()
|
||||
restart()
|
||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||
@ -198,7 +197,6 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
|
||||
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, parent.configuration)
|
||||
val target = parent.currentTarget
|
||||
val delegatedTaskExecutor = checkNotNull(parent.sslDelegatedTaskExecutor)
|
||||
val handler = if (parent.configuration.useOpenSsl) {
|
||||
createClientOpenSslHandler(
|
||||
target,
|
||||
@ -206,7 +204,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
wrappedKeyManagerFactory,
|
||||
trustManagerFactory,
|
||||
ch.alloc(),
|
||||
delegatedTaskExecutor
|
||||
parent.nettyThreading.sslDelegatedTaskExecutor
|
||||
)
|
||||
} else {
|
||||
createClientSslHandler(
|
||||
@ -214,7 +212,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
parent.allowedRemoteLegalNames,
|
||||
wrappedKeyManagerFactory,
|
||||
trustManagerFactory,
|
||||
delegatedTaskExecutor
|
||||
parent.nettyThreading.sslDelegatedTaskExecutor
|
||||
)
|
||||
}
|
||||
handler.handshakeTimeoutMillis = conf.sslHandshakeTimeout.toMillis()
|
||||
@ -256,7 +254,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
|
||||
if (started && amqpActive) {
|
||||
log.debug { "Scheduling restart of $currentTarget (AMQP active)" }
|
||||
workerGroup?.schedule({
|
||||
nettyThreading.eventLoopGroup.schedule({
|
||||
nextTarget()
|
||||
restart()
|
||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||
@ -273,8 +271,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
return
|
||||
}
|
||||
log.info("Connect to: $currentTarget")
|
||||
sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName)
|
||||
workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY))
|
||||
(nettyThreading as? NettyThreading.NonShared)?.start()
|
||||
started = true
|
||||
restart()
|
||||
}
|
||||
@ -286,7 +283,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
}
|
||||
val bootstrap = Bootstrap()
|
||||
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
|
||||
bootstrap.group(workerGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this))
|
||||
bootstrap.group(nettyThreading.eventLoopGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this))
|
||||
// Delegate DNS Resolution to the proxy side, if we are using proxy.
|
||||
if (configuration.proxyConfig != null) {
|
||||
bootstrap.resolver(NoopAddressResolverGroup.INSTANCE)
|
||||
@ -300,16 +297,12 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
lock.withLock {
|
||||
log.info("Stopping connection to: $currentTarget, Local address: $localAddressString")
|
||||
started = false
|
||||
if (sharedThreadPool == null) {
|
||||
workerGroup?.shutdownGracefully()
|
||||
workerGroup?.terminationFuture()?.sync()
|
||||
if (nettyThreading is NettyThreading.NonShared) {
|
||||
nettyThreading.stop()
|
||||
} else {
|
||||
clientChannel?.close()?.sync()
|
||||
}
|
||||
clientChannel = null
|
||||
workerGroup = null
|
||||
sslDelegatedTaskExecutor?.shutdown()
|
||||
sslDelegatedTaskExecutor = null
|
||||
log.info("Stopped connection to $currentTarget")
|
||||
}
|
||||
}
|
||||
@ -350,4 +343,36 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized()
|
||||
val onConnection: Observable<ConnectionChange>
|
||||
get() = _onConnection
|
||||
|
||||
|
||||
sealed class NettyThreading {
|
||||
abstract val eventLoopGroup: EventLoopGroup
|
||||
abstract val sslDelegatedTaskExecutor: Executor
|
||||
|
||||
class Shared(override val eventLoopGroup: EventLoopGroup,
|
||||
override val sslDelegatedTaskExecutor: ExecutorService = sslDelegatedTaskExecutor("AMQPClient")) : NettyThreading()
|
||||
|
||||
class NonShared(val threadPoolName: String) : NettyThreading() {
|
||||
private var _eventLoopGroup: NioEventLoopGroup? = null
|
||||
override val eventLoopGroup: EventLoopGroup get() = checkNotNull(_eventLoopGroup)
|
||||
|
||||
private var _sslDelegatedTaskExecutor: ThreadPoolExecutor? = null
|
||||
override val sslDelegatedTaskExecutor: ExecutorService get() = checkNotNull(_sslDelegatedTaskExecutor)
|
||||
|
||||
fun start() {
|
||||
check(_eventLoopGroup == null)
|
||||
check(_sslDelegatedTaskExecutor == null)
|
||||
_eventLoopGroup = NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY))
|
||||
_sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName)
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
eventLoopGroup.shutdownGracefully()
|
||||
eventLoopGroup.terminationFuture().sync()
|
||||
sslDelegatedTaskExecutor.shutdown()
|
||||
_eventLoopGroup = null
|
||||
_sslDelegatedTaskExecutor = null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -300,7 +300,7 @@ abstract class AbstractServerRevocationTest {
|
||||
listOf(NetworkHostAndPort("localhost", targetPort)),
|
||||
setOf(CHARLIE_NAME),
|
||||
amqpConfig,
|
||||
threadPoolName = legalName.organisation,
|
||||
nettyThreading = AMQPClient.NettyThreading.NonShared(legalName.organisation),
|
||||
distPointCrlSource = CertDistPointCrlSource(connectTimeout = crlConnectTimeout)
|
||||
)
|
||||
amqpClients += amqpClient
|
||||
|
@ -503,7 +503,7 @@ class ProtonWrapperTests {
|
||||
listOf(NetworkHostAndPort("localhost", serverPort)),
|
||||
setOf(ALICE_NAME),
|
||||
amqpConfig,
|
||||
sharedThreadPool = sharedEventGroup)
|
||||
nettyThreading = AMQPClient.NettyThreading.Shared(sharedEventGroup))
|
||||
}
|
||||
|
||||
private fun createServer(port: Int,
|
||||
|
@ -5,6 +5,7 @@ import com.codahale.metrics.MetricRegistry
|
||||
import com.google.common.collect.MutableClassToInstanceMap
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.zaxxer.hikari.pool.HikariPool
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import net.corda.common.logging.errorReporting.NodeDatabaseErrors
|
||||
import net.corda.confidential.SwapIdentitiesFlow
|
||||
import net.corda.core.CordaException
|
||||
@ -334,7 +335,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
private val schedulerService = makeNodeSchedulerService()
|
||||
|
||||
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
|
||||
private val shutdownExecutor = Executors.newSingleThreadExecutor()
|
||||
private val shutdownExecutor = Executors.newSingleThreadExecutor(DefaultThreadFactory("Shutdown"))
|
||||
|
||||
protected abstract val transactionVerifierWorkerCount: Int
|
||||
/**
|
||||
@ -770,7 +771,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
} else {
|
||||
1.days
|
||||
}
|
||||
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater"))
|
||||
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("NetworkMapPublisher"))
|
||||
executor.submit(object : Runnable {
|
||||
override fun run() {
|
||||
val republishInterval = try {
|
||||
|
@ -415,12 +415,13 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
private fun makeBridgeControlListener(serverAddress: NetworkHostAndPort, networkParameters: NetworkParameters): BridgeControlListener {
|
||||
val artemisMessagingClientFactory = {
|
||||
val artemisMessagingClientFactory = { threadPoolName: String ->
|
||||
ArtemisMessagingClient(
|
||||
configuration.p2pSslOptions,
|
||||
serverAddress,
|
||||
networkParameters.maxMessageSize,
|
||||
failoverCallback = { errorAndTerminate("ArtemisMessagingClient failed. Shutting down.", null) }
|
||||
failoverCallback = { errorAndTerminate("ArtemisMessagingClient failed. Shutting down.", null) },
|
||||
threadPoolName = threadPoolName
|
||||
)
|
||||
}
|
||||
return BridgeControlListener(
|
||||
@ -431,7 +432,8 @@ open class Node(configuration: NodeConfiguration,
|
||||
networkParameters.maxMessageSize,
|
||||
configuration.crlCheckSoftFail.toRevocationConfig(),
|
||||
false,
|
||||
artemisMessagingClientFactory)
|
||||
artemisMessagingClientFactory
|
||||
)
|
||||
}
|
||||
|
||||
private fun startLocalRpcBroker(securityManager: RPCSecurityManager): BrokerAddresses? {
|
||||
|
@ -2,6 +2,7 @@ package net.corda.node.services.events
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.context.InvocationOrigin
|
||||
@ -148,7 +149,7 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
// from the database
|
||||
private val startingStateRefs: MutableSet<ScheduledStateRef> = ConcurrentHashMap.newKeySet<ScheduledStateRef>()
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
|
||||
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor(DefaultThreadFactory("SchedulerService"))
|
||||
|
||||
// if there's nothing to do, check every minute if something fell through the cracks.
|
||||
// any new state should trigger a reschedule immediately if nothing is scheduled, so I would not expect
|
||||
|
@ -65,7 +65,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
private val messagingServerAddress: NetworkHostAndPort,
|
||||
private val maxMessageSize: Int,
|
||||
private val journalBufferTimeout : Int? = null,
|
||||
private val threadPoolName: String = "ArtemisServer",
|
||||
private val threadPoolName: String = "P2PServer",
|
||||
private val trace: Boolean = false,
|
||||
private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON,
|
||||
private val remotingThreads: Int? = null) : ArtemisBroker, SingletonSerializeAsToken() {
|
||||
|
@ -17,6 +17,7 @@ import net.corda.nodeapi.internal.config.CertificateStore
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.createAndInitSslContext
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.keyManagerFactory
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.sslDelegatedTaskExecutor
|
||||
import net.corda.nodeapi.internal.setThreadPoolName
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor
|
||||
@ -54,10 +55,23 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
|
||||
handler: BufferHandler?,
|
||||
listener: ServerConnectionLifeCycleListener?,
|
||||
threadPool: Executor,
|
||||
scheduledThreadPool: ScheduledExecutorService?,
|
||||
scheduledThreadPool: ScheduledExecutorService,
|
||||
protocolMap: Map<String, ProtocolManager<BaseInterceptor<*>>>?): Acceptor {
|
||||
val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Acceptor", configuration)
|
||||
threadPool.setThreadPoolName("$threadPoolName-artemis")
|
||||
scheduledThreadPool.setThreadPoolName("$threadPoolName-artemis-scheduler")
|
||||
val failureExecutor = OrderedExecutor(threadPool)
|
||||
return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
|
||||
return NodeNettyAcceptor(
|
||||
name,
|
||||
clusterConnection,
|
||||
configuration,
|
||||
handler,
|
||||
listener,
|
||||
scheduledThreadPool,
|
||||
failureExecutor,
|
||||
protocolMap,
|
||||
"$threadPoolName-netty"
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@ -68,14 +82,14 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
|
||||
listener: ServerConnectionLifeCycleListener?,
|
||||
scheduledThreadPool: ScheduledExecutorService?,
|
||||
failureExecutor: Executor,
|
||||
protocolMap: Map<String, ProtocolManager<BaseInterceptor<*>>>?) :
|
||||
protocolMap: Map<String, ProtocolManager<BaseInterceptor<*>>>?,
|
||||
private val threadPoolName: String) :
|
||||
NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
|
||||
{
|
||||
companion object {
|
||||
private val defaultThreadPoolNamePattern = Pattern.compile("""Thread-(\d+) \(activemq-netty-threads\)""")
|
||||
}
|
||||
|
||||
private val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "NodeNettyAcceptor", configuration)
|
||||
private val sslDelegatedTaskExecutor = sslDelegatedTaskExecutor(threadPoolName)
|
||||
private val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
|
||||
|
||||
|
@ -74,7 +74,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
}
|
||||
|
||||
private val parametersUpdatesTrack = PublishSubject.create<ParametersUpdateInfo>()
|
||||
private val networkMapPoller = ScheduledThreadPoolExecutor(1, NamedThreadFactory("Network Map Updater Thread")).apply {
|
||||
private val networkMapPoller = ScheduledThreadPoolExecutor(1, NamedThreadFactory("NetworkMapUpdater")).apply {
|
||||
executeExistingDelayedTasksAfterShutdownPolicy = false
|
||||
}
|
||||
private var newNetworkParameters: Pair<ParametersUpdate, SignedNetworkParameters>? = null
|
||||
@ -261,9 +261,12 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
//as HTTP GET is mostly IO bound, use more threads than CPU's
|
||||
//maximum threads to use = 24, as if we did not limit this on large machines it could result in 100's of concurrent requests
|
||||
val threadsToUseForNetworkMapDownload = min(Runtime.getRuntime().availableProcessors() * 4, 24)
|
||||
val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(threadsToUseForNetworkMapDownload, NamedThreadFactory("NetworkMapUpdaterNodeInfoDownloadThread"))
|
||||
val executorToUseForDownloadingNodeInfos = Executors.newFixedThreadPool(
|
||||
threadsToUseForNetworkMapDownload,
|
||||
NamedThreadFactory("NetworkMapUpdaterNodeInfoDownload")
|
||||
)
|
||||
//DB insert is single threaded - use a single threaded executor for it.
|
||||
val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsertThread"))
|
||||
val executorToUseForInsertionIntoDB = Executors.newSingleThreadExecutor(NamedThreadFactory("NetworkMapUpdateDBInsert"))
|
||||
val hashesToFetch = (allHashesFromNetworkMap - allNodeHashes)
|
||||
val networkMapDownloadStartTime = System.currentTimeMillis()
|
||||
if (hashesToFetch.isNotEmpty()) {
|
||||
|
@ -22,8 +22,7 @@ class InternalRPCMessagingClient(val sslConfig: MutualSslConfiguration, val serv
|
||||
private var rpcServer: RPCServer? = null
|
||||
|
||||
fun init(rpcOps: List<RPCOps>, securityManager: RPCSecurityManager, cacheFactory: NamedCacheFactory) = synchronized(this) {
|
||||
|
||||
val tcpTransport = ArtemisTcpTransport.rpcInternalClientTcpTransport(serverAddress, sslConfig)
|
||||
val tcpTransport = ArtemisTcpTransport.rpcInternalClientTcpTransport(serverAddress, sslConfig, threadPoolName = "RPCClient")
|
||||
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||
// would be the default and the two lines below can be deleted.
|
||||
|
@ -30,10 +30,10 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
|
||||
setDirectories(baseDirectory)
|
||||
|
||||
val acceptorConfigurationsSet = mutableSetOf(
|
||||
rpcAcceptorTcpTransport(address, sslOptions, enableSSL = useSsl)
|
||||
rpcAcceptorTcpTransport(address, sslOptions, enableSSL = useSsl, threadPoolName = "RPCServer")
|
||||
)
|
||||
adminAddress?.let {
|
||||
acceptorConfigurationsSet += rpcInternalAcceptorTcpTransport(it, nodeConfiguration)
|
||||
acceptorConfigurationsSet += rpcInternalAcceptorTcpTransport(it, nodeConfiguration, threadPoolName = "RPCServerAdmin")
|
||||
}
|
||||
acceptorConfigurations = acceptorConfigurationsSet
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
@ -22,10 +23,6 @@ internal class FlowMonitor(
|
||||
) : LifecycleSupport {
|
||||
|
||||
private companion object {
|
||||
private fun defaultScheduler(): ScheduledExecutorService {
|
||||
return Executors.newSingleThreadScheduledExecutor()
|
||||
}
|
||||
|
||||
private val logger = loggerFor<FlowMonitor>()
|
||||
}
|
||||
|
||||
@ -36,7 +33,7 @@ internal class FlowMonitor(
|
||||
override fun start() {
|
||||
synchronized(this) {
|
||||
if (scheduler == null) {
|
||||
scheduler = defaultScheduler()
|
||||
scheduler = Executors.newSingleThreadScheduledExecutor(DefaultThreadFactory("FlowMonitor"))
|
||||
shutdownScheduler = true
|
||||
}
|
||||
scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty() }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS)
|
||||
|
Loading…
Reference in New Issue
Block a user