From efb901dcb4662bc32f1643e8e9eecc894b757c99 Mon Sep 17 00:00:00 2001 From: Adel El-Beik Date: Tue, 2 May 2023 11:24:39 +0100 Subject: [PATCH 1/5] ENT-9883: Updated CODEOWNERS file. --- .github/CODEOWNERS | 45 +++++++++++++-------------------------------- 1 file changed, 13 insertions(+), 32 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e824a836e0..8a8dff99e1 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -2,10 +2,10 @@ *.md @corda/technical-writers # By default anything under core or node-api is the Kernel team -core @corda/kernel -node-api @corda/kernel -node/src/main/kotlin/net/corda/node/internal @corda/kernel -node/src/main/kotlin/net/corda/node/services @corda/kernel +core @rick-r3 +node-api @rick-r3 +node/src/main/kotlin/net/corda/node/internal @rick-r3 +node/src/main/kotlin/net/corda/node/services @rick-r3 # Determinstic components core-deterministic @chrisr3 @@ -17,46 +17,27 @@ serialization-tests @chrisr3 # Demobench defaults to Chris, but Viktor for the main code tools/demobench @chrisr3 -tools/demobench/src/main/kotlin/net/corda/demobench @vkolomeyko # General Corda code -client/rpc @vkolomeyko - -core/src/main/kotlin/net/corda/core/flows @dimosr +core/src/main/kotlin/net/corda/core/flows @rick-r3 core/src/main/kotlin/net/corda/core/internal/notary @corda/notaries -core/src/main/kotlin/net/corda/core/messaging @vkolomeyko -node/src/integration-test/kotlin/net/corda/node/persistence @blsemo -node/src/integration-test/kotlin/net/corda/node/services/persistence @blsemo -node/src/main/kotlin/net/corda/node/internal/artemis @rekalov -node/src/main/kotlin/net/corda/node/services/identity @rekalov -node/src/main/kotlin/net/corda/node/services/keys @rekalov -node/src/main/kotlin/net/corda/node/services/messaging @dimosr -node/src/main/kotlin/net/corda/node/services/network @rekalov -node/src/main/kotlin/net/corda/node/services/persistence @blsemo -node/src/main/kotlin/net/corda/node/services/rpc @vkolomeyko -node/src/main/kotlin/net/corda/node/services/statemachine @lankydan -node/src/main/kotlin/net/corda/node/utilities/registration @rekalov +node/src/integration-test/kotlin/net/corda/node/persistence @chriscochrane +node/src/integration-test/kotlin/net/corda/node/services/persistence @chriscochrane +node/src/main/kotlin/net/corda/node/services/messaging @rick-r3 +node/src/main/kotlin/net/corda/node/services/persistence @rick-r3 +node/src/main/kotlin/net/corda/node/services/statemachine @rick-r3 node/src/main/kotlin/net/corda/notary @corda/notaries -node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging @vkolomeyko -node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto @rekalov -node-api/src/main/kotlin/net/corda/nodeapi/internal/cryptoservice @rekalov -node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle @vkolomeyko -node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence @blsemo -node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper @vkolomeyko -node-api/src/test/kotlin/net/corda/nodeapi/internal/bridging @rekalov +node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence @rick-r3 -common/logging/src/main/kotlin/net/corda/common/logging/errorReporting @JamesHR3 -common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting @JamesHR3 +common/logging/src/main/kotlin/net/corda/common/logging/errorReporting @chriscochrane +common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting @chriscochrane # Single file ownerships go at the end, as they are most specific and take precedence over other ownerships core/src/main/kotlin/net/corda/core/internal/AbstractAttachment.kt @adelel1 core/src/main/kotlin/net/corda/core/internal/AttachmentTrustCalculator.kt @adelel1 core/src/main/kotlin/net/corda/core/internal/AttachmentWithContext.kt @adelel1 -core/src/main/kotlin/net/corda/core/internal/CertRole.kt @rekalov core/src/main/kotlin/net/corda/core/node/services/AttachmentStorage.kt @adelel1 -core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt @rekalov -core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @rekalov From 3d9c682ce8c2abc7f80bec7ab896256ffbfbc365 Mon Sep 17 00:00:00 2001 From: Adel El-Beik Date: Tue, 2 May 2023 11:24:39 +0100 Subject: [PATCH 2/5] ENT-9883: Updated CODEOWNERS file. --- .github/CODEOWNERS | 45 +++++++++++++-------------------------------- 1 file changed, 13 insertions(+), 32 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e824a836e0..8a8dff99e1 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -2,10 +2,10 @@ *.md @corda/technical-writers # By default anything under core or node-api is the Kernel team -core @corda/kernel -node-api @corda/kernel -node/src/main/kotlin/net/corda/node/internal @corda/kernel -node/src/main/kotlin/net/corda/node/services @corda/kernel +core @rick-r3 +node-api @rick-r3 +node/src/main/kotlin/net/corda/node/internal @rick-r3 +node/src/main/kotlin/net/corda/node/services @rick-r3 # Determinstic components core-deterministic @chrisr3 @@ -17,46 +17,27 @@ serialization-tests @chrisr3 # Demobench defaults to Chris, but Viktor for the main code tools/demobench @chrisr3 -tools/demobench/src/main/kotlin/net/corda/demobench @vkolomeyko # General Corda code -client/rpc @vkolomeyko - -core/src/main/kotlin/net/corda/core/flows @dimosr +core/src/main/kotlin/net/corda/core/flows @rick-r3 core/src/main/kotlin/net/corda/core/internal/notary @corda/notaries -core/src/main/kotlin/net/corda/core/messaging @vkolomeyko -node/src/integration-test/kotlin/net/corda/node/persistence @blsemo -node/src/integration-test/kotlin/net/corda/node/services/persistence @blsemo -node/src/main/kotlin/net/corda/node/internal/artemis @rekalov -node/src/main/kotlin/net/corda/node/services/identity @rekalov -node/src/main/kotlin/net/corda/node/services/keys @rekalov -node/src/main/kotlin/net/corda/node/services/messaging @dimosr -node/src/main/kotlin/net/corda/node/services/network @rekalov -node/src/main/kotlin/net/corda/node/services/persistence @blsemo -node/src/main/kotlin/net/corda/node/services/rpc @vkolomeyko -node/src/main/kotlin/net/corda/node/services/statemachine @lankydan -node/src/main/kotlin/net/corda/node/utilities/registration @rekalov +node/src/integration-test/kotlin/net/corda/node/persistence @chriscochrane +node/src/integration-test/kotlin/net/corda/node/services/persistence @chriscochrane +node/src/main/kotlin/net/corda/node/services/messaging @rick-r3 +node/src/main/kotlin/net/corda/node/services/persistence @rick-r3 +node/src/main/kotlin/net/corda/node/services/statemachine @rick-r3 node/src/main/kotlin/net/corda/notary @corda/notaries -node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging @vkolomeyko -node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto @rekalov -node-api/src/main/kotlin/net/corda/nodeapi/internal/cryptoservice @rekalov -node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle @vkolomeyko -node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence @blsemo -node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper @vkolomeyko -node-api/src/test/kotlin/net/corda/nodeapi/internal/bridging @rekalov +node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence @rick-r3 -common/logging/src/main/kotlin/net/corda/common/logging/errorReporting @JamesHR3 -common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting @JamesHR3 +common/logging/src/main/kotlin/net/corda/common/logging/errorReporting @chriscochrane +common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting @chriscochrane # Single file ownerships go at the end, as they are most specific and take precedence over other ownerships core/src/main/kotlin/net/corda/core/internal/AbstractAttachment.kt @adelel1 core/src/main/kotlin/net/corda/core/internal/AttachmentTrustCalculator.kt @adelel1 core/src/main/kotlin/net/corda/core/internal/AttachmentWithContext.kt @adelel1 -core/src/main/kotlin/net/corda/core/internal/CertRole.kt @rekalov core/src/main/kotlin/net/corda/core/node/services/AttachmentStorage.kt @adelel1 -core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt @rekalov -core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @rekalov From 09518532077affe290f33daf879481fb46f421fe Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Thu, 11 May 2023 09:49:40 +0100 Subject: [PATCH 3/5] ENT-6515: Cherry-pick of ENT-6315 - Allow dumping of paused flows (#7008) (#7363) This is a cherry-pick of https://github.com/corda/corda/pull/7008, which also resolves ENT-6515 Co-authored-by: Dan Newton --- .../node/services/rpc/DumpCheckpointsTest.kt | 59 +++++++++++++++++-- .../node/services/rpc/CheckpointDumperImpl.kt | 14 ++++- .../statemachine/StateMachineState.kt | 44 +++++++++++--- 3 files changed, 101 insertions(+), 16 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt index 21ac40a96d..a4582f6740 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt @@ -14,8 +14,11 @@ import net.corda.core.internal.list import net.corda.core.internal.readFully import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds import net.corda.node.internal.NodeStartup import net.corda.node.services.Permissions +import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.CountUpDownLatch import net.corda.testing.core.ALICE_NAME import net.corda.testing.driver.DriverParameters @@ -36,8 +39,8 @@ class DumpCheckpointsTest { private val flowProceedLatch = CountUpDownLatch(1) } - @Test(timeout=300_000) - fun `verify checkpoint dump via RPC`() { + @Test(timeout = 300_000) + fun `verify checkpoint dump via RPC`() { val user = User("mark", "dadada", setOf(Permissions.all())) driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) { @@ -55,20 +58,44 @@ class DumpCheckpointsTest { flowProceedLatch.countDown() assertEquals(1, checkPointCountFuture.get()) - checkDumpFile(logDirPath) + checkDumpFile(logDirPath, GetNumberOfCheckpointsFlow::class.java, Checkpoint.FlowStatus.RUNNABLE) } } } - private fun checkDumpFile(dir: Path) { + @Test(timeout = 300_000) + fun `paused flows included in checkpoint dump output`() { + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + + it.proxy.startFlow(::EasyFlow) + + // Hack to get the flow to show as paused + it.proxy.startFlow(::SetAllFlowsToPausedFlow).returnValue.getOrThrow(10.seconds) + + val logDirPath = nodeAHandle.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME + logDirPath.createDirectories() + nodeAHandle.checkpointsRpc.use { checkpointRPCOps -> checkpointRPCOps.dumpCheckpoints() } + + checkDumpFile(logDirPath, EasyFlow::class.java, Checkpoint.FlowStatus.PAUSED) + } + } + } + + private fun checkDumpFile(dir: Path, containsClass: Class>, flowStatus: Checkpoint.FlowStatus) { // The directory supposed to contain a single ZIP file val file = dir.list().single { it.isRegularFile() } ZipInputStream(file.inputStream()).use { zip -> val entry = zip.nextEntry assertThat(entry.name, containsSubstring("json")) - val content = zip.readFully() - assertThat(String(content), containsSubstring(GetNumberOfCheckpointsFlow::class.java.name)) + val content = String(zip.readFully()) + assertThat(content, containsSubstring(containsClass.name)) + assertThat(content, containsSubstring(flowStatus.name)) } } @@ -94,4 +121,24 @@ class DumpCheckpointsTest { flowProceedLatch.await() } } + + @StartableByRPC + class EasyFlow : FlowLogic() { + @Suspendable + override fun call(): Int { + sleep(2.minutes) + return 1 + } + } + + @StartableByRPC + class SetAllFlowsToPausedFlow : FlowLogic() { + @Suspendable + override fun call(): Int { + return serviceHub + .jdbcSession() + .prepareStatement("UPDATE node_checkpoints SET status = '${Checkpoint.FlowStatus.PAUSED.ordinal}'") + .use { ps -> ps.executeUpdate() } + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt index 0849fdf919..a9aceab60b 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt @@ -221,7 +221,10 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri instrumentCheckpointAgent(runId) val (bytes, fileName) = try { - val checkpoint = serialisedCheckpoint.deserialize(checkpointSerializationContext) + val checkpoint = serialisedCheckpoint.deserialize( + checkpointSerializationContext, + alwaysDeserializeFlowState = true + ) val json = checkpoint.toJson(runId.uuid, now) val jsonBytes = writer.writeValueAsBytes(json) jsonBytes to "${json.topLevelFlowClass.simpleName}-${runId.uuid}.json" @@ -259,7 +262,12 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri //Dump checkpoints in "fibers" folder for((runId, serializedCheckpoint) in stream) { - val flowState = serializedCheckpoint.deserialize(checkpointSerializationContext).flowState + val flowState = serializedCheckpoint.deserialize( + checkpointSerializationContext, + alwaysDeserializeFlowState = true + ).flowState + // This includes paused flows because we have forced the deserialization of the checkpoint's flow state + // which will show as started. if(flowState is FlowState.Started) writeFiber2Zip(zip, checkpointSerializationContext, runId, flowState) } @@ -354,6 +362,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri topLevelFlowLogic = flowLogic, flowCallStackSummary = flowCallStack.toSummary(), flowCallStack = flowCallStack, + status = status, suspendedOn = (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn( timestamp, now @@ -436,6 +445,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri val topLevelFlowClass: Class>, val topLevelFlowLogic: FlowLogic<*>, val flowCallStackSummary: List, + val status: Checkpoint.FlowStatus, val suspendedOn: SuspendedOn?, val flowCallStack: List, val origin: Origin, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 5bd7cd8e9c..a2bc214675 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -215,17 +215,28 @@ data class Checkpoint( /** * Deserializes the serialized fields contained in [Checkpoint.Serialized]. * - * @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized] + * Depending on the [FlowStatus] of the [Checkpoint.Serialized], the deserialized [Checkpoint] may or may not have its [flowState] + * properly deserialized. This is to optimise the process's memory footprint by not holding the checkpoints of flows that are not + * running in-memory. + * + * The [flowState] will not be deserialized when the [FlowStatus] is: + * + * - [FlowStatus.PAUSED] + * - [FlowStatus.COMPLETED] + * - [FlowStatus.FAILED] + * + * Any other status returns a [FlowState.Unstarted] or [FlowState.Started] depending on the content of [serializedFlowState]. + * + * @param checkpointSerializationContext The [CheckpointSerializationContext] to deserialize the checkpoint's serialized content with. + * @param alwaysDeserializeFlowState A flag to specify if [flowState] should be deserialized, disregarding the [FlowStatus] of the + * checkpoint and ignoring the memory optimisation. + * + * @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized]. */ - fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint { - val flowState = when(status) { - FlowStatus.PAUSED -> FlowState.Paused - FlowStatus.COMPLETED, FlowStatus.FAILED -> FlowState.Finished - else -> serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext) - } + fun deserialize(checkpointSerializationContext: CheckpointSerializationContext, alwaysDeserializeFlowState: Boolean = false): Checkpoint { return Checkpoint( checkpointState = serializedCheckpointState.checkpointDeserialize(checkpointSerializationContext), - flowState = flowState, + flowState = getFlowState(checkpointSerializationContext, alwaysDeserializeFlowState), errorState = errorState, result = result?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT), status = status, @@ -234,6 +245,23 @@ data class Checkpoint( compatible = compatible ) } + + private fun getFlowState( + checkpointSerializationContext: CheckpointSerializationContext, + alwaysDeserializeFlowState: Boolean + ): FlowState { + return when { + alwaysDeserializeFlowState -> deserializeFlowState(checkpointSerializationContext) + status == FlowStatus.PAUSED -> FlowState.Paused + status == FlowStatus.COMPLETED -> FlowState.Finished + status == FlowStatus.FAILED -> FlowState.Finished + else -> deserializeFlowState(checkpointSerializationContext) + } + } + + private fun deserializeFlowState(checkpointSerializationContext: CheckpointSerializationContext): FlowState { + return serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext) + } } } From 31a34e5a5c26985642949597d62be965cc018c00 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Fri, 12 May 2023 10:11:09 +0100 Subject: [PATCH 4/5] ENT-9941: Improved Netty logging, especially of the embedded broker (#7365) --- .../internal/ArtemisMessagingClient.kt | 12 +-- .../nodeapi/internal/ArtemisTcpTransport.kt | 82 ++++++++++++------- .../internal/bridging/AMQPBridgeManager.kt | 5 +- .../nodeapi/internal/crypto/X509Utilities.kt | 28 ++++++- .../protonwrapper/netty/AMQPChannelHandler.kt | 5 +- .../protonwrapper/netty/AMQPClient.kt | 12 ++- .../protonwrapper/netty/AMQPServer.kt | 9 +- .../internal/protonwrapper/netty/SSLHelper.kt | 39 ++++----- .../revocation/CertDistPointCrlSource.kt | 27 +++++- .../revocation/CordaRevocationChecker.kt | 34 ++++---- .../net/corda/node/amqp/AMQPBridgeTest.kt | 2 +- .../CertificateRevocationListNodeTests.kt | 58 +++++++++---- .../net/corda/node/amqp/ProtonWrapperTests.kt | 2 +- .../messaging/ArtemisMessagingTest.kt | 2 +- .../services/messaging/SimpleMQClient.kt | 2 +- .../messaging/ArtemisMessagingServer.kt | 7 +- .../messaging/NodeNettyAcceptorFactory.kt | 68 ++++++++++++++- .../messaging/NodeNettyConnectorFactory.kt | 63 ++++++++++++++ .../services/messaging/P2PMessagingClient.kt | 5 +- .../coretesting/internal/NettyTestClient.kt | 6 +- .../coretesting/internal/NettyTestServer.kt | 5 +- 21 files changed, 345 insertions(+), 128 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyConnectorFactory.kt diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt index 2c585a4b48..721b856fdd 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt @@ -24,7 +24,9 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, private val confirmationWindowSize: Int = -1, private val messagingServerConnectionConfig: MessagingServerConnectionConfiguration? = null, private val backupServerAddressPool: List = emptyList(), - private val failoverCallback: ((FailoverEventType) -> Unit)? = null + private val failoverCallback: ((FailoverEventType) -> Unit)? = null, + private val threadPoolName: String = "ArtemisClient", + private val trace: Boolean = false ) : ArtemisSessionProvider { companion object { private val log = loggerFor() @@ -39,8 +41,10 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, override fun start(): Started = synchronized(this) { check(started == null) { "start can't be called twice" } - val tcpTransport = p2pConnectorTcpTransport(serverAddress, config) - val backupTransports = backupServerAddressPool.map { p2pConnectorTcpTransport(it, config) } + val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace) + val backupTransports = backupServerAddressPool.map { + p2pConnectorTcpTransport(it, config, threadPoolName = threadPoolName, trace = trace) + } log.info("Connecting to message broker: $serverAddress") if (backupTransports.isNotEmpty()) { @@ -49,8 +53,6 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, // If back-up artemis addresses are configured, the locator will be created using HA mode. @Suppress("SpreadOperator") val locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply { - // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this - // would be the default and the two lines below can be deleted. connectionTTL = 60000 clientFailureCheckPeriod = 30000 callFailoverTimeout = java.lang.Long.getLong(CORDA_ARTEMIS_CALL_TIMEOUT_PROP_NAME, CORDA_ARTEMIS_CALL_TIMEOUT_DEFAULT) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt index 8e5c132307..6183cfe818 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt @@ -9,10 +9,10 @@ import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.SslConfiguration import org.apache.activemq.artemis.api.core.TransportConfiguration -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import java.nio.file.Path +@Suppress("LongParameterList") class ArtemisTcpTransport { companion object { val CIPHER_SUITES = listOf( @@ -22,8 +22,9 @@ class ArtemisTcpTransport { val TLS_VERSIONS = listOf("TLSv1.2") - const val SSL_HANDSHAKE_TIMEOUT_NAME = "SSLHandshakeTimeout" - const val TRACE_NAME = "trace" + const val SSL_HANDSHAKE_TIMEOUT_NAME = "Corda-SSLHandshakeTimeout" + const val TRACE_NAME = "Corda-Trace" + const val THREAD_POOL_NAME_NAME = "Corda-ThreadPoolName" // Turn on AMQP support, which needs the protocol jar on the classpath. // Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop. @@ -94,24 +95,25 @@ class ArtemisTcpTransport { fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, + threadPoolName: String = "P2PServer", trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() if (enableSSL) { config?.addToTransportOptions(options) } - return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, trace) + return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace) } fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, - keyStoreProvider: String? = null): TransportConfiguration { + threadPoolName: String = "P2PClient", + trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() if (enableSSL) { config?.addToTransportOptions(options) - options += asMap(keyStoreProvider) } - return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL) + return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace) } fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, @@ -123,63 +125,87 @@ class ArtemisTcpTransport { config.keyStorePath.requireOnDefaultFileSystem() options.putAll(config.toTransportOptions()) } - return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, trace) + return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCServer", trace) } - fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { + fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, + config: ClientRpcSslOptions?, + enableSSL: Boolean = true, + trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() if (config != null && enableSSL) { config.trustStorePath.requireOnDefaultFileSystem() options.putAll(config.toTransportOptions()) } - return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL) + return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCClient", trace) } - fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration { + fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, + config: SslConfiguration, + trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() config.addToTransportOptions(options) - options += asMap(keyStoreProvider) - return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true) + return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCClient", trace) } fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, - keyStoreProvider: String? = null, trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() config.addToTransportOptions(options) - options += asMap(keyStoreProvider) - return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true, trace = trace) - } - - private fun asMap(keyStoreProvider: String?): Map { - return keyStoreProvider?.let {mutableMapOf(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to it)} ?: emptyMap() + return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCServer", trace) } private fun createAcceptorTransport(hostAndPort: NetworkHostAndPort, protocols: String, options: MutableMap, enableSSL: Boolean, + threadPoolName: String, trace: Boolean): TransportConfiguration { - options += defaultArtemisOptions(hostAndPort, protocols) - if (enableSSL) { - options += defaultSSLOptions - } // Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections options[TransportConstants.HANDSHAKE_TIMEOUT] = 0 - options[TRACE_NAME] = trace - return TransportConfiguration("net.corda.node.services.messaging.NodeNettyAcceptorFactory", options) + return createTransport( + "net.corda.node.services.messaging.NodeNettyAcceptorFactory", + hostAndPort, + protocols, + options, + enableSSL, + threadPoolName, + trace + ) } private fun createConnectorTransport(hostAndPort: NetworkHostAndPort, protocols: String, options: MutableMap, - enableSSL: Boolean): TransportConfiguration { + enableSSL: Boolean, + threadPoolName: String, + trace: Boolean): TransportConfiguration { + return createTransport( + "net.corda.node.services.messaging.NodeNettyConnectorFactory", + hostAndPort, + protocols, + options, + enableSSL, + threadPoolName, + trace + ) + } + + private fun createTransport(className: String, + hostAndPort: NetworkHostAndPort, + protocols: String, + options: MutableMap, + enableSSL: Boolean, + threadPoolName: String, + trace: Boolean): TransportConfiguration { options += defaultArtemisOptions(hostAndPort, protocols) if (enableSSL) { options += defaultSSLOptions } - return TransportConfiguration(NettyConnectorFactory::class.java.name, options) + options[THREAD_POOL_NAME_NAME] = threadPoolName + options[TRACE_NAME] = trace + return TransportConfiguration(className, options) } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 9b24adc538..ee09e640ad 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -5,16 +5,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import io.netty.channel.EventLoop import io.netty.channel.EventLoopGroup import io.netty.channel.nio.NioEventLoopGroup +import io.netty.util.concurrent.DefaultThreadFactory import net.corda.core.identity.CordaX500Name import net.corda.core.internal.VisibleForTesting import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger +import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.ArtemisSessionProvider -import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient @@ -503,7 +504,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore, } override fun start() { - sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS) + sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("AMQPBridge", Thread.MAX_PRIORITY)) val artemis = artemisMessageClientFactory() this.artemis = artemis artemis.start() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt index ff115ec78c..9bc577e831 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt @@ -1,3 +1,5 @@ +@file:Suppress("MagicNumber", "TooGenericExceptionCaught") + package net.corda.nodeapi.internal.crypto import net.corda.core.CordaOID @@ -6,6 +8,8 @@ import net.corda.core.crypto.newSecureRandom import net.corda.core.internal.* import net.corda.core.utilities.days import net.corda.core.utilities.millis +import net.corda.core.utilities.toHex +import net.corda.nodeapi.internal.protonwrapper.netty.distributionPointsToString import org.bouncycastle.asn1.* import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.style.BCStyle @@ -368,7 +372,6 @@ object X509Utilities { } } - @Suppress("MagicNumber") private fun generateCertificateSerialNumber(): BigInteger { val bytes = ByteArray(CERTIFICATE_SERIAL_NUMBER_LENGTH) newSecureRandom().nextBytes(bytes) @@ -408,6 +411,29 @@ fun PKCS10CertificationRequest.isSignatureValid(): Boolean { return this.isSignatureValid(JcaContentVerifierProviderBuilder().build(this.subjectPublicKeyInfo)) } +fun X509Certificate.toSimpleString(): String { + val bcCert = toBc() + val keyIdentifier = try { + SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex() + } catch (e: Exception) { + "null" + } + val authorityKeyIdentifier = try { + AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex() + } catch (e: Exception) { + "null" + } + val subject = bcCert.subject + val issuer = bcCert.issuer + val role = CertRole.extract(this) + return "$subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] $role $serialNumber [${distributionPointsToString()}]" +} + +fun X509CRL.toSimpleString(): String { + val revokedSerialNumbers = revokedCertificates?.map { it.serialNumber } + return "$issuerX500Principal ${thisUpdate.toInstant()} ${nextUpdate.toInstant()} ${revokedSerialNumbers ?: "[]"}" +} + /** * Check certificate validity or print warning if expiry is within 30 days */ diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index 7d294dbee6..7bb8e9ad39 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -115,11 +115,10 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, val transport = connection.transport as ProtonJTransport transport.protocolTracer = object : ProtocolTracer { override fun sentFrame(transportFrame: TransportFrame) { - logInfoWithMDC { "${transportFrame.body}" } + logInfoWithMDC { "sentFrame: ${transportFrame.body}" } } - override fun receivedFrame(transportFrame: TransportFrame) { - logInfoWithMDC { "${transportFrame.body}" } + logInfoWithMDC { "receivedFrame: ${transportFrame.body}" } } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt index 29a8ae26f9..3e8fc485ba 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt @@ -1,7 +1,11 @@ package net.corda.nodeapi.internal.protonwrapper.netty import io.netty.bootstrap.Bootstrap -import io.netty.channel.* +import io.netty.channel.Channel +import io.netty.channel.ChannelFutureListener +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelInitializer +import io.netty.channel.EventLoopGroup import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioSocketChannel @@ -11,6 +15,7 @@ import io.netty.handler.proxy.HttpProxyHandler import io.netty.handler.proxy.Socks4ProxyHandler import io.netty.handler.proxy.Socks5ProxyHandler import io.netty.resolver.NoopAddressResolverGroup +import io.netty.util.concurrent.DefaultThreadFactory import io.netty.util.internal.logging.InternalLoggerFactory import io.netty.util.internal.logging.Slf4JLoggerFactory import net.corda.core.identity.CordaX500Name @@ -57,7 +62,8 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA class AMQPClient(private val targets: List, val allowedRemoteLegalNames: Set, private val configuration: AMQPConfiguration, - private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable { + private val sharedThreadPool: EventLoopGroup? = null, + private val threadPoolName: String = "AMQPClient") : AutoCloseable { companion object { init { InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE) @@ -254,7 +260,7 @@ class AMQPClient(private val targets: List, return } log.info("Connect to: $currentTarget") - workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS) + workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY)) started = true restart() } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt index 126e47a3e6..cbeb2562b4 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt @@ -11,6 +11,7 @@ import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LoggingHandler +import io.netty.util.concurrent.DefaultThreadFactory import io.netty.util.internal.logging.InternalLoggerFactory import io.netty.util.internal.logging.Slf4JLoggerFactory import net.corda.core.utilities.NetworkHostAndPort @@ -37,8 +38,8 @@ import kotlin.concurrent.withLock */ class AMQPServer(val hostName: String, val port: Int, - private val configuration: AMQPConfiguration) : AutoCloseable { - + private val configuration: AMQPConfiguration, + private val threadPoolName: String = "AMQPServer") : AutoCloseable { companion object { init { InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE) @@ -131,8 +132,8 @@ class AMQPServer(val hostName: String, lock.withLock { stop() - bossGroup = NioEventLoopGroup(1) - workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS) + bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("$threadPoolName-boss", Thread.MAX_PRIORITY)) + workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS, DefaultThreadFactory("$threadPoolName-worker", Thread.MAX_PRIORITY)) val server = ServerBootstrap() // TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt index b531988a7f..705fbc2905 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt @@ -14,33 +14,38 @@ import net.corda.core.internal.VisibleForTesting import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug -import net.corda.core.utilities.toHex import net.corda.nodeapi.internal.ArtemisTcpTransport import net.corda.nodeapi.internal.config.CertificateStore -import net.corda.nodeapi.internal.crypto.toBc +import net.corda.nodeapi.internal.crypto.toSimpleString import net.corda.nodeapi.internal.crypto.x509 import org.bouncycastle.asn1.ASN1InputStream import org.bouncycastle.asn1.ASN1Primitive import org.bouncycastle.asn1.DERIA5String import org.bouncycastle.asn1.DEROctetString import org.bouncycastle.asn1.x500.X500Name -import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier import org.bouncycastle.asn1.x509.CRLDistPoint import org.bouncycastle.asn1.x509.DistributionPointName import org.bouncycastle.asn1.x509.Extension import org.bouncycastle.asn1.x509.GeneralName import org.bouncycastle.asn1.x509.GeneralNames -import org.bouncycastle.asn1.x509.SubjectKeyIdentifier import org.slf4j.LoggerFactory import java.net.Socket import java.net.URI import java.security.KeyStore -import java.security.cert.* -import java.util.* +import java.security.cert.CertificateException +import java.security.cert.PKIXBuilderParameters +import java.security.cert.PKIXRevocationChecker +import java.security.cert.X509CertSelector +import java.security.cert.X509Certificate import java.util.concurrent.Executor -import javax.net.ssl.* +import javax.net.ssl.CertPathTrustManagerParameters +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.SNIHostName +import javax.net.ssl.SSLContext +import javax.net.ssl.SSLEngine +import javax.net.ssl.TrustManagerFactory +import javax.net.ssl.X509ExtendedTrustManager import javax.security.auth.x500.X500Principal -import kotlin.collections.HashMap import kotlin.system.measureTimeMillis private const val HOSTNAME_FORMAT = "%s.corda.net" @@ -109,23 +114,7 @@ fun certPathToString(certPath: Array?): String { if (certPath == null) { return "" } - val certs = certPath.map { - val bcCert = it.toBc() - val subject = bcCert.subject.toString() - val issuer = bcCert.issuer.toString() - val keyIdentifier = try { - SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex() - } catch (ex: Exception) { - "null" - } - val authorityKeyIdentifier = try { - AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex() - } catch (ex: Exception) { - "null" - } - " $subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] [${it.distributionPointsToString()}]" - } - return certs.joinToString("\r\n") + return certPath.joinToString(System.lineSeparator()) { " ${it.toSimpleString()}" } } @VisibleForTesting diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CertDistPointCrlSource.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CertDistPointCrlSource.kt index cb8c9e3174..db984f11b8 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CertDistPointCrlSource.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CertDistPointCrlSource.kt @@ -3,7 +3,10 @@ package net.corda.nodeapi.internal.revocation import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.LoadingCache import net.corda.core.internal.readFully +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.nodeapi.internal.crypto.X509CertificateFactory +import net.corda.nodeapi.internal.crypto.toSimpleString import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource import net.corda.nodeapi.internal.protonwrapper.netty.distributionPoints import java.net.URI @@ -15,8 +18,11 @@ import javax.security.auth.x500.X500Principal /** * [CrlSource] which downloads CRLs from the distribution points in the X509 certificate. */ +@Suppress("TooGenericExceptionCaught") class CertDistPointCrlSource : CrlSource { companion object { + private val logger = contextLogger() + // The default SSL handshake timeout is 60s (DEFAULT_SSL_HANDSHAKE_TIMEOUT). Considering there are 3 CRLs endpoints to check in a // node handshake, we want to keep the total timeout within that. private const val DEFAULT_CONNECT_TIMEOUT = 9_000 @@ -33,7 +39,8 @@ class CertDistPointCrlSource : CrlSource { private val readTimeout = Integer.getInteger("net.corda.dpcrl.read.timeout", DEFAULT_READ_TIMEOUT) private fun retrieveCRL(uri: URI): X509CRL { - val bytes = run { + val start = System.currentTimeMillis() + val bytes = try { val conn = uri.toURL().openConnection() conn.connectTimeout = connectTimeout conn.readTimeout = readTimeout @@ -41,12 +48,26 @@ class CertDistPointCrlSource : CrlSource { // in an InputStream, but the JDK implementation (sun.security.provider.X509Factory.engineGenerateCRL) converts any IOException // into CRLException and drops the cause chain. conn.getInputStream().readFully() + } catch (e: Exception) { + if (logger.isDebugEnabled) { + logger.debug("Unable to download CRL from $uri (${System.currentTimeMillis() - start}ms)", e) + } + throw e } - return X509CertificateFactory().generateCRL(bytes.inputStream()) + val duration = System.currentTimeMillis() - start + val crl = try { + X509CertificateFactory().generateCRL(bytes.inputStream()) + } catch (e: Exception) { + if (logger.isDebugEnabled) { + logger.debug("Invalid CRL from $uri (${duration}ms)", e) + } + throw e + } + logger.debug { "CRL from $uri (${duration}ms): ${crl.toSimpleString()}" } + return crl } } - @Suppress("TooGenericExceptionCaught") override fun fetch(certificate: X509Certificate): Set { val approvedCRLs = HashSet() var exception: Exception? = null diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CordaRevocationChecker.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CordaRevocationChecker.kt index b90bde624e..1e0a3ecf53 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CordaRevocationChecker.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CordaRevocationChecker.kt @@ -1,6 +1,8 @@ package net.corda.nodeapi.internal.revocation import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.nodeapi.internal.crypto.toSimpleString import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource import org.bouncycastle.asn1.x509.Extension import java.security.cert.CRLReason @@ -27,8 +29,8 @@ class CordaRevocationChecker(private val crlSource: CrlSource, private val softFailExceptions = ArrayList() override fun check(cert: Certificate, unresolvedCritExts: MutableCollection?) { - val x509Certificate = cert as X509Certificate - checkApprovedCRLs(x509Certificate, getCRLs(x509Certificate)) + cert as X509Certificate + checkApprovedCRLs(cert, getCRLs(cert)) } @Suppress("TooGenericExceptionCaught") @@ -40,30 +42,27 @@ class CordaRevocationChecker(private val crlSource: CrlSource, addSoftFailException(e) return emptySet() } else { - throw undeterminedRevocationException("Unable to retrieve CRLs", e) + throw undeterminedRevocationException("Unable to retrieve CRLs for cert ${cert.serialNumber}", e) } } if (crls.isNotEmpty() || softFail) { return crls } // Note, the JDK tries to find a valid CRL from a different signing key before giving up (RevocationChecker.verifyWithSeparateSigningKey) - throw undeterminedRevocationException("Could not find any valid CRLs", null) + throw undeterminedRevocationException("Could not find any valid CRLs for cert ${cert.serialNumber}", null) } /** * Borrowed from `RevocationChecker.checkApprovedCRLs()` */ @Suppress("NestedBlockDepth") - @Throws(CertPathValidatorException::class) private fun checkApprovedCRLs(cert: X509Certificate, approvedCRLs: Set) { // See if the cert is in the set of approved crls. - logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() cert SN: ${cert.serialNumber}") + logger.debug { "Check cert ${cert.serialNumber} against CRLs ${approvedCRLs.map { it.toSimpleString() }}" } for (crl in approvedCRLs) { val entry = crl.getRevokedCertificate(cert) if (entry != null) { - logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() CRL entry: $entry") - /* * Abort CRL validation and throw exception if there are any * unrecognized critical CRL entry extensions (see section @@ -75,19 +74,15 @@ class CordaRevocationChecker(private val crlSource: CrlSource, unresCritExts.remove(Extension.cRLDistributionPoints.id) unresCritExts.remove(Extension.certificateIssuer.id) if (unresCritExts.isNotEmpty()) { - throw CertPathValidatorException( - "Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts") + throw CertPathValidatorException("Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts") } } val reasonCode = entry.revocationReason ?: CRLReason.UNSPECIFIED val revocationDate = entry.revocationDate if (revocationDate.before(dateSource())) { - val t = CertificateRevokedException( - revocationDate, reasonCode, - crl.issuerX500Principal, mutableMapOf()) - throw CertPathValidatorException( - t.message, t, null, -1, BasicReason.REVOKED) + val t = CertificateRevokedException(revocationDate, reasonCode, crl.issuerX500Principal, emptyMap()) + throw CertPathValidatorException(t.message, t, null, -1, BasicReason.REVOKED) } } } @@ -105,15 +100,18 @@ class CordaRevocationChecker(private val crlSource: CrlSource, return false } - override fun getSupportedExtensions(): MutableSet? { + override fun getSupportedExtensions(): Set? { return null } override fun init(forward: Boolean) { + if (forward) { + throw CertPathValidatorException("Forward checking not allowed") + } softFailExceptions.clear() } - override fun getSoftFailExceptions(): MutableList { + override fun getSoftFailExceptions(): List { return Collections.unmodifiableList(softFailExceptions) } @@ -125,4 +123,4 @@ class CordaRevocationChecker(private val crlSource: CrlSource, private fun undeterminedRevocationException(message: String?, cause: Throwable?): CertPathValidatorException { return CertPathValidatorException(message, cause, null, -1, BasicReason.UNDETERMINED_REVOCATION_STATUS) } -} \ No newline at end of file +} diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 72d2c1bd47..a6d03bc337 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -204,7 +204,7 @@ class AMQPBridgeTest { doReturn(null).whenever(it).jmxMonitoringHttpPort } artemisConfig.configureWithDevSSLCertificate() - val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE, null) + val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE) val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE) artemisServer.start() artemisClient.start() diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt index b99060a732..4bfdfb2a93 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt @@ -25,7 +25,6 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig import net.corda.testing.core.ALICE_NAME -import net.corda.testing.core.BOB_NAME import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.MAX_MESSAGE_SIZE import net.corda.testing.driver.internal.incrementalPortAllocation @@ -49,6 +48,7 @@ import java.time.Duration import java.util.concurrent.atomic.AtomicInteger import kotlin.test.assertEquals +@Suppress("LongParameterList") class CertificateRevocationListNodeTests { @Rule @JvmField @@ -326,17 +326,18 @@ class CertificateRevocationListNodeTests { private fun createAMQPClient(targetPort: Int, crlCheckSoftFail: Boolean, + legalName: CordaX500Name, nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL", maxMessageSize: Int = MAX_MESSAGE_SIZE): X509Certificate { - val baseDirectory = temporaryFolder.root.toPath() / "client" + val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation val certificatesDirectory = baseDirectory / "certificates" val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory) val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) val clientConfig = rigorousMock().also { doReturn(baseDirectory).whenever(it).baseDirectory doReturn(certificatesDirectory).whenever(it).certificatesDirectory - doReturn(BOB_NAME).whenever(it).myLegalName + doReturn(legalName).whenever(it).myLegalName doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(signingCertificateStore).whenever(it).signingCertificateStore doReturn(crlCheckSoftFail).whenever(it).crlCheckSoftFail @@ -350,28 +351,32 @@ class CertificateRevocationListNodeTests { override val trustStore = clientConfig.p2pSslOptions.trustStore.get() override val maxMessageSize: Int = maxMessageSize } - amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", targetPort)), setOf(ALICE_NAME, CHARLIE_NAME), amqpConfig) + amqpClient = AMQPClient( + listOf(NetworkHostAndPort("localhost", targetPort)), + setOf(CHARLIE_NAME), + amqpConfig, + threadPoolName = legalName.organisation + ) return nodeCert } - @Suppress("LongParameterList") private fun createAMQPServer(port: Int, - name: CordaX500Name = ALICE_NAME, + legalName: CordaX500Name, crlCheckSoftFail: Boolean, nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL", maxMessageSize: Int = MAX_MESSAGE_SIZE, sslHandshakeTimeout: Duration? = null): X509Certificate { check(!::amqpServer.isInitialized) - val baseDirectory = temporaryFolder.root.toPath() / "server" + val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation val certificatesDirectory = baseDirectory / "certificates" val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory) val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) val serverConfig = rigorousMock().also { doReturn(baseDirectory).whenever(it).baseDirectory doReturn(certificatesDirectory).whenever(it).certificatesDirectory - doReturn(name).whenever(it).myLegalName + doReturn(legalName).whenever(it).myLegalName doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(signingCertificateStore).whenever(it).signingCertificateStore } @@ -385,7 +390,7 @@ class CertificateRevocationListNodeTests { override val maxMessageSize: Int = maxMessageSize override val sslHandshakeTimeout: Duration = sslHandshakeTimeout ?: super.sslHandshakeTimeout } - amqpServer = AMQPServer("0.0.0.0", port, amqpConfig) + amqpServer = AMQPServer("0.0.0.0", port, amqpConfig, threadPoolName = legalName.organisation) return nodeCert } @@ -421,7 +426,6 @@ class CertificateRevocationListNodeTests { return newNodeCert } - @Suppress("LongParameterList") private fun verifyAMQPConnection(crlCheckSoftFail: Boolean, nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", revokeServerCert: Boolean = false, @@ -430,6 +434,7 @@ class CertificateRevocationListNodeTests { expectedConnectStatus: Boolean) { val serverCert = createAMQPServer( serverPort, + CHARLIE_NAME, crlCheckSoftFail = crlCheckSoftFail, nodeCrlDistPoint = nodeCrlDistPoint, sslHandshakeTimeout = sslHandshakeTimeout @@ -444,6 +449,7 @@ class CertificateRevocationListNodeTests { val clientCert = createAMQPClient( serverPort, crlCheckSoftFail = crlCheckSoftFail, + legalName = ALICE_NAME, nodeCrlDistPoint = nodeCrlDistPoint ) if (revokeClientCert) { @@ -455,7 +461,8 @@ class CertificateRevocationListNodeTests { assertThat(serverConnect.connected).isEqualTo(expectedConnectStatus) } - private fun createArtemisServerAndClient(crlCheckSoftFail: Boolean, + private fun createArtemisServerAndClient(legalName: CordaX500Name, + crlCheckSoftFail: Boolean, crlCheckArtemisServer: Boolean, nodeCrlDistPoint: String, sslHandshakeTimeout: Duration?): Pair { @@ -466,7 +473,7 @@ class CertificateRevocationListNodeTests { val artemisConfig = rigorousMock().also { doReturn(baseDirectory).whenever(it).baseDirectory doReturn(certificatesDirectory).whenever(it).certificatesDirectory - doReturn(CHARLIE_NAME).whenever(it).myLegalName + doReturn(legalName).whenever(it).myLegalName doReturn(signingCertificateStore).whenever(it).signingCertificateStore doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress @@ -477,14 +484,25 @@ class CertificateRevocationListNodeTests { artemisConfig.configureWithDevSSLCertificate() recreateNodeCaAndTlsCertificates(signingCertificateStore, p2pSslConfiguration, nodeCrlDistPoint, null) - val server = ArtemisMessagingServer(artemisConfig, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE, null) - val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE) + val server = ArtemisMessagingServer( + artemisConfig, + artemisConfig.p2pAddress, + MAX_MESSAGE_SIZE, + threadPoolName = "${legalName.organisation}-server", + trace = true + ) + val client = ArtemisMessagingClient( + artemisConfig.p2pSslOptions, + artemisConfig.p2pAddress, + MAX_MESSAGE_SIZE, + threadPoolName = "${legalName.organisation}-client", + trace = true + ) server.start() client.start() return server to client } - @Suppress("LongParameterList") private fun verifyArtemisConnection(crlCheckSoftFail: Boolean, crlCheckArtemisServer: Boolean, expectedConnected: Boolean = true, @@ -493,11 +511,17 @@ class CertificateRevocationListNodeTests { nodeCrlDistPoint: String = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", sslHandshakeTimeout: Duration? = null) { val queueName = P2P_PREFIX + "Test" - val (artemisServer, artemisClient) = createArtemisServerAndClient(crlCheckSoftFail, crlCheckArtemisServer, nodeCrlDistPoint, sslHandshakeTimeout) + val (artemisServer, artemisClient) = createArtemisServerAndClient( + CHARLIE_NAME, + crlCheckSoftFail, + crlCheckArtemisServer, + nodeCrlDistPoint, + sslHandshakeTimeout + ) artemisServer.use { artemisClient.started!!.session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) - val nodeCert = createAMQPClient(serverPort, true, nodeCrlDistPoint) + val nodeCert = createAMQPClient(serverPort, true, ALICE_NAME, nodeCrlDistPoint) if (revokedNodeCert) { crlServer.revokedNodeCerts.add(nodeCert.serialNumber) } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 1fd59b9704..c69538aae5 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -437,7 +437,7 @@ class ProtonWrapperTests { } artemisConfig.configureWithDevSSLCertificate() - val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize, null) + val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize) val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", artemisPort), maxMessageSize) server.start() client.start() diff --git a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 302d1def96..56af87c83f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -240,7 +240,7 @@ class ArtemisMessagingTest { } private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer { - return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null, true).apply { + return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, trace = true).apply { config.configureWithDevSSLCertificate() messagingServer = this } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/SimpleMQClient.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/SimpleMQClient.kt index fa5fc09d53..b52422fff0 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/SimpleMQClient.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/SimpleMQClient.kt @@ -22,7 +22,7 @@ class SimpleMQClient(val target: NetworkHostAndPort, lateinit var producer: ClientProducer fun start(username: String? = null, password: String? = null, enableSSL: Boolean = true) { - val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL) + val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL, threadPoolName = "SimpleMQClient") val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { isBlockOnNonDurableSend = true threadPoolMaxSize = 1 diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index d6b7214f1b..d1e0a7ea16 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -55,7 +55,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE class ArtemisMessagingServer(private val config: NodeConfiguration, private val messagingServerAddress: NetworkHostAndPort, private val maxMessageSize: Int, - private val journalBufferTimeout : Int?, + private val journalBufferTimeout : Int? = null, + private val threadPoolName: String = "ArtemisServer", private val trace: Boolean = false) : ArtemisBroker, SingletonSerializeAsToken() { companion object { private val log = contextLogger() @@ -132,9 +133,10 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, // The transaction cache is configurable, and drives other cache sizes. globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize) - acceptorConfigurations.add(p2pAcceptorTcpTransport( + addAcceptorConfiguration(p2pAcceptorTcpTransport( NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config.p2pSslOptions, + threadPoolName = threadPoolName, trace = trace )) // Enable built in message deduplication. Note we still have to do our own as the delayed commits @@ -181,7 +183,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue) } - @Throws(IOException::class, KeyStoreException::class) private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { val keyStore = config.p2pSslOptions.keyStore.get().value.internal val trustStore = config.p2pSslOptions.trustStore.get().value.internal diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt index eb0e6afd16..d8ba1ddbb2 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt @@ -1,11 +1,14 @@ package net.corda.node.services.messaging import io.netty.buffer.ByteBufAllocator +import io.netty.channel.ChannelHandlerContext import io.netty.channel.group.ChannelGroup import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LoggingHandler import io.netty.handler.ssl.SslHandler +import io.netty.handler.ssl.SslHandshakeTimeoutException import net.corda.core.internal.declaredField +import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.ArtemisTcpTransport import org.apache.activemq.artemis.api.core.BaseInterceptor import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor @@ -15,10 +18,14 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory import org.apache.activemq.artemis.spi.core.remoting.BufferHandler import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener +import org.apache.activemq.artemis.utils.ConfigurationHelper import org.apache.activemq.artemis.utils.actors.OrderedExecutor +import java.nio.channels.ClosedChannelException import java.time.Duration import java.util.concurrent.Executor import java.util.concurrent.ScheduledExecutorService +import java.util.regex.Pattern +import javax.net.ssl.SSLEngine @Suppress("unused") // Used via reflection in ArtemisTcpTransport class NodeNettyAcceptorFactory : AcceptorFactory { @@ -34,6 +41,7 @@ class NodeNettyAcceptorFactory : AcceptorFactory { return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap) } + private class NodeNettyAcceptor(name: String?, clusterConnection: ClusterConnection?, configuration: Map, @@ -44,24 +52,76 @@ class NodeNettyAcceptorFactory : AcceptorFactory { protocolMap: Map>>?) : NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap) { + companion object { + private val defaultThreadPoolNamePattern = Pattern.compile("""Thread-(\d+) \(activemq-netty-threads\)""") + } + + private val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "NodeNettyAcceptor", configuration) + private val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration) + + @Synchronized override fun start() { super.start() - if (configuration[ArtemisTcpTransport.TRACE_NAME] == true) { - // Artemis does not seem to allow access to the underlying channel so we resort to reflection and get it via the - // serverChannelGroup field. This field is only available after start(), hence why we add the logger here. + if (trace) { + // Unfortunately we have to resort to reflection to be able to get access to the server channel(s) declaredField("serverChannelGroup").value.forEach { channel -> channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO)) } } } + @Synchronized override fun getSslHandler(alloc: ByteBufAllocator?): SslHandler { - val sslHandler = super.getSslHandler(alloc) + applyThreadPoolName() + val engine = super.getSslHandler(alloc).engine() + val sslHandler = NodeAcceptorSslHandler(engine, trace) val handshakeTimeout = configuration[ArtemisTcpTransport.SSL_HANDSHAKE_TIMEOUT_NAME] as Duration? if (handshakeTimeout != null) { sslHandler.handshakeTimeoutMillis = handshakeTimeout.toMillis() } return sslHandler } + + /** + * [NettyAcceptor.start] has hardcoded the thread pool name and does not provide a way to configure it. This is a workaround. + */ + private fun applyThreadPoolName() { + val matcher = defaultThreadPoolNamePattern.matcher(Thread.currentThread().name) + if (matcher.matches()) { + Thread.currentThread().name = "$threadPoolName-${matcher.group(1)}" // Preserve the pool thread number + } + } + } + + + private class NodeAcceptorSslHandler(engine: SSLEngine, private val trace: Boolean) : SslHandler(engine) { + companion object { + private val logger = contextLogger() + } + + override fun handlerAdded(ctx: ChannelHandlerContext) { + logHandshake() + super.handlerAdded(ctx) + // Unfortunately NettyAcceptor does not let us add extra child handlers, so we have to add our logger this way. + if (trace) { + ctx.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO)) + } + } + + private fun logHandshake() { + val start = System.currentTimeMillis() + handshakeFuture().addListener { + val duration = System.currentTimeMillis() - start + when { + it.isSuccess -> logger.info("SSL handshake completed in ${duration}ms with ${engine().session.peerPrincipal}") + it.isCancelled -> logger.warn("SSL handshake cancelled after ${duration}ms") + else -> when (it.cause()) { + is ClosedChannelException -> logger.warn("SSL handshake closed early after ${duration}ms") + is SslHandshakeTimeoutException -> logger.warn("SSL handshake timed out after ${duration}ms") + else -> logger.warn("SSL handshake failed after ${duration}ms", it.cause()) + } + } + } + } } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyConnectorFactory.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyConnectorFactory.kt new file mode 100644 index 0000000000..1672031a11 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyConnectorFactory.kt @@ -0,0 +1,63 @@ +package net.corda.node.services.messaging + +import io.netty.channel.ChannelPipeline +import io.netty.handler.logging.LogLevel +import io.netty.handler.logging.LoggingHandler +import net.corda.nodeapi.internal.ArtemisTcpTransport +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector +import org.apache.activemq.artemis.spi.core.remoting.BufferHandler +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager +import org.apache.activemq.artemis.spi.core.remoting.Connector +import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory +import org.apache.activemq.artemis.utils.ConfigurationHelper +import java.util.concurrent.Executor +import java.util.concurrent.ScheduledExecutorService + +@Suppress("unused") +class NodeNettyConnectorFactory : ConnectorFactory { + override fun createConnector(configuration: MutableMap?, + handler: BufferHandler?, + listener: ClientConnectionLifeCycleListener?, + closeExecutor: Executor?, + threadPool: Executor?, + scheduledThreadPool: ScheduledExecutorService?, + protocolManager: ClientProtocolManager?): Connector { + val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration) + val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration) + return NettyConnector( + configuration, + handler, + listener, + closeExecutor, + threadPool, + scheduledThreadPool, + MyClientProtocolManager(threadPoolName, trace) + ) + } + + override fun isReliable(): Boolean = false + + override fun getDefaults(): Map = NettyConnector.DEFAULT_CONFIG + + + private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() { + override fun addChannelHandlers(pipeline: ChannelPipeline) { + applyThreadPoolName() + super.addChannelHandlers(pipeline) + if (trace) { + pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) + } + } + + /** + * [NettyConnector.start] does not provide a way to configure the thread pool name, so we modify the thread name accordingly. + */ + private fun applyThreadPoolName() { + with(Thread.currentThread()) { + name = name.replace("nioEventLoopGroup", threadPoolName) // pool and thread numbers are preserved + } + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 10a22f5aed..b9caae56aa 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -93,6 +93,7 @@ class P2PMessagingClient(val config: NodeConfiguration, cacheFactory: NamedCacheFactory, private val isDrainingModeOn: () -> Boolean, private val drainingModeWasChangedEvents: Observable>, + private val threadPoolName: String = "P2PClient", private val stateHelper: ServiceStateHelper = ServiceStateHelper(log) ) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, ServiceStateSupport by stateHelper { companion object { @@ -164,10 +165,8 @@ class P2PMessagingClient(val config: NodeConfiguration, started = true log.info("Connecting to message broker: $serverAddress") // TODO Add broker CN to config for host verification in case the embedded broker isn't used - val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions) + val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions, threadPoolName = threadPoolName) locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { - // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this - // would be the default and the two lines below can be deleted. connectionTTL = 60000 clientFailureCheckPeriod = 30000 minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE diff --git a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestClient.kt b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestClient.kt index 185a289472..581172a788 100644 --- a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestClient.kt +++ b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestClient.kt @@ -3,13 +3,14 @@ package net.corda.coretesting.internal import io.netty.bootstrap.Bootstrap import io.netty.channel.ChannelFuture import io.netty.channel.ChannelInboundHandlerAdapter -import io.netty.handler.ssl.SslContext import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelOption import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslHandler +import io.netty.util.concurrent.DefaultThreadFactory import java.io.Closeable import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -17,7 +18,6 @@ import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.SSLEngine import kotlin.concurrent.thread - class NettyTestClient( val sslContext: SslContext?, val targetHost: String, @@ -49,7 +49,7 @@ class NettyTestClient( private fun run() { // Configure the client. - val group = NioEventLoopGroup() + val group = NioEventLoopGroup(DefaultThreadFactory("NettyTestClient")) try { val b = Bootstrap() b.group(group) diff --git a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestServer.kt b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestServer.kt index 8fa9d23057..1abc3f5c7b 100644 --- a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestServer.kt +++ b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestServer.kt @@ -11,6 +11,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LoggingHandler import io.netty.handler.ssl.SslContext +import io.netty.util.concurrent.DefaultThreadFactory import java.io.Closeable import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -45,8 +46,8 @@ class NettyTestServer( fun run() { // Configure the server. - val bossGroup = NioEventLoopGroup(1) - val workerGroup = NioEventLoopGroup() + val bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("NettyTestServer-boss")) + val workerGroup = NioEventLoopGroup(DefaultThreadFactory("NettyTestServer-worker")) try { val b = ServerBootstrap() b.group(bossGroup, workerGroup) From 13ca00deeabb3f146b1a377e9ff6fa099032d696 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Fri, 12 May 2023 17:42:27 +0100 Subject: [PATCH 5/5] ENT-9941: Moved new connector factory to node-api --- .../kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt | 2 +- .../net/corda/nodeapi/internal}/NodeNettyConnectorFactory.kt | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) rename {node/src/main/kotlin/net/corda/node/services/messaging => node-api/src/main/kotlin/net/corda/nodeapi/internal}/NodeNettyConnectorFactory.kt (96%) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt index acf7922cae..f46202cc07 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt @@ -182,7 +182,7 @@ class ArtemisTcpTransport { threadPoolName: String, trace: Boolean): TransportConfiguration { return createTransport( - "net.corda.node.services.messaging.NodeNettyConnectorFactory", + NodeNettyConnectorFactory::class.java.name, hostAndPort, protocols, options, diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyConnectorFactory.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/NodeNettyConnectorFactory.kt similarity index 96% rename from node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyConnectorFactory.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/NodeNettyConnectorFactory.kt index 1672031a11..47e046566e 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyConnectorFactory.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/NodeNettyConnectorFactory.kt @@ -1,9 +1,8 @@ -package net.corda.node.services.messaging +package net.corda.nodeapi.internal import io.netty.channel.ChannelPipeline import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LoggingHandler -import net.corda.nodeapi.internal.ArtemisTcpTransport import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector import org.apache.activemq.artemis.spi.core.remoting.BufferHandler @@ -15,7 +14,6 @@ import org.apache.activemq.artemis.utils.ConfigurationHelper import java.util.concurrent.Executor import java.util.concurrent.ScheduledExecutorService -@Suppress("unused") class NodeNettyConnectorFactory : ConnectorFactory { override fun createConnector(configuration: MutableMap?, handler: BufferHandler?,