mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
Merge pull request #7370 from corda/shams-4.10-fwrd-merge-7b7e3847
ENT-6515, ENT-9941: 4.9 to 4.10 forward merge
This commit is contained in:
commit
046ed0ac3b
@ -24,7 +24,9 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
private val confirmationWindowSize: Int = -1,
|
||||
private val messagingServerConnectionConfig: MessagingServerConnectionConfiguration? = null,
|
||||
private val backupServerAddressPool: List<NetworkHostAndPort> = emptyList(),
|
||||
private val failoverCallback: ((FailoverEventType) -> Unit)? = null
|
||||
private val failoverCallback: ((FailoverEventType) -> Unit)? = null,
|
||||
private val threadPoolName: String = "ArtemisClient",
|
||||
private val trace: Boolean = false
|
||||
) : ArtemisSessionProvider {
|
||||
companion object {
|
||||
private val log = loggerFor<ArtemisMessagingClient>()
|
||||
@ -39,8 +41,10 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
|
||||
override fun start(): Started = synchronized(this) {
|
||||
check(started == null) { "start can't be called twice" }
|
||||
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config)
|
||||
val backupTransports = backupServerAddressPool.map { p2pConnectorTcpTransport(it, config) }
|
||||
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config, threadPoolName = threadPoolName, trace = trace)
|
||||
val backupTransports = backupServerAddressPool.map {
|
||||
p2pConnectorTcpTransport(it, config, threadPoolName = threadPoolName, trace = trace)
|
||||
}
|
||||
|
||||
log.info("Connecting to message broker: $serverAddress")
|
||||
if (backupTransports.isNotEmpty()) {
|
||||
@ -49,8 +53,6 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
// If back-up artemis addresses are configured, the locator will be created using HA mode.
|
||||
@Suppress("SpreadOperator")
|
||||
val locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply {
|
||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||
// would be the default and the two lines below can be deleted.
|
||||
connectionTTL = 60000
|
||||
clientFailureCheckPeriod = 30000
|
||||
callFailoverTimeout = java.lang.Long.getLong(CORDA_ARTEMIS_CALL_TIMEOUT_PROP_NAME, CORDA_ARTEMIS_CALL_TIMEOUT_DEFAULT)
|
||||
|
@ -9,10 +9,10 @@ import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.config.SslConfiguration
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
import java.nio.file.Path
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
class ArtemisTcpTransport {
|
||||
companion object {
|
||||
val CIPHER_SUITES = listOf(
|
||||
@ -22,8 +22,9 @@ class ArtemisTcpTransport {
|
||||
|
||||
val TLS_VERSIONS = listOf("TLSv1.2")
|
||||
|
||||
const val SSL_HANDSHAKE_TIMEOUT_NAME = "SSLHandshakeTimeout"
|
||||
const val TRACE_NAME = "trace"
|
||||
const val SSL_HANDSHAKE_TIMEOUT_NAME = "Corda-SSLHandshakeTimeout"
|
||||
const val TRACE_NAME = "Corda-Trace"
|
||||
const val THREAD_POOL_NAME_NAME = "Corda-ThreadPoolName"
|
||||
|
||||
// Turn on AMQP support, which needs the protocol jar on the classpath.
|
||||
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
|
||||
@ -94,24 +95,25 @@ class ArtemisTcpTransport {
|
||||
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: MutualSslConfiguration?,
|
||||
enableSSL: Boolean = true,
|
||||
threadPoolName: String = "P2PServer",
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
if (enableSSL) {
|
||||
config?.addToTransportOptions(options)
|
||||
}
|
||||
return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, trace)
|
||||
return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace)
|
||||
}
|
||||
|
||||
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: MutualSslConfiguration?,
|
||||
enableSSL: Boolean = true,
|
||||
keyStoreType: String? = null): TransportConfiguration {
|
||||
threadPoolName: String = "P2PClient",
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
if (enableSSL) {
|
||||
config?.addToTransportOptions(options)
|
||||
options += asMap(keyStoreType)
|
||||
}
|
||||
return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL)
|
||||
return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, threadPoolName, trace)
|
||||
}
|
||||
|
||||
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
@ -123,65 +125,89 @@ class ArtemisTcpTransport {
|
||||
config.keyStorePath.requireOnDefaultFileSystem()
|
||||
options.putAll(config.toTransportOptions())
|
||||
}
|
||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, trace)
|
||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCServer", trace)
|
||||
}
|
||||
|
||||
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
|
||||
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: ClientRpcSslOptions?,
|
||||
enableSSL: Boolean = true,
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
if (config != null && enableSSL) {
|
||||
config.trustStorePath.requireOnDefaultFileSystem()
|
||||
options.putAll(config.toTransportOptions())
|
||||
}
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL)
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, "RPCClient", trace)
|
||||
}
|
||||
|
||||
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration {
|
||||
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: SslConfiguration,
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
config.addToTransportOptions(options)
|
||||
options += asMap(keyStoreProvider)
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true)
|
||||
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCClient", trace)
|
||||
}
|
||||
|
||||
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
|
||||
config: SslConfiguration,
|
||||
keyStoreType: String? = null,
|
||||
trace: Boolean = false): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any>()
|
||||
config.addToTransportOptions(options)
|
||||
options += asMap(keyStoreType)
|
||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true, trace = trace)
|
||||
}
|
||||
|
||||
private fun asMap(keyStoreType: String?): Map<String, String> {
|
||||
return keyStoreType?.let { mutableMapOf(TransportConstants.KEYSTORE_TYPE_PROP_NAME to it) } ?: emptyMap()
|
||||
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, true, "Internal-RPCServer", trace)
|
||||
}
|
||||
|
||||
private fun createAcceptorTransport(hostAndPort: NetworkHostAndPort,
|
||||
protocols: String,
|
||||
options: MutableMap<String, Any>,
|
||||
enableSSL: Boolean,
|
||||
threadPoolName: String,
|
||||
trace: Boolean): TransportConfiguration {
|
||||
options += defaultArtemisOptions(hostAndPort, protocols)
|
||||
if (enableSSL) {
|
||||
options += defaultSSLOptions
|
||||
}
|
||||
// Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections
|
||||
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0
|
||||
options[TRACE_NAME] = trace
|
||||
return TransportConfiguration("net.corda.node.services.messaging.NodeNettyAcceptorFactory", options)
|
||||
return createTransport(
|
||||
"net.corda.node.services.messaging.NodeNettyAcceptorFactory",
|
||||
hostAndPort,
|
||||
protocols,
|
||||
options,
|
||||
enableSSL,
|
||||
threadPoolName,
|
||||
trace
|
||||
)
|
||||
}
|
||||
|
||||
private fun createConnectorTransport(hostAndPort: NetworkHostAndPort,
|
||||
protocols: String,
|
||||
options: MutableMap<String, Any>,
|
||||
enableSSL: Boolean): TransportConfiguration {
|
||||
enableSSL: Boolean,
|
||||
threadPoolName: String,
|
||||
trace: Boolean): TransportConfiguration {
|
||||
return createTransport(
|
||||
NodeNettyConnectorFactory::class.java.name,
|
||||
hostAndPort,
|
||||
protocols,
|
||||
options,
|
||||
enableSSL,
|
||||
threadPoolName,
|
||||
trace
|
||||
)
|
||||
}
|
||||
|
||||
private fun createTransport(className: String,
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
protocols: String,
|
||||
options: MutableMap<String, Any>,
|
||||
enableSSL: Boolean,
|
||||
threadPoolName: String,
|
||||
trace: Boolean): TransportConfiguration {
|
||||
options += defaultArtemisOptions(hostAndPort, protocols)
|
||||
if (enableSSL) {
|
||||
options += defaultSSLOptions
|
||||
// This is required to stop Client checking URL address vs. Server provided certificate
|
||||
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
|
||||
}
|
||||
return TransportConfiguration(NettyConnectorFactory::class.java.name, options)
|
||||
options[THREAD_POOL_NAME_NAME] = threadPoolName
|
||||
options[TRACE_NAME] = trace
|
||||
return TransportConfiguration(className, options)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,61 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import io.netty.channel.ChannelPipeline
|
||||
import io.netty.handler.logging.LogLevel
|
||||
import io.netty.handler.logging.LoggingHandler
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector
|
||||
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connector
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
|
||||
class NodeNettyConnectorFactory : ConnectorFactory {
|
||||
override fun createConnector(configuration: MutableMap<String, Any>?,
|
||||
handler: BufferHandler?,
|
||||
listener: ClientConnectionLifeCycleListener?,
|
||||
closeExecutor: Executor?,
|
||||
threadPool: Executor?,
|
||||
scheduledThreadPool: ScheduledExecutorService?,
|
||||
protocolManager: ClientProtocolManager?): Connector {
|
||||
val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "Connector", configuration)
|
||||
val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
|
||||
return NettyConnector(
|
||||
configuration,
|
||||
handler,
|
||||
listener,
|
||||
closeExecutor,
|
||||
threadPool,
|
||||
scheduledThreadPool,
|
||||
MyClientProtocolManager(threadPoolName, trace)
|
||||
)
|
||||
}
|
||||
|
||||
override fun isReliable(): Boolean = false
|
||||
|
||||
override fun getDefaults(): Map<String?, Any?> = NettyConnector.DEFAULT_CONFIG
|
||||
|
||||
|
||||
private class MyClientProtocolManager(private val threadPoolName: String, private val trace: Boolean) : ActiveMQClientProtocolManager() {
|
||||
override fun addChannelHandlers(pipeline: ChannelPipeline) {
|
||||
applyThreadPoolName()
|
||||
super.addChannelHandlers(pipeline)
|
||||
if (trace) {
|
||||
pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* [NettyConnector.start] does not provide a way to configure the thread pool name, so we modify the thread name accordingly.
|
||||
*/
|
||||
private fun applyThreadPoolName() {
|
||||
with(Thread.currentThread()) {
|
||||
name = name.replace("nioEventLoopGroup", threadPoolName) // pool and thread numbers are preserved
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -5,16 +5,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
import io.netty.channel.EventLoop
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
||||
import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY
|
||||
import net.corda.nodeapi.internal.config.CertificateStore
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||
@ -503,7 +504,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS)
|
||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS, DefaultThreadFactory("AMQPBridge", Thread.MAX_PRIORITY))
|
||||
val artemis = artemisMessageClientFactory()
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
|
@ -1,3 +1,5 @@
|
||||
@file:Suppress("MagicNumber", "TooGenericExceptionCaught")
|
||||
|
||||
package net.corda.nodeapi.internal.crypto
|
||||
|
||||
import net.corda.core.CordaOID
|
||||
@ -6,6 +8,8 @@ import net.corda.core.crypto.newSecureRandom
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.core.utilities.toHex
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.distributionPointsToString
|
||||
import org.bouncycastle.asn1.*
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.bouncycastle.asn1.x500.style.BCStyle
|
||||
@ -368,7 +372,6 @@ object X509Utilities {
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("MagicNumber")
|
||||
private fun generateCertificateSerialNumber(): BigInteger {
|
||||
val bytes = ByteArray(CERTIFICATE_SERIAL_NUMBER_LENGTH)
|
||||
newSecureRandom().nextBytes(bytes)
|
||||
@ -408,6 +411,29 @@ fun PKCS10CertificationRequest.isSignatureValid(): Boolean {
|
||||
return this.isSignatureValid(JcaContentVerifierProviderBuilder().build(this.subjectPublicKeyInfo))
|
||||
}
|
||||
|
||||
fun X509Certificate.toSimpleString(): String {
|
||||
val bcCert = toBc()
|
||||
val keyIdentifier = try {
|
||||
SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex()
|
||||
} catch (e: Exception) {
|
||||
"null"
|
||||
}
|
||||
val authorityKeyIdentifier = try {
|
||||
AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex()
|
||||
} catch (e: Exception) {
|
||||
"null"
|
||||
}
|
||||
val subject = bcCert.subject
|
||||
val issuer = bcCert.issuer
|
||||
val role = CertRole.extract(this)
|
||||
return "$subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] $role $serialNumber [${distributionPointsToString()}]"
|
||||
}
|
||||
|
||||
fun X509CRL.toSimpleString(): String {
|
||||
val revokedSerialNumbers = revokedCertificates?.map { it.serialNumber }
|
||||
return "$issuerX500Principal ${thisUpdate.toInstant()} ${nextUpdate.toInstant()} ${revokedSerialNumbers ?: "[]"}"
|
||||
}
|
||||
|
||||
/**
|
||||
* Check certificate validity or print warning if expiry is within 30 days
|
||||
*/
|
||||
|
@ -115,11 +115,10 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
val transport = connection.transport as ProtonJTransport
|
||||
transport.protocolTracer = object : ProtocolTracer {
|
||||
override fun sentFrame(transportFrame: TransportFrame) {
|
||||
logInfoWithMDC { "${transportFrame.body}" }
|
||||
logInfoWithMDC { "sentFrame: ${transportFrame.body}" }
|
||||
}
|
||||
|
||||
override fun receivedFrame(transportFrame: TransportFrame) {
|
||||
logInfoWithMDC { "${transportFrame.body}" }
|
||||
logInfoWithMDC { "receivedFrame: ${transportFrame.body}" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,11 @@
|
||||
package net.corda.nodeapi.internal.protonwrapper.netty
|
||||
|
||||
import io.netty.bootstrap.Bootstrap
|
||||
import io.netty.channel.*
|
||||
import io.netty.channel.Channel
|
||||
import io.netty.channel.ChannelFutureListener
|
||||
import io.netty.channel.ChannelHandler
|
||||
import io.netty.channel.ChannelInitializer
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
import io.netty.channel.socket.nio.NioSocketChannel
|
||||
@ -11,6 +15,7 @@ import io.netty.handler.proxy.HttpProxyHandler
|
||||
import io.netty.handler.proxy.Socks4ProxyHandler
|
||||
import io.netty.handler.proxy.Socks5ProxyHandler
|
||||
import io.netty.resolver.NoopAddressResolverGroup
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory
|
||||
import io.netty.util.internal.logging.Slf4JLoggerFactory
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -58,7 +63,8 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA
|
||||
class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
val allowedRemoteLegalNames: Set<CordaX500Name>,
|
||||
private val configuration: AMQPConfiguration,
|
||||
private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable {
|
||||
private val sharedThreadPool: EventLoopGroup? = null,
|
||||
private val threadPoolName: String = "AMQPClient") : AutoCloseable {
|
||||
companion object {
|
||||
init {
|
||||
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
|
||||
@ -303,7 +309,7 @@ class AMQPClient(private val targets: List<NetworkHostAndPort>,
|
||||
return
|
||||
}
|
||||
log.info("Connect to: $currentTarget")
|
||||
workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS)
|
||||
workerGroup = sharedThreadPool ?: NioEventLoopGroup(NUM_CLIENT_THREADS, DefaultThreadFactory(threadPoolName, Thread.MAX_PRIORITY))
|
||||
started = true
|
||||
restart()
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import io.netty.channel.socket.SocketChannel
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel
|
||||
import io.netty.handler.logging.LogLevel
|
||||
import io.netty.handler.logging.LoggingHandler
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory
|
||||
import io.netty.util.internal.logging.Slf4JLoggerFactory
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
@ -37,8 +38,8 @@ import kotlin.concurrent.withLock
|
||||
*/
|
||||
class AMQPServer(val hostName: String,
|
||||
val port: Int,
|
||||
private val configuration: AMQPConfiguration) : AutoCloseable {
|
||||
|
||||
private val configuration: AMQPConfiguration,
|
||||
private val threadPoolName: String = "AMQPServer") : AutoCloseable {
|
||||
companion object {
|
||||
init {
|
||||
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
|
||||
@ -131,8 +132,8 @@ class AMQPServer(val hostName: String,
|
||||
lock.withLock {
|
||||
stop()
|
||||
|
||||
bossGroup = NioEventLoopGroup(1)
|
||||
workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS)
|
||||
bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("$threadPoolName-boss", Thread.MAX_PRIORITY))
|
||||
workerGroup = NioEventLoopGroup(NUM_SERVER_THREADS, DefaultThreadFactory("$threadPoolName-worker", Thread.MAX_PRIORITY))
|
||||
|
||||
val server = ServerBootstrap()
|
||||
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
|
||||
|
@ -14,33 +14,38 @@ import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.toHex
|
||||
import net.corda.nodeapi.internal.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.internal.config.CertificateStore
|
||||
import net.corda.nodeapi.internal.crypto.toBc
|
||||
import net.corda.nodeapi.internal.crypto.toSimpleString
|
||||
import net.corda.nodeapi.internal.crypto.x509
|
||||
import org.bouncycastle.asn1.ASN1InputStream
|
||||
import org.bouncycastle.asn1.ASN1Primitive
|
||||
import org.bouncycastle.asn1.ASN1IA5String
|
||||
import org.bouncycastle.asn1.DEROctetString
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier
|
||||
import org.bouncycastle.asn1.x509.CRLDistPoint
|
||||
import org.bouncycastle.asn1.x509.DistributionPointName
|
||||
import org.bouncycastle.asn1.x509.Extension
|
||||
import org.bouncycastle.asn1.x509.GeneralName
|
||||
import org.bouncycastle.asn1.x509.GeneralNames
|
||||
import org.bouncycastle.asn1.x509.SubjectKeyIdentifier
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.net.Socket
|
||||
import java.net.URI
|
||||
import java.security.KeyStore
|
||||
import java.security.cert.*
|
||||
import java.util.*
|
||||
import java.security.cert.CertificateException
|
||||
import java.security.cert.PKIXBuilderParameters
|
||||
import java.security.cert.PKIXRevocationChecker
|
||||
import java.security.cert.X509CertSelector
|
||||
import java.security.cert.X509Certificate
|
||||
import java.util.concurrent.Executor
|
||||
import javax.net.ssl.*
|
||||
import javax.net.ssl.CertPathTrustManagerParameters
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
import javax.net.ssl.SNIHostName
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.SSLEngine
|
||||
import javax.net.ssl.TrustManagerFactory
|
||||
import javax.net.ssl.X509ExtendedTrustManager
|
||||
import javax.security.auth.x500.X500Principal
|
||||
import kotlin.collections.HashMap
|
||||
import kotlin.system.measureTimeMillis
|
||||
|
||||
private const val HOSTNAME_FORMAT = "%s.corda.net"
|
||||
@ -109,23 +114,7 @@ fun certPathToString(certPath: Array<out X509Certificate>?): String {
|
||||
if (certPath == null) {
|
||||
return "<empty certpath>"
|
||||
}
|
||||
val certs = certPath.map {
|
||||
val bcCert = it.toBc()
|
||||
val subject = bcCert.subject.toString()
|
||||
val issuer = bcCert.issuer.toString()
|
||||
val keyIdentifier = try {
|
||||
SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex()
|
||||
} catch (ex: Exception) {
|
||||
"null"
|
||||
}
|
||||
val authorityKeyIdentifier = try {
|
||||
AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex()
|
||||
} catch (ex: Exception) {
|
||||
"null"
|
||||
}
|
||||
" $subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier] [${it.distributionPointsToString()}]"
|
||||
}
|
||||
return certs.joinToString("\r\n")
|
||||
return certPath.joinToString(System.lineSeparator()) { " ${it.toSimpleString()}" }
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -3,7 +3,10 @@ package net.corda.nodeapi.internal.revocation
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import net.corda.core.internal.readFully
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
|
||||
import net.corda.nodeapi.internal.crypto.toSimpleString
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.distributionPoints
|
||||
import java.net.URI
|
||||
@ -15,8 +18,11 @@ import javax.security.auth.x500.X500Principal
|
||||
/**
|
||||
* [CrlSource] which downloads CRLs from the distribution points in the X509 certificate.
|
||||
*/
|
||||
@Suppress("TooGenericExceptionCaught")
|
||||
class CertDistPointCrlSource : CrlSource {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
|
||||
// The default SSL handshake timeout is 60s (DEFAULT_SSL_HANDSHAKE_TIMEOUT). Considering there are 3 CRLs endpoints to check in a
|
||||
// node handshake, we want to keep the total timeout within that.
|
||||
private const val DEFAULT_CONNECT_TIMEOUT = 9_000
|
||||
@ -33,7 +39,8 @@ class CertDistPointCrlSource : CrlSource {
|
||||
private val readTimeout = Integer.getInteger("net.corda.dpcrl.read.timeout", DEFAULT_READ_TIMEOUT)
|
||||
|
||||
private fun retrieveCRL(uri: URI): X509CRL {
|
||||
val bytes = run {
|
||||
val start = System.currentTimeMillis()
|
||||
val bytes = try {
|
||||
val conn = uri.toURL().openConnection()
|
||||
conn.connectTimeout = connectTimeout
|
||||
conn.readTimeout = readTimeout
|
||||
@ -41,12 +48,26 @@ class CertDistPointCrlSource : CrlSource {
|
||||
// in an InputStream, but the JDK implementation (sun.security.provider.X509Factory.engineGenerateCRL) converts any IOException
|
||||
// into CRLException and drops the cause chain.
|
||||
conn.getInputStream().readFully()
|
||||
} catch (e: Exception) {
|
||||
if (logger.isDebugEnabled) {
|
||||
logger.debug("Unable to download CRL from $uri (${System.currentTimeMillis() - start}ms)", e)
|
||||
}
|
||||
throw e
|
||||
}
|
||||
return X509CertificateFactory().generateCRL(bytes.inputStream())
|
||||
val duration = System.currentTimeMillis() - start
|
||||
val crl = try {
|
||||
X509CertificateFactory().generateCRL(bytes.inputStream())
|
||||
} catch (e: Exception) {
|
||||
if (logger.isDebugEnabled) {
|
||||
logger.debug("Invalid CRL from $uri (${duration}ms)", e)
|
||||
}
|
||||
throw e
|
||||
}
|
||||
logger.debug { "CRL from $uri (${duration}ms): ${crl.toSimpleString()}" }
|
||||
return crl
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("TooGenericExceptionCaught")
|
||||
override fun fetch(certificate: X509Certificate): Set<X509CRL> {
|
||||
val approvedCRLs = HashSet<X509CRL>()
|
||||
var exception: Exception? = null
|
||||
|
@ -1,6 +1,8 @@
|
||||
package net.corda.nodeapi.internal.revocation
|
||||
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.crypto.toSimpleString
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource
|
||||
import org.bouncycastle.asn1.x509.Extension
|
||||
import java.security.cert.CRLReason
|
||||
@ -27,8 +29,8 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
|
||||
private val softFailExceptions = ArrayList<CertPathValidatorException>()
|
||||
|
||||
override fun check(cert: Certificate, unresolvedCritExts: MutableCollection<String>?) {
|
||||
val x509Certificate = cert as X509Certificate
|
||||
checkApprovedCRLs(x509Certificate, getCRLs(x509Certificate))
|
||||
cert as X509Certificate
|
||||
checkApprovedCRLs(cert, getCRLs(cert))
|
||||
}
|
||||
|
||||
@Suppress("TooGenericExceptionCaught")
|
||||
@ -40,30 +42,27 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
|
||||
addSoftFailException(e)
|
||||
return emptySet()
|
||||
} else {
|
||||
throw undeterminedRevocationException("Unable to retrieve CRLs", e)
|
||||
throw undeterminedRevocationException("Unable to retrieve CRLs for cert ${cert.serialNumber}", e)
|
||||
}
|
||||
}
|
||||
if (crls.isNotEmpty() || softFail) {
|
||||
return crls
|
||||
}
|
||||
// Note, the JDK tries to find a valid CRL from a different signing key before giving up (RevocationChecker.verifyWithSeparateSigningKey)
|
||||
throw undeterminedRevocationException("Could not find any valid CRLs", null)
|
||||
throw undeterminedRevocationException("Could not find any valid CRLs for cert ${cert.serialNumber}", null)
|
||||
}
|
||||
|
||||
/**
|
||||
* Borrowed from `RevocationChecker.checkApprovedCRLs()`
|
||||
*/
|
||||
@Suppress("NestedBlockDepth")
|
||||
@Throws(CertPathValidatorException::class)
|
||||
private fun checkApprovedCRLs(cert: X509Certificate, approvedCRLs: Set<X509CRL>) {
|
||||
// See if the cert is in the set of approved crls.
|
||||
logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() cert SN: ${cert.serialNumber}")
|
||||
logger.debug { "Check cert ${cert.serialNumber} against CRLs ${approvedCRLs.map { it.toSimpleString() }}" }
|
||||
|
||||
for (crl in approvedCRLs) {
|
||||
val entry = crl.getRevokedCertificate(cert)
|
||||
if (entry != null) {
|
||||
logger.debug("ExternalSourceRevocationChecker.checkApprovedCRLs() CRL entry: $entry")
|
||||
|
||||
/*
|
||||
* Abort CRL validation and throw exception if there are any
|
||||
* unrecognized critical CRL entry extensions (see section
|
||||
@ -75,19 +74,15 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
|
||||
unresCritExts.remove(Extension.cRLDistributionPoints.id)
|
||||
unresCritExts.remove(Extension.certificateIssuer.id)
|
||||
if (unresCritExts.isNotEmpty()) {
|
||||
throw CertPathValidatorException(
|
||||
"Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts")
|
||||
throw CertPathValidatorException("Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts")
|
||||
}
|
||||
}
|
||||
|
||||
val reasonCode = entry.revocationReason ?: CRLReason.UNSPECIFIED
|
||||
val revocationDate = entry.revocationDate
|
||||
if (revocationDate.before(dateSource())) {
|
||||
val t = CertificateRevokedException(
|
||||
revocationDate, reasonCode,
|
||||
crl.issuerX500Principal, mutableMapOf())
|
||||
throw CertPathValidatorException(
|
||||
t.message, t, null, -1, BasicReason.REVOKED)
|
||||
val t = CertificateRevokedException(revocationDate, reasonCode, crl.issuerX500Principal, emptyMap())
|
||||
throw CertPathValidatorException(t.message, t, null, -1, BasicReason.REVOKED)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -105,15 +100,18 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
|
||||
return false
|
||||
}
|
||||
|
||||
override fun getSupportedExtensions(): MutableSet<String>? {
|
||||
override fun getSupportedExtensions(): Set<String>? {
|
||||
return null
|
||||
}
|
||||
|
||||
override fun init(forward: Boolean) {
|
||||
if (forward) {
|
||||
throw CertPathValidatorException("Forward checking not allowed")
|
||||
}
|
||||
softFailExceptions.clear()
|
||||
}
|
||||
|
||||
override fun getSoftFailExceptions(): MutableList<CertPathValidatorException> {
|
||||
override fun getSoftFailExceptions(): List<CertPathValidatorException> {
|
||||
return Collections.unmodifiableList(softFailExceptions)
|
||||
}
|
||||
|
||||
@ -125,4 +123,4 @@ class CordaRevocationChecker(private val crlSource: CrlSource,
|
||||
private fun undeterminedRevocationException(message: String?, cause: Throwable?): CertPathValidatorException {
|
||||
return CertPathValidatorException(message, cause, null, -1, BasicReason.UNDETERMINED_REVOCATION_STATUS)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -205,7 +205,7 @@ class AMQPBridgeTest {
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE, null)
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE)
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
|
@ -25,7 +25,6 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.CHARLIE_NAME
|
||||
import net.corda.testing.core.MAX_MESSAGE_SIZE
|
||||
import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||
@ -50,6 +49,7 @@ import java.time.Duration
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
class CertificateRevocationListNodeTests {
|
||||
@Rule
|
||||
@JvmField
|
||||
@ -327,17 +327,18 @@ class CertificateRevocationListNodeTests {
|
||||
|
||||
private fun createAMQPClient(targetPort: Int,
|
||||
crlCheckSoftFail: Boolean,
|
||||
legalName: CordaX500Name,
|
||||
nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
|
||||
tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL",
|
||||
maxMessageSize: Int = MAX_MESSAGE_SIZE): X509Certificate {
|
||||
val baseDirectory = temporaryFolder.root.toPath() / "client"
|
||||
val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
val clientConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(baseDirectory).whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(BOB_NAME).whenever(it).myLegalName
|
||||
doReturn(legalName).whenever(it).myLegalName
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(crlCheckSoftFail).whenever(it).crlCheckSoftFail
|
||||
@ -351,28 +352,32 @@ class CertificateRevocationListNodeTests {
|
||||
override val trustStore = clientConfig.p2pSslOptions.trustStore.get()
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
}
|
||||
amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", targetPort)), setOf(ALICE_NAME, CHARLIE_NAME), amqpConfig)
|
||||
amqpClient = AMQPClient(
|
||||
listOf(NetworkHostAndPort("localhost", targetPort)),
|
||||
setOf(CHARLIE_NAME),
|
||||
amqpConfig,
|
||||
threadPoolName = legalName.organisation
|
||||
)
|
||||
|
||||
return nodeCert
|
||||
}
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
private fun createAMQPServer(port: Int,
|
||||
name: CordaX500Name = ALICE_NAME,
|
||||
legalName: CordaX500Name,
|
||||
crlCheckSoftFail: Boolean,
|
||||
nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
|
||||
tlsCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$EMPTY_CRL",
|
||||
maxMessageSize: Int = MAX_MESSAGE_SIZE,
|
||||
sslHandshakeTimeout: Duration? = null): X509Certificate {
|
||||
check(!::amqpServer.isInitialized)
|
||||
val baseDirectory = temporaryFolder.root.toPath() / "server"
|
||||
val baseDirectory = temporaryFolder.root.toPath() / legalName.organisation
|
||||
val certificatesDirectory = baseDirectory / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(baseDirectory).whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(name).whenever(it).myLegalName
|
||||
doReturn(legalName).whenever(it).myLegalName
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
}
|
||||
@ -386,7 +391,7 @@ class CertificateRevocationListNodeTests {
|
||||
override val maxMessageSize: Int = maxMessageSize
|
||||
override val sslHandshakeTimeout: Duration = sslHandshakeTimeout ?: super.sslHandshakeTimeout
|
||||
}
|
||||
amqpServer = AMQPServer("0.0.0.0", port, amqpConfig)
|
||||
amqpServer = AMQPServer("0.0.0.0", port, amqpConfig, threadPoolName = legalName.organisation)
|
||||
return nodeCert
|
||||
}
|
||||
|
||||
@ -422,7 +427,6 @@ class CertificateRevocationListNodeTests {
|
||||
return newNodeCert
|
||||
}
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
private fun verifyAMQPConnection(crlCheckSoftFail: Boolean,
|
||||
nodeCrlDistPoint: String? = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
|
||||
revokeServerCert: Boolean = false,
|
||||
@ -431,6 +435,7 @@ class CertificateRevocationListNodeTests {
|
||||
expectedConnectStatus: Boolean) {
|
||||
val serverCert = createAMQPServer(
|
||||
serverPort,
|
||||
CHARLIE_NAME,
|
||||
crlCheckSoftFail = crlCheckSoftFail,
|
||||
nodeCrlDistPoint = nodeCrlDistPoint,
|
||||
sslHandshakeTimeout = sslHandshakeTimeout
|
||||
@ -445,6 +450,7 @@ class CertificateRevocationListNodeTests {
|
||||
val clientCert = createAMQPClient(
|
||||
serverPort,
|
||||
crlCheckSoftFail = crlCheckSoftFail,
|
||||
legalName = ALICE_NAME,
|
||||
nodeCrlDistPoint = nodeCrlDistPoint
|
||||
)
|
||||
if (revokeClientCert) {
|
||||
@ -456,7 +462,8 @@ class CertificateRevocationListNodeTests {
|
||||
assertThat(serverConnect.connected).isEqualTo(expectedConnectStatus)
|
||||
}
|
||||
|
||||
private fun createArtemisServerAndClient(crlCheckSoftFail: Boolean,
|
||||
private fun createArtemisServerAndClient(legalName: CordaX500Name,
|
||||
crlCheckSoftFail: Boolean,
|
||||
crlCheckArtemisServer: Boolean,
|
||||
nodeCrlDistPoint: String,
|
||||
sslHandshakeTimeout: Duration?): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
@ -467,7 +474,7 @@ class CertificateRevocationListNodeTests {
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(baseDirectory).whenever(it).baseDirectory
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(CHARLIE_NAME).whenever(it).myLegalName
|
||||
doReturn(legalName).whenever(it).myLegalName
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress
|
||||
@ -478,14 +485,25 @@ class CertificateRevocationListNodeTests {
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
recreateNodeCaAndTlsCertificates(signingCertificateStore, p2pSslConfiguration, nodeCrlDistPoint, null)
|
||||
|
||||
val server = ArtemisMessagingServer(artemisConfig, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE, null)
|
||||
val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE)
|
||||
val server = ArtemisMessagingServer(
|
||||
artemisConfig,
|
||||
artemisConfig.p2pAddress,
|
||||
MAX_MESSAGE_SIZE,
|
||||
threadPoolName = "${legalName.organisation}-server",
|
||||
trace = true
|
||||
)
|
||||
val client = ArtemisMessagingClient(
|
||||
artemisConfig.p2pSslOptions,
|
||||
artemisConfig.p2pAddress,
|
||||
MAX_MESSAGE_SIZE,
|
||||
threadPoolName = "${legalName.organisation}-client",
|
||||
trace = true
|
||||
)
|
||||
server.start()
|
||||
client.start()
|
||||
return server to client
|
||||
}
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
private fun verifyArtemisConnection(crlCheckSoftFail: Boolean,
|
||||
crlCheckArtemisServer: Boolean,
|
||||
expectedConnected: Boolean = true,
|
||||
@ -494,13 +512,19 @@ class CertificateRevocationListNodeTests {
|
||||
nodeCrlDistPoint: String = "http://${crlServer.hostAndPort}/crl/$NODE_CRL",
|
||||
sslHandshakeTimeout: Duration? = null) {
|
||||
val queueName = P2P_PREFIX + "Test"
|
||||
val (artemisServer, artemisClient) = createArtemisServerAndClient(crlCheckSoftFail, crlCheckArtemisServer, nodeCrlDistPoint, sslHandshakeTimeout)
|
||||
val (artemisServer, artemisClient) = createArtemisServerAndClient(
|
||||
CHARLIE_NAME,
|
||||
crlCheckSoftFail,
|
||||
crlCheckArtemisServer,
|
||||
nodeCrlDistPoint,
|
||||
sslHandshakeTimeout
|
||||
)
|
||||
artemisServer.use {
|
||||
artemisClient.started!!.session.createQueue(
|
||||
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true)
|
||||
)
|
||||
|
||||
val nodeCert = createAMQPClient(serverPort, true, nodeCrlDistPoint)
|
||||
val nodeCert = createAMQPClient(serverPort, true, ALICE_NAME, nodeCrlDistPoint)
|
||||
if (revokedNodeCert) {
|
||||
crlServer.revokedNodeCerts.add(nodeCert.serialNumber)
|
||||
}
|
||||
|
@ -541,7 +541,7 @@ class ProtonWrapperTests {
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize, null)
|
||||
val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize)
|
||||
val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", artemisPort), maxMessageSize)
|
||||
server.start()
|
||||
client.start()
|
||||
|
@ -243,7 +243,7 @@ class ArtemisMessagingTest {
|
||||
}
|
||||
|
||||
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null, true).apply {
|
||||
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, trace = true).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
messagingServer = this
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ class SimpleMQClient(val target: NetworkHostAndPort,
|
||||
lateinit var producer: ClientProducer
|
||||
|
||||
fun start(username: String? = null, password: String? = null, enableSSL: Boolean = true) {
|
||||
val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL)
|
||||
val tcpTransport = p2pConnectorTcpTransport(target, config, enableSSL = enableSSL, threadPoolName = "SimpleMQClient")
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||
isBlockOnNonDurableSend = true
|
||||
threadPoolMaxSize = 1
|
||||
|
@ -56,7 +56,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
|
||||
class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
private val messagingServerAddress: NetworkHostAndPort,
|
||||
private val maxMessageSize: Int,
|
||||
private val journalBufferTimeout : Int?,
|
||||
private val journalBufferTimeout : Int? = null,
|
||||
private val threadPoolName: String = "ArtemisServer",
|
||||
private val trace: Boolean = false) : ArtemisBroker, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
@ -131,9 +132,10 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
// The transaction cache is configurable, and drives other cache sizes.
|
||||
globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize)
|
||||
|
||||
acceptorConfigurations.add(p2pAcceptorTcpTransport(
|
||||
addAcceptorConfiguration(p2pAcceptorTcpTransport(
|
||||
NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port),
|
||||
config.p2pSslOptions,
|
||||
threadPoolName = threadPoolName,
|
||||
trace = trace
|
||||
))
|
||||
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
||||
@ -180,7 +182,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue)
|
||||
}
|
||||
|
||||
@Throws(IOException::class, KeyStoreException::class)
|
||||
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
|
||||
val keyStore = config.p2pSslOptions.keyStore.get().value.internal
|
||||
val trustStore = config.p2pSslOptions.trustStore.get().value.internal
|
||||
|
@ -1,11 +1,14 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.group.ChannelGroup
|
||||
import io.netty.handler.logging.LogLevel
|
||||
import io.netty.handler.logging.LoggingHandler
|
||||
import io.netty.handler.ssl.SslHandler
|
||||
import io.netty.handler.ssl.SslHandshakeTimeoutException
|
||||
import net.corda.core.internal.declaredField
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.ArtemisTcpTransport
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor
|
||||
@ -16,10 +19,14 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor
|
||||
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory
|
||||
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper
|
||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutor
|
||||
import java.nio.channels.ClosedChannelException
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.regex.Pattern
|
||||
import javax.net.ssl.SSLEngine
|
||||
|
||||
@Suppress("unused") // Used via reflection in ArtemisTcpTransport
|
||||
class NodeNettyAcceptorFactory : AcceptorFactory {
|
||||
@ -35,6 +42,7 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
|
||||
return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
|
||||
}
|
||||
|
||||
|
||||
private class NodeNettyAcceptor(name: String?,
|
||||
clusterConnection: ClusterConnection?,
|
||||
configuration: Map<String, Any>,
|
||||
@ -45,24 +53,76 @@ class NodeNettyAcceptorFactory : AcceptorFactory {
|
||||
protocolMap: MutableMap<String, ProtocolManager<BaseInterceptor<*>, RedirectHandler<*>>>?) :
|
||||
NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
|
||||
{
|
||||
companion object {
|
||||
private val defaultThreadPoolNamePattern = Pattern.compile("""Thread-(\d+) \(activemq-netty-threads\)""")
|
||||
}
|
||||
|
||||
private val threadPoolName = ConfigurationHelper.getStringProperty(ArtemisTcpTransport.THREAD_POOL_NAME_NAME, "NodeNettyAcceptor", configuration)
|
||||
private val trace = ConfigurationHelper.getBooleanProperty(ArtemisTcpTransport.TRACE_NAME, false, configuration)
|
||||
|
||||
@Synchronized
|
||||
override fun start() {
|
||||
super.start()
|
||||
if (configuration[ArtemisTcpTransport.TRACE_NAME] == true) {
|
||||
// Artemis does not seem to allow access to the underlying channel so we resort to reflection and get it via the
|
||||
// serverChannelGroup field. This field is only available after start(), hence why we add the logger here.
|
||||
if (trace) {
|
||||
// Unfortunately we have to resort to reflection to be able to get access to the server channel(s)
|
||||
declaredField<ChannelGroup>("serverChannelGroup").value.forEach { channel ->
|
||||
channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getSslHandler(alloc: ByteBufAllocator?, peerHost: String?, peerPort: Int): SslHandler {
|
||||
val sslHandler = super.getSslHandler(alloc, peerHost, peerPort)
|
||||
applyThreadPoolName()
|
||||
val engine = super.getSslHandler(alloc, peerHost, peerPort).engine()
|
||||
val sslHandler = NodeAcceptorSslHandler(engine, trace)
|
||||
val handshakeTimeout = configuration[ArtemisTcpTransport.SSL_HANDSHAKE_TIMEOUT_NAME] as Duration?
|
||||
if (handshakeTimeout != null) {
|
||||
sslHandler.handshakeTimeoutMillis = handshakeTimeout.toMillis()
|
||||
}
|
||||
return sslHandler
|
||||
}
|
||||
|
||||
/**
|
||||
* [NettyAcceptor.start] has hardcoded the thread pool name and does not provide a way to configure it. This is a workaround.
|
||||
*/
|
||||
private fun applyThreadPoolName() {
|
||||
val matcher = defaultThreadPoolNamePattern.matcher(Thread.currentThread().name)
|
||||
if (matcher.matches()) {
|
||||
Thread.currentThread().name = "$threadPoolName-${matcher.group(1)}" // Preserve the pool thread number
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class NodeAcceptorSslHandler(engine: SSLEngine, private val trace: Boolean) : SslHandler(engine) {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
override fun handlerAdded(ctx: ChannelHandlerContext) {
|
||||
logHandshake()
|
||||
super.handlerAdded(ctx)
|
||||
// Unfortunately NettyAcceptor does not let us add extra child handlers, so we have to add our logger this way.
|
||||
if (trace) {
|
||||
ctx.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO))
|
||||
}
|
||||
}
|
||||
|
||||
private fun logHandshake() {
|
||||
val start = System.currentTimeMillis()
|
||||
handshakeFuture().addListener {
|
||||
val duration = System.currentTimeMillis() - start
|
||||
when {
|
||||
it.isSuccess -> logger.info("SSL handshake completed in ${duration}ms with ${engine().session.peerPrincipal}")
|
||||
it.isCancelled -> logger.warn("SSL handshake cancelled after ${duration}ms")
|
||||
else -> when (it.cause()) {
|
||||
is ClosedChannelException -> logger.warn("SSL handshake closed early after ${duration}ms")
|
||||
is SslHandshakeTimeoutException -> logger.warn("SSL handshake timed out after ${duration}ms")
|
||||
else -> logger.warn("SSL handshake failed after ${duration}ms", it.cause())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -117,6 +117,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
cacheFactory: NamedCacheFactory,
|
||||
private val isDrainingModeOn: () -> Boolean,
|
||||
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>,
|
||||
private val threadPoolName: String = "P2PClient",
|
||||
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log),
|
||||
private val terminateOnConnectionError: Boolean = true,
|
||||
private val timeoutConfig: TimeoutConfig = TimeoutConfig.default()
|
||||
@ -205,10 +206,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
started = true
|
||||
log.info("Connecting to message broker: $serverAddress")
|
||||
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
||||
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions)
|
||||
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions, threadPoolName = threadPoolName)
|
||||
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||
// would be the default and the two lines below can be deleted.
|
||||
callTimeout = timeoutConfig.callTimeout.toMillis()
|
||||
connectionTTL = timeoutConfig.serverConnectionTtl.toMillis()
|
||||
clientFailureCheckPeriod = timeoutConfig.clientConnectionTtl.toMillis()
|
||||
|
@ -3,13 +3,14 @@ package net.corda.coretesting.internal
|
||||
import io.netty.bootstrap.Bootstrap
|
||||
import io.netty.channel.ChannelFuture
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter
|
||||
import io.netty.handler.ssl.SslContext
|
||||
import io.netty.channel.ChannelInitializer
|
||||
import io.netty.channel.ChannelOption
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
import io.netty.channel.socket.nio.NioSocketChannel
|
||||
import io.netty.handler.ssl.SslContext
|
||||
import io.netty.handler.ssl.SslHandler
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import java.io.Closeable
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
@ -17,7 +18,6 @@ import java.util.concurrent.locks.ReentrantLock
|
||||
import javax.net.ssl.SSLEngine
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
|
||||
class NettyTestClient(
|
||||
val sslContext: SslContext?,
|
||||
val targetHost: String,
|
||||
@ -49,7 +49,7 @@ class NettyTestClient(
|
||||
|
||||
private fun run() {
|
||||
// Configure the client.
|
||||
val group = NioEventLoopGroup()
|
||||
val group = NioEventLoopGroup(DefaultThreadFactory("NettyTestClient"))
|
||||
try {
|
||||
val b = Bootstrap()
|
||||
b.group(group)
|
||||
|
@ -11,6 +11,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
|
||||
import io.netty.handler.logging.LogLevel
|
||||
import io.netty.handler.logging.LoggingHandler
|
||||
import io.netty.handler.ssl.SslContext
|
||||
import io.netty.util.concurrent.DefaultThreadFactory
|
||||
import java.io.Closeable
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
@ -45,8 +46,8 @@ class NettyTestServer(
|
||||
|
||||
fun run() {
|
||||
// Configure the server.
|
||||
val bossGroup = NioEventLoopGroup(1)
|
||||
val workerGroup = NioEventLoopGroup()
|
||||
val bossGroup = NioEventLoopGroup(1, DefaultThreadFactory("NettyTestServer-boss"))
|
||||
val workerGroup = NioEventLoopGroup(DefaultThreadFactory("NettyTestServer-worker"))
|
||||
try {
|
||||
val b = ServerBootstrap()
|
||||
b.group(bossGroup, workerGroup)
|
||||
|
Loading…
Reference in New Issue
Block a user