ENT-9569: Apply the 60s SSL handshake timeout to the embedded Artemis server (#7315)

This commit is contained in:
Shams Asari 2023-03-22 13:22:12 +00:00 committed by GitHub
parent ae953885d6
commit 0213861d22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 845 additions and 749 deletions

View File

@ -47,6 +47,7 @@ pipeline {
GRADLE_USER_HOME = "/host_tmp/gradle"
}
steps {
authenticateGradleWrapper()
sh 'mkdir -p ${GRADLE_USER_HOME}'
snykDeltaScan(env.SNYK_API_TOKEN, env.C4_OS_SNYK_ORG_ID)
}

View File

@ -17,7 +17,6 @@ import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransportsFromList
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcInternalClientTcpTransport
import net.corda.nodeapi.internal.RoundRobinConnectionPolicy
import net.corda.nodeapi.internal.config.SslConfiguration
@ -61,8 +60,12 @@ class RPCClient<I : RPCOps>(
sslConfiguration: ClientRpcSslOptions? = null,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) : this(rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration),
configuration, serializationContext, rpcConnectorTcpTransportsFromList(haAddressPool, sslConfiguration))
) : this(
rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration),
configuration,
serializationContext,
haAddressPool.map { rpcConnectorTcpTransport(it, sslConfiguration) }
)
companion object {
private val log = contextLogger()

Binary file not shown.

View File

@ -5,7 +5,6 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransportFromList
import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import org.apache.activemq.artemis.api.core.client.*
@ -41,7 +40,7 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
override fun start(): Started = synchronized(this) {
check(started == null) { "start can't be called twice" }
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config)
val backupTransports = p2pConnectorTcpTransportFromList(backupServerAddressPool, config)
val backupTransports = backupServerAddressPool.map { p2pConnectorTcpTransport(it, config) }
log.info("Connecting to message broker: $serverAddress")
if (backupTransports.isNotEmpty()) {

View File

@ -5,16 +5,14 @@ import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.BrokerRpcSslOptions
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.SslConfiguration
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.Path
// This avoids internal types from leaking in the public API. The "external" ArtemisTcpTransport delegates to this internal one.
class ArtemisTcpTransport {
companion object {
val CIPHER_SUITES = listOf(
@ -24,6 +22,9 @@ class ArtemisTcpTransport {
val TLS_VERSIONS = listOf("TLSv1.2")
const val SSL_HANDSHAKE_TIMEOUT_NAME = "SSLHandshakeTimeout"
const val TRACE_NAME = "trace"
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
// It does not use AMQP messages for its own messages e.g. topology and heartbeats.
@ -46,17 +47,7 @@ class ArtemisTcpTransport {
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(","))
private fun SslConfiguration.toTransportOptions(): Map<String, Any> {
val options = mutableMapOf<String, Any>()
(keyStore to trustStore).addToTransportOptions(options)
return options
}
private fun Pair<FileBasedCertificateStoreSupplier?, FileBasedCertificateStoreSupplier?>.addToTransportOptions(options: MutableMap<String, Any>) {
val keyStore = first
val trustStore = second
private fun SslConfiguration.addToTransportOptions(options: MutableMap<String, Any>) {
keyStore?.let {
with (it) {
path.requireOnDefaultFileSystem()
@ -69,6 +60,8 @@ class ArtemisTcpTransport {
options.putAll(get().toTrustStoreTransportOptions(path))
}
}
options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER
options[SSL_HANDSHAKE_TIMEOUT_NAME] = handshakeTimeout ?: DEFAULT_SSL_HANDSHAKE_TIMEOUT
}
private fun CertificateStore.toKeyStoreTransportOptions(path: Path) = mapOf(
@ -98,86 +91,95 @@ class ArtemisTcpTransport {
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword,
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to false)
private val acceptorFactoryClassName = NettyAcceptorFactory::class.java.name
private val connectorFactoryClassName = NettyConnectorFactory::class.java.name
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true): TransportConfiguration {
return p2pAcceptorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false)
}
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreProvider: String? = null): TransportConfiguration {
return p2pConnectorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false, keyStoreProvider = keyStoreProvider)
}
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap()
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
config: MutualSslConfiguration?,
enableSSL: Boolean = true,
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>()
if (enableSSL) {
options.putAll(defaultSSLOptions)
(keyStore to trustStore).addToTransportOptions(options)
options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER
config?.addToTransportOptions(options)
}
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0 // Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections
return TransportConfiguration(acceptorFactoryClassName, options)
return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, trace)
}
@Suppress("LongParameterList")
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false, keyStoreProvider: String? = null): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap()
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
config: MutualSslConfiguration?,
enableSSL: Boolean = true,
keyStoreProvider: String? = null): TransportConfiguration {
val options = mutableMapOf<String, Any>()
if (enableSSL) {
options.putAll(defaultSSLOptions)
(keyStore to trustStore).addToTransportOptions(options)
options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER
keyStoreProvider?.let { options.put(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME, keyStoreProvider) }
config?.addToTransportOptions(options)
options += asMap(keyStoreProvider)
}
return TransportConfiguration(connectorFactoryClassName, options)
return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL)
}
fun p2pConnectorTcpTransportFromList(hostAndPortList: List<NetworkHostAndPort>, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreProvider: String? = null): List<TransportConfiguration> = hostAndPortList.map {
p2pConnectorTcpTransport(it, config, enableSSL, keyStoreProvider)
}
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap()
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
config: BrokerRpcSslOptions?,
enableSSL: Boolean = true,
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>()
if (config != null && enableSSL) {
config.keyStorePath.requireOnDefaultFileSystem()
options.putAll(config.toTransportOptions())
options.putAll(defaultSSLOptions)
}
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0 // Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections
return TransportConfiguration(acceptorFactoryClassName, options)
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, trace)
}
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap()
val options = mutableMapOf<String, Any>()
if (config != null && enableSSL) {
config.trustStorePath.requireOnDefaultFileSystem()
options.putAll(config.toTransportOptions())
options.putAll(defaultSSLOptions)
}
return TransportConfiguration(connectorFactoryClassName, options)
}
fun rpcConnectorTcpTransportsFromList(hostAndPortList: List<NetworkHostAndPort>, config: ClientRpcSslOptions?, enableSSL: Boolean = true): List<TransportConfiguration> = hostAndPortList.map {
rpcConnectorTcpTransport(it, config, enableSSL)
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL)
}
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration {
return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions + config.toTransportOptions() + asMap(keyStoreProvider))
val options = mutableMapOf<String, Any>()
config.addToTransportOptions(options)
options += asMap(keyStoreProvider)
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true)
}
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration {
return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions +
config.toTransportOptions() + (TransportConstants.HANDSHAKE_TIMEOUT to 0) + asMap(keyStoreProvider))
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
config: SslConfiguration,
keyStoreProvider: String? = null,
trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>()
config.addToTransportOptions(options)
options += asMap(keyStoreProvider)
return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true, trace = trace)
}
private fun asMap(keyStoreProvider: String?): Map<String, String> {
return keyStoreProvider?.let {mutableMapOf(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to it)} ?: emptyMap()
}
private fun createAcceptorTransport(hostAndPort: NetworkHostAndPort,
protocols: String,
options: MutableMap<String, Any>,
enableSSL: Boolean,
trace: Boolean): TransportConfiguration {
options += defaultArtemisOptions(hostAndPort, protocols)
if (enableSSL) {
options += defaultSSLOptions
}
// Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0
options[TRACE_NAME] = trace
return TransportConfiguration("net.corda.node.services.messaging.NodeNettyAcceptorFactory", options)
}
private fun createConnectorTransport(hostAndPort: NetworkHostAndPort,
protocols: String,
options: MutableMap<String, Any>,
enableSSL: Boolean): TransportConfiguration {
options += defaultArtemisOptions(hostAndPort, protocols)
if (enableSSL) {
options += defaultSSLOptions
}
return TransportConfiguration(NettyConnectorFactory::class.java.name, options)
}
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.slf4j.MDC
import rx.Subscription
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
@ -54,7 +55,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService? = null,
trace: Boolean,
sslHandshakeTimeout: Long?,
sslHandshakeTimeout: Duration?,
private val bridgeConnectionTTLSeconds: Int) : BridgeManager {
private val lock = ReentrantLock()
@ -69,8 +70,8 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
override val enableSNI: Boolean,
override val sourceX500Name: String? = null,
override val trace: Boolean,
private val _sslHandshakeTimeout: Long?) : AMQPConfiguration {
override val sslHandshakeTimeout: Long
private val _sslHandshakeTimeout: Duration?) : AMQPConfiguration {
override val sslHandshakeTimeout: Duration
get() = _sslHandshakeTimeout ?: super.sslHandshakeTimeout
}

View File

@ -5,16 +5,13 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.crypto.x509
import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
@ -27,6 +24,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession
import rx.Observable
import rx.subjects.PublishSubject
import java.time.Duration
import java.util.*
class BridgeControlListener(private val keyStore: CertificateStore,
@ -39,7 +37,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
bridgeMetricsService: BridgeMetricsService? = null,
trace: Boolean = false,
sslHandshakeTimeout: Long? = null,
sslHandshakeTimeout: Duration? = null,
bridgeConnectionTTLSeconds: Int = 0) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString()
private var bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
@ -57,13 +55,6 @@ class BridgeControlListener(private val keyStore: CertificateStore,
private var controlConsumer: ClientConsumer? = null
private var notifyConsumer: ClientConsumer? = null
constructor(config: MutualSslConfiguration,
p2pAddress: NetworkHostAndPort,
maxMessageSize: Int,
revocationConfig: RevocationConfig,
enableSNI: Boolean,
proxy: ProxyConfig? = null) : this(config.keyStore.get(), config.trustStore.get(), config.useOpenSsl, proxy, maxMessageSize, revocationConfig, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object {
private val log = contextLogger()
}

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.slf4j.MDC
import java.time.Duration
/**
* The LoopbackBridgeManager holds the list of independent LoopbackBridge objects that actively loopback messages to local Artemis
@ -40,7 +41,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore,
private val bridgeMetricsService: BridgeMetricsService? = null,
private val isLocalInbox: (String) -> Boolean,
trace: Boolean,
sslHandshakeTimeout: Long? = null,
sslHandshakeTimeout: Duration? = null,
bridgeConnectionTTLSeconds: Int = 0) : AMQPBridgeManager(keyStore, trustStore, useOpenSSL, proxyConfig,
maxMessageSize, revocationConfig, enableSNI,
artemisMessageClientFactory, bridgeMetricsService,

View File

@ -1,16 +1,20 @@
package net.corda.nodeapi.internal.config
import net.corda.core.utilities.seconds
import java.time.Duration
interface SslConfiguration {
val keyStore: FileBasedCertificateStoreSupplier?
val trustStore: FileBasedCertificateStoreSupplier?
val useOpenSsl: Boolean
val handshakeTimeout: Duration?
companion object {
fun mutual(keyStore: FileBasedCertificateStoreSupplier, trustStore: FileBasedCertificateStoreSupplier): MutualSslConfiguration {
return MutualSslOptions(keyStore, trustStore)
fun mutual(keyStore: FileBasedCertificateStoreSupplier,
trustStore: FileBasedCertificateStoreSupplier,
handshakeTimeout: Duration? = null): MutualSslConfiguration {
return MutualSslOptions(keyStore, trustStore, handshakeTimeout)
}
}
}
@ -21,9 +25,10 @@ interface MutualSslConfiguration : SslConfiguration {
}
private class MutualSslOptions(override val keyStore: FileBasedCertificateStoreSupplier,
override val trustStore: FileBasedCertificateStoreSupplier) : MutualSslConfiguration {
override val trustStore: FileBasedCertificateStoreSupplier,
override val handshakeTimeout: Duration?) : MutualSslConfiguration {
override val useOpenSsl: Boolean = false
}
const val DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = 60000L // Set at least 3 times higher than sun.security.provider.certpath.URICertStore.DEFAULT_CRL_CONNECT_TIMEOUT which is 15 sec
@Suppress("MagicNumber")
val DEFAULT_SSL_HANDSHAKE_TIMEOUT: Duration = 60.seconds // Set at least 3 times higher than sun.security.provider.certpath.URICertStore.DEFAULT_CRL_CONNECT_TIMEOUT which is 15 sec

View File

@ -28,6 +28,7 @@ import org.apache.qpid.proton.framing.TransportFrame
import org.slf4j.MDC
import java.net.InetSocketAddress
import java.nio.channels.ClosedChannelException
import java.security.cert.PKIXRevocationChecker
import java.security.cert.X509Certificate
import javax.net.ssl.ExtendedSSLSession
import javax.net.ssl.SNIHostName
@ -46,6 +47,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
private val password: String?,
private val trace: Boolean,
private val suppressLogs: Boolean,
private val revocationChecker: PKIXRevocationChecker,
private val onOpen: (SocketChannel, ConnectionChange) -> Unit,
private val onClose: (SocketChannel, ConnectionChange) -> Unit,
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
@ -168,6 +170,13 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
} else {
handleFailedHandshake(ctx, evt)
}
if (log.isDebugEnabled) {
withMDC {
revocationChecker.softFailExceptions.forEachIndexed { index, e ->
log.debug("Revocation soft fail exception (${index + 1}/${revocationChecker.softFailExceptions.size})", e)
}
}
}
}
}
}
@ -186,7 +195,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
}
}
@Suppress("OverridingDeprecatedMember")
@Deprecated("Deprecated in Java")
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
logWarnWithMDC("Closing channel due to nonrecoverable exception ${cause.message}")
if (log.isTraceEnabled) {
@ -298,16 +307,15 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
cause is ClosedChannelException -> logWarnWithMDC("SSL Handshake closed early.")
cause is SslHandshakeTimeoutException -> logWarnWithMDC("SSL Handshake timed out")
// Sadly the exception thrown by Netty wrapper requires that we check the message.
cause is SSLException && (cause.message?.contains("close_notify") == true)
-> logWarnWithMDC("Received close_notify during handshake")
cause is SSLException && (cause.message?.contains("close_notify") == true) -> logWarnWithMDC("Received close_notify during handshake")
// io.netty.handler.ssl.SslHandler.setHandshakeFailureTransportFailure()
cause is SSLException && (cause.message?.contains("writing TLS control frames") == true) -> logWarnWithMDC(cause.message!!)
else -> badCert = true
}
logWarnWithMDC("Handshake failure: ${evt.cause().message}")
if (log.isTraceEnabled) {
withMDC { log.trace("Handshake failure", evt.cause()) }
withMDC { log.trace("Handshake failure", cause) }
} else {
logWarnWithMDC("Handshake failure: ${cause.message}")
}
ctx.close()
}

View File

@ -26,6 +26,7 @@ import rx.Observable
import rx.subjects.PublishSubject
import java.lang.Long.min
import java.net.InetSocketAddress
import java.security.cert.PKIXRevocationChecker
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.KeyManagerFactory
@ -53,7 +54,7 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA
* otherwise it creates a self-contained Netty thraed pool and socket objects.
* Once connected it can accept application packets to send via the AMQP protocol.
*/
class AMQPClient(val targets: List<NetworkHostAndPort>,
class AMQPClient(private val targets: List<NetworkHostAndPort>,
val allowedRemoteLegalNames: Set<CordaX500Name>,
private val configuration: AMQPConfiguration,
private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable {
@ -109,24 +110,22 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
retryInterval = min(MAX_RETRY_INTERVAL, retryInterval * BACKOFF_MULTIPLIER)
}
private val connectListener = object : ChannelFutureListener {
override fun operationComplete(future: ChannelFuture) {
amqpActive = false
if (!future.isSuccess) {
log.info("Failed to connect to $currentTarget", future.cause())
private val connectListener = ChannelFutureListener { future ->
amqpActive = false
if (!future.isSuccess) {
log.info("Failed to connect to $currentTarget", future.cause())
if (started) {
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
} else {
// Connection established successfully
clientChannel = future.channel()
clientChannel?.closeFuture()?.addListener(closeListener)
log.info("Connected to $currentTarget, Local address: $localAddressString")
if (started) {
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
} else {
// Connection established successfully
clientChannel = future.channel()
clientChannel?.closeFuture()?.addListener(closeListener)
log.info("Connected to $currentTarget, Local address: $localAddressString")
}
}
@ -146,13 +145,15 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
private class ClientChannelInitializer(val parent: AMQPClient) : ChannelInitializer<SocketChannel>() {
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
private val revocationChecker: PKIXRevocationChecker
private val conf = parent.configuration
@Volatile
private lateinit var amqpChannelHandler: AMQPChannelHandler
init {
keyManagerFactory.init(conf.keyStore)
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, conf.revocationConfig))
revocationChecker = createPKIXRevocationChecker(conf.revocationConfig)
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, revocationChecker))
}
@Suppress("ComplexMethod")
@ -196,12 +197,13 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
val handler = if (parent.configuration.useOpenSsl) {
createClientOpenSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory, ch.alloc())
} else {
createClientSslHelper(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory)
createClientSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory)
}
handler.handshakeTimeoutMillis = conf.sslHandshakeTimeout
handler.handshakeTimeoutMillis = conf.sslHandshakeTimeout.toMillis()
pipeline.addLast("sslHandler", handler)
if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
amqpChannelHandler = AMQPChannelHandler(false,
amqpChannelHandler = AMQPChannelHandler(
false,
parent.allowedRemoteLegalNames,
// Single entry, key can be anything.
mapOf(DEFAULT to wrappedKeyManagerFactory),
@ -209,37 +211,42 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
conf.password,
conf.trace,
false,
onOpen = { _, change ->
parent.run {
amqpActive = true
retryInterval = MIN_RETRY_INTERVAL // reset to fast reconnect if we connect properly
_onConnection.onNext(change)
}
},
onClose = { _, change ->
if (parent.amqpChannelHandler == amqpChannelHandler) {
parent.run {
_onConnection.onNext(change)
if (change.badCert) {
log.error("Blocking future connection attempts to $target due to bad certificate on endpoint")
badCertTargets += target
}
if (started && amqpActive) {
log.debug { "Scheduling restart of $currentTarget (AMQP active)" }
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
amqpActive = false
}
}
},
onReceive = { rcv -> parent._onReceive.onNext(rcv) })
revocationChecker,
onOpen = { _, change -> onChannelOpen(change) },
onClose = { _, change -> onChannelClose(change, target) },
onReceive = parent._onReceive::onNext
)
parent.amqpChannelHandler = amqpChannelHandler
pipeline.addLast(amqpChannelHandler)
}
private fun onChannelOpen(change: ConnectionChange) {
parent.run {
amqpActive = true
retryInterval = MIN_RETRY_INTERVAL // reset to fast reconnect if we connect properly
_onConnection.onNext(change)
}
}
private fun onChannelClose(change: ConnectionChange, target: NetworkHostAndPort) {
if (parent.amqpChannelHandler != amqpChannelHandler) return
parent.run {
_onConnection.onNext(change)
if (change.badCert) {
log.error("Blocking future connection attempts to $target due to bad certificate on endpoint")
badCertTargets += target
}
if (started && amqpActive) {
log.debug { "Scheduling restart of $currentTarget (AMQP active)" }
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
amqpActive = false
}
}
}
fun start() {

View File

@ -2,7 +2,8 @@ package net.corda.nodeapi.internal.protonwrapper.netty
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS
import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT
import java.time.Duration
interface AMQPConfiguration {
/**
@ -67,8 +68,8 @@ interface AMQPConfiguration {
get() = false
@JvmDefault
val sslHandshakeTimeout: Long
get() = DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS // Aligned with sun.security.provider.certpath.URICertStore.DEFAULT_CRL_CONNECT_TIMEOUT
val sslHandshakeTimeout: Duration
get() = DEFAULT_SSL_HANDSHAKE_TIMEOUT // Aligned with sun.security.provider.certpath.URICertStore.DEFAULT_CRL_CONNECT_TIMEOUT
/**
* An optional Health Check Phrase which if passed through the channel will cause AMQP Server to echo it back instead of doing normal pipeline processing

View File

@ -25,6 +25,7 @@ import rx.Observable
import rx.subjects.PublishSubject
import java.net.BindException
import java.net.InetSocketAddress
import java.security.cert.PKIXRevocationChecker
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.KeyManagerFactory
@ -60,11 +61,13 @@ class AMQPServer(val hostName: String,
private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() {
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
private val revocationChecker: PKIXRevocationChecker
private val conf = parent.configuration
init {
keyManagerFactory.init(conf.keyStore.value.internal, conf.keyStore.entryPassword.toCharArray())
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, conf.revocationConfig))
revocationChecker = createPKIXRevocationChecker(conf.revocationConfig)
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, revocationChecker))
}
override fun initChannel(ch: SocketChannel) {
@ -75,7 +78,8 @@ class AMQPServer(val hostName: String,
pipeline.addLast("sslHandler", sslHandler)
if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
val suppressLogs = ch.remoteAddress()?.hostString in amqpConfiguration.silencedIPs
pipeline.addLast(AMQPChannelHandler(true,
pipeline.addLast(AMQPChannelHandler(
true,
null,
// Passing a mapping of legal names to key managers to be able to pick the correct one after
// SNI completion event is fired up.
@ -84,27 +88,33 @@ class AMQPServer(val hostName: String,
conf.password,
conf.trace,
suppressLogs,
onOpen = { channel, change ->
parent.run {
clientChannels[channel.remoteAddress()] = channel
_onConnection.onNext(change)
}
},
onClose = { channel, change ->
parent.run {
val remoteAddress = channel.remoteAddress()
clientChannels.remove(remoteAddress)
_onConnection.onNext(change)
}
},
onReceive = { rcv -> parent._onReceive.onNext(rcv) }))
revocationChecker,
onOpen = ::onChannelOpen,
onClose = ::onChannelClose,
onReceive = parent._onReceive::onNext
))
}
private fun onChannelOpen(channel: SocketChannel, change: ConnectionChange) {
parent.run {
clientChannels[channel.remoteAddress()] = channel
_onConnection.onNext(change)
}
}
private fun onChannelClose(channel: SocketChannel, change: ConnectionChange) {
parent.run {
val remoteAddress = channel.remoteAddress()
clientChannels.remove(remoteAddress)
_onConnection.onNext(change)
}
}
private fun createSSLHandler(amqpConfig: AMQPConfiguration, ch: SocketChannel): Pair<ChannelHandler, Map<String, CertHoldingKeyManagerFactoryWrapper>> {
return if (amqpConfig.useOpenSsl && amqpConfig.enableSNI && amqpConfig.keyStore.aliases().size > 1) {
val keyManagerFactoriesMap = splitKeystore(amqpConfig)
// SNI matching needed only when multiple nodes exist behind the server.
Pair(createServerSNIOpenSslHandler(keyManagerFactoriesMap, trustManagerFactory), keyManagerFactoriesMap)
Pair(createServerSNIOpenSniHandler(keyManagerFactoriesMap, trustManagerFactory), keyManagerFactoriesMap)
} else {
val keyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, amqpConfig)
val handler = if (amqpConfig.useOpenSsl) {
@ -113,7 +123,7 @@ class AMQPServer(val hostName: String,
// For javaSSL, SNI matching is handled at key manager level.
createServerSslHandler(amqpConfig.keyStore, keyManagerFactory, trustManagerFactory)
}
handler.handshakeTimeoutMillis = amqpConfig.sslHandshakeTimeout
handler.handshakeTimeoutMillis = amqpConfig.sslHandshakeTimeout.toMillis()
Pair(handler, mapOf(DEFAULT to keyManagerFactory))
}
}

View File

@ -11,7 +11,7 @@ object AllowAllRevocationChecker : PKIXRevocationChecker() {
private val logger = LoggerFactory.getLogger(AllowAllRevocationChecker::class.java)
override fun check(cert: Certificate?, unresolvedCritExts: MutableCollection<String>?) {
override fun check(cert: Certificate, unresolvedCritExts: Collection<String>) {
logger.debug {"Passing certificate check for: $cert"}
// Nothing to do
}
@ -20,7 +20,7 @@ object AllowAllRevocationChecker : PKIXRevocationChecker() {
return true
}
override fun getSupportedExtensions(): MutableSet<String>? {
override fun getSupportedExtensions(): Set<String>? {
return null
}
@ -28,7 +28,7 @@ object AllowAllRevocationChecker : PKIXRevocationChecker() {
// Nothing to do
}
override fun getSoftFailExceptions(): MutableList<CertPathValidatorException> {
return LinkedList()
override fun getSoftFailExceptions(): List<CertPathValidatorException> {
return Collections.emptyList()
}
}

View File

@ -46,7 +46,7 @@ internal const val DP_DEFAULT_ANSWER = "NO CRLDP ext"
internal val logger = LoggerFactory.getLogger("net.corda.nodeapi.internal.protonwrapper.netty.SSLHelper")
fun X509Certificate.distributionPoints() : Set<String>? {
fun X509Certificate.distributionPoints(): Set<String> {
logger.debug("Checking CRLDPs for $subjectX500Principal")
val crldpExtBytes = getExtensionValue(Extension.cRLDistributionPoints.id)
@ -74,13 +74,9 @@ fun X509Certificate.distributionPoints() : Set<String>? {
return generalNames.filter { it.tagNo == GeneralName.uniformResourceIdentifier}.map { DERIA5String.getInstance(it.name).string }.toSet()
}
fun X509Certificate.distributionPointsToString() : String {
fun X509Certificate.distributionPointsToString(): String {
return with(distributionPoints()) {
if(this == null || isEmpty()) {
DP_DEFAULT_ANSWER
} else {
sorted().joinToString()
}
if (isEmpty()) DP_DEFAULT_ANSWER else sorted().joinToString()
}
}
@ -117,7 +113,7 @@ class LoggingTrustManagerWrapper(val wrapped: X509ExtendedTrustManager) : X509Ex
if (chain == null) {
return "<empty certpath>"
}
return chain.map { it.toString() }.joinToString(", ")
return chain.joinToString(", ") { it.toString() }
}
private fun logErrors(chain: Array<out X509Certificate>?, block: () -> Unit) {
@ -171,14 +167,9 @@ class LoggingTrustManagerWrapper(val wrapped: X509ExtendedTrustManager) : X509Ex
private object LoggingImmediateExecutor : Executor {
override fun execute(command: Runnable?) {
override fun execute(command: Runnable) {
val log = LoggerFactory.getLogger(javaClass)
if (command == null) {
log.error("SSL handler executor called with a null command")
throw NullPointerException("command")
}
@Suppress("TooGenericExceptionCaught", "MagicNumber") // log and rethrow all exceptions
try {
val commandName = command::class.qualifiedName?.let { "[$it]" } ?: ""
@ -196,10 +187,10 @@ private object LoggingImmediateExecutor : Executor {
}
}
internal fun createClientSslHelper(target: NetworkHostAndPort,
expectedRemoteLegalNames: Set<CordaX500Name>,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
internal fun createClientSslHandler(target: NetworkHostAndPort,
expectedRemoteLegalNames: Set<CordaX500Name>,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = createAndInitSslContext(keyManagerFactory, trustManagerFactory)
val sslEngine = sslContext.createSSLEngine(target.host, target.port)
sslEngine.useClientMode = true
@ -211,7 +202,6 @@ internal fun createClientSslHelper(target: NetworkHostAndPort,
sslParameters.serverNames = listOf(SNIHostName(x500toHostName(expectedRemoteLegalNames.single())))
sslEngine.sslParameters = sslParameters
}
@Suppress("DEPRECATION")
return SslHandler(sslEngine, false, LoggingImmediateExecutor)
}
@ -229,7 +219,6 @@ internal fun createClientOpenSslHandler(target: NetworkHostAndPort,
sslParameters.serverNames = listOf(SNIHostName(x500toHostName(expectedRemoteLegalNames.single())))
sslEngine.sslParameters = sslParameters
}
@Suppress("DEPRECATION")
return SslHandler(sslEngine, false, LoggingImmediateExecutor)
}
@ -246,7 +235,15 @@ internal fun createServerSslHandler(keyStore: CertificateStore,
val sslParameters = sslEngine.sslParameters
sslParameters.sniMatchers = listOf(ServerSNIMatcher(keyStore))
sslEngine.sslParameters = sslParameters
@Suppress("DEPRECATION")
return SslHandler(sslEngine, false, LoggingImmediateExecutor)
}
internal fun createServerOpenSslHandler(keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory,
alloc: ByteBufAllocator): SslHandler {
val sslContext = getServerSslContextBuilder(keyManagerFactory, trustManagerFactory).build()
val sslEngine = sslContext.newEngine(alloc)
sslEngine.useClientMode = false
return SslHandler(sslEngine, false, LoggingImmediateExecutor)
}
@ -260,9 +257,20 @@ fun createAndInitSslContext(keyManagerFactory: KeyManagerFactory, trustManagerFa
}
@VisibleForTesting
fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore, revocationConfig: RevocationConfig): ManagerFactoryParameters {
fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore,
revocationConfig: RevocationConfig): CertPathTrustManagerParameters {
return initialiseTrustStoreAndEnableCrlChecking(trustStore, createPKIXRevocationChecker(revocationConfig))
}
fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore,
revocationChecker: PKIXRevocationChecker): CertPathTrustManagerParameters {
val pkixParams = PKIXBuilderParameters(trustStore.value.internal, X509CertSelector())
val revocationChecker = when (revocationConfig.mode) {
pkixParams.addCertPathChecker(revocationChecker)
return CertPathTrustManagerParameters(pkixParams)
}
fun createPKIXRevocationChecker(revocationConfig: RevocationConfig): PKIXRevocationChecker {
return when (revocationConfig.mode) {
RevocationConfig.Mode.OFF -> AllowAllRevocationChecker // Custom PKIXRevocationChecker skipping CRL check
RevocationConfig.Mode.EXTERNAL_SOURCE -> {
require(revocationConfig.externalCrlSource != null) { "externalCrlSource must not be null" }
@ -271,40 +279,28 @@ fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore, revoc
else -> {
val certPathBuilder = CertPathBuilder.getInstance("PKIX")
val pkixRevocationChecker = certPathBuilder.revocationChecker as PKIXRevocationChecker
pkixRevocationChecker.options = EnumSet.of(
val options = EnumSet.of(
// Prefer CRL over OCSP
PKIXRevocationChecker.Option.PREFER_CRLS,
// Don't fall back to OCSP checking
PKIXRevocationChecker.Option.NO_FALLBACK)
PKIXRevocationChecker.Option.NO_FALLBACK
)
if (revocationConfig.mode == RevocationConfig.Mode.SOFT_FAIL) {
// Allow revocation check to succeed if the revocation status cannot be determined for one of
// the following reasons: The CRL or OCSP response cannot be obtained because of a network error.
pkixRevocationChecker.options = pkixRevocationChecker.options + PKIXRevocationChecker.Option.SOFT_FAIL
options += PKIXRevocationChecker.Option.SOFT_FAIL
}
pkixRevocationChecker.options = options
pkixRevocationChecker
}
}
pkixParams.addCertPathChecker(revocationChecker)
return CertPathTrustManagerParameters(pkixParams)
}
internal fun createServerOpenSslHandler(keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory,
alloc: ByteBufAllocator): SslHandler {
val sslContext = getServerSslContextBuilder(keyManagerFactory, trustManagerFactory).build()
val sslEngine = sslContext.newEngine(alloc)
sslEngine.useClientMode = false
@Suppress("DEPRECATION")
return SslHandler(sslEngine, false, LoggingImmediateExecutor)
}
/**
* Creates a special SNI handler used only when openSSL is used for AMQPServer
*/
internal fun createServerSNIOpenSslHandler(keyManagerFactoriesMap: Map<String, KeyManagerFactory>,
internal fun createServerSNIOpenSniHandler(keyManagerFactoriesMap: Map<String, KeyManagerFactory>,
trustManagerFactory: TrustManagerFactory): SniHandler {
// Default value can be any in the map.
val sslCtxBuilder = getServerSslContextBuilder(keyManagerFactoriesMap.values.first(), trustManagerFactory)
val mapping = DomainWildcardMappingBuilder(sslCtxBuilder.build())
@ -327,7 +323,7 @@ private fun getServerSslContextBuilder(keyManagerFactory: KeyManagerFactory, tru
internal fun splitKeystore(config: AMQPConfiguration): Map<String, CertHoldingKeyManagerFactoryWrapper> {
val keyStore = config.keyStore.value.internal
val password = config.keyStore.entryPassword.toCharArray()
return keyStore.aliases().toList().map { alias ->
return keyStore.aliases().toList().associate { alias ->
val key = keyStore.getKey(alias, password)
val certs = keyStore.getCertificateChain(alias)
val x500Name = keyStore.getCertificate(alias).x509.subjectX500Principal
@ -338,7 +334,7 @@ internal fun splitKeystore(config: AMQPConfiguration): Map<String, CertHoldingKe
val newKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
newKeyManagerFactory.init(newKeyStore, password)
x500toHostName(cordaX500Name) to CertHoldingKeyManagerFactoryWrapper(newKeyManagerFactory, config)
}.toMap()
}
}
// As per Javadoc in: https://docs.oracle.com/javase/8/docs/api/javax/net/ssl/KeyManagerFactory.html `init` method

View File

@ -28,7 +28,7 @@ class SSLHelperTest {
val trustStore = sslConfig.trustStore
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(CertificateStore.fromFile(trustStore.path, trustStore.storePassword, trustStore.entryPassword, false), RevocationConfigImpl(RevocationConfig.Mode.HARD_FAIL)))
val sslHandler = createClientSslHelper(NetworkHostAndPort("localhost", 1234), setOf(legalName), keyManagerFactory, trustManagerFactory)
val sslHandler = createClientSslHandler(NetworkHostAndPort("localhost", 1234), setOf(legalName), keyManagerFactory, trustManagerFactory)
val legalNameHash = SecureHash.sha256(legalName.toString()).toString().take(32).toLowerCase()
// These hardcoded values must not be changed, something is broken if you have to change these hardcoded values.

View File

@ -264,6 +264,8 @@ tasks.register('integrationTest', Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
maxParallelForks = (System.env.CORDA_NODE_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_INT_TESTING_FORKS".toInteger()
// CertificateRevocationListNodeTests
systemProperty 'com.sun.security.crl.timeout', '4'
}
tasks.register('slowIntegrationTest', Test) {

View File

@ -27,6 +27,7 @@ import org.junit.Test
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.time.Duration
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.TrustManagerFactory
import kotlin.test.assertFalse
@ -123,7 +124,7 @@ class AMQPClientSslErrorsTest(@Suppress("unused") private val iteration: Int) {
override val keyStore = keyStore
override val trustStore = clientConfig.p2pSslOptions.trustStore.get()
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
override val sslHandshakeTimeout: Long = 3000
override val sslHandshakeTimeout: Duration = 3.seconds
}
clientKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())

View File

@ -7,6 +7,8 @@ import net.corda.core.crypto.generateKeyPair
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.coretesting.internal.rigorousMock
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
@ -22,8 +24,6 @@ import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.coretesting.internal.rigorousMock
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.internal.MOCK_VERSION_INFO
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
@ -57,7 +57,6 @@ class ArtemisMessagingTest {
@JvmField
val temporaryFolder = TemporaryFolder()
// THe
private val portAllocation = incrementalPortAllocation()
private val serverPort = portAllocation.nextPort()
private val identity = generateKeyPair()
@ -200,7 +199,9 @@ class ArtemisMessagingTest {
messagingClient!!.start(identity.public, null, maxMessageSize)
}
private fun createAndStartClientAndServer(platformVersion: Int = 1, serverMaxMessageSize: Int = MAX_MESSAGE_SIZE, clientMaxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<P2PMessagingClient, BlockingQueue<ReceivedMessage>> {
private fun createAndStartClientAndServer(platformVersion: Int = 1,
serverMaxMessageSize: Int = MAX_MESSAGE_SIZE,
clientMaxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<P2PMessagingClient, BlockingQueue<ReceivedMessage>> {
val receivedMessages = LinkedBlockingQueue<ReceivedMessage>()
createMessagingServer(maxMessageSize = serverMaxMessageSize).start()
@ -239,7 +240,7 @@ class ArtemisMessagingTest {
}
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null).apply {
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null, true).apply {
config.configureWithDevSSLCertificate()
messagingServer = this
}

View File

@ -55,7 +55,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
class ArtemisMessagingServer(private val config: NodeConfiguration,
private val messagingServerAddress: NetworkHostAndPort,
private val maxMessageSize: Int,
private val journalBufferTimeout : Int?) : ArtemisBroker, SingletonSerializeAsToken() {
private val journalBufferTimeout : Int?,
private val trace: Boolean = false) : ArtemisBroker, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
}
@ -131,7 +132,11 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
// The transaction cache is configurable, and drives other cache sizes.
globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize)
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config.p2pSslOptions))
acceptorConfigurations.add(p2pAcceptorTcpTransport(
NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port),
config.p2pSslOptions,
trace = trace
))
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess

View File

@ -0,0 +1,67 @@
package net.corda.node.services.messaging
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.group.ChannelGroup
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.handler.ssl.SslHandler
import net.corda.core.internal.declaredField
import net.corda.nodeapi.internal.ArtemisTcpTransport
import org.apache.activemq.artemis.api.core.BaseInterceptor
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager
import org.apache.activemq.artemis.spi.core.remoting.Acceptor
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener
import org.apache.activemq.artemis.utils.actors.OrderedExecutor
import java.time.Duration
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
@Suppress("unused") // Used via reflection in ArtemisTcpTransport
class NodeNettyAcceptorFactory : AcceptorFactory {
override fun createAcceptor(name: String?,
clusterConnection: ClusterConnection?,
configuration: Map<String, Any>,
handler: BufferHandler?,
listener: ServerConnectionLifeCycleListener?,
threadPool: Executor,
scheduledThreadPool: ScheduledExecutorService?,
protocolMap: Map<String, ProtocolManager<BaseInterceptor<*>>>?): Acceptor {
val failureExecutor = OrderedExecutor(threadPool)
return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
}
private class NodeNettyAcceptor(name: String?,
clusterConnection: ClusterConnection?,
configuration: Map<String, Any>,
handler: BufferHandler?,
listener: ServerConnectionLifeCycleListener?,
scheduledThreadPool: ScheduledExecutorService?,
failureExecutor: Executor,
protocolMap: Map<String, ProtocolManager<BaseInterceptor<*>>>?) :
NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
{
override fun start() {
super.start()
if (configuration[ArtemisTcpTransport.TRACE_NAME] == true) {
// Artemis does not seem to allow access to the underlying channel so we resort to reflection and get it via the
// serverChannelGroup field. This field is only available after start(), hence why we add the logger here.
declaredField<ChannelGroup>("serverChannelGroup").value.forEach { channel ->
channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO))
}
}
}
override fun getSslHandler(alloc: ByteBufAllocator?): SslHandler {
val sslHandler = super.getSslHandler(alloc)
val handshakeTimeout = configuration[ArtemisTcpTransport.SSL_HANDSHAKE_TIMEOUT_NAME] as Duration?
if (handshakeTimeout != null) {
sslHandler.handshakeTimeoutMillis = handshakeTimeout.toMillis()
}
return sslHandler
}
}
}

View File

@ -8,6 +8,7 @@ import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.SslConfiguration
import java.nio.file.Path
import java.time.Duration
class CertificateStoreStubs {
@ -49,11 +50,11 @@ class CertificateStoreStubs {
keyStorePassword: String = KeyStore.DEFAULT_STORE_PASSWORD, keyPassword: String = keyStorePassword,
trustStoreFileName: String = TrustStore.DEFAULT_STORE_FILE_NAME,
trustStorePassword: String = TrustStore.DEFAULT_STORE_PASSWORD,
trustStoreKeyPassword: String = TrustStore.DEFAULT_KEY_PASSWORD): MutualSslConfiguration {
trustStoreKeyPassword: String = TrustStore.DEFAULT_KEY_PASSWORD,
sslHandshakeTimeout: Duration? = null): MutualSslConfiguration {
val keyStore = FileBasedCertificateStoreSupplier(certificatesDirectory / keyStoreFileName, keyStorePassword, keyPassword)
val trustStore = FileBasedCertificateStoreSupplier(certificatesDirectory / trustStoreFileName, trustStorePassword, trustStoreKeyPassword)
return SslConfiguration.mutual(keyStore, trustStore)
return SslConfiguration.mutual(keyStore, trustStore, sslHandshakeTimeout)
}
@JvmStatic

View File

@ -0,0 +1,192 @@
@file:Suppress("MagicNumber")
package net.corda.testing.node.internal.network
import net.corda.core.crypto.Crypto
import net.corda.core.internal.CertRole
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.days
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.coretesting.internal.DEV_INTERMEDIATE_CA
import net.corda.coretesting.internal.DEV_ROOT_CA
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.crypto.ContentSignerBuilder
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.certificateType
import net.corda.nodeapi.internal.crypto.toJca
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x509.CRLDistPoint
import org.bouncycastle.asn1.x509.DistributionPoint
import org.bouncycastle.asn1.x509.DistributionPointName
import org.bouncycastle.asn1.x509.Extension
import org.bouncycastle.asn1.x509.GeneralName
import org.bouncycastle.asn1.x509.GeneralNames
import org.bouncycastle.asn1.x509.IssuingDistributionPoint
import org.bouncycastle.asn1.x509.ReasonFlags
import org.bouncycastle.cert.jcajce.JcaX509CRLConverter
import org.bouncycastle.cert.jcajce.JcaX509ExtensionUtils
import org.bouncycastle.cert.jcajce.JcaX509v2CRLBuilder
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.ServerConnector
import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.servlet.ServletContainer
import java.io.Closeable
import java.math.BigInteger
import java.net.InetSocketAddress
import java.security.KeyPair
import java.security.cert.X509CRL
import java.security.cert.X509Certificate
import java.util.*
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.Response
class CrlServer(hostAndPort: NetworkHostAndPort,
private val revokedNodeCerts: List<BigInteger>,
private val revokedIntermediateCerts: List<BigInteger>) : Closeable {
companion object {
private const val SIGNATURE_ALGORITHM = "SHA256withECDSA"
const val FORBIDDEN_CRL = "forbidden.crl"
fun X509Certificate.withCrlDistPoint(issuerKeyPair: KeyPair, crlDistPoint: String?, crlIssuer: X500Name? = null): X509Certificate {
val signatureScheme = Crypto.findSignatureScheme(issuerKeyPair.private)
val provider = Crypto.findProvider(signatureScheme.providerName)
val issuerSigner = ContentSignerBuilder.build(signatureScheme, issuerKeyPair.private, provider)
val builder = X509Utilities.createPartialCertificate(
CertRole.extract(this)!!.certificateType,
issuerX500Principal,
issuerKeyPair.public,
subjectX500Principal,
publicKey,
Pair(Date(System.currentTimeMillis() - 5.minutes.toMillis()), Date(System.currentTimeMillis() + 10.days.toMillis())),
null
)
if (crlDistPoint != null) {
val distPointName = DistributionPointName(GeneralNames(GeneralName(GeneralName.uniformResourceIdentifier, crlDistPoint)))
val crlIssuerGeneralNames = crlIssuer?.let { GeneralNames(GeneralName(it)) }
val distPoint = DistributionPoint(distPointName, null, crlIssuerGeneralNames)
builder.addExtension(Extension.cRLDistributionPoints, false, CRLDistPoint(arrayOf(distPoint)))
}
return builder.build(issuerSigner).toJca()
}
}
private val server: Server = Server(InetSocketAddress(hostAndPort.host, hostAndPort.port)).apply {
handler = HandlerCollection().apply {
addHandler(buildServletContextHandler())
}
}
val rootCa: CertificateAndKeyPair = DEV_ROOT_CA
private lateinit var _intermediateCa: CertificateAndKeyPair
val intermediateCa: CertificateAndKeyPair get() = _intermediateCa
val hostAndPort: NetworkHostAndPort
get() = server.connectors.mapNotNull { it as? ServerConnector }
.map { NetworkHostAndPort(it.host, it.localPort) }
.first()
fun start() {
server.start()
_intermediateCa = CertificateAndKeyPair(
DEV_INTERMEDIATE_CA.certificate.withCrlDistPoint(rootCa.keyPair, "http://$hostAndPort/crl/intermediate.crl"),
DEV_INTERMEDIATE_CA.keyPair
)
println("Network management web services started on $hostAndPort")
}
fun createRevocationList(signatureAlgorithm: String,
ca: CertificateAndKeyPair,
endpoint: String,
indirect: Boolean,
serialNumbers: List<BigInteger>): X509CRL {
println("Generating CRL for $endpoint")
val builder = JcaX509v2CRLBuilder(ca.certificate.subjectX500Principal, Date(System.currentTimeMillis() - 1.minutes.toMillis()))
val extensionUtils = JcaX509ExtensionUtils()
builder.addExtension(Extension.authorityKeyIdentifier, false, extensionUtils.createAuthorityKeyIdentifier(ca.certificate))
val issuingDistPointName = GeneralName(GeneralName.uniformResourceIdentifier, "http://$hostAndPort/crl/$endpoint")
// This is required and needs to match the certificate settings with respect to being indirect
val issuingDistPoint = IssuingDistributionPoint(DistributionPointName(GeneralNames(issuingDistPointName)), indirect, false)
builder.addExtension(Extension.issuingDistributionPoint, true, issuingDistPoint)
builder.setNextUpdate(Date(System.currentTimeMillis() + 1.seconds.toMillis()))
serialNumbers.forEach {
builder.addCRLEntry(it, Date(System.currentTimeMillis() - 10.minutes.toMillis()), ReasonFlags.certificateHold)
}
val signer = JcaContentSignerBuilder(signatureAlgorithm).setProvider(Crypto.findProvider("BC")).build(ca.keyPair.private)
return JcaX509CRLConverter().setProvider(Crypto.findProvider("BC")).getCRL(builder.build(signer))
}
override fun close() {
println("Shutting down network management web services...")
server.stop()
server.join()
}
private fun buildServletContextHandler(): ServletContextHandler {
return ServletContextHandler().apply {
contextPath = "/"
val resourceConfig = ResourceConfig().apply {
register(CrlServlet(this@CrlServer))
}
val jerseyServlet = ServletHolder(ServletContainer(resourceConfig)).apply { initOrder = 0 }
addServlet(jerseyServlet, "/*")
}
}
@Path("crl")
class CrlServlet(private val crlServer: CrlServer) {
@GET
@Path("node.crl")
@Produces("application/pkcs7-crl")
fun getNodeCRL(): Response {
return Response.ok(crlServer.createRevocationList(
SIGNATURE_ALGORITHM,
crlServer.intermediateCa,
"node.crl",
false,
crlServer.revokedNodeCerts
).encoded).build()
}
@GET
@Path(FORBIDDEN_CRL)
@Produces("application/pkcs7-crl")
fun getNodeSlowCRL(): Response {
return Response.status(Response.Status.FORBIDDEN).build()
}
@GET
@Path("intermediate.crl")
@Produces("application/pkcs7-crl")
fun getIntermediateCRL(): Response {
return Response.ok(crlServer.createRevocationList(
SIGNATURE_ALGORITHM,
crlServer.rootCa,
"intermediate.crl",
false,
crlServer.revokedIntermediateCerts
).encoded).build()
}
@GET
@Path("empty.crl")
@Produces("application/pkcs7-crl")
fun getEmptyCRL(): Response {
return Response.ok(crlServer.createRevocationList(
SIGNATURE_ALGORITHM,
crlServer.rootCa,
"empty.crl",
true, emptyList()
).encoded).build()
}
}
}