ENT-9941: Improved Netty logging, especially of the embedded broker (#7365)

This commit is contained in:
Shams Asari 2023-05-12 10:11:09 +01:00 committed by GitHub
parent 0951853207
commit 31a34e5a5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 345 additions and 128 deletions

View File

@ -24,7 +24,9 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
private val confirmationWindowSize: Int = -1, private val confirmationWindowSize: Int = -1,
private val messagingServerConnectionConfig: MessagingServerConnectionConfiguration? = null, private val messagingServerConnectionConfig: MessagingServerConnectionConfiguration? = null,
private val backupServerAddressPool: List<NetworkHostAndPort> = emptyList(), private val backupServerAddressPool: List<NetworkHostAndPort> = emptyList(),
private val failoverCallback: ((FailoverEventType) -> Unit)? = null private val failoverCallback: ((FailoverEventType) -> Unit)? = null,
private val threadPoolName: String = "ArtemisClient",
private val trace: Boolean = false
) : ArtemisSessionProvider { ) : ArtemisSessionProvider {
companion object { companion object {
private val log = loggerFor<ArtemisMessagingClient>() private val log = loggerFor<ArtemisMessagingClient>()
@ -39,8 +41,10 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
override fun start(): Started = synchronized(this) { override fun start(): Started = synchronized(this) {
check(started == null) { "start can't be called twice" } check(started == null) { "start can't be called twice" }
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config) val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace)
val backupTransports = backupServerAddressPool.map { p2pConnectorTcpTransport(it, config) } val backupTransports = backupServerAddressPool.map {
p2pConnectorTcpTransport(it, config, threadPoolName = threadPoolName, trace = trace)
}
log.info("Connecting to message broker: $serverAddress") log.info("Connecting to message broker: $serverAddress")
if (backupTransports.isNotEmpty()) { if (backupTransports.isNotEmpty()) {
@ -49,8 +53,6 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
// If back-up artemis addresses are configured, the locator will be created using HA mode. // If back-up artemis addresses are configured, the locator will be created using HA mode.
@Suppress("SpreadOperator") @Suppress("SpreadOperator")
val locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply { val locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
connectionTTL = 60000 connectionTTL = 60000
clientFailureCheckPeriod = 30000 clientFailureCheckPeriod = 30000
callFailoverTimeout = java.lang.Long.getLong(CORDA_ARTEMIS_CALL_TIMEOUT_PROP_NAME, CORDA_ARTEMIS_CALL_TIMEOUT_DEFAULT) callFailoverTimeout = java.lang.Long.getLong(CORDA_ARTEMIS_CALL_TIMEOUT_PROP_NAME, CORDA_ARTEMIS_CALL_TIMEOUT_DEFAULT)

View File

@ -9,10 +9,10 @@ import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT
import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.SslConfiguration import net.corda.nodeapi.internal.config.SslConfiguration
import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.Path import java.nio.file.Path
@Suppress("LongParameterList")
class ArtemisTcpTransport { class ArtemisTcpTransport {
companion object { companion object {
val CIPHER_SUITES = listOf( val CIPHER_SUITES = listOf(
@ -22,8 +22,9 @@ class ArtemisTcpTransport {
val TLS_VERSIONS = listOf("TLSv1.2") val TLS_VERSIONS = listOf("TLSv1.2")
const val SSL_HANDSHAKE_TIMEOUT_NAME = "SSLHandshakeTimeout" const val SSL_HANDSHAKE_TIMEOUT_NAME = "Corda-SSLHandshakeTimeout"
const val TRACE_NAME = "trace" const val TRACE_NAME = "Corda-Trace"
const val THREAD_POOL_NAME_NAME = "Corda-ThreadPoolName"
// Turn on AMQP support, which needs the protocol jar on the classpath. // Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop. // Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
@ -94,24 +95,25 @@ class ArtemisTcpTransport {
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
config: MutualSslConfiguration?, config: MutualSslConfiguration?,
enableSSL: Boolean = true, enableSSL: Boolean = true,
threadPoolName: String = "P2PServer",
trace: Boolean = false): TransportConfiguration { trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>() val options = mutableMapOf<String, Any>()
if (enableSSL) { if (enableSSL) {
config?.addToTransportOptions(options) config?.addToTransportOptions(options)
} }
return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, trace) return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace)
} }
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
config: MutualSslConfiguration?, config: MutualSslConfiguration?,
enableSSL: Boolean = true, enableSSL: Boolean = true,
keyStoreProvider: String? = null): TransportConfiguration { threadPoolName: String = "P2PClient",
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>() val options = mutableMapOf<String, Any>()
if (enableSSL) { if (enableSSL) {
config?.addToTransportOptions(options) config?.addToTransportOptions(options)
options += asMap(keyStoreProvider)
} }
return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL) return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace)
} }
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
@ -123,63 +125,87 @@ class ArtemisTcpTransport {
config.keyStorePath.requireOnDefaultFileSystem() config.keyStorePath.requireOnDefaultFileSystem()
options.putAll(config.toTransportOptions()) options.putAll(config.toTransportOptions())
} }
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, trace) return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCServer", trace)
} }
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
config: ClientRpcSslOptions?,
enableSSL: Boolean = true,
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>() val options = mutableMapOf<String, Any>()
if (config != null && enableSSL) { if (config != null && enableSSL) {
config.trustStorePath.requireOnDefaultFileSystem() config.trustStorePath.requireOnDefaultFileSystem()
options.putAll(config.toTransportOptions()) options.putAll(config.toTransportOptions())
} }
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL) return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCClient", trace)
} }
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration { fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort,
config: SslConfiguration,
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>() val options = mutableMapOf<String, Any>()
config.addToTransportOptions(options) config.addToTransportOptions(options)
options += asMap(keyStoreProvider) return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCClient", trace)
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true)
} }
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
config: SslConfiguration, config: SslConfiguration,
keyStoreProvider: String? = null,
trace: Boolean = false): TransportConfiguration { trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>() val options = mutableMapOf<String, Any>()
config.addToTransportOptions(options) config.addToTransportOptions(options)
options += asMap(keyStoreProvider) return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCServer", trace)
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true, trace = trace)
}
private fun asMap(keyStoreProvider: String?): Map<String, String> {
return keyStoreProvider?.let {mutableMapOf(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to it)} ?: emptyMap()
} }
private fun createAcceptorTransport(hostAndPort: NetworkHostAndPort, private fun createAcceptorTransport(hostAndPort: NetworkHostAndPort,
protocols: String, protocols: String,
options: MutableMap<String, Any>, options: MutableMap<String, Any>,
enableSSL: Boolean, enableSSL: Boolean,
threadPoolName: String,
trace: Boolean): TransportConfiguration { trace: Boolean): TransportConfiguration {
options += defaultArtemisOptions(hostAndPort, protocols)
if (enableSSL) {
options += defaultSSLOptions
}
// Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections // Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0 options[TransportConstants.HANDSHAKE_TIMEOUT] = 0
options[TRACE_NAME] = trace return createTransport(
return TransportConfiguration("net.corda.node.services.messaging.NodeNettyAcceptorFactory", options) "net.corda.node.services.messaging.NodeNettyAcceptorFactory",
hostAndPort,
protocols,
options,
enableSSL,
threadPoolName,
trace
)
} }
private fun createConnectorTransport(hostAndPort: NetworkHostAndPort, private fun createConnectorTransport(hostAndPort: NetworkHostAndPort,
protocols: String, protocols: String,
options: MutableMap<String, Any>, options: MutableMap<String, Any>,
enableSSL: Boolean): TransportConfiguration { enableSSL: Boolean,
threadPoolName: String,
trace: Boolean): TransportConfiguration {
return createTransport(
"net.corda.node.services.messaging.NodeNettyConnectorFactory",
hostAndPort,
protocols,
options,
enableSSL,
threadPoolName,
trace
)
}
private fun createTransport(className: String,
hostAndPort: NetworkHostAndPort,
protocols: String,
options: MutableMap<String, Any>,
enableSSL: Boolean,
threadPoolName: String,
trace: Boolean): TransportConfiguration {
options += defaultArtemisOptions(hostAndPort, protocols) options += defaultArtemisOptions(hostAndPort, protocols)
if (enableSSL) { if (enableSSL) {
options += defaultSSLOptions options += defaultSSLOptions
} }
return TransportConfiguration(NettyConnectorFactory::class.java.name, options) options[THREAD_POOL_NAME_NAME] = threadPoolName
options[TRACE_NAME] = trace
return TransportConfiguration(className, options)
} }
} }
} }

View File

@ -5,16 +5,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.netty.channel.EventLoop import io.netty.channel.EventLoop
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.nio.NioEventLoopGroup
import io.netty.util.concurrent.DefaultThreadFactory
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.VisibleForTesting
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY
import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
import net.corda.nodeapi.internal.ArtemisSessionProvider import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY
import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
@ -503,7 +504,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
} }
override fun start() { override fun start() {
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS) sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("AMQPBridge", Thread.MAX_PRIORITY))
val artemis = artemisMessageClientFactory() val artemis = artemisMessageClientFactory()
this.artemis = artemis this.artemis = artemis
artemis.start() artemis.start()

View File

@ -1,3 +1,5 @@
@file:Suppress("MagicNumber", "TooGenericExceptionCaught")
package net.corda.nodeapi.internal.crypto package net.corda.nodeapi.internal.crypto
import net.corda.core.CordaOID import net.corda.core.CordaOID
@ -6,6 +8,8 @@ import net.corda.core.crypto.newSecureRandom
import net.corda.core.internal.* import net.corda.core.internal.*
import net.corda.core.utilities.days import net.corda.core.utilities.days
import net.corda.core.utilities.millis import net.corda.core.utilities.millis
import net.corda.core.utilities.toHex
import net.corda.nodeapi.internal.protonwrapper.netty.distributionPointsToString
import org.bouncycastle.asn1.* import org.bouncycastle.asn1.*
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x500.style.BCStyle import org.bouncycastle.asn1.x500.style.BCStyle
@ -368,7 +372,6 @@ object X509Utilities {
} }
} }
@Suppress("MagicNumber")
private fun generateCertificateSerialNumber(): BigInteger { private fun generateCertificateSerialNumber(): BigInteger {
val bytes = ByteArray(CERTIFICATE_SERIAL_NUMBER_LENGTH) val bytes = ByteArray(CERTIFICATE_SERIAL_NUMBER_LENGTH)
newSecureRandom().nextBytes(bytes) newSecureRandom().nextBytes(bytes)
@ -408,6 +411,29 @@ fun PKCS10CertificationRequest.isSignatureValid(): Boolean {
return this.isSignatureValid(JcaContentVerifierProviderBuilder().build(this.subjectPublicKeyInfo)) return this.isSignatureValid(JcaContentVerifierProviderBuilder().build(this.subjectPublicKeyInfo))
} }
fun X509Certificate.toSimpleString(): String {
val bcCert = toBc()
val keyIdentifier = try {
SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (e: Exception) {
"null"
}
val authorityKeyIdentifier = try {
AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (e: Exception) {
"null"
}
val subject = bcCert.subject
val issuer = bcCert.issuer
val role = CertRole.extract(this)
return "$subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] $role $serialNumber [${distributionPointsToString()}]"
}
fun X509CRL.toSimpleString(): String {
val revokedSerialNumbers = revokedCertificates?.map { it.serialNumber }
return "$issuerX500Principal ${thisUpdate.toInstant()} ${nextUpdate.toInstant()} ${revokedSerialNumbers ?: "[]"}"
}
/** /**
* Check certificate validity or print warning if expiry is within 30 days * Check certificate validity or print warning if expiry is within 30 days
*/ */

View File

@ -115,11 +115,10 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
val transport = connection.transport as ProtonJTransport val transport = connection.transport as ProtonJTransport
transport.protocolTracer = object : ProtocolTracer { transport.protocolTracer = object : ProtocolTracer {
override fun sentFrame(transportFrame: TransportFrame) { override fun sentFrame(transportFrame: TransportFrame) {
logInfoWithMDC { "${transportFrame.body}" } logInfoWithMDC { "sentFrame: ${transportFrame.body}" }
} }
override fun receivedFrame(transportFrame: TransportFrame) { override fun receivedFrame(transportFrame: TransportFrame) {
logInfoWithMDC { "${transportFrame.body}" } logInfoWithMDC { "receivedFrame: ${transportFrame.body}" }
} }
} }
} }

View File

@ -1,7 +1,11 @@
package net.corda.nodeapi.internal.protonwrapper.netty package net.corda.nodeapi.internal.protonwrapper.netty
import io.netty.bootstrap.Bootstrap import io.netty.bootstrap.Bootstrap
import io.netty.channel.* import io.netty.channel.Channel
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelInitializer
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel import io.netty.channel.socket.nio.NioSocketChannel
@ -11,6 +15,7 @@ import io.netty.handler.proxy.HttpProxyHandler
import io.netty.handler.proxy.Socks4ProxyHandler import io.netty.handler.proxy.Socks4ProxyHandler
import io.netty.handler.proxy.Socks5ProxyHandler import io.netty.handler.proxy.Socks5ProxyHandler
import io.netty.resolver.NoopAddressResolverGroup import io.netty.resolver.NoopAddressResolverGroup
import io.netty.util.concurrent.DefaultThreadFactory
import io.netty.util.internal.logging.InternalLoggerFactory import io.netty.util.internal.logging.InternalLoggerFactory
import io.netty.util.internal.logging.Slf4JLoggerFactory import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
@ -57,7 +62,8 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA
class AMQPClient(private val targets: List<NetworkHostAndPort>, class AMQPClient(private val targets: List<NetworkHostAndPort>,
val allowedRemoteLegalNames: Set<CordaX500Name>, val allowedRemoteLegalNames: Set<CordaX500Name>,
private val configuration: AMQPConfiguration, private val configuration: AMQPConfiguration,
private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable { private val sharedThreadPool: EventLoopGroup? = null,
private val threadPoolName: String = "AMQPClient") : AutoCloseable {
companion object { companion object {
init { init {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE) InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
@ -254,7 +260,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
return return
} }
log.info("Connect to: $currentTarget") log.info("Connect to: $currentTarget")
workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS) workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY))
started = true started = true
restart() restart()
} }

View File

@ -11,6 +11,7 @@ import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler import io.netty.handler.logging.LoggingHandler
import io.netty.util.concurrent.DefaultThreadFactory
import io.netty.util.internal.logging.InternalLoggerFactory import io.netty.util.internal.logging.InternalLoggerFactory
import io.netty.util.internal.logging.Slf4JLoggerFactory import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
@ -37,8 +38,8 @@ import kotlin.concurrent.withLock
*/ */
class AMQPServer(val hostName: String, class AMQPServer(val hostName: String,
val port: Int, val port: Int,
private val configuration: AMQPConfiguration) : AutoCloseable { private val configuration: AMQPConfiguration,
private val threadPoolName: String = "AMQPServer") : AutoCloseable {
companion object { companion object {
init { init {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE) InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
@ -131,8 +132,8 @@ class AMQPServer(val hostName: String,
lock.withLock { lock.withLock {
stop() stop()
bossGroup = NioEventLoopGroup(1) bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("$threadPoolName-boss", Thread.MAX_PRIORITY))
workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS) workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS, DefaultThreadFactory("$threadPoolName-worker", Thread.MAX_PRIORITY))
val server = ServerBootstrap() val server = ServerBootstrap()
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux // TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux

View File

@ -14,33 +14,38 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.core.utilities.toHex
import net.corda.nodeapi.internal.ArtemisTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport
import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.crypto.toBc import net.corda.nodeapi.internal.crypto.toSimpleString
import net.corda.nodeapi.internal.crypto.x509 import net.corda.nodeapi.internal.crypto.x509
import org.bouncycastle.asn1.ASN1InputStream import org.bouncycastle.asn1.ASN1InputStream
import org.bouncycastle.asn1.ASN1Primitive import org.bouncycastle.asn1.ASN1Primitive
import org.bouncycastle.asn1.DERIA5String import org.bouncycastle.asn1.DERIA5String
import org.bouncycastle.asn1.DEROctetString import org.bouncycastle.asn1.DEROctetString
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier
import org.bouncycastle.asn1.x509.CRLDistPoint import org.bouncycastle.asn1.x509.CRLDistPoint
import org.bouncycastle.asn1.x509.DistributionPointName import org.bouncycastle.asn1.x509.DistributionPointName
import org.bouncycastle.asn1.x509.Extension import org.bouncycastle.asn1.x509.Extension
import org.bouncycastle.asn1.x509.GeneralName import org.bouncycastle.asn1.x509.GeneralName
import org.bouncycastle.asn1.x509.GeneralNames import org.bouncycastle.asn1.x509.GeneralNames
import org.bouncycastle.asn1.x509.SubjectKeyIdentifier
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.net.Socket import java.net.Socket
import java.net.URI import java.net.URI
import java.security.KeyStore import java.security.KeyStore
import java.security.cert.* import java.security.cert.CertificateException
import java.util.* import java.security.cert.PKIXBuilderParameters
import java.security.cert.PKIXRevocationChecker
import java.security.cert.X509CertSelector
import java.security.cert.X509Certificate
import java.util.concurrent.Executor import java.util.concurrent.Executor
import javax.net.ssl.* import javax.net.ssl.CertPathTrustManagerParameters
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SNIHostName
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLEngine
import javax.net.ssl.TrustManagerFactory
import javax.net.ssl.X509ExtendedTrustManager
import javax.security.auth.x500.X500Principal import javax.security.auth.x500.X500Principal
import kotlin.collections.HashMap
import kotlin.system.measureTimeMillis import kotlin.system.measureTimeMillis
private const val HOSTNAME_FORMAT = "%s.corda.net" private const val HOSTNAME_FORMAT = "%s.corda.net"
@ -109,23 +114,7 @@ fun certPathToString(certPath: Array<out X509Certificate>?): String {
if (certPath == null) { if (certPath == null) {
return "<empty certpath>" return "<empty certpath>"
} }
val certs = certPath.map { return certPath.joinToString(System.lineSeparator()) { " ${it.toSimpleString()}" }
val bcCert = it.toBc()
val subject = bcCert.subject.toString()
val issuer = bcCert.issuer.toString()
val keyIdentifier = try {
SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (ex: Exception) {
"null"
}
val authorityKeyIdentifier = try {
AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (ex: Exception) {
"null"
}
" $subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] [${it.distributionPointsToString()}]"
}
return certs.joinToString("\r\n")
} }
@VisibleForTesting @VisibleForTesting

View File

@ -3,7 +3,10 @@ package net.corda.nodeapi.internal.revocation
import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache import com.github.benmanes.caffeine.cache.LoadingCache
import net.corda.core.internal.readFully import net.corda.core.internal.readFully
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.nodeapi.internal.crypto.X509CertificateFactory import net.corda.nodeapi.internal.crypto.X509CertificateFactory
import net.corda.nodeapi.internal.crypto.toSimpleString
import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource
import net.corda.nodeapi.internal.protonwrapper.netty.distributionPoints import net.corda.nodeapi.internal.protonwrapper.netty.distributionPoints
import java.net.URI import java.net.URI
@ -15,8 +18,11 @@ import javax.security.auth.x500.X500Principal
/** /**
* [CrlSource] which downloads CRLs from the distribution points in the X509 certificate. * [CrlSource] which downloads CRLs from the distribution points in the X509 certificate.
*/ */
@Suppress("TooGenericExceptionCaught")
class CertDistPointCrlSource : CrlSource { class CertDistPointCrlSource : CrlSource {
companion object { companion object {
private val logger = contextLogger()
// The default SSL handshake timeout is 60s (DEFAULT_SSL_HANDSHAKE_TIMEOUT). Considering there are 3 CRLs endpoints to check in a // The default SSL handshake timeout is 60s (DEFAULT_SSL_HANDSHAKE_TIMEOUT). Considering there are 3 CRLs endpoints to check in a
// node handshake, we want to keep the total timeout within that. // node handshake, we want to keep the total timeout within that.
private const val DEFAULT_CONNECT_TIMEOUT = 9_000 private const val DEFAULT_CONNECT_TIMEOUT = 9_000
@ -33,7 +39,8 @@ class CertDistPointCrlSource : CrlSource {
private val readTimeout = Integer.getInteger("net.corda.dpcrl.read.timeout", DEFAULT_READ_TIMEOUT) private val readTimeout = Integer.getInteger("net.corda.dpcrl.read.timeout", DEFAULT_READ_TIMEOUT)
private fun retrieveCRL(uri: URI): X509CRL { private fun retrieveCRL(uri: URI): X509CRL {
val bytes = run { val start = System.currentTimeMillis()
val bytes = try {
val conn = uri.toURL().openConnection() val conn = uri.toURL().openConnection()
conn.connectTimeout = connectTimeout conn.connectTimeout = connectTimeout
conn.readTimeout = readTimeout conn.readTimeout = readTimeout
@ -41,12 +48,26 @@ class CertDistPointCrlSource : CrlSource {
// in an InputStream, but the JDK implementation (sun.security.provider.X509Factory.engineGenerateCRL) converts any IOException // in an InputStream, but the JDK implementation (sun.security.provider.X509Factory.engineGenerateCRL) converts any IOException
// into CRLException and drops the cause chain. // into CRLException and drops the cause chain.
conn.getInputStream().readFully() conn.getInputStream().readFully()
} catch (e: Exception) {
if (logger.isDebugEnabled) {
logger.debug("Unable to download CRL from $uri (${System.currentTimeMillis() - start}ms)", e)
}
throw e
} }
return X509CertificateFactory().generateCRL(bytes.inputStream()) val duration = System.currentTimeMillis() - start
val crl = try {
X509CertificateFactory().generateCRL(bytes.inputStream())
} catch (e: Exception) {
if (logger.isDebugEnabled) {
logger.debug("Invalid CRL from $uri (${duration}ms)", e)
}
throw e
}
logger.debug { "CRL from $uri (${duration}ms): ${crl.toSimpleString()}" }
return crl
} }
} }
@Suppress("TooGenericExceptionCaught")
override fun fetch(certificate: X509Certificate): Set<X509CRL> { override fun fetch(certificate: X509Certificate): Set<X509CRL> {
val approvedCRLs = HashSet<X509CRL>() val approvedCRLs = HashSet<X509CRL>()
var exception: Exception? = null var exception: Exception? = null

View File

@ -1,6 +1,8 @@
package net.corda.nodeapi.internal.revocation package net.corda.nodeapi.internal.revocation
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.nodeapi.internal.crypto.toSimpleString
import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource
import org.bouncycastle.asn1.x509.Extension import org.bouncycastle.asn1.x509.Extension
import java.security.cert.CRLReason import java.security.cert.CRLReason
@ -27,8 +29,8 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
private val softFailExceptions = ArrayList<CertPathValidatorException>() private val softFailExceptions = ArrayList<CertPathValidatorException>()
override fun check(cert: Certificate, unresolvedCritExts: MutableCollection<String>?) { override fun check(cert: Certificate, unresolvedCritExts: MutableCollection<String>?) {
val x509Certificate = cert as X509Certificate cert as X509Certificate
checkApprovedCRLs(x509Certificate, getCRLs(x509Certificate)) checkApprovedCRLs(cert, getCRLs(cert))
} }
@Suppress("TooGenericExceptionCaught") @Suppress("TooGenericExceptionCaught")
@ -40,30 +42,27 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
addSoftFailException(e) addSoftFailException(e)
return emptySet() return emptySet()
} else { } else {
throw undeterminedRevocationException("Unable to retrieve CRLs", e) throw undeterminedRevocationException("Unable to retrieve CRLs for cert ${cert.serialNumber}", e)
} }
} }
if (crls.isNotEmpty() || softFail) { if (crls.isNotEmpty() || softFail) {
return crls return crls
} }
// Note, the JDK tries to find a valid CRL from a different signing key before giving up (RevocationChecker.verifyWithSeparateSigningKey) // Note, the JDK tries to find a valid CRL from a different signing key before giving up (RevocationChecker.verifyWithSeparateSigningKey)
throw undeterminedRevocationException("Could not find any valid CRLs", null) throw undeterminedRevocationException("Could not find any valid CRLs for cert ${cert.serialNumber}", null)
} }
/** /**
* Borrowed from `RevocationChecker.checkApprovedCRLs()` * Borrowed from `RevocationChecker.checkApprovedCRLs()`
*/ */
@Suppress("NestedBlockDepth") @Suppress("NestedBlockDepth")
@Throws(CertPathValidatorException::class)
private fun checkApprovedCRLs(cert: X509Certificate, approvedCRLs: Set<X509CRL>) { private fun checkApprovedCRLs(cert: X509Certificate, approvedCRLs: Set<X509CRL>) {
// See if the cert is in the set of approved crls. // See if the cert is in the set of approved crls.
logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() cert SN: ${cert.serialNumber}") logger.debug { "Check cert ${cert.serialNumber} against CRLs ${approvedCRLs.map { it.toSimpleString() }}" }
for (crl in approvedCRLs) { for (crl in approvedCRLs) {
val entry = crl.getRevokedCertificate(cert) val entry = crl.getRevokedCertificate(cert)
if (entry != null) { if (entry != null) {
logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() CRL entry: $entry")
/* /*
* Abort CRL validation and throw exception if there are any * Abort CRL validation and throw exception if there are any
* unrecognized critical CRL entry extensions (see section * unrecognized critical CRL entry extensions (see section
@ -75,19 +74,15 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
unresCritExts.remove(Extension.cRLDistributionPoints.id) unresCritExts.remove(Extension.cRLDistributionPoints.id)
unresCritExts.remove(Extension.certificateIssuer.id) unresCritExts.remove(Extension.certificateIssuer.id)
if (unresCritExts.isNotEmpty()) { if (unresCritExts.isNotEmpty()) {
throw CertPathValidatorException( throw CertPathValidatorException("Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts")
"Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts")
} }
} }
val reasonCode = entry.revocationReason ?: CRLReason.UNSPECIFIED val reasonCode = entry.revocationReason ?: CRLReason.UNSPECIFIED
val revocationDate = entry.revocationDate val revocationDate = entry.revocationDate
if (revocationDate.before(dateSource())) { if (revocationDate.before(dateSource())) {
val t = CertificateRevokedException( val t = CertificateRevokedException(revocationDate, reasonCode, crl.issuerX500Principal, emptyMap())
revocationDate, reasonCode, throw CertPathValidatorException(t.message, t, null, -1, BasicReason.REVOKED)
crl.issuerX500Principal, mutableMapOf())
throw CertPathValidatorException(
t.message, t, null, -1, BasicReason.REVOKED)
} }
} }
} }
@ -105,15 +100,18 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
return false return false
} }
override fun getSupportedExtensions(): MutableSet<String>? { override fun getSupportedExtensions(): Set<String>? {
return null return null
} }
override fun init(forward: Boolean) { override fun init(forward: Boolean) {
if (forward) {
throw CertPathValidatorException("Forward checking not allowed")
}
softFailExceptions.clear() softFailExceptions.clear()
} }
override fun getSoftFailExceptions(): MutableList<CertPathValidatorException> { override fun getSoftFailExceptions(): List<CertPathValidatorException> {
return Collections.unmodifiableList(softFailExceptions) return Collections.unmodifiableList(softFailExceptions)
} }
@ -125,4 +123,4 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
private fun undeterminedRevocationException(message: String?, cause: Throwable?): CertPathValidatorException { private fun undeterminedRevocationException(message: String?, cause: Throwable?): CertPathValidatorException {
return CertPathValidatorException(message, cause, null, -1, BasicReason.UNDETERMINED_REVOCATION_STATUS) return CertPathValidatorException(message, cause, null, -1, BasicReason.UNDETERMINED_REVOCATION_STATUS)
} }
} }

View File

@ -204,7 +204,7 @@ class AMQPBridgeTest {
doReturn(null).whenever(it).jmxMonitoringHttpPort doReturn(null).whenever(it).jmxMonitoringHttpPort
} }
artemisConfig.configureWithDevSSLCertificate() artemisConfig.configureWithDevSSLCertificate()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE, null) val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE) val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE)
artemisServer.start() artemisServer.start()
artemisClient.start() artemisClient.start()

View File

@ -25,7 +25,6 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.driver.internal.incrementalPortAllocation
@ -49,6 +48,7 @@ import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals import kotlin.test.assertEquals
@Suppress("LongParameterList")
class CertificateRevocationListNodeTests { class CertificateRevocationListNodeTests {
@Rule @Rule
@JvmField @JvmField
@ -326,17 +326,18 @@ class CertificateRevocationListNodeTests {
private fun createAMQPClient(targetPort: Int, private fun createAMQPClient(targetPort: Int,
crlCheckSoftFail: Boolean, crlCheckSoftFail: Boolean,
legalName: CordaX500Name,
nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL", tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL",
maxMessageSize: Int = MAX_MESSAGE_SIZE): X509Certificate { maxMessageSize: Int = MAX_MESSAGE_SIZE): X509Certificate {
val baseDirectory = temporaryFolder.root.toPath() / "client" val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation
val certificatesDirectory = baseDirectory / "certificates" val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory) val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val clientConfig = rigorousMock<AbstractNodeConfiguration>().also { val clientConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(BOB_NAME).whenever(it).myLegalName doReturn(legalName).whenever(it).myLegalName
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(signingCertificateStore).whenever(it).signingCertificateStore doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(crlCheckSoftFail).whenever(it).crlCheckSoftFail doReturn(crlCheckSoftFail).whenever(it).crlCheckSoftFail
@ -350,28 +351,32 @@ class CertificateRevocationListNodeTests {
override val trustStore = clientConfig.p2pSslOptions.trustStore.get() override val trustStore = clientConfig.p2pSslOptions.trustStore.get()
override val maxMessageSize: Int = maxMessageSize override val maxMessageSize: Int = maxMessageSize
} }
amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", targetPort)), setOf(ALICE_NAME, CHARLIE_NAME), amqpConfig) amqpClient = AMQPClient(
listOf(NetworkHostAndPort("localhost", targetPort)),
setOf(CHARLIE_NAME),
amqpConfig,
threadPoolName = legalName.organisation
)
return nodeCert return nodeCert
} }
@Suppress("LongParameterList")
private fun createAMQPServer(port: Int, private fun createAMQPServer(port: Int,
name: CordaX500Name = ALICE_NAME, legalName: CordaX500Name,
crlCheckSoftFail: Boolean, crlCheckSoftFail: Boolean,
nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL", tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL",
maxMessageSize: Int = MAX_MESSAGE_SIZE, maxMessageSize: Int = MAX_MESSAGE_SIZE,
sslHandshakeTimeout: Duration? = null): X509Certificate { sslHandshakeTimeout: Duration? = null): X509Certificate {
check(!::amqpServer.isInitialized) check(!::amqpServer.isInitialized)
val baseDirectory = temporaryFolder.root.toPath() / "server" val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation
val certificatesDirectory = baseDirectory / "certificates" val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory) val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also { val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(name).whenever(it).myLegalName doReturn(legalName).whenever(it).myLegalName
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(signingCertificateStore).whenever(it).signingCertificateStore doReturn(signingCertificateStore).whenever(it).signingCertificateStore
} }
@ -385,7 +390,7 @@ class CertificateRevocationListNodeTests {
override val maxMessageSize: Int = maxMessageSize override val maxMessageSize: Int = maxMessageSize
override val sslHandshakeTimeout: Duration = sslHandshakeTimeout ?: super.sslHandshakeTimeout override val sslHandshakeTimeout: Duration = sslHandshakeTimeout ?: super.sslHandshakeTimeout
} }
amqpServer = AMQPServer("0.0.0.0", port, amqpConfig) amqpServer = AMQPServer("0.0.0.0", port, amqpConfig, threadPoolName = legalName.organisation)
return nodeCert return nodeCert
} }
@ -421,7 +426,6 @@ class CertificateRevocationListNodeTests {
return newNodeCert return newNodeCert
} }
@Suppress("LongParameterList")
private fun verifyAMQPConnection(crlCheckSoftFail: Boolean, private fun verifyAMQPConnection(crlCheckSoftFail: Boolean,
nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
revokeServerCert: Boolean = false, revokeServerCert: Boolean = false,
@ -430,6 +434,7 @@ class CertificateRevocationListNodeTests {
expectedConnectStatus: Boolean) { expectedConnectStatus: Boolean) {
val serverCert = createAMQPServer( val serverCert = createAMQPServer(
serverPort, serverPort,
CHARLIE_NAME,
crlCheckSoftFail = crlCheckSoftFail, crlCheckSoftFail = crlCheckSoftFail,
nodeCrlDistPoint = nodeCrlDistPoint, nodeCrlDistPoint = nodeCrlDistPoint,
sslHandshakeTimeout = sslHandshakeTimeout sslHandshakeTimeout = sslHandshakeTimeout
@ -444,6 +449,7 @@ class CertificateRevocationListNodeTests {
val clientCert = createAMQPClient( val clientCert = createAMQPClient(
serverPort, serverPort,
crlCheckSoftFail = crlCheckSoftFail, crlCheckSoftFail = crlCheckSoftFail,
legalName = ALICE_NAME,
nodeCrlDistPoint = nodeCrlDistPoint nodeCrlDistPoint = nodeCrlDistPoint
) )
if (revokeClientCert) { if (revokeClientCert) {
@ -455,7 +461,8 @@ class CertificateRevocationListNodeTests {
assertThat(serverConnect.connected).isEqualTo(expectedConnectStatus) assertThat(serverConnect.connected).isEqualTo(expectedConnectStatus)
} }
private fun createArtemisServerAndClient(crlCheckSoftFail: Boolean, private fun createArtemisServerAndClient(legalName: CordaX500Name,
crlCheckSoftFail: Boolean,
crlCheckArtemisServer: Boolean, crlCheckArtemisServer: Boolean,
nodeCrlDistPoint: String, nodeCrlDistPoint: String,
sslHandshakeTimeout: Duration?): Pair<ArtemisMessagingServer, ArtemisMessagingClient> { sslHandshakeTimeout: Duration?): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
@ -466,7 +473,7 @@ class CertificateRevocationListNodeTests {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also { val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(CHARLIE_NAME).whenever(it).myLegalName doReturn(legalName).whenever(it).myLegalName
doReturn(signingCertificateStore).whenever(it).signingCertificateStore doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress
@ -477,14 +484,25 @@ class CertificateRevocationListNodeTests {
artemisConfig.configureWithDevSSLCertificate() artemisConfig.configureWithDevSSLCertificate()
recreateNodeCaAndTlsCertificates(signingCertificateStore, p2pSslConfiguration, nodeCrlDistPoint, null) recreateNodeCaAndTlsCertificates(signingCertificateStore, p2pSslConfiguration, nodeCrlDistPoint, null)
val server = ArtemisMessagingServer(artemisConfig, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE, null) val server = ArtemisMessagingServer(
val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE) artemisConfig,
artemisConfig.p2pAddress,
MAX_MESSAGE_SIZE,
threadPoolName = "${legalName.organisation}-server",
trace = true
)
val client = ArtemisMessagingClient(
artemisConfig.p2pSslOptions,
artemisConfig.p2pAddress,
MAX_MESSAGE_SIZE,
threadPoolName = "${legalName.organisation}-client",
trace = true
)
server.start() server.start()
client.start() client.start()
return server to client return server to client
} }
@Suppress("LongParameterList")
private fun verifyArtemisConnection(crlCheckSoftFail: Boolean, private fun verifyArtemisConnection(crlCheckSoftFail: Boolean,
crlCheckArtemisServer: Boolean, crlCheckArtemisServer: Boolean,
expectedConnected: Boolean = true, expectedConnected: Boolean = true,
@ -493,11 +511,17 @@ class CertificateRevocationListNodeTests {
nodeCrlDistPoint: String = "http://${crlServer.hostAndPort}/crl/$NODE_CRL", nodeCrlDistPoint: String = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
sslHandshakeTimeout: Duration? = null) { sslHandshakeTimeout: Duration? = null) {
val queueName = P2P_PREFIX + "Test" val queueName = P2P_PREFIX + "Test"
val (artemisServer, artemisClient) = createArtemisServerAndClient(crlCheckSoftFail, crlCheckArtemisServer, nodeCrlDistPoint, sslHandshakeTimeout) val (artemisServer, artemisClient) = createArtemisServerAndClient(
CHARLIE_NAME,
crlCheckSoftFail,
crlCheckArtemisServer,
nodeCrlDistPoint,
sslHandshakeTimeout
)
artemisServer.use { artemisServer.use {
artemisClient.started!!.session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) artemisClient.started!!.session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
val nodeCert = createAMQPClient(serverPort, true, nodeCrlDistPoint) val nodeCert = createAMQPClient(serverPort, true, ALICE_NAME, nodeCrlDistPoint)
if (revokedNodeCert) { if (revokedNodeCert) {
crlServer.revokedNodeCerts.add(nodeCert.serialNumber) crlServer.revokedNodeCerts.add(nodeCert.serialNumber)
} }

View File

@ -437,7 +437,7 @@ class ProtonWrapperTests {
} }
artemisConfig.configureWithDevSSLCertificate() artemisConfig.configureWithDevSSLCertificate()
val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize, null) val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize)
val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", artemisPort), maxMessageSize) val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", artemisPort), maxMessageSize)
server.start() server.start()
client.start() client.start()

View File

@ -240,7 +240,7 @@ class ArtemisMessagingTest {
} }
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer { private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null, true).apply { return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, trace = true).apply {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
messagingServer = this messagingServer = this
} }

View File

@ -22,7 +22,7 @@ class SimpleMQClient(val target: NetworkHostAndPort,
lateinit var producer: ClientProducer lateinit var producer: ClientProducer
fun start(username: String? = null, password: String? = null, enableSSL: Boolean = true) { fun start(username: String? = null, password: String? = null, enableSSL: Boolean = true) {
val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL) val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL, threadPoolName = "SimpleMQClient")
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
isBlockOnNonDurableSend = true isBlockOnNonDurableSend = true
threadPoolMaxSize = 1 threadPoolMaxSize = 1

View File

@ -55,7 +55,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
class ArtemisMessagingServer(private val config: NodeConfiguration, class ArtemisMessagingServer(private val config: NodeConfiguration,
private val messagingServerAddress: NetworkHostAndPort, private val messagingServerAddress: NetworkHostAndPort,
private val maxMessageSize: Int, private val maxMessageSize: Int,
private val journalBufferTimeout : Int?, private val journalBufferTimeout : Int? = null,
private val threadPoolName: String = "ArtemisServer",
private val trace: Boolean = false) : ArtemisBroker, SingletonSerializeAsToken() { private val trace: Boolean = false) : ArtemisBroker, SingletonSerializeAsToken() {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
@ -132,9 +133,10 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
// The transaction cache is configurable, and drives other cache sizes. // The transaction cache is configurable, and drives other cache sizes.
globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize) globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize)
acceptorConfigurations.add(p2pAcceptorTcpTransport( addAcceptorConfiguration(p2pAcceptorTcpTransport(
NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port),
config.p2pSslOptions, config.p2pSslOptions,
threadPoolName = threadPoolName,
trace = trace trace = trace
)) ))
// Enable built in message deduplication. Note we still have to do our own as the delayed commits // Enable built in message deduplication. Note we still have to do our own as the delayed commits
@ -181,7 +183,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue) deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue)
} }
@Throws(IOException::class, KeyStoreException::class)
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
val keyStore = config.p2pSslOptions.keyStore.get().value.internal val keyStore = config.p2pSslOptions.keyStore.get().value.internal
val trustStore = config.p2pSslOptions.trustStore.get().value.internal val trustStore = config.p2pSslOptions.trustStore.get().value.internal

View File

@ -1,11 +1,14 @@
package net.corda.node.services.messaging package net.corda.node.services.messaging
import io.netty.buffer.ByteBufAllocator import io.netty.buffer.ByteBufAllocator
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.group.ChannelGroup import io.netty.channel.group.ChannelGroup
import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler import io.netty.handler.logging.LoggingHandler
import io.netty.handler.ssl.SslHandler import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeTimeoutException
import net.corda.core.internal.declaredField import net.corda.core.internal.declaredField
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport
import org.apache.activemq.artemis.api.core.BaseInterceptor import org.apache.activemq.artemis.api.core.BaseInterceptor
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor
@ -15,10 +18,14 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler import org.apache.activemq.artemis.spi.core.remoting.BufferHandler
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener
import org.apache.activemq.artemis.utils.ConfigurationHelper
import org.apache.activemq.artemis.utils.actors.OrderedExecutor import org.apache.activemq.artemis.utils.actors.OrderedExecutor
import java.nio.channels.ClosedChannelException
import java.time.Duration import java.time.Duration
import java.util.concurrent.Executor import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.regex.Pattern
import javax.net.ssl.SSLEngine
@Suppress("unused") // Used via reflection in ArtemisTcpTransport @Suppress("unused") // Used via reflection in ArtemisTcpTransport
class NodeNettyAcceptorFactory : AcceptorFactory { class NodeNettyAcceptorFactory : AcceptorFactory {
@ -34,6 +41,7 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap) return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
} }
private class NodeNettyAcceptor(name: String?, private class NodeNettyAcceptor(name: String?,
clusterConnection: ClusterConnection?, clusterConnection: ClusterConnection?,
configuration: Map<String, Any>, configuration: Map<String, Any>,
@ -44,24 +52,76 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
protocolMap: Map<String, ProtocolManager<BaseInterceptor<*>>>?) : protocolMap: Map<String, ProtocolManager<BaseInterceptor<*>>>?) :
NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap) NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
{ {
companion object {
private val defaultThreadPoolNamePattern = Pattern.compile("""Thread-(\d+) \(activemq-netty-threads\)""")
}
private val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "NodeNettyAcceptor", configuration)
private val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
@Synchronized
override fun start() { override fun start() {
super.start() super.start()
if (configuration[ArtemisTcpTransport.TRACE_NAME] == true) { if (trace) {
// Artemis does not seem to allow access to the underlying channel so we resort to reflection and get it via the // Unfortunately we have to resort to reflection to be able to get access to the server channel(s)
// serverChannelGroup field. This field is only available after start(), hence why we add the logger here.
declaredField<ChannelGroup>("serverChannelGroup").value.forEach { channel -> declaredField<ChannelGroup>("serverChannelGroup").value.forEach { channel ->
channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO)) channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO))
} }
} }
} }
@Synchronized
override fun getSslHandler(alloc: ByteBufAllocator?): SslHandler { override fun getSslHandler(alloc: ByteBufAllocator?): SslHandler {
val sslHandler = super.getSslHandler(alloc) applyThreadPoolName()
val engine = super.getSslHandler(alloc).engine()
val sslHandler = NodeAcceptorSslHandler(engine, trace)
val handshakeTimeout = configuration[ArtemisTcpTransport.SSL_HANDSHAKE_TIMEOUT_NAME] as Duration? val handshakeTimeout = configuration[ArtemisTcpTransport.SSL_HANDSHAKE_TIMEOUT_NAME] as Duration?
if (handshakeTimeout != null) { if (handshakeTimeout != null) {
sslHandler.handshakeTimeoutMillis = handshakeTimeout.toMillis() sslHandler.handshakeTimeoutMillis = handshakeTimeout.toMillis()
} }
return sslHandler return sslHandler
} }
/**
* [NettyAcceptor.start] has hardcoded the thread pool name and does not provide a way to configure it. This is a workaround.
*/
private fun applyThreadPoolName() {
val matcher = defaultThreadPoolNamePattern.matcher(Thread.currentThread().name)
if (matcher.matches()) {
Thread.currentThread().name = "$threadPoolName-${matcher.group(1)}" // Preserve the pool thread number
}
}
}
private class NodeAcceptorSslHandler(engine: SSLEngine, private val trace: Boolean) : SslHandler(engine) {
companion object {
private val logger = contextLogger()
}
override fun handlerAdded(ctx: ChannelHandlerContext) {
logHandshake()
super.handlerAdded(ctx)
// Unfortunately NettyAcceptor does not let us add extra child handlers, so we have to add our logger this way.
if (trace) {
ctx.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO))
}
}
private fun logHandshake() {
val start = System.currentTimeMillis()
handshakeFuture().addListener {
val duration = System.currentTimeMillis() - start
when {
it.isSuccess -> logger.info("SSL handshake completed in ${duration}ms with ${engine().session.peerPrincipal}")
it.isCancelled -> logger.warn("SSL handshake cancelled after ${duration}ms")
else -> when (it.cause()) {
is ClosedChannelException -> logger.warn("SSL handshake closed early after ${duration}ms")
is SslHandshakeTimeoutException -> logger.warn("SSL handshake timed out after ${duration}ms")
else -> logger.warn("SSL handshake failed after ${duration}ms", it.cause())
}
}
}
}
} }
} }

View File

@ -0,0 +1,63 @@
package net.corda.node.services.messaging
import io.netty.channel.ChannelPipeline
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import net.corda.nodeapi.internal.ArtemisTcpTransport
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager
import org.apache.activemq.artemis.spi.core.remoting.Connector
import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory
import org.apache.activemq.artemis.utils.ConfigurationHelper
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
@Suppress("unused")
class NodeNettyConnectorFactory : ConnectorFactory {
override fun createConnector(configuration: MutableMap<String, Any>?,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?): Connector {
val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration)
val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
return NettyConnector(
configuration,
handler,
listener,
closeExecutor,
threadPool,
scheduledThreadPool,
MyClientProtocolManager(threadPoolName, trace)
)
}
override fun isReliable(): Boolean = false
override fun getDefaults(): Map<String?, Any?> = NettyConnector.DEFAULT_CONFIG
private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() {
override fun addChannelHandlers(pipeline: ChannelPipeline) {
applyThreadPoolName()
super.addChannelHandlers(pipeline)
if (trace) {
pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
}
}
/**
* [NettyConnector.start] does not provide a way to configure the thread pool name, so we modify the thread name accordingly.
*/
private fun applyThreadPoolName() {
with(Thread.currentThread()) {
name = name.replace("nioEventLoopGroup", threadPoolName) // pool and thread numbers are preserved
}
}
}
}

View File

@ -93,6 +93,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
cacheFactory: NamedCacheFactory, cacheFactory: NamedCacheFactory,
private val isDrainingModeOn: () -> Boolean, private val isDrainingModeOn: () -> Boolean,
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>, private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>,
private val threadPoolName: String = "P2PClient",
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log) private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, ServiceStateSupport by stateHelper { ) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, ServiceStateSupport by stateHelper {
companion object { companion object {
@ -164,10 +165,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
started = true started = true
log.info("Connecting to message broker: $serverAddress") log.info("Connecting to message broker: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used // TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions) val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions, threadPoolName = threadPoolName)
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
connectionTTL = 60000 connectionTTL = 60000
clientFailureCheckPeriod = 30000 clientFailureCheckPeriod = 30000
minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE

View File

@ -3,13 +3,14 @@ package net.corda.coretesting.internal
import io.netty.bootstrap.Bootstrap import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelFuture import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.ssl.SslContext
import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption import io.netty.channel.ChannelOption
import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslHandler import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.DefaultThreadFactory
import java.io.Closeable import java.io.Closeable
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
@ -17,7 +18,6 @@ import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.SSLEngine import javax.net.ssl.SSLEngine
import kotlin.concurrent.thread import kotlin.concurrent.thread
class NettyTestClient( class NettyTestClient(
val sslContext: SslContext?, val sslContext: SslContext?,
val targetHost: String, val targetHost: String,
@ -49,7 +49,7 @@ class NettyTestClient(
private fun run() { private fun run() {
// Configure the client. // Configure the client.
val group = NioEventLoopGroup() val group = NioEventLoopGroup(DefaultThreadFactory("NettyTestClient"))
try { try {
val b = Bootstrap() val b = Bootstrap()
b.group(group) b.group(group)

View File

@ -11,6 +11,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler import io.netty.handler.logging.LoggingHandler
import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContext
import io.netty.util.concurrent.DefaultThreadFactory
import java.io.Closeable import java.io.Closeable
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
@ -45,8 +46,8 @@ class NettyTestServer(
fun run() { fun run() {
// Configure the server. // Configure the server.
val bossGroup = NioEventLoopGroup(1) val bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("NettyTestServer-boss"))
val workerGroup = NioEventLoopGroup() val workerGroup = NioEventLoopGroup(DefaultThreadFactory("NettyTestServer-worker"))
try { try {
val b = ServerBootstrap() val b = ServerBootstrap()
b.group(bossGroup, workerGroup) b.group(bossGroup, workerGroup)