Merge branch 'release/os/4.10' into shams-4.11-fwrd-merge-046ed0ac

# Conflicts:
#	node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt
This commit is contained in:
Shams Asari 2023-05-22 09:55:19 +01:00
commit e6336666c2
21 changed files with 343 additions and 128 deletions

View File

@ -24,7 +24,9 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
private val confirmationWindowSize: Int = -1,
private val messagingServerConnectionConfig: MessagingServerConnectionConfiguration? = null,
private val backupServerAddressPool: List<NetworkHostAndPort> = emptyList(),
private val failoverCallback: ((FailoverEventType) -> Unit)? = null
private val failoverCallback: ((FailoverEventType) -> Unit)? = null,
private val threadPoolName: String = "ArtemisClient",
private val trace: Boolean = false
) : ArtemisSessionProvider {
companion object {
private val log = loggerFor<ArtemisMessagingClient>()
@ -39,8 +41,10 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
override fun start(): Started = synchronized(this) {
check(started == null) { "start can't be called twice" }
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config)
val backupTransports = backupServerAddressPool.map { p2pConnectorTcpTransport(it, config) }
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace)
val backupTransports = backupServerAddressPool.map {
p2pConnectorTcpTransport(it, config, threadPoolName = threadPoolName, trace = trace)
}
log.info("Connecting to message broker: $serverAddress")
if (backupTransports.isNotEmpty()) {
@ -49,8 +53,6 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
// If back-up artemis addresses are configured, the locator will be created using HA mode.
@Suppress("SpreadOperator")
val locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
connectionTTL = 60000
clientFailureCheckPeriod = 30000
callFailoverTimeout = java.lang.Long.getLong(CORDA_ARTEMIS_CALL_TIMEOUT_PROP_NAME, CORDA_ARTEMIS_CALL_TIMEOUT_DEFAULT)

View File

@ -9,10 +9,10 @@ import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.SslConfiguration
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.Path
@Suppress("LongParameterList")
class ArtemisTcpTransport {
companion object {
val CIPHER_SUITES = listOf(
@ -22,8 +22,9 @@ class ArtemisTcpTransport {
val TLS_VERSIONS = listOf("TLSv1.2")
const val SSL_HANDSHAKE_TIMEOUT_NAME = "SSLHandshakeTimeout"
const val TRACE_NAME = "trace"
const val SSL_HANDSHAKE_TIMEOUT_NAME = "Corda-SSLHandshakeTimeout"
const val TRACE_NAME = "Corda-Trace"
const val THREAD_POOL_NAME_NAME = "Corda-ThreadPoolName"
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
@ -94,24 +95,25 @@ class ArtemisTcpTransport {
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
config: MutualSslConfiguration?,
enableSSL: Boolean = true,
threadPoolName: String = "P2PServer",
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>()
if (enableSSL) {
config?.addToTransportOptions(options)
}
return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, trace)
return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace)
}
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
config: MutualSslConfiguration?,
enableSSL: Boolean = true,
keyStoreType: String? = null): TransportConfiguration {
threadPoolName: String = "P2PClient",
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>()
if (enableSSL) {
config?.addToTransportOptions(options)
options += asMap(keyStoreType)
}
return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL)
return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace)
}
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
@ -123,65 +125,89 @@ class ArtemisTcpTransport {
config.keyStorePath.requireOnDefaultFileSystem()
options.putAll(config.toTransportOptions())
}
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, trace)
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCServer", trace)
}
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
config: ClientRpcSslOptions?,
enableSSL: Boolean = true,
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>()
if (config != null && enableSSL) {
config.trustStorePath.requireOnDefaultFileSystem()
options.putAll(config.toTransportOptions())
}
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL)
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCClient", trace)
}
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration {
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort,
config: SslConfiguration,
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>()
config.addToTransportOptions(options)
options += asMap(keyStoreProvider)
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true)
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCClient", trace)
}
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
config: SslConfiguration,
keyStoreType: String? = null,
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>()
config.addToTransportOptions(options)
options += asMap(keyStoreType)
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true, trace = trace)
}
private fun asMap(keyStoreType: String?): Map<String, String> {
return keyStoreType?.let { mutableMapOf(TransportConstants.KEYSTORE_TYPE_PROP_NAME to it) } ?: emptyMap()
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCServer", trace)
}
private fun createAcceptorTransport(hostAndPort: NetworkHostAndPort,
protocols: String,
options: MutableMap<String, Any>,
enableSSL: Boolean,
threadPoolName: String,
trace: Boolean): TransportConfiguration {
options += defaultArtemisOptions(hostAndPort, protocols)
if (enableSSL) {
options += defaultSSLOptions
}
// Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0
options[TRACE_NAME] = trace
return TransportConfiguration("net.corda.node.services.messaging.NodeNettyAcceptorFactory", options)
return createTransport(
"net.corda.node.services.messaging.NodeNettyAcceptorFactory",
hostAndPort,
protocols,
options,
enableSSL,
threadPoolName,
trace
)
}
private fun createConnectorTransport(hostAndPort: NetworkHostAndPort,
protocols: String,
options: MutableMap<String, Any>,
enableSSL: Boolean): TransportConfiguration {
enableSSL: Boolean,
threadPoolName: String,
trace: Boolean): TransportConfiguration {
return createTransport(
NodeNettyConnectorFactory::class.java.name,
hostAndPort,
protocols,
options,
enableSSL,
threadPoolName,
trace
)
}
private fun createTransport(className: String,
hostAndPort: NetworkHostAndPort,
protocols: String,
options: MutableMap<String, Any>,
enableSSL: Boolean,
threadPoolName: String,
trace: Boolean): TransportConfiguration {
options += defaultArtemisOptions(hostAndPort, protocols)
if (enableSSL) {
options += defaultSSLOptions
// This is required to stop Client checking URL address vs. Server provided certificate
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
}
return TransportConfiguration(NettyConnectorFactory::class.java.name, options)
options[THREAD_POOL_NAME_NAME] = threadPoolName
options[TRACE_NAME] = trace
return TransportConfiguration(className, options)
}
}
}

View File

@ -0,0 +1,61 @@
package net.corda.nodeapi.internal
import io.netty.channel.ChannelPipeline
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager
import org.apache.activemq.artemis.spi.core.remoting.Connector
import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory
import org.apache.activemq.artemis.utils.ConfigurationHelper
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
class NodeNettyConnectorFactory : ConnectorFactory {
override fun createConnector(configuration: MutableMap<String, Any>?,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?): Connector {
val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration)
val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
return NettyConnector(
configuration,
handler,
listener,
closeExecutor,
threadPool,
scheduledThreadPool,
MyClientProtocolManager(threadPoolName, trace)
)
}
override fun isReliable(): Boolean = false
override fun getDefaults(): Map<String?, Any?> = NettyConnector.DEFAULT_CONFIG
private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() {
override fun addChannelHandlers(pipeline: ChannelPipeline) {
applyThreadPoolName()
super.addChannelHandlers(pipeline)
if (trace) {
pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
}
}
/**
* [NettyConnector.start] does not provide a way to configure the thread pool name, so we modify the thread name accordingly.
*/
private fun applyThreadPoolName() {
with(Thread.currentThread()) {
name = name.replace("nioEventLoopGroup", threadPoolName) // pool and thread numbers are preserved
}
}
}
}

View File

@ -5,16 +5,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.netty.channel.EventLoop
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.util.concurrent.DefaultThreadFactory
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.VisibleForTesting
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
@ -503,7 +504,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
}
override fun start() {
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS)
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("AMQPBridge", Thread.MAX_PRIORITY))
val artemis = artemisMessageClientFactory()
this.artemis = artemis
artemis.start()

View File

@ -1,3 +1,5 @@
@file:Suppress("MagicNumber", "TooGenericExceptionCaught")
package net.corda.nodeapi.internal.crypto
import net.corda.core.CordaOID
@ -12,6 +14,8 @@ import net.corda.core.internal.validate
import net.corda.core.internal.writer
import net.corda.core.utilities.days
import net.corda.core.utilities.millis
import net.corda.core.utilities.toHex
import net.corda.nodeapi.internal.protonwrapper.netty.distributionPointsToString
import org.bouncycastle.asn1.ASN1EncodableVector
import org.bouncycastle.asn1.ASN1ObjectIdentifier
import org.bouncycastle.asn1.ASN1Sequence
@ -393,7 +397,6 @@ object X509Utilities {
}
}
@Suppress("MagicNumber")
private fun generateCertificateSerialNumber(): BigInteger {
val bytes = ByteArray(CERTIFICATE_SERIAL_NUMBER_LENGTH)
newSecureRandom().nextBytes(bytes)
@ -433,6 +436,29 @@ fun PKCS10CertificationRequest.isSignatureValid(): Boolean {
return this.isSignatureValid(JcaContentVerifierProviderBuilder().build(this.subjectPublicKeyInfo))
}
fun X509Certificate.toSimpleString(): String {
val bcCert = toBc()
val keyIdentifier = try {
SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (e: Exception) {
"null"
}
val authorityKeyIdentifier = try {
AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (e: Exception) {
"null"
}
val subject = bcCert.subject
val issuer = bcCert.issuer
val role = CertRole.extract(this)
return "$subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] $role $serialNumber [${distributionPointsToString()}]"
}
fun X509CRL.toSimpleString(): String {
val revokedSerialNumbers = revokedCertificates?.map { it.serialNumber }
return "$issuerX500Principal ${thisUpdate.toInstant()} ${nextUpdate.toInstant()} ${revokedSerialNumbers ?: "[]"}"
}
/**
* Check certificate validity or print warning if expiry is within 30 days
*/

View File

@ -115,11 +115,10 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
val transport = connection.transport as ProtonJTransport
transport.protocolTracer = object : ProtocolTracer {
override fun sentFrame(transportFrame: TransportFrame) {
logInfoWithMDC { "${transportFrame.body}" }
logInfoWithMDC { "sentFrame: ${transportFrame.body}" }
}
override fun receivedFrame(transportFrame: TransportFrame) {
logInfoWithMDC { "${transportFrame.body}" }
logInfoWithMDC { "receivedFrame: ${transportFrame.body}" }
}
}
}

View File

@ -1,7 +1,11 @@
package net.corda.nodeapi.internal.protonwrapper.netty
import io.netty.bootstrap.Bootstrap
import io.netty.channel.*
import io.netty.channel.Channel
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelInitializer
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
@ -11,6 +15,7 @@ import io.netty.handler.proxy.HttpProxyHandler
import io.netty.handler.proxy.Socks4ProxyHandler
import io.netty.handler.proxy.Socks5ProxyHandler
import io.netty.resolver.NoopAddressResolverGroup
import io.netty.util.concurrent.DefaultThreadFactory
import io.netty.util.internal.logging.InternalLoggerFactory
import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.identity.CordaX500Name
@ -58,7 +63,8 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA
class AMQPClient(private val targets: List<NetworkHostAndPort>,
val allowedRemoteLegalNames: Set<CordaX500Name>,
private val configuration: AMQPConfiguration,
private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable {
private val sharedThreadPool: EventLoopGroup? = null,
private val threadPoolName: String = "AMQPClient") : AutoCloseable {
companion object {
init {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
@ -303,7 +309,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
return
}
log.info("Connect to: $currentTarget")
workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS)
workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY))
started = true
restart()
}

View File

@ -11,6 +11,7 @@ import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.util.concurrent.DefaultThreadFactory
import io.netty.util.internal.logging.InternalLoggerFactory
import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.utilities.NetworkHostAndPort
@ -37,8 +38,8 @@ import kotlin.concurrent.withLock
*/
class AMQPServer(val hostName: String,
val port: Int,
private val configuration: AMQPConfiguration) : AutoCloseable {
private val configuration: AMQPConfiguration,
private val threadPoolName: String = "AMQPServer") : AutoCloseable {
companion object {
init {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
@ -131,8 +132,8 @@ class AMQPServer(val hostName: String,
lock.withLock {
stop()
bossGroup = NioEventLoopGroup(1)
workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS)
bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("$threadPoolName-boss", Thread.MAX_PRIORITY))
workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS, DefaultThreadFactory("$threadPoolName-worker", Thread.MAX_PRIORITY))
val server = ServerBootstrap()
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux

View File

@ -14,33 +14,38 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.toHex
import net.corda.nodeapi.internal.ArtemisTcpTransport
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.crypto.toBc
import net.corda.nodeapi.internal.crypto.toSimpleString
import net.corda.nodeapi.internal.crypto.x509
import org.bouncycastle.asn1.ASN1InputStream
import org.bouncycastle.asn1.ASN1Primitive
import org.bouncycastle.asn1.ASN1IA5String
import org.bouncycastle.asn1.DEROctetString
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier
import org.bouncycastle.asn1.x509.CRLDistPoint
import org.bouncycastle.asn1.x509.DistributionPointName
import org.bouncycastle.asn1.x509.Extension
import org.bouncycastle.asn1.x509.GeneralName
import org.bouncycastle.asn1.x509.GeneralNames
import org.bouncycastle.asn1.x509.SubjectKeyIdentifier
import org.slf4j.LoggerFactory
import java.net.Socket
import java.net.URI
import java.security.KeyStore
import java.security.cert.*
import java.util.*
import java.security.cert.CertificateException
import java.security.cert.PKIXBuilderParameters
import java.security.cert.PKIXRevocationChecker
import java.security.cert.X509CertSelector
import java.security.cert.X509Certificate
import java.util.concurrent.Executor
import javax.net.ssl.*
import javax.net.ssl.CertPathTrustManagerParameters
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SNIHostName
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLEngine
import javax.net.ssl.TrustManagerFactory
import javax.net.ssl.X509ExtendedTrustManager
import javax.security.auth.x500.X500Principal
import kotlin.collections.HashMap
import kotlin.system.measureTimeMillis
private const val HOSTNAME_FORMAT = "%s.corda.net"
@ -109,23 +114,7 @@ fun certPathToString(certPath: Array<out X509Certificate>?): String {
if (certPath == null) {
return "<empty certpath>"
}
val certs = certPath.map {
val bcCert = it.toBc()
val subject = bcCert.subject.toString()
val issuer = bcCert.issuer.toString()
val keyIdentifier = try {
SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (ex: Exception) {
"null"
}
val authorityKeyIdentifier = try {
AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (ex: Exception) {
"null"
}
" $subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] [${it.distributionPointsToString()}]"
}
return certs.joinToString("\r\n")
return certPath.joinToString(System.lineSeparator()) { " ${it.toSimpleString()}" }
}
@VisibleForTesting

View File

@ -3,7 +3,10 @@ package net.corda.nodeapi.internal.revocation
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import net.corda.core.internal.readFully
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
import net.corda.nodeapi.internal.crypto.toSimpleString
import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource
import net.corda.nodeapi.internal.protonwrapper.netty.distributionPoints
import java.net.URI
@ -15,8 +18,11 @@ import javax.security.auth.x500.X500Principal
/**
* [CrlSource] which downloads CRLs from the distribution points in the X509 certificate.
*/
@Suppress("TooGenericExceptionCaught")
class CertDistPointCrlSource : CrlSource {
companion object {
private val logger = contextLogger()
// The default SSL handshake timeout is 60s (DEFAULT_SSL_HANDSHAKE_TIMEOUT). Considering there are 3 CRLs endpoints to check in a
// node handshake, we want to keep the total timeout within that.
private const val DEFAULT_CONNECT_TIMEOUT = 9_000
@ -33,7 +39,8 @@ class CertDistPointCrlSource : CrlSource {
private val readTimeout = Integer.getInteger("net.corda.dpcrl.read.timeout", DEFAULT_READ_TIMEOUT)
private fun retrieveCRL(uri: URI): X509CRL {
val bytes = run {
val start = System.currentTimeMillis()
val bytes = try {
val conn = uri.toURL().openConnection()
conn.connectTimeout = connectTimeout
conn.readTimeout = readTimeout
@ -41,12 +48,26 @@ class CertDistPointCrlSource : CrlSource {
// in an InputStream, but the JDK implementation (sun.security.provider.X509Factory.engineGenerateCRL) converts any IOException
// into CRLException and drops the cause chain.
conn.getInputStream().readFully()
} catch (e: Exception) {
if (logger.isDebugEnabled) {
logger.debug("Unable to download CRL from $uri (${System.currentTimeMillis() - start}ms)", e)
}
throw e
}
return X509CertificateFactory().generateCRL(bytes.inputStream())
val duration = System.currentTimeMillis() - start
val crl = try {
X509CertificateFactory().generateCRL(bytes.inputStream())
} catch (e: Exception) {
if (logger.isDebugEnabled) {
logger.debug("Invalid CRL from $uri (${duration}ms)", e)
}
throw e
}
logger.debug { "CRL from $uri (${duration}ms): ${crl.toSimpleString()}" }
return crl
}
}
@Suppress("TooGenericExceptionCaught")
override fun fetch(certificate: X509Certificate): Set<X509CRL> {
val approvedCRLs = HashSet<X509CRL>()
var exception: Exception? = null

View File

@ -1,6 +1,8 @@
package net.corda.nodeapi.internal.revocation
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.nodeapi.internal.crypto.toSimpleString
import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource
import org.bouncycastle.asn1.x509.Extension
import java.security.cert.CRLReason
@ -27,8 +29,8 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
private val softFailExceptions = ArrayList<CertPathValidatorException>()
override fun check(cert: Certificate, unresolvedCritExts: MutableCollection<String>?) {
val x509Certificate = cert as X509Certificate
checkApprovedCRLs(x509Certificate, getCRLs(x509Certificate))
cert as X509Certificate
checkApprovedCRLs(cert, getCRLs(cert))
}
@Suppress("TooGenericExceptionCaught")
@ -40,30 +42,27 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
addSoftFailException(e)
return emptySet()
} else {
throw undeterminedRevocationException("Unable to retrieve CRLs", e)
throw undeterminedRevocationException("Unable to retrieve CRLs for cert ${cert.serialNumber}", e)
}
}
if (crls.isNotEmpty() || softFail) {
return crls
}
// Note, the JDK tries to find a valid CRL from a different signing key before giving up (RevocationChecker.verifyWithSeparateSigningKey)
throw undeterminedRevocationException("Could not find any valid CRLs", null)
throw undeterminedRevocationException("Could not find any valid CRLs for cert ${cert.serialNumber}", null)
}
/**
* Borrowed from `RevocationChecker.checkApprovedCRLs()`
*/
@Suppress("NestedBlockDepth")
@Throws(CertPathValidatorException::class)
private fun checkApprovedCRLs(cert: X509Certificate, approvedCRLs: Set<X509CRL>) {
// See if the cert is in the set of approved crls.
logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() cert SN: ${cert.serialNumber}")
logger.debug { "Check cert ${cert.serialNumber} against CRLs ${approvedCRLs.map { it.toSimpleString() }}" }
for (crl in approvedCRLs) {
val entry = crl.getRevokedCertificate(cert)
if (entry != null) {
logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() CRL entry: $entry")
/*
* Abort CRL validation and throw exception if there are any
* unrecognized critical CRL entry extensions (see section
@ -75,19 +74,15 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
unresCritExts.remove(Extension.cRLDistributionPoints.id)
unresCritExts.remove(Extension.certificateIssuer.id)
if (unresCritExts.isNotEmpty()) {
throw CertPathValidatorException(
"Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts")
throw CertPathValidatorException("Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts")
}
}
val reasonCode = entry.revocationReason ?: CRLReason.UNSPECIFIED
val revocationDate = entry.revocationDate
if (revocationDate.before(dateSource())) {
val t = CertificateRevokedException(
revocationDate, reasonCode,
crl.issuerX500Principal, mutableMapOf())
throw CertPathValidatorException(
t.message, t, null, -1, BasicReason.REVOKED)
val t = CertificateRevokedException(revocationDate, reasonCode, crl.issuerX500Principal, emptyMap())
throw CertPathValidatorException(t.message, t, null, -1, BasicReason.REVOKED)
}
}
}
@ -105,15 +100,18 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
return false
}
override fun getSupportedExtensions(): MutableSet<String>? {
override fun getSupportedExtensions(): Set<String>? {
return null
}
override fun init(forward: Boolean) {
if (forward) {
throw CertPathValidatorException("Forward checking not allowed")
}
softFailExceptions.clear()
}
override fun getSoftFailExceptions(): MutableList<CertPathValidatorException> {
override fun getSoftFailExceptions(): List<CertPathValidatorException> {
return Collections.unmodifiableList(softFailExceptions)
}
@ -125,4 +123,4 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
private fun undeterminedRevocationException(message: String?, cause: Throwable?): CertPathValidatorException {
return CertPathValidatorException(message, cause, null, -1, BasicReason.UNDETERMINED_REVOCATION_STATUS)
}
}
}

View File

@ -205,7 +205,7 @@ class AMQPBridgeTest {
doReturn(null).whenever(it).jmxMonitoringHttpPort
}
artemisConfig.configureWithDevSSLCertificate()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE, null)
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()

View File

@ -25,7 +25,6 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.driver.internal.incrementalPortAllocation
@ -50,6 +49,7 @@ import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
@Suppress("LongParameterList")
class CertificateRevocationListNodeTests {
@Rule
@JvmField
@ -327,17 +327,18 @@ class CertificateRevocationListNodeTests {
private fun createAMQPClient(targetPort: Int,
crlCheckSoftFail: Boolean,
legalName: CordaX500Name,
nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL",
maxMessageSize: Int = MAX_MESSAGE_SIZE): X509Certificate {
val baseDirectory = temporaryFolder.root.toPath() / "client"
val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation
val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val clientConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(BOB_NAME).whenever(it).myLegalName
doReturn(legalName).whenever(it).myLegalName
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(crlCheckSoftFail).whenever(it).crlCheckSoftFail
@ -351,28 +352,32 @@ class CertificateRevocationListNodeTests {
override val trustStore = clientConfig.p2pSslOptions.trustStore.get()
override val maxMessageSize: Int = maxMessageSize
}
amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", targetPort)), setOf(ALICE_NAME, CHARLIE_NAME), amqpConfig)
amqpClient = AMQPClient(
listOf(NetworkHostAndPort("localhost", targetPort)),
setOf(CHARLIE_NAME),
amqpConfig,
threadPoolName = legalName.organisation
)
return nodeCert
}
@Suppress("LongParameterList")
private fun createAMQPServer(port: Int,
name: CordaX500Name = ALICE_NAME,
legalName: CordaX500Name,
crlCheckSoftFail: Boolean,
nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL",
maxMessageSize: Int = MAX_MESSAGE_SIZE,
sslHandshakeTimeout: Duration? = null): X509Certificate {
check(!::amqpServer.isInitialized)
val baseDirectory = temporaryFolder.root.toPath() / "server"
val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation
val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(name).whenever(it).myLegalName
doReturn(legalName).whenever(it).myLegalName
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
}
@ -386,7 +391,7 @@ class CertificateRevocationListNodeTests {
override val maxMessageSize: Int = maxMessageSize
override val sslHandshakeTimeout: Duration = sslHandshakeTimeout ?: super.sslHandshakeTimeout
}
amqpServer = AMQPServer("0.0.0.0", port, amqpConfig)
amqpServer = AMQPServer("0.0.0.0", port, amqpConfig, threadPoolName = legalName.organisation)
return nodeCert
}
@ -422,7 +427,6 @@ class CertificateRevocationListNodeTests {
return newNodeCert
}
@Suppress("LongParameterList")
private fun verifyAMQPConnection(crlCheckSoftFail: Boolean,
nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
revokeServerCert: Boolean = false,
@ -431,6 +435,7 @@ class CertificateRevocationListNodeTests {
expectedConnectStatus: Boolean) {
val serverCert = createAMQPServer(
serverPort,
CHARLIE_NAME,
crlCheckSoftFail = crlCheckSoftFail,
nodeCrlDistPoint = nodeCrlDistPoint,
sslHandshakeTimeout = sslHandshakeTimeout
@ -445,6 +450,7 @@ class CertificateRevocationListNodeTests {
val clientCert = createAMQPClient(
serverPort,
crlCheckSoftFail = crlCheckSoftFail,
legalName = ALICE_NAME,
nodeCrlDistPoint = nodeCrlDistPoint
)
if (revokeClientCert) {
@ -456,7 +462,8 @@ class CertificateRevocationListNodeTests {
assertThat(serverConnect.connected).isEqualTo(expectedConnectStatus)
}
private fun createArtemisServerAndClient(crlCheckSoftFail: Boolean,
private fun createArtemisServerAndClient(legalName: CordaX500Name,
crlCheckSoftFail: Boolean,
crlCheckArtemisServer: Boolean,
nodeCrlDistPoint: String,
sslHandshakeTimeout: Duration?): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
@ -467,7 +474,7 @@ class CertificateRevocationListNodeTests {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(CHARLIE_NAME).whenever(it).myLegalName
doReturn(legalName).whenever(it).myLegalName
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress
@ -478,14 +485,25 @@ class CertificateRevocationListNodeTests {
artemisConfig.configureWithDevSSLCertificate()
recreateNodeCaAndTlsCertificates(signingCertificateStore, p2pSslConfiguration, nodeCrlDistPoint, null)
val server = ArtemisMessagingServer(artemisConfig, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE, null)
val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE)
val server = ArtemisMessagingServer(
artemisConfig,
artemisConfig.p2pAddress,
MAX_MESSAGE_SIZE,
threadPoolName = "${legalName.organisation}-server",
trace = true
)
val client = ArtemisMessagingClient(
artemisConfig.p2pSslOptions,
artemisConfig.p2pAddress,
MAX_MESSAGE_SIZE,
threadPoolName = "${legalName.organisation}-client",
trace = true
)
server.start()
client.start()
return server to client
}
@Suppress("LongParameterList")
private fun verifyArtemisConnection(crlCheckSoftFail: Boolean,
crlCheckArtemisServer: Boolean,
expectedConnected: Boolean = true,
@ -494,13 +512,19 @@ class CertificateRevocationListNodeTests {
nodeCrlDistPoint: String = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
sslHandshakeTimeout: Duration? = null) {
val queueName = P2P_PREFIX + "Test"
val (artemisServer, artemisClient) = createArtemisServerAndClient(crlCheckSoftFail, crlCheckArtemisServer, nodeCrlDistPoint, sslHandshakeTimeout)
val (artemisServer, artemisClient) = createArtemisServerAndClient(
CHARLIE_NAME,
crlCheckSoftFail,
crlCheckArtemisServer,
nodeCrlDistPoint,
sslHandshakeTimeout
)
artemisServer.use {
artemisClient.started!!.session.createQueue(
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true)
)
val nodeCert = createAMQPClient(serverPort, true, nodeCrlDistPoint)
val nodeCert = createAMQPClient(serverPort, true, ALICE_NAME, nodeCrlDistPoint)
if (revokedNodeCert) {
crlServer.revokedNodeCerts.add(nodeCert.serialNumber)
}

View File

@ -541,7 +541,7 @@ class ProtonWrapperTests {
}
artemisConfig.configureWithDevSSLCertificate()
val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize, null)
val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize)
val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", artemisPort), maxMessageSize)
server.start()
client.start()

View File

@ -243,7 +243,7 @@ class ArtemisMessagingTest {
}
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null, true).apply {
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, trace = true).apply {
config.configureWithDevSSLCertificate()
messagingServer = this
}

View File

@ -22,7 +22,7 @@ class SimpleMQClient(val target: NetworkHostAndPort,
lateinit var producer: ClientProducer
fun start(username: String? = null, password: String? = null, enableSSL: Boolean = true) {
val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL)
val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL, threadPoolName = "SimpleMQClient")
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
isBlockOnNonDurableSend = true
threadPoolMaxSize = 1

View File

@ -56,7 +56,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
class ArtemisMessagingServer(private val config: NodeConfiguration,
private val messagingServerAddress: NetworkHostAndPort,
private val maxMessageSize: Int,
private val journalBufferTimeout : Int?,
private val journalBufferTimeout : Int? = null,
private val threadPoolName: String = "ArtemisServer",
private val trace: Boolean = false) : ArtemisBroker, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
@ -131,9 +132,10 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
// The transaction cache is configurable, and drives other cache sizes.
globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize)
acceptorConfigurations.add(p2pAcceptorTcpTransport(
addAcceptorConfiguration(p2pAcceptorTcpTransport(
NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port),
config.p2pSslOptions,
threadPoolName = threadPoolName,
trace = trace
))
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
@ -180,7 +182,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue)
}
@Throws(IOException::class, KeyStoreException::class)
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
val keyStore = config.p2pSslOptions.keyStore.get().value.internal
val trustStore = config.p2pSslOptions.trustStore.get().value.internal

View File

@ -1,11 +1,14 @@
package net.corda.node.services.messaging
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.group.ChannelGroup
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeTimeoutException
import net.corda.core.internal.declaredField
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisTcpTransport
import org.apache.activemq.artemis.api.core.BaseInterceptor
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor
@ -16,10 +19,14 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener
import org.apache.activemq.artemis.utils.ConfigurationHelper
import org.apache.activemq.artemis.utils.actors.OrderedExecutor
import java.nio.channels.ClosedChannelException
import java.time.Duration
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
import java.util.regex.Pattern
import javax.net.ssl.SSLEngine
@Suppress("unused") // Used via reflection in ArtemisTcpTransport
class NodeNettyAcceptorFactory : AcceptorFactory {
@ -35,6 +42,7 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
}
private class NodeNettyAcceptor(name: String?,
clusterConnection: ClusterConnection?,
configuration: Map<String, Any>,
@ -45,24 +53,76 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
protocolMap: MutableMap<String, ProtocolManager<BaseInterceptor<*>, RedirectHandler<*>>>?) :
NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
{
companion object {
private val defaultThreadPoolNamePattern = Pattern.compile("""Thread-(\d+) \(activemq-netty-threads\)""")
}
private val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "NodeNettyAcceptor", configuration)
private val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
@Synchronized
override fun start() {
super.start()
if (configuration[ArtemisTcpTransport.TRACE_NAME] == true) {
// Artemis does not seem to allow access to the underlying channel so we resort to reflection and get it via the
// serverChannelGroup field. This field is only available after start(), hence why we add the logger here.
if (trace) {
// Unfortunately we have to resort to reflection to be able to get access to the server channel(s)
declaredField<ChannelGroup>("serverChannelGroup").value.forEach { channel ->
channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO))
}
}
}
@Synchronized
override fun getSslHandler(alloc: ByteBufAllocator?, peerHost: String?, peerPort: Int): SslHandler {
val sslHandler = super.getSslHandler(alloc, peerHost, peerPort)
applyThreadPoolName()
val engine = super.getSslHandler(alloc, peerHost, peerPort).engine()
val sslHandler = NodeAcceptorSslHandler(engine, trace)
val handshakeTimeout = configuration[ArtemisTcpTransport.SSL_HANDSHAKE_TIMEOUT_NAME] as Duration?
if (handshakeTimeout != null) {
sslHandler.handshakeTimeoutMillis = handshakeTimeout.toMillis()
}
return sslHandler
}
/**
* [NettyAcceptor.start] has hardcoded the thread pool name and does not provide a way to configure it. This is a workaround.
*/
private fun applyThreadPoolName() {
val matcher = defaultThreadPoolNamePattern.matcher(Thread.currentThread().name)
if (matcher.matches()) {
Thread.currentThread().name = "$threadPoolName-${matcher.group(1)}" // Preserve the pool thread number
}
}
}
private class NodeAcceptorSslHandler(engine: SSLEngine, private val trace: Boolean) : SslHandler(engine) {
companion object {
private val logger = contextLogger()
}
override fun handlerAdded(ctx: ChannelHandlerContext) {
logHandshake()
super.handlerAdded(ctx)
// Unfortunately NettyAcceptor does not let us add extra child handlers, so we have to add our logger this way.
if (trace) {
ctx.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO))
}
}
private fun logHandshake() {
val start = System.currentTimeMillis()
handshakeFuture().addListener {
val duration = System.currentTimeMillis() - start
when {
it.isSuccess -> logger.info("SSL handshake completed in ${duration}ms with ${engine().session.peerPrincipal}")
it.isCancelled -> logger.warn("SSL handshake cancelled after ${duration}ms")
else -> when (it.cause()) {
is ClosedChannelException -> logger.warn("SSL handshake closed early after ${duration}ms")
is SslHandshakeTimeoutException -> logger.warn("SSL handshake timed out after ${duration}ms")
else -> logger.warn("SSL handshake failed after ${duration}ms", it.cause())
}
}
}
}
}
}

View File

@ -117,6 +117,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
cacheFactory: NamedCacheFactory,
private val isDrainingModeOn: () -> Boolean,
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>,
private val threadPoolName: String = "P2PClient",
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log),
private val terminateOnConnectionError: Boolean = true,
private val timeoutConfig: TimeoutConfig = TimeoutConfig.default()
@ -205,10 +206,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
started = true
log.info("Connecting to message broker: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions)
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions, threadPoolName = threadPoolName)
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
callTimeout = timeoutConfig.callTimeout.toMillis()
connectionTTL = timeoutConfig.serverConnectionTtl.toMillis()
clientFailureCheckPeriod = timeoutConfig.clientConnectionTtl.toMillis()

View File

@ -3,13 +3,14 @@ package net.corda.coretesting.internal
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.ssl.SslContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.DefaultThreadFactory
import java.io.Closeable
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
@ -17,7 +18,6 @@ import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.SSLEngine
import kotlin.concurrent.thread
class NettyTestClient(
val sslContext: SslContext?,
val targetHost: String,
@ -49,7 +49,7 @@ class NettyTestClient(
private fun run() {
// Configure the client.
val group = NioEventLoopGroup()
val group = NioEventLoopGroup(DefaultThreadFactory("NettyTestClient"))
try {
val b = Bootstrap()
b.group(group)

View File

@ -11,6 +11,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.handler.ssl.SslContext
import io.netty.util.concurrent.DefaultThreadFactory
import java.io.Closeable
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
@ -45,8 +46,8 @@ class NettyTestServer(
fun run() {
// Configure the server.
val bossGroup = NioEventLoopGroup(1)
val workerGroup = NioEventLoopGroup()
val bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("NettyTestServer-boss"))
val workerGroup = NioEventLoopGroup(DefaultThreadFactory("NettyTestServer-worker"))
try {
val b = ServerBootstrap()
b.group(bossGroup, workerGroup)