diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt index 750b5857f5..7be0ac6229 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt @@ -24,7 +24,9 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, private val confirmationWindowSize: Int = -1, private val messagingServerConnectionConfig: MessagingServerConnectionConfiguration? = null, private val backupServerAddressPool: List = emptyList(), - private val failoverCallback: ((FailoverEventType) -> Unit)? = null + private val failoverCallback: ((FailoverEventType) -> Unit)? = null, + private val threadPoolName: String = "ArtemisClient", + private val trace: Boolean = false ) : ArtemisSessionProvider { companion object { private val log = loggerFor() @@ -39,8 +41,10 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, override fun start(): Started = synchronized(this) { check(started == null) { "start can't be called twice" } - val tcpTransport = p2pConnectorTcpTransport(serverAddress, config) - val backupTransports = backupServerAddressPool.map { p2pConnectorTcpTransport(it, config) } + val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace) + val backupTransports = backupServerAddressPool.map { + p2pConnectorTcpTransport(it, config, threadPoolName = threadPoolName, trace = trace) + } log.info("Connecting to message broker: $serverAddress") if (backupTransports.isNotEmpty()) { @@ -49,8 +53,6 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, // If back-up artemis addresses are configured, the locator will be created using HA mode. @Suppress("SpreadOperator") val locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply { - // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this - // would be the default and the two lines below can be deleted. connectionTTL = 60000 clientFailureCheckPeriod = 30000 callFailoverTimeout = java.lang.Long.getLong(CORDA_ARTEMIS_CALL_TIMEOUT_PROP_NAME, CORDA_ARTEMIS_CALL_TIMEOUT_DEFAULT) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt index bf2ff89bf9..f46202cc07 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt @@ -9,10 +9,10 @@ import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.SslConfiguration import org.apache.activemq.artemis.api.core.TransportConfiguration -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import java.nio.file.Path +@Suppress("LongParameterList") class ArtemisTcpTransport { companion object { val CIPHER_SUITES = listOf( @@ -22,8 +22,9 @@ class ArtemisTcpTransport { val TLS_VERSIONS = listOf("TLSv1.2") - const val SSL_HANDSHAKE_TIMEOUT_NAME = "SSLHandshakeTimeout" - const val TRACE_NAME = "trace" + const val SSL_HANDSHAKE_TIMEOUT_NAME = "Corda-SSLHandshakeTimeout" + const val TRACE_NAME = "Corda-Trace" + const val THREAD_POOL_NAME_NAME = "Corda-ThreadPoolName" // Turn on AMQP support, which needs the protocol jar on the classpath. // Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop. @@ -94,24 +95,25 @@ class ArtemisTcpTransport { fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, + threadPoolName: String = "P2PServer", trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() if (enableSSL) { config?.addToTransportOptions(options) } - return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, trace) + return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace) } fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, - keyStoreType: String? = null): TransportConfiguration { + threadPoolName: String = "P2PClient", + trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() if (enableSSL) { config?.addToTransportOptions(options) - options += asMap(keyStoreType) } - return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL) + return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace) } fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, @@ -123,65 +125,89 @@ class ArtemisTcpTransport { config.keyStorePath.requireOnDefaultFileSystem() options.putAll(config.toTransportOptions()) } - return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, trace) + return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCServer", trace) } - fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { + fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, + config: ClientRpcSslOptions?, + enableSSL: Boolean = true, + trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() if (config != null && enableSSL) { config.trustStorePath.requireOnDefaultFileSystem() options.putAll(config.toTransportOptions()) } - return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL) + return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCClient", trace) } - fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration { + fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, + config: SslConfiguration, + trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() config.addToTransportOptions(options) - options += asMap(keyStoreProvider) - return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true) + return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCClient", trace) } fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, - keyStoreType: String? = null, trace: Boolean = false): TransportConfiguration { val options = mutableMapOf() config.addToTransportOptions(options) - options += asMap(keyStoreType) - return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true, trace = trace) - } - - private fun asMap(keyStoreType: String?): Map { - return keyStoreType?.let { mutableMapOf(TransportConstants.KEYSTORE_TYPE_PROP_NAME to it) } ?: emptyMap() + return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCServer", trace) } private fun createAcceptorTransport(hostAndPort: NetworkHostAndPort, protocols: String, options: MutableMap, enableSSL: Boolean, + threadPoolName: String, trace: Boolean): TransportConfiguration { - options += defaultArtemisOptions(hostAndPort, protocols) - if (enableSSL) { - options += defaultSSLOptions - } // Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections options[TransportConstants.HANDSHAKE_TIMEOUT] = 0 - options[TRACE_NAME] = trace - return TransportConfiguration("net.corda.node.services.messaging.NodeNettyAcceptorFactory", options) + return createTransport( + "net.corda.node.services.messaging.NodeNettyAcceptorFactory", + hostAndPort, + protocols, + options, + enableSSL, + threadPoolName, + trace + ) } private fun createConnectorTransport(hostAndPort: NetworkHostAndPort, protocols: String, options: MutableMap, - enableSSL: Boolean): TransportConfiguration { + enableSSL: Boolean, + threadPoolName: String, + trace: Boolean): TransportConfiguration { + return createTransport( + NodeNettyConnectorFactory::class.java.name, + hostAndPort, + protocols, + options, + enableSSL, + threadPoolName, + trace + ) + } + + private fun createTransport(className: String, + hostAndPort: NetworkHostAndPort, + protocols: String, + options: MutableMap, + enableSSL: Boolean, + threadPoolName: String, + trace: Boolean): TransportConfiguration { options += defaultArtemisOptions(hostAndPort, protocols) if (enableSSL) { options += defaultSSLOptions // This is required to stop Client checking URL address vs. Server provided certificate options[TransportConstants.VERIFY_HOST_PROP_NAME] = false } - return TransportConfiguration(NettyConnectorFactory::class.java.name, options) + options[THREAD_POOL_NAME_NAME] = threadPoolName + options[TRACE_NAME] = trace + return TransportConfiguration(className, options) } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/NodeNettyConnectorFactory.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/NodeNettyConnectorFactory.kt new file mode 100644 index 0000000000..47e046566e --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/NodeNettyConnectorFactory.kt @@ -0,0 +1,61 @@ +package net.corda.nodeapi.internal + +import io.netty.channel.ChannelPipeline +import io.netty.handler.logging.LogLevel +import io.netty.handler.logging.LoggingHandler +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector +import org.apache.activemq.artemis.spi.core.remoting.BufferHandler +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager +import org.apache.activemq.artemis.spi.core.remoting.Connector +import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory +import org.apache.activemq.artemis.utils.ConfigurationHelper +import java.util.concurrent.Executor +import java.util.concurrent.ScheduledExecutorService + +class NodeNettyConnectorFactory : ConnectorFactory { + override fun createConnector(configuration: MutableMap?, + handler: BufferHandler?, + listener: ClientConnectionLifeCycleListener?, + closeExecutor: Executor?, + threadPool: Executor?, + scheduledThreadPool: ScheduledExecutorService?, + protocolManager: ClientProtocolManager?): Connector { + val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration) + val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration) + return NettyConnector( + configuration, + handler, + listener, + closeExecutor, + threadPool, + scheduledThreadPool, + MyClientProtocolManager(threadPoolName, trace) + ) + } + + override fun isReliable(): Boolean = false + + override fun getDefaults(): Map = NettyConnector.DEFAULT_CONFIG + + + private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() { + override fun addChannelHandlers(pipeline: ChannelPipeline) { + applyThreadPoolName() + super.addChannelHandlers(pipeline) + if (trace) { + pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) + } + } + + /** + * [NettyConnector.start] does not provide a way to configure the thread pool name, so we modify the thread name accordingly. + */ + private fun applyThreadPoolName() { + with(Thread.currentThread()) { + name = name.replace("nioEventLoopGroup", threadPoolName) // pool and thread numbers are preserved + } + } + } +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 9b24adc538..ee09e640ad 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -5,16 +5,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import io.netty.channel.EventLoop import io.netty.channel.EventLoopGroup import io.netty.channel.nio.NioEventLoopGroup +import io.netty.util.concurrent.DefaultThreadFactory import net.corda.core.identity.CordaX500Name import net.corda.core.internal.VisibleForTesting import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger +import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.ArtemisSessionProvider -import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient @@ -503,7 +504,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore, } override fun start() { - sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS) + sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("AMQPBridge", Thread.MAX_PRIORITY)) val artemis = artemisMessageClientFactory() this.artemis = artemis artemis.start() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt index 60d9136f30..0908e5322b 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt @@ -1,3 +1,5 @@ +@file:Suppress("MagicNumber", "TooGenericExceptionCaught") + package net.corda.nodeapi.internal.crypto import net.corda.core.CordaOID @@ -12,6 +14,8 @@ import net.corda.core.internal.validate import net.corda.core.internal.writer import net.corda.core.utilities.days import net.corda.core.utilities.millis +import net.corda.core.utilities.toHex +import net.corda.nodeapi.internal.protonwrapper.netty.distributionPointsToString import org.bouncycastle.asn1.ASN1EncodableVector import org.bouncycastle.asn1.ASN1ObjectIdentifier import org.bouncycastle.asn1.ASN1Sequence @@ -393,7 +397,6 @@ object X509Utilities { } } - @Suppress("MagicNumber") private fun generateCertificateSerialNumber(): BigInteger { val bytes = ByteArray(CERTIFICATE_SERIAL_NUMBER_LENGTH) newSecureRandom().nextBytes(bytes) @@ -433,6 +436,29 @@ fun PKCS10CertificationRequest.isSignatureValid(): Boolean { return this.isSignatureValid(JcaContentVerifierProviderBuilder().build(this.subjectPublicKeyInfo)) } +fun X509Certificate.toSimpleString(): String { + val bcCert = toBc() + val keyIdentifier = try { + SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex() + } catch (e: Exception) { + "null" + } + val authorityKeyIdentifier = try { + AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex() + } catch (e: Exception) { + "null" + } + val subject = bcCert.subject + val issuer = bcCert.issuer + val role = CertRole.extract(this) + return "$subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] $role $serialNumber [${distributionPointsToString()}]" +} + +fun X509CRL.toSimpleString(): String { + val revokedSerialNumbers = revokedCertificates?.map { it.serialNumber } + return "$issuerX500Principal ${thisUpdate.toInstant()} ${nextUpdate.toInstant()} ${revokedSerialNumbers ?: "[]"}" +} + /** * Check certificate validity or print warning if expiry is within 30 days */ diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index e42cedcbb3..41e38251d3 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -115,11 +115,10 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, val transport = connection.transport as ProtonJTransport transport.protocolTracer = object : ProtocolTracer { override fun sentFrame(transportFrame: TransportFrame) { - logInfoWithMDC { "${transportFrame.body}" } + logInfoWithMDC { "sentFrame: ${transportFrame.body}" } } - override fun receivedFrame(transportFrame: TransportFrame) { - logInfoWithMDC { "${transportFrame.body}" } + logInfoWithMDC { "receivedFrame: ${transportFrame.body}" } } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt index c14ce5e820..7920f6f3e9 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt @@ -1,7 +1,11 @@ package net.corda.nodeapi.internal.protonwrapper.netty import io.netty.bootstrap.Bootstrap -import io.netty.channel.* +import io.netty.channel.Channel +import io.netty.channel.ChannelFutureListener +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelInitializer +import io.netty.channel.EventLoopGroup import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioSocketChannel @@ -11,6 +15,7 @@ import io.netty.handler.proxy.HttpProxyHandler import io.netty.handler.proxy.Socks4ProxyHandler import io.netty.handler.proxy.Socks5ProxyHandler import io.netty.resolver.NoopAddressResolverGroup +import io.netty.util.concurrent.DefaultThreadFactory import io.netty.util.internal.logging.InternalLoggerFactory import io.netty.util.internal.logging.Slf4JLoggerFactory import net.corda.core.identity.CordaX500Name @@ -58,7 +63,8 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA class AMQPClient(private val targets: List, val allowedRemoteLegalNames: Set, private val configuration: AMQPConfiguration, - private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable { + private val sharedThreadPool: EventLoopGroup? = null, + private val threadPoolName: String = "AMQPClient") : AutoCloseable { companion object { init { InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE) @@ -303,7 +309,7 @@ class AMQPClient(private val targets: List, return } log.info("Connect to: $currentTarget") - workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS) + workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY)) started = true restart() } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt index 126e47a3e6..cbeb2562b4 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt @@ -11,6 +11,7 @@ import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LoggingHandler +import io.netty.util.concurrent.DefaultThreadFactory import io.netty.util.internal.logging.InternalLoggerFactory import io.netty.util.internal.logging.Slf4JLoggerFactory import net.corda.core.utilities.NetworkHostAndPort @@ -37,8 +38,8 @@ import kotlin.concurrent.withLock */ class AMQPServer(val hostName: String, val port: Int, - private val configuration: AMQPConfiguration) : AutoCloseable { - + private val configuration: AMQPConfiguration, + private val threadPoolName: String = "AMQPServer") : AutoCloseable { companion object { init { InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE) @@ -131,8 +132,8 @@ class AMQPServer(val hostName: String, lock.withLock { stop() - bossGroup = NioEventLoopGroup(1) - workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS) + bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("$threadPoolName-boss", Thread.MAX_PRIORITY)) + workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS, DefaultThreadFactory("$threadPoolName-worker", Thread.MAX_PRIORITY)) val server = ServerBootstrap() // TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt index ab13048d67..ad5260f9a6 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt @@ -14,33 +14,38 @@ import net.corda.core.internal.VisibleForTesting import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug -import net.corda.core.utilities.toHex import net.corda.nodeapi.internal.ArtemisTcpTransport import net.corda.nodeapi.internal.config.CertificateStore -import net.corda.nodeapi.internal.crypto.toBc +import net.corda.nodeapi.internal.crypto.toSimpleString import net.corda.nodeapi.internal.crypto.x509 import org.bouncycastle.asn1.ASN1InputStream import org.bouncycastle.asn1.ASN1Primitive import org.bouncycastle.asn1.ASN1IA5String import org.bouncycastle.asn1.DEROctetString import org.bouncycastle.asn1.x500.X500Name -import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier import org.bouncycastle.asn1.x509.CRLDistPoint import org.bouncycastle.asn1.x509.DistributionPointName import org.bouncycastle.asn1.x509.Extension import org.bouncycastle.asn1.x509.GeneralName import org.bouncycastle.asn1.x509.GeneralNames -import org.bouncycastle.asn1.x509.SubjectKeyIdentifier import org.slf4j.LoggerFactory import java.net.Socket import java.net.URI import java.security.KeyStore -import java.security.cert.* -import java.util.* +import java.security.cert.CertificateException +import java.security.cert.PKIXBuilderParameters +import java.security.cert.PKIXRevocationChecker +import java.security.cert.X509CertSelector +import java.security.cert.X509Certificate import java.util.concurrent.Executor -import javax.net.ssl.* +import javax.net.ssl.CertPathTrustManagerParameters +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.SNIHostName +import javax.net.ssl.SSLContext +import javax.net.ssl.SSLEngine +import javax.net.ssl.TrustManagerFactory +import javax.net.ssl.X509ExtendedTrustManager import javax.security.auth.x500.X500Principal -import kotlin.collections.HashMap import kotlin.system.measureTimeMillis private const val HOSTNAME_FORMAT = "%s.corda.net" @@ -109,23 +114,7 @@ fun certPathToString(certPath: Array?): String { if (certPath == null) { return "" } - val certs = certPath.map { - val bcCert = it.toBc() - val subject = bcCert.subject.toString() - val issuer = bcCert.issuer.toString() - val keyIdentifier = try { - SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex() - } catch (ex: Exception) { - "null" - } - val authorityKeyIdentifier = try { - AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex() - } catch (ex: Exception) { - "null" - } - " $subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] [${it.distributionPointsToString()}]" - } - return certs.joinToString("\r\n") + return certPath.joinToString(System.lineSeparator()) { " ${it.toSimpleString()}" } } @VisibleForTesting diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CertDistPointCrlSource.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CertDistPointCrlSource.kt index cb8c9e3174..db984f11b8 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CertDistPointCrlSource.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CertDistPointCrlSource.kt @@ -3,7 +3,10 @@ package net.corda.nodeapi.internal.revocation import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.LoadingCache import net.corda.core.internal.readFully +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.nodeapi.internal.crypto.X509CertificateFactory +import net.corda.nodeapi.internal.crypto.toSimpleString import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource import net.corda.nodeapi.internal.protonwrapper.netty.distributionPoints import java.net.URI @@ -15,8 +18,11 @@ import javax.security.auth.x500.X500Principal /** * [CrlSource] which downloads CRLs from the distribution points in the X509 certificate. */ +@Suppress("TooGenericExceptionCaught") class CertDistPointCrlSource : CrlSource { companion object { + private val logger = contextLogger() + // The default SSL handshake timeout is 60s (DEFAULT_SSL_HANDSHAKE_TIMEOUT). Considering there are 3 CRLs endpoints to check in a // node handshake, we want to keep the total timeout within that. private const val DEFAULT_CONNECT_TIMEOUT = 9_000 @@ -33,7 +39,8 @@ class CertDistPointCrlSource : CrlSource { private val readTimeout = Integer.getInteger("net.corda.dpcrl.read.timeout", DEFAULT_READ_TIMEOUT) private fun retrieveCRL(uri: URI): X509CRL { - val bytes = run { + val start = System.currentTimeMillis() + val bytes = try { val conn = uri.toURL().openConnection() conn.connectTimeout = connectTimeout conn.readTimeout = readTimeout @@ -41,12 +48,26 @@ class CertDistPointCrlSource : CrlSource { // in an InputStream, but the JDK implementation (sun.security.provider.X509Factory.engineGenerateCRL) converts any IOException // into CRLException and drops the cause chain. conn.getInputStream().readFully() + } catch (e: Exception) { + if (logger.isDebugEnabled) { + logger.debug("Unable to download CRL from $uri (${System.currentTimeMillis() - start}ms)", e) + } + throw e } - return X509CertificateFactory().generateCRL(bytes.inputStream()) + val duration = System.currentTimeMillis() - start + val crl = try { + X509CertificateFactory().generateCRL(bytes.inputStream()) + } catch (e: Exception) { + if (logger.isDebugEnabled) { + logger.debug("Invalid CRL from $uri (${duration}ms)", e) + } + throw e + } + logger.debug { "CRL from $uri (${duration}ms): ${crl.toSimpleString()}" } + return crl } } - @Suppress("TooGenericExceptionCaught") override fun fetch(certificate: X509Certificate): Set { val approvedCRLs = HashSet() var exception: Exception? = null diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CordaRevocationChecker.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CordaRevocationChecker.kt index b90bde624e..1e0a3ecf53 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CordaRevocationChecker.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/revocation/CordaRevocationChecker.kt @@ -1,6 +1,8 @@ package net.corda.nodeapi.internal.revocation import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.nodeapi.internal.crypto.toSimpleString import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource import org.bouncycastle.asn1.x509.Extension import java.security.cert.CRLReason @@ -27,8 +29,8 @@ class CordaRevocationChecker(private val crlSource: CrlSource, private val softFailExceptions = ArrayList() override fun check(cert: Certificate, unresolvedCritExts: MutableCollection?) { - val x509Certificate = cert as X509Certificate - checkApprovedCRLs(x509Certificate, getCRLs(x509Certificate)) + cert as X509Certificate + checkApprovedCRLs(cert, getCRLs(cert)) } @Suppress("TooGenericExceptionCaught") @@ -40,30 +42,27 @@ class CordaRevocationChecker(private val crlSource: CrlSource, addSoftFailException(e) return emptySet() } else { - throw undeterminedRevocationException("Unable to retrieve CRLs", e) + throw undeterminedRevocationException("Unable to retrieve CRLs for cert ${cert.serialNumber}", e) } } if (crls.isNotEmpty() || softFail) { return crls } // Note, the JDK tries to find a valid CRL from a different signing key before giving up (RevocationChecker.verifyWithSeparateSigningKey) - throw undeterminedRevocationException("Could not find any valid CRLs", null) + throw undeterminedRevocationException("Could not find any valid CRLs for cert ${cert.serialNumber}", null) } /** * Borrowed from `RevocationChecker.checkApprovedCRLs()` */ @Suppress("NestedBlockDepth") - @Throws(CertPathValidatorException::class) private fun checkApprovedCRLs(cert: X509Certificate, approvedCRLs: Set) { // See if the cert is in the set of approved crls. - logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() cert SN: ${cert.serialNumber}") + logger.debug { "Check cert ${cert.serialNumber} against CRLs ${approvedCRLs.map { it.toSimpleString() }}" } for (crl in approvedCRLs) { val entry = crl.getRevokedCertificate(cert) if (entry != null) { - logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() CRL entry: $entry") - /* * Abort CRL validation and throw exception if there are any * unrecognized critical CRL entry extensions (see section @@ -75,19 +74,15 @@ class CordaRevocationChecker(private val crlSource: CrlSource, unresCritExts.remove(Extension.cRLDistributionPoints.id) unresCritExts.remove(Extension.certificateIssuer.id) if (unresCritExts.isNotEmpty()) { - throw CertPathValidatorException( - "Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts") + throw CertPathValidatorException("Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts") } } val reasonCode = entry.revocationReason ?: CRLReason.UNSPECIFIED val revocationDate = entry.revocationDate if (revocationDate.before(dateSource())) { - val t = CertificateRevokedException( - revocationDate, reasonCode, - crl.issuerX500Principal, mutableMapOf()) - throw CertPathValidatorException( - t.message, t, null, -1, BasicReason.REVOKED) + val t = CertificateRevokedException(revocationDate, reasonCode, crl.issuerX500Principal, emptyMap()) + throw CertPathValidatorException(t.message, t, null, -1, BasicReason.REVOKED) } } } @@ -105,15 +100,18 @@ class CordaRevocationChecker(private val crlSource: CrlSource, return false } - override fun getSupportedExtensions(): MutableSet? { + override fun getSupportedExtensions(): Set? { return null } override fun init(forward: Boolean) { + if (forward) { + throw CertPathValidatorException("Forward checking not allowed") + } softFailExceptions.clear() } - override fun getSoftFailExceptions(): MutableList { + override fun getSoftFailExceptions(): List { return Collections.unmodifiableList(softFailExceptions) } @@ -125,4 +123,4 @@ class CordaRevocationChecker(private val crlSource: CrlSource, private fun undeterminedRevocationException(message: String?, cause: Throwable?): CertPathValidatorException { return CertPathValidatorException(message, cause, null, -1, BasicReason.UNDETERMINED_REVOCATION_STATUS) } -} \ No newline at end of file +} diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 2c96dfb238..c099cf3e7c 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -205,7 +205,7 @@ class AMQPBridgeTest { doReturn(null).whenever(it).jmxMonitoringHttpPort } artemisConfig.configureWithDevSSLCertificate() - val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE, null) + val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE) val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE) artemisServer.start() artemisClient.start() diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt index 36b1a14ae1..788874f436 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt @@ -25,7 +25,6 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig import net.corda.testing.core.ALICE_NAME -import net.corda.testing.core.BOB_NAME import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.MAX_MESSAGE_SIZE import net.corda.testing.driver.internal.incrementalPortAllocation @@ -50,6 +49,7 @@ import java.time.Duration import java.util.concurrent.atomic.AtomicInteger import kotlin.test.assertEquals +@Suppress("LongParameterList") class CertificateRevocationListNodeTests { @Rule @JvmField @@ -327,17 +327,18 @@ class CertificateRevocationListNodeTests { private fun createAMQPClient(targetPort: Int, crlCheckSoftFail: Boolean, + legalName: CordaX500Name, nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL", maxMessageSize: Int = MAX_MESSAGE_SIZE): X509Certificate { - val baseDirectory = temporaryFolder.root.toPath() / "client" + val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation val certificatesDirectory = baseDirectory / "certificates" val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory) val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) val clientConfig = rigorousMock().also { doReturn(baseDirectory).whenever(it).baseDirectory doReturn(certificatesDirectory).whenever(it).certificatesDirectory - doReturn(BOB_NAME).whenever(it).myLegalName + doReturn(legalName).whenever(it).myLegalName doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(signingCertificateStore).whenever(it).signingCertificateStore doReturn(crlCheckSoftFail).whenever(it).crlCheckSoftFail @@ -351,28 +352,32 @@ class CertificateRevocationListNodeTests { override val trustStore = clientConfig.p2pSslOptions.trustStore.get() override val maxMessageSize: Int = maxMessageSize } - amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", targetPort)), setOf(ALICE_NAME, CHARLIE_NAME), amqpConfig) + amqpClient = AMQPClient( + listOf(NetworkHostAndPort("localhost", targetPort)), + setOf(CHARLIE_NAME), + amqpConfig, + threadPoolName = legalName.organisation + ) return nodeCert } - @Suppress("LongParameterList") private fun createAMQPServer(port: Int, - name: CordaX500Name = ALICE_NAME, + legalName: CordaX500Name, crlCheckSoftFail: Boolean, nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL", maxMessageSize: Int = MAX_MESSAGE_SIZE, sslHandshakeTimeout: Duration? = null): X509Certificate { check(!::amqpServer.isInitialized) - val baseDirectory = temporaryFolder.root.toPath() / "server" + val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation val certificatesDirectory = baseDirectory / "certificates" val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory) val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) val serverConfig = rigorousMock().also { doReturn(baseDirectory).whenever(it).baseDirectory doReturn(certificatesDirectory).whenever(it).certificatesDirectory - doReturn(name).whenever(it).myLegalName + doReturn(legalName).whenever(it).myLegalName doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(signingCertificateStore).whenever(it).signingCertificateStore } @@ -386,7 +391,7 @@ class CertificateRevocationListNodeTests { override val maxMessageSize: Int = maxMessageSize override val sslHandshakeTimeout: Duration = sslHandshakeTimeout ?: super.sslHandshakeTimeout } - amqpServer = AMQPServer("0.0.0.0", port, amqpConfig) + amqpServer = AMQPServer("0.0.0.0", port, amqpConfig, threadPoolName = legalName.organisation) return nodeCert } @@ -422,7 +427,6 @@ class CertificateRevocationListNodeTests { return newNodeCert } - @Suppress("LongParameterList") private fun verifyAMQPConnection(crlCheckSoftFail: Boolean, nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", revokeServerCert: Boolean = false, @@ -431,6 +435,7 @@ class CertificateRevocationListNodeTests { expectedConnectStatus: Boolean) { val serverCert = createAMQPServer( serverPort, + CHARLIE_NAME, crlCheckSoftFail = crlCheckSoftFail, nodeCrlDistPoint = nodeCrlDistPoint, sslHandshakeTimeout = sslHandshakeTimeout @@ -445,6 +450,7 @@ class CertificateRevocationListNodeTests { val clientCert = createAMQPClient( serverPort, crlCheckSoftFail = crlCheckSoftFail, + legalName = ALICE_NAME, nodeCrlDistPoint = nodeCrlDistPoint ) if (revokeClientCert) { @@ -456,7 +462,8 @@ class CertificateRevocationListNodeTests { assertThat(serverConnect.connected).isEqualTo(expectedConnectStatus) } - private fun createArtemisServerAndClient(crlCheckSoftFail: Boolean, + private fun createArtemisServerAndClient(legalName: CordaX500Name, + crlCheckSoftFail: Boolean, crlCheckArtemisServer: Boolean, nodeCrlDistPoint: String, sslHandshakeTimeout: Duration?): Pair { @@ -467,7 +474,7 @@ class CertificateRevocationListNodeTests { val artemisConfig = rigorousMock().also { doReturn(baseDirectory).whenever(it).baseDirectory doReturn(certificatesDirectory).whenever(it).certificatesDirectory - doReturn(CHARLIE_NAME).whenever(it).myLegalName + doReturn(legalName).whenever(it).myLegalName doReturn(signingCertificateStore).whenever(it).signingCertificateStore doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress @@ -478,14 +485,25 @@ class CertificateRevocationListNodeTests { artemisConfig.configureWithDevSSLCertificate() recreateNodeCaAndTlsCertificates(signingCertificateStore, p2pSslConfiguration, nodeCrlDistPoint, null) - val server = ArtemisMessagingServer(artemisConfig, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE, null) - val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE) + val server = ArtemisMessagingServer( + artemisConfig, + artemisConfig.p2pAddress, + MAX_MESSAGE_SIZE, + threadPoolName = "${legalName.organisation}-server", + trace = true + ) + val client = ArtemisMessagingClient( + artemisConfig.p2pSslOptions, + artemisConfig.p2pAddress, + MAX_MESSAGE_SIZE, + threadPoolName = "${legalName.organisation}-client", + trace = true + ) server.start() client.start() return server to client } - @Suppress("LongParameterList") private fun verifyArtemisConnection(crlCheckSoftFail: Boolean, crlCheckArtemisServer: Boolean, expectedConnected: Boolean = true, @@ -494,13 +512,19 @@ class CertificateRevocationListNodeTests { nodeCrlDistPoint: String = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", sslHandshakeTimeout: Duration? = null) { val queueName = P2P_PREFIX + "Test" - val (artemisServer, artemisClient) = createArtemisServerAndClient(crlCheckSoftFail, crlCheckArtemisServer, nodeCrlDistPoint, sslHandshakeTimeout) + val (artemisServer, artemisClient) = createArtemisServerAndClient( + CHARLIE_NAME, + crlCheckSoftFail, + crlCheckArtemisServer, + nodeCrlDistPoint, + sslHandshakeTimeout + ) artemisServer.use { artemisClient.started!!.session.createQueue( QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true) ) - val nodeCert = createAMQPClient(serverPort, true, nodeCrlDistPoint) + val nodeCert = createAMQPClient(serverPort, true, ALICE_NAME, nodeCrlDistPoint) if (revokedNodeCert) { crlServer.revokedNodeCerts.add(nodeCert.serialNumber) } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index b067f24b40..6be8cc1002 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -541,7 +541,7 @@ class ProtonWrapperTests { } artemisConfig.configureWithDevSSLCertificate() - val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize, null) + val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize) val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", artemisPort), maxMessageSize) server.start() client.start() diff --git a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 873d1a1795..cf9d022bff 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -243,7 +243,7 @@ class ArtemisMessagingTest { } private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer { - return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null, true).apply { + return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, trace = true).apply { config.configureWithDevSSLCertificate() messagingServer = this } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/SimpleMQClient.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/SimpleMQClient.kt index fa5fc09d53..b52422fff0 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/SimpleMQClient.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/SimpleMQClient.kt @@ -22,7 +22,7 @@ class SimpleMQClient(val target: NetworkHostAndPort, lateinit var producer: ClientProducer fun start(username: String? = null, password: String? = null, enableSSL: Boolean = true) { - val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL) + val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL, threadPoolName = "SimpleMQClient") val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { isBlockOnNonDurableSend = true threadPoolMaxSize = 1 diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 3e79bc5966..4b0689d18e 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -56,7 +56,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE class ArtemisMessagingServer(private val config: NodeConfiguration, private val messagingServerAddress: NetworkHostAndPort, private val maxMessageSize: Int, - private val journalBufferTimeout : Int?, + private val journalBufferTimeout : Int? = null, + private val threadPoolName: String = "ArtemisServer", private val trace: Boolean = false) : ArtemisBroker, SingletonSerializeAsToken() { companion object { private val log = contextLogger() @@ -131,9 +132,10 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, // The transaction cache is configurable, and drives other cache sizes. globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize) - acceptorConfigurations.add(p2pAcceptorTcpTransport( + addAcceptorConfiguration(p2pAcceptorTcpTransport( NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config.p2pSslOptions, + threadPoolName = threadPoolName, trace = trace )) // Enable built in message deduplication. Note we still have to do our own as the delayed commits @@ -180,7 +182,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue) } - @Throws(IOException::class, KeyStoreException::class) private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { val keyStore = config.p2pSslOptions.keyStore.get().value.internal val trustStore = config.p2pSslOptions.trustStore.get().value.internal diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt index 5d10099f29..4143b20273 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeNettyAcceptorFactory.kt @@ -1,11 +1,14 @@ package net.corda.node.services.messaging import io.netty.buffer.ByteBufAllocator +import io.netty.channel.ChannelHandlerContext import io.netty.channel.group.ChannelGroup import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LoggingHandler import io.netty.handler.ssl.SslHandler +import io.netty.handler.ssl.SslHandshakeTimeoutException import net.corda.core.internal.declaredField +import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.ArtemisTcpTransport import org.apache.activemq.artemis.api.core.BaseInterceptor import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor @@ -16,10 +19,14 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory import org.apache.activemq.artemis.spi.core.remoting.BufferHandler import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener +import org.apache.activemq.artemis.utils.ConfigurationHelper import org.apache.activemq.artemis.utils.actors.OrderedExecutor +import java.nio.channels.ClosedChannelException import java.time.Duration import java.util.concurrent.Executor import java.util.concurrent.ScheduledExecutorService +import java.util.regex.Pattern +import javax.net.ssl.SSLEngine @Suppress("unused") // Used via reflection in ArtemisTcpTransport class NodeNettyAcceptorFactory : AcceptorFactory { @@ -35,6 +42,7 @@ class NodeNettyAcceptorFactory : AcceptorFactory { return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap) } + private class NodeNettyAcceptor(name: String?, clusterConnection: ClusterConnection?, configuration: Map, @@ -45,24 +53,76 @@ class NodeNettyAcceptorFactory : AcceptorFactory { protocolMap: MutableMap, RedirectHandler<*>>>?) : NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap) { + companion object { + private val defaultThreadPoolNamePattern = Pattern.compile("""Thread-(\d+) \(activemq-netty-threads\)""") + } + + private val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "NodeNettyAcceptor", configuration) + private val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration) + + @Synchronized override fun start() { super.start() - if (configuration[ArtemisTcpTransport.TRACE_NAME] == true) { - // Artemis does not seem to allow access to the underlying channel so we resort to reflection and get it via the - // serverChannelGroup field. This field is only available after start(), hence why we add the logger here. + if (trace) { + // Unfortunately we have to resort to reflection to be able to get access to the server channel(s) declaredField("serverChannelGroup").value.forEach { channel -> channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO)) } } } + @Synchronized override fun getSslHandler(alloc: ByteBufAllocator?, peerHost: String?, peerPort: Int): SslHandler { - val sslHandler = super.getSslHandler(alloc, peerHost, peerPort) + applyThreadPoolName() + val engine = super.getSslHandler(alloc, peerHost, peerPort).engine() + val sslHandler = NodeAcceptorSslHandler(engine, trace) val handshakeTimeout = configuration[ArtemisTcpTransport.SSL_HANDSHAKE_TIMEOUT_NAME] as Duration? if (handshakeTimeout != null) { sslHandler.handshakeTimeoutMillis = handshakeTimeout.toMillis() } return sslHandler } + + /** + * [NettyAcceptor.start] has hardcoded the thread pool name and does not provide a way to configure it. This is a workaround. + */ + private fun applyThreadPoolName() { + val matcher = defaultThreadPoolNamePattern.matcher(Thread.currentThread().name) + if (matcher.matches()) { + Thread.currentThread().name = "$threadPoolName-${matcher.group(1)}" // Preserve the pool thread number + } + } + } + + + private class NodeAcceptorSslHandler(engine: SSLEngine, private val trace: Boolean) : SslHandler(engine) { + companion object { + private val logger = contextLogger() + } + + override fun handlerAdded(ctx: ChannelHandlerContext) { + logHandshake() + super.handlerAdded(ctx) + // Unfortunately NettyAcceptor does not let us add extra child handlers, so we have to add our logger this way. + if (trace) { + ctx.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO)) + } + } + + private fun logHandshake() { + val start = System.currentTimeMillis() + handshakeFuture().addListener { + val duration = System.currentTimeMillis() - start + when { + it.isSuccess -> logger.info("SSL handshake completed in ${duration}ms with ${engine().session.peerPrincipal}") + it.isCancelled -> logger.warn("SSL handshake cancelled after ${duration}ms") + else -> when (it.cause()) { + is ClosedChannelException -> logger.warn("SSL handshake closed early after ${duration}ms") + is SslHandshakeTimeoutException -> logger.warn("SSL handshake timed out after ${duration}ms") + else -> logger.warn("SSL handshake failed after ${duration}ms", it.cause()) + } + } + } + } } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index ce0fcfc9f8..ea132130f1 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -117,6 +117,7 @@ class P2PMessagingClient(val config: NodeConfiguration, cacheFactory: NamedCacheFactory, private val isDrainingModeOn: () -> Boolean, private val drainingModeWasChangedEvents: Observable>, + private val threadPoolName: String = "P2PClient", private val stateHelper: ServiceStateHelper = ServiceStateHelper(log), private val terminateOnConnectionError: Boolean = true, private val timeoutConfig: TimeoutConfig = TimeoutConfig.default() @@ -205,10 +206,8 @@ class P2PMessagingClient(val config: NodeConfiguration, started = true log.info("Connecting to message broker: $serverAddress") // TODO Add broker CN to config for host verification in case the embedded broker isn't used - val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions) + val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions, threadPoolName = threadPoolName) locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { - // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this - // would be the default and the two lines below can be deleted. callTimeout = timeoutConfig.callTimeout.toMillis() connectionTTL = timeoutConfig.serverConnectionTtl.toMillis() clientFailureCheckPeriod = timeoutConfig.clientConnectionTtl.toMillis() diff --git a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestClient.kt b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestClient.kt index 185a289472..581172a788 100644 --- a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestClient.kt +++ b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestClient.kt @@ -3,13 +3,14 @@ package net.corda.coretesting.internal import io.netty.bootstrap.Bootstrap import io.netty.channel.ChannelFuture import io.netty.channel.ChannelInboundHandlerAdapter -import io.netty.handler.ssl.SslContext import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelOption import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslHandler +import io.netty.util.concurrent.DefaultThreadFactory import java.io.Closeable import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -17,7 +18,6 @@ import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.SSLEngine import kotlin.concurrent.thread - class NettyTestClient( val sslContext: SslContext?, val targetHost: String, @@ -49,7 +49,7 @@ class NettyTestClient( private fun run() { // Configure the client. - val group = NioEventLoopGroup() + val group = NioEventLoopGroup(DefaultThreadFactory("NettyTestClient")) try { val b = Bootstrap() b.group(group) diff --git a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestServer.kt b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestServer.kt index 8fa9d23057..1abc3f5c7b 100644 --- a/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestServer.kt +++ b/testing/core-test-utils/src/main/kotlin/net/corda/coretesting/internal/NettyTestServer.kt @@ -11,6 +11,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LoggingHandler import io.netty.handler.ssl.SslContext +import io.netty.util.concurrent.DefaultThreadFactory import java.io.Closeable import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -45,8 +46,8 @@ class NettyTestServer( fun run() { // Configure the server. - val bossGroup = NioEventLoopGroup(1) - val workerGroup = NioEventLoopGroup() + val bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("NettyTestServer-boss")) + val workerGroup = NioEventLoopGroup(DefaultThreadFactory("NettyTestServer-worker")) try { val b = ServerBootstrap() b.group(bossGroup, workerGroup)