Merge pull request #7328 from corda/shams-4.10-fwrd-merge-ceb76c56

ENT-8898 & ENT-9569: Forward merge from 4.9 to 4.10 (ceb76c56)
This commit is contained in:
Rick Parker 2023-04-13 09:21:28 +01:00 committed by GitHub
commit fc53176175
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1128 additions and 870 deletions

View File

@ -47,6 +47,7 @@ pipeline {
GRADLE_USER_HOME = "/host_tmp/gradle" GRADLE_USER_HOME = "/host_tmp/gradle"
} }
steps { steps {
authenticateGradleWrapper()
sh 'mkdir -p ${GRADLE_USER_HOME}' sh 'mkdir -p ${GRADLE_USER_HOME}'
snykDeltaScan(env.SNYK_API_TOKEN, env.C4_OS_SNYK_ORG_ID) snykDeltaScan(env.SNYK_API_TOKEN, env.C4_OS_SNYK_ORG_ID)
} }

View File

@ -17,7 +17,6 @@ import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransportsFromList
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcInternalClientTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcInternalClientTcpTransport
import net.corda.nodeapi.internal.RoundRobinConnectionPolicy import net.corda.nodeapi.internal.RoundRobinConnectionPolicy
import net.corda.nodeapi.internal.config.SslConfiguration import net.corda.nodeapi.internal.config.SslConfiguration
@ -62,8 +61,12 @@ class RPCClient<I : RPCOps>(
sslConfiguration: ClientRpcSslOptions? = null, sslConfiguration: ClientRpcSslOptions? = null,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) : this(rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration), ) : this(
configuration, serializationContext, rpcConnectorTcpTransportsFromList(haAddressPool, sslConfiguration)) rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration),
configuration,
serializationContext,
haAddressPool.map { rpcConnectorTcpTransport(it, sslConfiguration) }
)
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()

View File

@ -31,7 +31,7 @@ dependencies {
// SQL connection pooling library // SQL connection pooling library
compile "com.zaxxer:HikariCP:$hikari_version" compile "com.zaxxer:HikariCP:$hikari_version"
// ClassGraph: classpath scanning // ClassGraph: classpath scanning
compile "io.github.classgraph:classgraph:$class_graph_version" compile "io.github.classgraph:classgraph:$class_graph_version"
@ -56,6 +56,9 @@ dependencies {
testRuntimeOnly "org.junit.vintage:junit-vintage-engine:${junit_vintage_version}" testRuntimeOnly "org.junit.vintage:junit-vintage-engine:${junit_vintage_version}"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junit_jupiter_version}" testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junit_jupiter_version}"
testRuntimeOnly "org.junit.platform:junit-platform-launcher:${junit_platform_version}" testRuntimeOnly "org.junit.platform:junit-platform-launcher:${junit_platform_version}"
testCompile project(':node-driver')
// Unit testing helpers. // Unit testing helpers.
testCompile "org.assertj:assertj-core:$assertj_version" testCompile "org.assertj:assertj-core:$assertj_version"
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version" testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"

View File

@ -5,7 +5,6 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransportFromList
import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration
import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration
import org.apache.activemq.artemis.api.core.client.* import org.apache.activemq.artemis.api.core.client.*
@ -41,7 +40,7 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
override fun start(): Started = synchronized(this) { override fun start(): Started = synchronized(this) {
check(started == null) { "start can't be called twice" } check(started == null) { "start can't be called twice" }
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config) val tcpTransport = p2pConnectorTcpTransport(serverAddress, config)
val backupTransports = p2pConnectorTcpTransportFromList(backupServerAddressPool, config) val backupTransports = backupServerAddressPool.map { p2pConnectorTcpTransport(it, config) }
log.info("Connecting to message broker: $serverAddress") log.info("Connecting to message broker: $serverAddress")
if (backupTransports.isNotEmpty()) { if (backupTransports.isNotEmpty()) {

View File

@ -5,16 +5,14 @@ import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.BrokerRpcSslOptions import net.corda.nodeapi.BrokerRpcSslOptions
import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT
import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.SslConfiguration import net.corda.nodeapi.internal.config.SslConfiguration
import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.Path import java.nio.file.Path
// This avoids internal types from leaking in the public API. The "external" ArtemisTcpTransport delegates to this internal one.
class ArtemisTcpTransport { class ArtemisTcpTransport {
companion object { companion object {
val CIPHER_SUITES = listOf( val CIPHER_SUITES = listOf(
@ -24,6 +22,9 @@ class ArtemisTcpTransport {
val TLS_VERSIONS = listOf("TLSv1.2") val TLS_VERSIONS = listOf("TLSv1.2")
const val SSL_HANDSHAKE_TIMEOUT_NAME = "SSLHandshakeTimeout"
const val TRACE_NAME = "trace"
// Turn on AMQP support, which needs the protocol jar on the classpath. // Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop. // Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
// It does not use AMQP messages for its own messages e.g. topology and heartbeats. // It does not use AMQP messages for its own messages e.g. topology and heartbeats.
@ -46,17 +47,7 @@ class ArtemisTcpTransport {
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","), TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(",")) TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(","))
private fun SslConfiguration.toTransportOptions(): Map<String, Any> { private fun SslConfiguration.addToTransportOptions(options: MutableMap<String, Any>) {
val options = mutableMapOf<String, Any>()
(keyStore to trustStore).addToTransportOptions(options)
return options
}
private fun Pair<FileBasedCertificateStoreSupplier?, FileBasedCertificateStoreSupplier?>.addToTransportOptions(options: MutableMap<String, Any>) {
val keyStore = first
val trustStore = second
keyStore?.let { keyStore?.let {
with (it) { with (it) {
path.requireOnDefaultFileSystem() path.requireOnDefaultFileSystem()
@ -69,6 +60,8 @@ class ArtemisTcpTransport {
options.putAll(get().toTrustStoreTransportOptions(path)) options.putAll(get().toTrustStoreTransportOptions(path))
} }
} }
options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER
options[SSL_HANDSHAKE_TIMEOUT_NAME] = handshakeTimeout ?: DEFAULT_SSL_HANDSHAKE_TIMEOUT
} }
private fun CertificateStore.toKeyStoreTransportOptions(path: Path) = mapOf( private fun CertificateStore.toKeyStoreTransportOptions(path: Path) = mapOf(
@ -98,96 +91,97 @@ class ArtemisTcpTransport {
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword, TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword,
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to false) TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to false)
private val acceptorFactoryClassName = NettyAcceptorFactory::class.java.name fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
private val connectorFactoryClassName = NettyConnectorFactory::class.java.name config: MutualSslConfiguration?,
enableSSL: Boolean = true,
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true): TransportConfiguration { trace: Boolean = false): TransportConfiguration {
val options = mutableMapOf<String, Any>()
return p2pAcceptorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false)
}
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreType: String? = null): TransportConfiguration {
return p2pConnectorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false, keyStoreType = keyStoreType)
}
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap()
if (enableSSL) { if (enableSSL) {
options.putAll(defaultSSLOptions) config?.addToTransportOptions(options)
(keyStore to trustStore).addToTransportOptions(options)
options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER
} }
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0 // Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections return createAcceptorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL, trace)
return TransportConfiguration(acceptorFactoryClassName, options)
} }
@Suppress("LongParameterList") fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort,
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false, keyStoreType: String? = null): TransportConfiguration { config: MutualSslConfiguration?,
enableSSL: Boolean = true,
val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap() keyStoreType: String? = null): TransportConfiguration {
val options = mutableMapOf<String, Any>()
if (enableSSL) { if (enableSSL) {
options.putAll(defaultSSLOptions) config?.addToTransportOptions(options)
(keyStore to trustStore).addToTransportOptions(options) options += asMap(keyStoreType)
options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER
keyStoreType?.let { options.put(TransportConstants.KEYSTORE_TYPE_PROP_NAME, keyStoreType) }
// This is required to stop Client checking URL address vs. Server provided certificate
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
} }
return TransportConfiguration(connectorFactoryClassName, options) return createConnectorTransport(hostAndPort, P2P_PROTOCOLS, options, enableSSL)
} }
fun p2pConnectorTcpTransportFromList(hostAndPortList: List<NetworkHostAndPort>, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreType: String? = null): List<TransportConfiguration> = hostAndPortList.map { fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
p2pConnectorTcpTransport(it, config, enableSSL, keyStoreType) config: BrokerRpcSslOptions?,
} enableSSL: Boolean = true,
trace: Boolean = false): TransportConfiguration {
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { val options = mutableMapOf<String, Any>()
val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap()
if (config != null && enableSSL) { if (config != null && enableSSL) {
config.keyStorePath.requireOnDefaultFileSystem() config.keyStorePath.requireOnDefaultFileSystem()
options.putAll(config.toTransportOptions()) options.putAll(config.toTransportOptions())
options.putAll(defaultSSLOptions)
} }
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0 // Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL, trace)
return TransportConfiguration(acceptorFactoryClassName, options)
} }
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap() val options = mutableMapOf<String, Any>()
if (config != null && enableSSL) { if (config != null && enableSSL) {
config.trustStorePath.requireOnDefaultFileSystem() config.trustStorePath.requireOnDefaultFileSystem()
options.putAll(config.toTransportOptions()) options.putAll(config.toTransportOptions())
options.putAll(defaultSSLOptions)
// This is required to stop Client checking URL address vs. Server provided certificate
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
} }
return TransportConfiguration(connectorFactoryClassName, options) return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL)
} }
fun rpcConnectorTcpTransportsFromList(hostAndPortList: List<NetworkHostAndPort>, config: ClientRpcSslOptions?, enableSSL: Boolean = true): List<TransportConfiguration> = hostAndPortList.map { fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration {
rpcConnectorTcpTransport(it, config, enableSSL) val options = mutableMapOf<String, Any>()
config.addToTransportOptions(options)
options += asMap(keyStoreProvider)
return createConnectorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true)
} }
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreType: String? = null): TransportConfiguration { fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort,
val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap() config: SslConfiguration,
options.putAll(defaultSSLOptions) keyStoreType: String? = null,
options.putAll(config.toTransportOptions()) trace: Boolean = false): TransportConfiguration {
options.putAll(asMap(keyStoreType)) val options = mutableMapOf<String, Any>()
// This is required to stop Client checking URL address vs. Server provided certificate config.addToTransportOptions(options)
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false options += asMap(keyStoreType)
return TransportConfiguration(connectorFactoryClassName, options) return createAcceptorTransport(hostAndPort, RPC_PROTOCOLS, options, enableSSL = true, trace = trace)
}
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreType: String? = null): TransportConfiguration {
return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions +
config.toTransportOptions() + (TransportConstants.HANDSHAKE_TIMEOUT to 0) + asMap(keyStoreType))
} }
private fun asMap(keyStoreType: String?): Map<String, String> { private fun asMap(keyStoreType: String?): Map<String, String> {
return keyStoreType?.let { mutableMapOf(TransportConstants.KEYSTORE_TYPE_PROP_NAME to it) } ?: emptyMap() return keyStoreType?.let { mutableMapOf(TransportConstants.KEYSTORE_TYPE_PROP_NAME to it) } ?: emptyMap()
} }
private fun createAcceptorTransport(hostAndPort: NetworkHostAndPort,
protocols: String,
options: MutableMap<String, Any>,
enableSSL: Boolean,
trace: Boolean): TransportConfiguration {
options += defaultArtemisOptions(hostAndPort, protocols)
if (enableSSL) {
options += defaultSSLOptions
}
// Suppress core.server.lambda$channelActive$0 - AMQ224088 error from load balancer type connections
options[TransportConstants.HANDSHAKE_TIMEOUT] = 0
options[TRACE_NAME] = trace
return TransportConfiguration("net.corda.node.services.messaging.NodeNettyAcceptorFactory", options)
}
private fun createConnectorTransport(hostAndPort: NetworkHostAndPort,
protocols: String,
options: MutableMap<String, Any>,
enableSSL: Boolean): TransportConfiguration {
options += defaultArtemisOptions(hostAndPort, protocols)
if (enableSSL) {
options += defaultSSLOptions
// This is required to stop Client checking URL address vs. Server provided certificate
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
}
return TransportConfiguration(NettyConnectorFactory::class.java.name, options)
}
} }
} }

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSession
import org.slf4j.MDC import org.slf4j.MDC
import rx.Subscription import rx.Subscription
import java.time.Duration
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture import java.util.concurrent.ScheduledFuture
@ -54,7 +55,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider, private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService? = null, private val bridgeMetricsService: BridgeMetricsService? = null,
trace: Boolean, trace: Boolean,
sslHandshakeTimeout: Long?, sslHandshakeTimeout: Duration?,
private val bridgeConnectionTTLSeconds: Int) : BridgeManager { private val bridgeConnectionTTLSeconds: Int) : BridgeManager {
private val lock = ReentrantLock() private val lock = ReentrantLock()
@ -69,8 +70,8 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
override val enableSNI: Boolean, override val enableSNI: Boolean,
override val sourceX500Name: String? = null, override val sourceX500Name: String? = null,
override val trace: Boolean, override val trace: Boolean,
private val _sslHandshakeTimeout: Long?) : AMQPConfiguration { private val _sslHandshakeTimeout: Duration?) : AMQPConfiguration {
override val sslHandshakeTimeout: Long override val sslHandshakeTimeout: Duration
get() = _sslHandshakeTimeout ?: super.sslHandshakeTimeout get() = _sslHandshakeTimeout ?: super.sslHandshakeTimeout
} }

View File

@ -5,16 +5,13 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisSessionProvider import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.crypto.x509 import net.corda.nodeapi.internal.crypto.x509
import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
@ -28,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSession
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.time.Duration
import java.util.* import java.util.*
class BridgeControlListener(private val keyStore: CertificateStore, class BridgeControlListener(private val keyStore: CertificateStore,
@ -40,7 +38,7 @@ class BridgeControlListener(private val keyStore: CertificateStore,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider, private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
bridgeMetricsService: BridgeMetricsService? = null, bridgeMetricsService: BridgeMetricsService? = null,
trace: Boolean = false, trace: Boolean = false,
sslHandshakeTimeout: Long? = null, sslHandshakeTimeout: Duration? = null,
bridgeConnectionTTLSeconds: Int = 0) : AutoCloseable { bridgeConnectionTTLSeconds: Int = 0) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString() private val bridgeId: String = UUID.randomUUID().toString()
private var bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId" private var bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
@ -58,13 +56,6 @@ class BridgeControlListener(private val keyStore: CertificateStore,
private var controlConsumer: ClientConsumer? = null private var controlConsumer: ClientConsumer? = null
private var notifyConsumer: ClientConsumer? = null private var notifyConsumer: ClientConsumer? = null
constructor(config: MutualSslConfiguration,
p2pAddress: NetworkHostAndPort,
maxMessageSize: Int,
revocationConfig: RevocationConfig,
enableSNI: Boolean,
proxy: ProxyConfig? = null) : this(config.keyStore.get(), config.trustStore.get(), config.useOpenSsl, proxy, maxMessageSize, revocationConfig, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
} }

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSession
import org.slf4j.MDC import org.slf4j.MDC
import java.time.Duration
/** /**
* The LoopbackBridgeManager holds the list of independent LoopbackBridge objects that actively loopback messages to local Artemis * The LoopbackBridgeManager holds the list of independent LoopbackBridge objects that actively loopback messages to local Artemis
@ -40,7 +41,7 @@ class LoopbackBridgeManager(keyStore: CertificateStore,
private val bridgeMetricsService: BridgeMetricsService? = null, private val bridgeMetricsService: BridgeMetricsService? = null,
private val isLocalInbox: (String) -> Boolean, private val isLocalInbox: (String) -> Boolean,
trace: Boolean, trace: Boolean,
sslHandshakeTimeout: Long? = null, sslHandshakeTimeout: Duration? = null,
bridgeConnectionTTLSeconds: Int = 0) : AMQPBridgeManager(keyStore, trustStore, useOpenSSL, proxyConfig, bridgeConnectionTTLSeconds: Int = 0) : AMQPBridgeManager(keyStore, trustStore, useOpenSSL, proxyConfig,
maxMessageSize, revocationConfig, enableSNI, maxMessageSize, revocationConfig, enableSNI,
artemisMessageClientFactory, bridgeMetricsService, artemisMessageClientFactory, bridgeMetricsService,

View File

@ -1,16 +1,20 @@
package net.corda.nodeapi.internal.config package net.corda.nodeapi.internal.config
import net.corda.core.utilities.seconds
import java.time.Duration
interface SslConfiguration { interface SslConfiguration {
val keyStore: FileBasedCertificateStoreSupplier? val keyStore: FileBasedCertificateStoreSupplier?
val trustStore: FileBasedCertificateStoreSupplier? val trustStore: FileBasedCertificateStoreSupplier?
val useOpenSsl: Boolean val useOpenSsl: Boolean
val handshakeTimeout: Duration?
companion object { companion object {
fun mutual(keyStore: FileBasedCertificateStoreSupplier,
fun mutual(keyStore: FileBasedCertificateStoreSupplier, trustStore: FileBasedCertificateStoreSupplier): MutualSslConfiguration { trustStore: FileBasedCertificateStoreSupplier,
handshakeTimeout: Duration? = null): MutualSslConfiguration {
return MutualSslOptions(keyStore, trustStore) return MutualSslOptions(keyStore, trustStore, handshakeTimeout)
} }
} }
} }
@ -21,9 +25,10 @@ interface MutualSslConfiguration : SslConfiguration {
} }
private class MutualSslOptions(override val keyStore: FileBasedCertificateStoreSupplier, private class MutualSslOptions(override val keyStore: FileBasedCertificateStoreSupplier,
override val trustStore: FileBasedCertificateStoreSupplier) : MutualSslConfiguration { override val trustStore: FileBasedCertificateStoreSupplier,
override val handshakeTimeout: Duration?) : MutualSslConfiguration {
override val useOpenSsl: Boolean = false override val useOpenSsl: Boolean = false
} }
const val DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = 60000L // Set at least 3 times higher than sun.security.provider.certpath.URICertStore.DEFAULT_CRL_CONNECT_TIMEOUT which is 15 sec @Suppress("MagicNumber")
val DEFAULT_SSL_HANDSHAKE_TIMEOUT: Duration = 60.seconds // Set at least 3 times higher than sun.security.provider.certpath.URICertStore.DEFAULT_CRL_CONNECT_TIMEOUT which is 15 sec

View File

@ -438,6 +438,8 @@ class X509CertificateFactory {
fun generateCertPath(vararg certificates: X509Certificate): CertPath = generateCertPath(certificates.asList()) fun generateCertPath(vararg certificates: X509Certificate): CertPath = generateCertPath(certificates.asList())
fun generateCertPath(certificates: List<X509Certificate>): CertPath = delegate.generateCertPath(certificates) fun generateCertPath(certificates: List<X509Certificate>): CertPath = delegate.generateCertPath(certificates)
fun generateCRL(input: InputStream): X509CRL = delegate.generateCRL(input) as X509CRL
} }
enum class CertificateType(val keyUsage: KeyUsage, vararg val purposes: KeyPurposeId, val isCA: Boolean, val role: CertRole?) { enum class CertificateType(val keyUsage: KeyUsage, vararg val purposes: KeyPurposeId, val isCA: Boolean, val role: CertRole?) {

View File

@ -186,7 +186,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
} }
} }
@Suppress("OverridingDeprecatedMember") @Deprecated("Deprecated in Java")
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
logWarnWithMDC("Closing channel due to nonrecoverable exception ${cause.message}") logWarnWithMDC("Closing channel due to nonrecoverable exception ${cause.message}")
if (log.isTraceEnabled) { if (log.isTraceEnabled) {
@ -298,16 +298,16 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
cause is ClosedChannelException -> logWarnWithMDC("SSL Handshake closed early.") cause is ClosedChannelException -> logWarnWithMDC("SSL Handshake closed early.")
cause is SslHandshakeTimeoutException -> logWarnWithMDC("SSL Handshake timed out") cause is SslHandshakeTimeoutException -> logWarnWithMDC("SSL Handshake timed out")
// Sadly the exception thrown by Netty wrapper requires that we check the message. // Sadly the exception thrown by Netty wrapper requires that we check the message.
cause is SSLException && (cause.message?.contains("close_notify") == true) cause is SSLException && (cause.message?.contains("close_notify") == true) -> logWarnWithMDC("Received close_notify during handshake")
-> logWarnWithMDC("Received close_notify during handshake")
// io.netty.handler.ssl.SslHandler.setHandshakeFailureTransportFailure() // io.netty.handler.ssl.SslHandler.setHandshakeFailureTransportFailure()
cause is SSLException && (cause.message?.contains("writing TLS control frames") == true) -> logWarnWithMDC(cause.message!!) cause is SSLException && (cause.message?.contains("writing TLS control frames") == true) -> logWarnWithMDC(cause.message!!)
cause is SSLException && (cause.message?.contains("internal_error") == true) -> logWarnWithMDC("Received internal_error during handshake") cause is SSLException && (cause.message?.contains("internal_error") == true) -> logWarnWithMDC("Received internal_error during handshake")
else -> connectionResult = ConnectionResult.HANDSHAKE_FAILURE else -> connectionResult = ConnectionResult.HANDSHAKE_FAILURE
} }
logWarnWithMDC("Handshake failure: ${evt.cause().message}")
if (log.isTraceEnabled) { if (log.isTraceEnabled) {
withMDC { log.trace("Handshake failure", evt.cause()) } withMDC { log.trace("Handshake failure", cause) }
} else {
logWarnWithMDC("Handshake failure: ${cause.message}")
} }
ctx.close() ctx.close()
} }

View File

@ -26,6 +26,7 @@ import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.lang.Long.min import java.lang.Long.min
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.security.cert.CertPathValidatorException
import java.time.Duration import java.time.Duration
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
@ -54,7 +55,7 @@ data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostA
* otherwise it creates a self-contained Netty thraed pool and socket objects. * otherwise it creates a self-contained Netty thraed pool and socket objects.
* Once connected it can accept application packets to send via the AMQP protocol. * Once connected it can accept application packets to send via the AMQP protocol.
*/ */
class AMQPClient(val targets: List<NetworkHostAndPort>, class AMQPClient(private val targets: List<NetworkHostAndPort>,
val allowedRemoteLegalNames: Set<CordaX500Name>, val allowedRemoteLegalNames: Set<CordaX500Name>,
private val configuration: AMQPConfiguration, private val configuration: AMQPConfiguration,
private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable { private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable {
@ -84,6 +85,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
private var targetIndex = 0 private var targetIndex = 0
private var currentTarget: NetworkHostAndPort = targets.first() private var currentTarget: NetworkHostAndPort = targets.first()
private var retryInterval = MIN_RETRY_INTERVAL private var retryInterval = MIN_RETRY_INTERVAL
private val revocationChecker = configuration.revocationConfig.createPKIXRevocationChecker()
private val handshakeFailureRetryTargets = mutableSetOf<NetworkHostAndPort>() private val handshakeFailureRetryTargets = mutableSetOf<NetworkHostAndPort>()
private var retryingHandshakeFailures = false private var retryingHandshakeFailures = false
private var retryOffset = 0 private var retryOffset = 0
@ -158,24 +160,22 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
log.info("Retry connect to ${targets[targetIndex]} in [$retryInterval] ms") log.info("Retry connect to ${targets[targetIndex]} in [$retryInterval] ms")
} }
private val connectListener = object : ChannelFutureListener { private val connectListener = ChannelFutureListener { future ->
override fun operationComplete(future: ChannelFuture) { amqpActive = false
amqpActive = false if (!future.isSuccess) {
if (!future.isSuccess) { log.info("Failed to connect to $currentTarget", future.cause())
log.info("Failed to connect to $currentTarget", future.cause())
if (started) { if (started) {
workerGroup?.schedule({ workerGroup?.schedule({
nextTarget() nextTarget()
restart() restart()
}, retryInterval, TimeUnit.MILLISECONDS) }, retryInterval, TimeUnit.MILLISECONDS)
}
} else {
// Connection established successfully
clientChannel = future.channel()
clientChannel?.closeFuture()?.addListener(closeListener)
log.info("Connected to $currentTarget, Local address: $localAddressString")
} }
} else {
// Connection established successfully
clientChannel = future.channel()
clientChannel?.closeFuture()?.addListener(closeListener)
log.info("Connected to $currentTarget, Local address: $localAddressString")
} }
} }
@ -201,7 +201,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
init { init {
keyManagerFactory.init(conf.keyStore) keyManagerFactory.init(conf.keyStore)
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, conf.revocationConfig)) trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, parent.revocationChecker))
} }
@Suppress("ComplexMethod") @Suppress("ComplexMethod")
@ -245,12 +245,13 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
val handler = if (parent.configuration.useOpenSsl) { val handler = if (parent.configuration.useOpenSsl) {
createClientOpenSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory, ch.alloc()) createClientOpenSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory, ch.alloc())
} else { } else {
createClientSslHelper(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory) createClientSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory)
} }
handler.handshakeTimeoutMillis = conf.sslHandshakeTimeout handler.handshakeTimeoutMillis = conf.sslHandshakeTimeout.toMillis()
pipeline.addLast("sslHandler", handler) pipeline.addLast("sslHandler", handler)
if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
amqpChannelHandler = AMQPChannelHandler(false, amqpChannelHandler = AMQPChannelHandler(
false,
parent.allowedRemoteLegalNames, parent.allowedRemoteLegalNames,
// Single entry, key can be anything. // Single entry, key can be anything.
mapOf(DEFAULT to wrappedKeyManagerFactory), mapOf(DEFAULT to wrappedKeyManagerFactory),
@ -258,37 +259,41 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
conf.password, conf.password,
conf.trace, conf.trace,
false, false,
onOpen = { _, change -> onOpen = { _, change -> onChannelOpen(change) },
parent.run { onClose = { _, change -> onChannelClose(change, target) },
amqpActive = true onReceive = parent._onReceive::onNext
successfullyConnected() )
_onConnection.onNext(change)
}
},
onClose = { _, change ->
if (parent.amqpChannelHandler == amqpChannelHandler) {
parent.run {
_onConnection.onNext(change)
if (change.connectionResult == ConnectionResult.HANDSHAKE_FAILURE) {
log.warn("Handshake failure with $target target; will retry later")
handshakeFailureRetryTargets += target
}
if (started && amqpActive) {
log.debug { "Scheduling restart of $currentTarget (AMQP active)" }
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
amqpActive = false
}
}
},
onReceive = { rcv -> parent._onReceive.onNext(rcv) })
parent.amqpChannelHandler = amqpChannelHandler parent.amqpChannelHandler = amqpChannelHandler
pipeline.addLast(amqpChannelHandler) pipeline.addLast(amqpChannelHandler)
} }
private fun onChannelOpen(change: ConnectionChange) {
parent.run {
amqpActive = true
successfullyConnected()
_onConnection.onNext(change)
}
}
private fun onChannelClose(change: ConnectionChange, target: NetworkHostAndPort) {
if (parent.amqpChannelHandler != amqpChannelHandler) return
parent.run {
_onConnection.onNext(change)
if (change.connectionResult == ConnectionResult.HANDSHAKE_FAILURE) {
log.warn("Handshake failure with $target target; will retry later")
handshakeFailureRetryTargets += target
}
if (started && amqpActive) {
log.debug { "Scheduling restart of $currentTarget (AMQP active)" }
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
amqpActive = false
}
}
} }
fun start() { fun start() {
@ -372,4 +377,6 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized() private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized()
val onConnection: Observable<ConnectionChange> val onConnection: Observable<ConnectionChange>
get() = _onConnection get() = _onConnection
val softFailExceptions: List<CertPathValidatorException> get() = revocationChecker.softFailExceptions
} }

View File

@ -2,7 +2,8 @@ package net.corda.nodeapi.internal.protonwrapper.netty
import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS import net.corda.nodeapi.internal.config.DEFAULT_SSL_HANDSHAKE_TIMEOUT
import java.time.Duration
interface AMQPConfiguration { interface AMQPConfiguration {
/** /**
@ -67,8 +68,8 @@ interface AMQPConfiguration {
get() = false get() = false
@JvmDefault @JvmDefault
val sslHandshakeTimeout: Long val sslHandshakeTimeout: Duration
get() = DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS // Aligned with sun.security.provider.certpath.URICertStore.DEFAULT_CRL_CONNECT_TIMEOUT get() = DEFAULT_SSL_HANDSHAKE_TIMEOUT // Aligned with sun.security.provider.certpath.URICertStore.DEFAULT_CRL_CONNECT_TIMEOUT
/** /**
* An optional Health Check Phrase which if passed through the channel will cause AMQP Server to echo it back instead of doing normal pipeline processing * An optional Health Check Phrase which if passed through the channel will cause AMQP Server to echo it back instead of doing normal pipeline processing

View File

@ -25,6 +25,7 @@ import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.net.BindException import java.net.BindException
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.security.cert.CertPathValidatorException
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.KeyManagerFactory import javax.net.ssl.KeyManagerFactory
@ -55,6 +56,7 @@ class AMQPServer(val hostName: String,
private var bossGroup: EventLoopGroup? = null private var bossGroup: EventLoopGroup? = null
private var workerGroup: EventLoopGroup? = null private var workerGroup: EventLoopGroup? = null
private var serverChannel: Channel? = null private var serverChannel: Channel? = null
private val revocationChecker = configuration.revocationConfig.createPKIXRevocationChecker()
private val clientChannels = ConcurrentHashMap<InetSocketAddress, SocketChannel>() private val clientChannels = ConcurrentHashMap<InetSocketAddress, SocketChannel>()
private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() { private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() {
@ -64,7 +66,7 @@ class AMQPServer(val hostName: String,
init { init {
keyManagerFactory.init(conf.keyStore.value.internal, conf.keyStore.entryPassword.toCharArray()) keyManagerFactory.init(conf.keyStore.value.internal, conf.keyStore.entryPassword.toCharArray())
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, conf.revocationConfig)) trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, parent.revocationChecker))
} }
override fun initChannel(ch: SocketChannel) { override fun initChannel(ch: SocketChannel) {
@ -75,7 +77,8 @@ class AMQPServer(val hostName: String,
pipeline.addLast("sslHandler", sslHandler) pipeline.addLast("sslHandler", sslHandler)
if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
val suppressLogs = ch.remoteAddress()?.hostString in amqpConfiguration.silencedIPs val suppressLogs = ch.remoteAddress()?.hostString in amqpConfiguration.silencedIPs
pipeline.addLast(AMQPChannelHandler(true, pipeline.addLast(AMQPChannelHandler(
true,
null, null,
// Passing a mapping of legal names to key managers to be able to pick the correct one after // Passing a mapping of legal names to key managers to be able to pick the correct one after
// SNI completion event is fired up. // SNI completion event is fired up.
@ -84,27 +87,32 @@ class AMQPServer(val hostName: String,
conf.password, conf.password,
conf.trace, conf.trace,
suppressLogs, suppressLogs,
onOpen = { channel, change -> onOpen = ::onChannelOpen,
parent.run { onClose = ::onChannelClose,
clientChannels[channel.remoteAddress()] = channel onReceive = parent._onReceive::onNext
_onConnection.onNext(change) ))
} }
},
onClose = { channel, change -> private fun onChannelOpen(channel: SocketChannel, change: ConnectionChange) {
parent.run { parent.run {
val remoteAddress = channel.remoteAddress() clientChannels[channel.remoteAddress()] = channel
clientChannels.remove(remoteAddress) _onConnection.onNext(change)
_onConnection.onNext(change) }
} }
},
onReceive = { rcv -> parent._onReceive.onNext(rcv) })) private fun onChannelClose(channel: SocketChannel, change: ConnectionChange) {
parent.run {
val remoteAddress = channel.remoteAddress()
clientChannels.remove(remoteAddress)
_onConnection.onNext(change)
}
} }
private fun createSSLHandler(amqpConfig: AMQPConfiguration, ch: SocketChannel): Pair<ChannelHandler, Map<String, CertHoldingKeyManagerFactoryWrapper>> { private fun createSSLHandler(amqpConfig: AMQPConfiguration, ch: SocketChannel): Pair<ChannelHandler, Map<String, CertHoldingKeyManagerFactoryWrapper>> {
return if (amqpConfig.useOpenSsl && amqpConfig.enableSNI && amqpConfig.keyStore.aliases().size > 1) { return if (amqpConfig.useOpenSsl && amqpConfig.enableSNI && amqpConfig.keyStore.aliases().size > 1) {
val keyManagerFactoriesMap = splitKeystore(amqpConfig) val keyManagerFactoriesMap = splitKeystore(amqpConfig)
// SNI matching needed only when multiple nodes exist behind the server. // SNI matching needed only when multiple nodes exist behind the server.
Pair(createServerSNIOpenSslHandler(keyManagerFactoriesMap, trustManagerFactory), keyManagerFactoriesMap) Pair(createServerSNIOpenSniHandler(keyManagerFactoriesMap, trustManagerFactory), keyManagerFactoriesMap)
} else { } else {
val keyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, amqpConfig) val keyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, amqpConfig)
val handler = if (amqpConfig.useOpenSsl) { val handler = if (amqpConfig.useOpenSsl) {
@ -113,7 +121,7 @@ class AMQPServer(val hostName: String,
// For javaSSL, SNI matching is handled at key manager level. // For javaSSL, SNI matching is handled at key manager level.
createServerSslHandler(amqpConfig.keyStore, keyManagerFactory, trustManagerFactory) createServerSslHandler(amqpConfig.keyStore, keyManagerFactory, trustManagerFactory)
} }
handler.handshakeTimeoutMillis = amqpConfig.sslHandshakeTimeout handler.handshakeTimeoutMillis = amqpConfig.sslHandshakeTimeout.toMillis()
Pair(handler, mapOf(DEFAULT to keyManagerFactory)) Pair(handler, mapOf(DEFAULT to keyManagerFactory))
} }
} }
@ -217,4 +225,6 @@ class AMQPServer(val hostName: String,
private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized() private val _onConnection = PublishSubject.create<ConnectionChange>().toSerialized()
val onConnection: Observable<ConnectionChange> val onConnection: Observable<ConnectionChange>
get() = _onConnection get() = _onConnection
val softFailExceptions: List<CertPathValidatorException> get() = revocationChecker.softFailExceptions
} }

View File

@ -11,7 +11,7 @@ object AllowAllRevocationChecker : PKIXRevocationChecker() {
private val logger = LoggerFactory.getLogger(AllowAllRevocationChecker::class.java) private val logger = LoggerFactory.getLogger(AllowAllRevocationChecker::class.java)
override fun check(cert: Certificate?, unresolvedCritExts: MutableCollection<String>?) { override fun check(cert: Certificate, unresolvedCritExts: Collection<String>) {
logger.debug {"Passing certificate check for: $cert"} logger.debug {"Passing certificate check for: $cert"}
// Nothing to do // Nothing to do
} }
@ -20,7 +20,7 @@ object AllowAllRevocationChecker : PKIXRevocationChecker() {
return true return true
} }
override fun getSupportedExtensions(): MutableSet<String>? { override fun getSupportedExtensions(): Set<String>? {
return null return null
} }
@ -28,7 +28,7 @@ object AllowAllRevocationChecker : PKIXRevocationChecker() {
// Nothing to do // Nothing to do
} }
override fun getSoftFailExceptions(): MutableList<CertPathValidatorException> { override fun getSoftFailExceptions(): List<CertPathValidatorException> {
return LinkedList() return Collections.emptyList()
} }
} }

View File

@ -3,10 +3,11 @@ package net.corda.nodeapi.internal.protonwrapper.netty
import java.security.cert.X509CRL import java.security.cert.X509CRL
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
interface ExternalCrlSource { @FunctionalInterface
interface CrlSource {
/** /**
* Given certificate provides a set of CRLs, potentially performing remote communication. * Given certificate provides a set of CRLs, potentially performing remote communication.
*/ */
fun fetch(certificate: X509Certificate) : Set<X509CRL> fun fetch(certificate: X509Certificate): Set<X509CRL>
} }

View File

@ -3,6 +3,9 @@ package net.corda.nodeapi.internal.protonwrapper.netty
import com.typesafe.config.Config import com.typesafe.config.Config
import net.corda.nodeapi.internal.config.ConfigParser import net.corda.nodeapi.internal.config.ConfigParser
import net.corda.nodeapi.internal.config.CustomConfigParser import net.corda.nodeapi.internal.config.CustomConfigParser
import net.corda.nodeapi.internal.revocation.CertDistPointCrlSource
import net.corda.nodeapi.internal.revocation.CordaRevocationChecker
import java.security.cert.PKIXRevocationChecker
/** /**
* Data structure for controlling the way how Certificate Revocation Lists are handled. * Data structure for controlling the way how Certificate Revocation Lists are handled.
@ -26,7 +29,7 @@ interface RevocationConfig {
/** /**
* CRLs are obtained from external source * CRLs are obtained from external source
* @see ExternalCrlSource * @see CrlSource
*/ */
EXTERNAL_SOURCE, EXTERNAL_SOURCE,
@ -39,14 +42,21 @@ interface RevocationConfig {
val mode: Mode val mode: Mode
/** /**
* Optional `ExternalCrlSource` which only makes sense with `mode` = `EXTERNAL_SOURCE` * Optional [CrlSource] which only makes sense with `mode` = `EXTERNAL_SOURCE`
*/ */
val externalCrlSource: ExternalCrlSource? val externalCrlSource: CrlSource?
/** fun createPKIXRevocationChecker(): PKIXRevocationChecker {
* Creates a copy of `RevocationConfig` with ExternalCrlSource enriched return when (mode) {
*/ Mode.OFF -> AllowAllRevocationChecker
fun enrichExternalCrlSource(sourceFunc: (() -> ExternalCrlSource)?): RevocationConfig Mode.EXTERNAL_SOURCE -> {
val externalCrlSource = requireNotNull(externalCrlSource) { "externalCrlSource must be specfied for EXTERNAL_SOURCE" }
CordaRevocationChecker(externalCrlSource, softFail = true)
}
Mode.SOFT_FAIL -> CordaRevocationChecker(CertDistPointCrlSource(), softFail = true)
Mode.HARD_FAIL -> CordaRevocationChecker(CertDistPointCrlSource(), softFail = false)
}
}
} }
/** /**
@ -54,16 +64,7 @@ interface RevocationConfig {
*/ */
fun Boolean.toRevocationConfig() = if(this) RevocationConfigImpl(RevocationConfig.Mode.SOFT_FAIL) else RevocationConfigImpl(RevocationConfig.Mode.HARD_FAIL) fun Boolean.toRevocationConfig() = if(this) RevocationConfigImpl(RevocationConfig.Mode.SOFT_FAIL) else RevocationConfigImpl(RevocationConfig.Mode.HARD_FAIL)
data class RevocationConfigImpl(override val mode: RevocationConfig.Mode, override val externalCrlSource: ExternalCrlSource? = null) : RevocationConfig { data class RevocationConfigImpl(override val mode: RevocationConfig.Mode, override val externalCrlSource: CrlSource? = null) : RevocationConfig
override fun enrichExternalCrlSource(sourceFunc: (() -> ExternalCrlSource)?): RevocationConfig {
return if(mode != RevocationConfig.Mode.EXTERNAL_SOURCE) {
this
} else {
assert(sourceFunc != null) { "There should be a way to obtain ExternalCrlSource" }
copy(externalCrlSource = sourceFunc!!())
}
}
}
class RevocationConfigParser : ConfigParser<RevocationConfig> { class RevocationConfigParser : ConfigParser<RevocationConfig> {
override fun parse(config: Config): RevocationConfig { override fun parse(config: Config): RevocationConfig {
@ -80,4 +81,4 @@ class RevocationConfigParser : ConfigParser<RevocationConfig> {
else -> throw IllegalArgumentException("Unsupported mode : '$mode'") else -> throw IllegalArgumentException("Unsupported mode : '$mode'")
} }
} }
} }

View File

@ -13,15 +13,17 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.VisibleForTesting
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.toHex import net.corda.core.utilities.toHex
import net.corda.nodeapi.internal.ArtemisTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport
import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.crypto.toBc import net.corda.nodeapi.internal.crypto.toBc
import net.corda.nodeapi.internal.crypto.x509 import net.corda.nodeapi.internal.crypto.x509
import net.corda.nodeapi.internal.protonwrapper.netty.revocation.ExternalSourceRevocationChecker
import org.bouncycastle.asn1.ASN1IA5String
import org.bouncycastle.asn1.ASN1InputStream import org.bouncycastle.asn1.ASN1InputStream
import org.bouncycastle.asn1.ASN1Primitive
import org.bouncycastle.asn1.ASN1IA5String
import org.bouncycastle.asn1.DEROctetString import org.bouncycastle.asn1.DEROctetString
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier
import org.bouncycastle.asn1.x509.CRLDistPoint import org.bouncycastle.asn1.x509.CRLDistPoint
import org.bouncycastle.asn1.x509.DistributionPointName import org.bouncycastle.asn1.x509.DistributionPointName
@ -30,13 +32,15 @@ import org.bouncycastle.asn1.x509.GeneralName
import org.bouncycastle.asn1.x509.GeneralNames import org.bouncycastle.asn1.x509.GeneralNames
import org.bouncycastle.asn1.x509.SubjectKeyIdentifier import org.bouncycastle.asn1.x509.SubjectKeyIdentifier
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.net.Socket import java.net.Socket
import java.net.URI
import java.security.KeyStore import java.security.KeyStore
import java.security.cert.* import java.security.cert.*
import java.util.* import java.util.*
import java.util.concurrent.Executor import java.util.concurrent.Executor
import javax.net.ssl.* import javax.net.ssl.*
import javax.security.auth.x500.X500Principal
import kotlin.collections.HashMap
import kotlin.system.measureTimeMillis import kotlin.system.measureTimeMillis
private const val HOSTNAME_FORMAT = "%s.corda.net" private const val HOSTNAME_FORMAT = "%s.corda.net"
@ -46,44 +50,61 @@ internal const val DP_DEFAULT_ANSWER = "NO CRLDP ext"
internal val logger = LoggerFactory.getLogger("net.corda.nodeapi.internal.protonwrapper.netty.SSLHelper") internal val logger = LoggerFactory.getLogger("net.corda.nodeapi.internal.protonwrapper.netty.SSLHelper")
fun X509Certificate.distributionPoints() : Set<String>? { /**
logger.debug("Checking CRLDPs for $subjectX500Principal") * Returns all the CRL distribution points in the certificate as [URI]s along with the CRL issuer names, if any.
*/
@Suppress("ComplexMethod")
fun X509Certificate.distributionPoints(): Map<URI, List<X500Principal>?> {
logger.debug { "Checking CRLDPs for $subjectX500Principal" }
val crldpExtBytes = getExtensionValue(Extension.cRLDistributionPoints.id) val crldpExtBytes = getExtensionValue(Extension.cRLDistributionPoints.id)
if (crldpExtBytes == null) { if (crldpExtBytes == null) {
logger.debug(DP_DEFAULT_ANSWER) logger.debug(DP_DEFAULT_ANSWER)
return emptySet() return emptyMap()
} }
val derObjCrlDP = ASN1InputStream(ByteArrayInputStream(crldpExtBytes)).readObject() val derObjCrlDP = crldpExtBytes.toAsn1Object()
val dosCrlDP = derObjCrlDP as? DEROctetString val dosCrlDP = derObjCrlDP as? DEROctetString
if (dosCrlDP == null) { if (dosCrlDP == null) {
logger.error("Expected to have DEROctetString, actual type: ${derObjCrlDP.javaClass}") logger.error("Expected to have DEROctetString, actual type: ${derObjCrlDP.javaClass}")
return emptySet() return emptyMap()
} }
val crldpExtOctetsBytes = dosCrlDP.octets val dpObj = dosCrlDP.octets.toAsn1Object()
val dpObj = ASN1InputStream(ByteArrayInputStream(crldpExtOctetsBytes)).readObject() val crlDistPoint = CRLDistPoint.getInstance(dpObj)
val distPoint = CRLDistPoint.getInstance(dpObj) if (crlDistPoint == null) {
if (distPoint == null) {
logger.error("Could not instantiate CRLDistPoint, from: $dpObj") logger.error("Could not instantiate CRLDistPoint, from: $dpObj")
return emptySet() return emptyMap()
} }
val dpNames = distPoint.distributionPoints.mapNotNull { it.distributionPoint }.filter { it.type == DistributionPointName.FULL_NAME } val dpMap = HashMap<URI, List<X500Principal>?>()
val generalNames = dpNames.flatMap { GeneralNames.getInstance(it.name).names.asList() } for (distributionPoint in crlDistPoint.distributionPoints) {
return generalNames.filter { it.tagNo == GeneralName.uniformResourceIdentifier}.map { ASN1IA5String.getInstance(it.name).string }.toSet() val distributionPointName = distributionPoint.distributionPoint
} if (distributionPointName?.type != DistributionPointName.FULL_NAME) continue
val issuerNames = distributionPoint.crlIssuer?.names?.mapNotNull {
fun X509Certificate.distributionPointsToString() : String { if (it.tagNo == GeneralName.directoryName) {
return with(distributionPoints()) { X500Principal(X500Name.getInstance(it.name).encoded)
if(this == null || isEmpty()) { } else {
DP_DEFAULT_ANSWER null
} else { }
sorted().joinToString() }
for (generalName in GeneralNames.getInstance(distributionPointName.name).names) {
if (generalName.tagNo == GeneralName.uniformResourceIdentifier) {
val uri = URI(ASN1IA5String.getInstance(generalName.name).string)
dpMap[uri] = issuerNames
}
} }
} }
return dpMap
} }
fun X509Certificate.distributionPointsToString(): String {
return with(distributionPoints().keys) {
if (isEmpty()) DP_DEFAULT_ANSWER else sorted().joinToString()
}
}
fun ByteArray.toAsn1Object(): ASN1Primitive = ASN1InputStream(this).readObject()
fun certPathToString(certPath: Array<out X509Certificate>?): String { fun certPathToString(certPath: Array<out X509Certificate>?): String {
if (certPath == null) { if (certPath == null) {
return "<empty certpath>" return "<empty certpath>"
@ -117,7 +138,7 @@ class LoggingTrustManagerWrapper(val wrapped: X509ExtendedTrustManager) : X509Ex
if (chain == null) { if (chain == null) {
return "<empty certpath>" return "<empty certpath>"
} }
return chain.map { it.toString() }.joinToString(", ") return chain.joinToString(", ") { it.toString() }
} }
private fun logErrors(chain: Array<out X509Certificate>?, block: () -> Unit) { private fun logErrors(chain: Array<out X509Certificate>?, block: () -> Unit) {
@ -171,14 +192,9 @@ class LoggingTrustManagerWrapper(val wrapped: X509ExtendedTrustManager) : X509Ex
private object LoggingImmediateExecutor : Executor { private object LoggingImmediateExecutor : Executor {
override fun execute(command: Runnable?) { override fun execute(command: Runnable) {
val log = LoggerFactory.getLogger(javaClass) val log = LoggerFactory.getLogger(javaClass)
if (command == null) {
log.error("SSL handler executor called with a null command")
throw NullPointerException("command")
}
@Suppress("TooGenericExceptionCaught", "MagicNumber") // log and rethrow all exceptions @Suppress("TooGenericExceptionCaught", "MagicNumber") // log and rethrow all exceptions
try { try {
val commandName = command::class.qualifiedName?.let { "[$it]" } ?: "" val commandName = command::class.qualifiedName?.let { "[$it]" } ?: ""
@ -196,10 +212,10 @@ private object LoggingImmediateExecutor : Executor {
} }
} }
internal fun createClientSslHelper(target: NetworkHostAndPort, internal fun createClientSslHandler(target: NetworkHostAndPort,
expectedRemoteLegalNames: Set<CordaX500Name>, expectedRemoteLegalNames: Set<CordaX500Name>,
keyManagerFactory: KeyManagerFactory, keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler { trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = createAndInitSslContext(keyManagerFactory, trustManagerFactory) val sslContext = createAndInitSslContext(keyManagerFactory, trustManagerFactory)
val sslEngine = sslContext.createSSLEngine(target.host, target.port) val sslEngine = sslContext.createSSLEngine(target.host, target.port)
sslEngine.useClientMode = true sslEngine.useClientMode = true
@ -211,7 +227,6 @@ internal fun createClientSslHelper(target: NetworkHostAndPort,
sslParameters.serverNames = listOf(SNIHostName(x500toHostName(expectedRemoteLegalNames.single()))) sslParameters.serverNames = listOf(SNIHostName(x500toHostName(expectedRemoteLegalNames.single())))
sslEngine.sslParameters = sslParameters sslEngine.sslParameters = sslParameters
} }
@Suppress("DEPRECATION")
return SslHandler(sslEngine, false, LoggingImmediateExecutor) return SslHandler(sslEngine, false, LoggingImmediateExecutor)
} }
@ -229,7 +244,6 @@ internal fun createClientOpenSslHandler(target: NetworkHostAndPort,
sslParameters.serverNames = listOf(SNIHostName(x500toHostName(expectedRemoteLegalNames.single()))) sslParameters.serverNames = listOf(SNIHostName(x500toHostName(expectedRemoteLegalNames.single())))
sslEngine.sslParameters = sslParameters sslEngine.sslParameters = sslParameters
} }
@Suppress("DEPRECATION")
return SslHandler(sslEngine, false, LoggingImmediateExecutor) return SslHandler(sslEngine, false, LoggingImmediateExecutor)
} }
@ -246,7 +260,15 @@ internal fun createServerSslHandler(keyStore: CertificateStore,
val sslParameters = sslEngine.sslParameters val sslParameters = sslEngine.sslParameters
sslParameters.sniMatchers = listOf(ServerSNIMatcher(keyStore)) sslParameters.sniMatchers = listOf(ServerSNIMatcher(keyStore))
sslEngine.sslParameters = sslParameters sslEngine.sslParameters = sslParameters
@Suppress("DEPRECATION") return SslHandler(sslEngine, false, LoggingImmediateExecutor)
}
internal fun createServerOpenSslHandler(keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory,
alloc: ByteBufAllocator): SslHandler {
val sslContext = getServerSslContextBuilder(keyManagerFactory, trustManagerFactory).build()
val sslEngine = sslContext.newEngine(alloc)
sslEngine.useClientMode = false
return SslHandler(sslEngine, false, LoggingImmediateExecutor) return SslHandler(sslEngine, false, LoggingImmediateExecutor)
} }
@ -259,52 +281,23 @@ fun createAndInitSslContext(keyManagerFactory: KeyManagerFactory, trustManagerFa
return sslContext return sslContext
} }
@VisibleForTesting fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore,
fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore, revocationConfig: RevocationConfig): ManagerFactoryParameters { revocationConfig: RevocationConfig): CertPathTrustManagerParameters {
val pkixParams = PKIXBuilderParameters(trustStore.value.internal, X509CertSelector()) return initialiseTrustStoreAndEnableCrlChecking(trustStore, revocationConfig.createPKIXRevocationChecker())
val revocationChecker = when (revocationConfig.mode) {
RevocationConfig.Mode.OFF -> AllowAllRevocationChecker // Custom PKIXRevocationChecker skipping CRL check
RevocationConfig.Mode.EXTERNAL_SOURCE -> {
require(revocationConfig.externalCrlSource != null) { "externalCrlSource must not be null" }
ExternalSourceRevocationChecker(revocationConfig.externalCrlSource!!) { Date() } // Custom PKIXRevocationChecker which uses `externalCrlSource`
}
else -> {
val certPathBuilder = CertPathBuilder.getInstance("PKIX")
val pkixRevocationChecker = certPathBuilder.revocationChecker as PKIXRevocationChecker
pkixRevocationChecker.options = EnumSet.of(
// Prefer CRL over OCSP
PKIXRevocationChecker.Option.PREFER_CRLS,
// Don't fall back to OCSP checking
PKIXRevocationChecker.Option.NO_FALLBACK)
if (revocationConfig.mode == RevocationConfig.Mode.SOFT_FAIL) {
// Allow revocation check to succeed if the revocation status cannot be determined for one of
// the following reasons: The CRL or OCSP response cannot be obtained because of a network error.
pkixRevocationChecker.options = pkixRevocationChecker.options + PKIXRevocationChecker.Option.SOFT_FAIL
}
pkixRevocationChecker
}
}
pkixParams.addCertPathChecker(revocationChecker)
return CertPathTrustManagerParameters(pkixParams)
} }
internal fun createServerOpenSslHandler(keyManagerFactory: KeyManagerFactory, fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateStore,
trustManagerFactory: TrustManagerFactory, revocationChecker: PKIXRevocationChecker): CertPathTrustManagerParameters {
alloc: ByteBufAllocator): SslHandler { val pkixParams = PKIXBuilderParameters(trustStore.value.internal, X509CertSelector())
pkixParams.addCertPathChecker(revocationChecker)
val sslContext = getServerSslContextBuilder(keyManagerFactory, trustManagerFactory).build() return CertPathTrustManagerParameters(pkixParams)
val sslEngine = sslContext.newEngine(alloc)
sslEngine.useClientMode = false
@Suppress("DEPRECATION")
return SslHandler(sslEngine, false, LoggingImmediateExecutor)
} }
/** /**
* Creates a special SNI handler used only when openSSL is used for AMQPServer * Creates a special SNI handler used only when openSSL is used for AMQPServer
*/ */
internal fun createServerSNIOpenSslHandler(keyManagerFactoriesMap: Map<String, KeyManagerFactory>, internal fun createServerSNIOpenSniHandler(keyManagerFactoriesMap: Map<String, KeyManagerFactory>,
trustManagerFactory: TrustManagerFactory): SniHandler { trustManagerFactory: TrustManagerFactory): SniHandler {
// Default value can be any in the map. // Default value can be any in the map.
val sslCtxBuilder = getServerSslContextBuilder(keyManagerFactoriesMap.values.first(), trustManagerFactory) val sslCtxBuilder = getServerSslContextBuilder(keyManagerFactoriesMap.values.first(), trustManagerFactory)
val mapping = DomainWildcardMappingBuilder(sslCtxBuilder.build()) val mapping = DomainWildcardMappingBuilder(sslCtxBuilder.build())
@ -327,7 +320,7 @@ private fun getServerSslContextBuilder(keyManagerFactory: KeyManagerFactory, tru
internal fun splitKeystore(config: AMQPConfiguration): Map<String, CertHoldingKeyManagerFactoryWrapper> { internal fun splitKeystore(config: AMQPConfiguration): Map<String, CertHoldingKeyManagerFactoryWrapper> {
val keyStore = config.keyStore.value.internal val keyStore = config.keyStore.value.internal
val password = config.keyStore.entryPassword.toCharArray() val password = config.keyStore.entryPassword.toCharArray()
return keyStore.aliases().toList().map { alias -> return keyStore.aliases().toList().associate { alias ->
val key = keyStore.getKey(alias, password) val key = keyStore.getKey(alias, password)
val certs = keyStore.getCertificateChain(alias) val certs = keyStore.getCertificateChain(alias)
val x500Name = keyStore.getCertificate(alias).x509.subjectX500Principal val x500Name = keyStore.getCertificate(alias).x509.subjectX500Principal
@ -338,7 +331,7 @@ internal fun splitKeystore(config: AMQPConfiguration): Map<String, CertHoldingKe
val newKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) val newKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
newKeyManagerFactory.init(newKeyStore, password) newKeyManagerFactory.init(newKeyStore, password)
x500toHostName(cordaX500Name) to CertHoldingKeyManagerFactoryWrapper(newKeyManagerFactory, config) x500toHostName(cordaX500Name) to CertHoldingKeyManagerFactoryWrapper(newKeyManagerFactory, config)
}.toMap() }
} }
// As per Javadoc in: https://docs.oracle.com/javase/8/docs/api/javax/net/ssl/KeyManagerFactory.html `init` method // As per Javadoc in: https://docs.oracle.com/javase/8/docs/api/javax/net/ssl/KeyManagerFactory.html `init` method

View File

@ -0,0 +1,84 @@
package net.corda.nodeapi.internal.revocation
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import net.corda.core.internal.readFully
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource
import net.corda.nodeapi.internal.protonwrapper.netty.distributionPoints
import java.net.URI
import java.security.cert.X509CRL
import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit
import javax.security.auth.x500.X500Principal
/**
* [CrlSource] which downloads CRLs from the distribution points in the X509 certificate.
*/
class CertDistPointCrlSource : CrlSource {
companion object {
// The default SSL handshake timeout is 60s (DEFAULT_SSL_HANDSHAKE_TIMEOUT). Considering there are 3 CRLs endpoints to check in a
// node handshake, we want to keep the total timeout within that.
private const val DEFAULT_CONNECT_TIMEOUT = 9_000
private const val DEFAULT_READ_TIMEOUT = 9_000
private const val DEFAULT_CACHE_SIZE = 185L // Same default as the JDK (URICertStore)
private const val DEFAULT_CACHE_EXPIRY = 5 * 60 * 1000L
private val cache: LoadingCache<URI, X509CRL> = Caffeine.newBuilder()
.maximumSize(java.lang.Long.getLong("net.corda.dpcrl.cache.size", DEFAULT_CACHE_SIZE))
.expireAfterWrite(java.lang.Long.getLong("net.corda.dpcrl.cache.expiry", DEFAULT_CACHE_EXPIRY), TimeUnit.MILLISECONDS)
.build(::retrieveCRL)
private val connectTimeout = Integer.getInteger("net.corda.dpcrl.connect.timeout", DEFAULT_CONNECT_TIMEOUT)
private val readTimeout = Integer.getInteger("net.corda.dpcrl.read.timeout", DEFAULT_READ_TIMEOUT)
private fun retrieveCRL(uri: URI): X509CRL {
val bytes = run {
val conn = uri.toURL().openConnection()
conn.connectTimeout = connectTimeout
conn.readTimeout = readTimeout
// Read all bytes first and then pass them into the CertificateFactory. This may seem unnecessary when generateCRL already takes
// in an InputStream, but the JDK implementation (sun.security.provider.X509Factory.engineGenerateCRL) converts any IOException
// into CRLException and drops the cause chain.
conn.getInputStream().readFully()
}
return X509CertificateFactory().generateCRL(bytes.inputStream())
}
}
@Suppress("TooGenericExceptionCaught")
override fun fetch(certificate: X509Certificate): Set<X509CRL> {
val approvedCRLs = HashSet<X509CRL>()
var exception: Exception? = null
for ((distPointUri, issuerNames) in certificate.distributionPoints()) {
try {
val possibleCRL = getPossibleCRL(distPointUri)
if (verifyCRL(possibleCRL, certificate, issuerNames)) {
approvedCRLs += possibleCRL
}
} catch (e: Exception) {
if (exception == null) {
exception = e
} else {
exception.addSuppressed(e)
}
}
}
// Only throw if no CRLs are retrieved
if (exception != null && approvedCRLs.isEmpty()) {
throw exception
} else {
return approvedCRLs
}
}
private fun getPossibleCRL(uri: URI): X509CRL {
return cache[uri]!!
}
// DistributionPointFetcher.verifyCRL
private fun verifyCRL(crl: X509CRL, certificate: X509Certificate, distPointIssuerNames: List<X500Principal>?): Boolean {
val crlIssuer = crl.issuerX500Principal
return distPointIssuerNames?.any { it == crlIssuer } ?: (certificate.issuerX500Principal == crlIssuer)
}
}

View File

@ -1,30 +1,53 @@
package net.corda.nodeapi.internal.protonwrapper.netty.revocation package net.corda.nodeapi.internal.revocation
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.protonwrapper.netty.ExternalCrlSource import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource
import org.bouncycastle.asn1.x509.Extension import org.bouncycastle.asn1.x509.Extension
import java.security.cert.CRLReason import java.security.cert.CRLReason
import java.security.cert.CertPathValidatorException import java.security.cert.CertPathValidatorException
import java.security.cert.CertPathValidatorException.BasicReason
import java.security.cert.Certificate import java.security.cert.Certificate
import java.security.cert.CertificateRevokedException import java.security.cert.CertificateRevokedException
import java.security.cert.PKIXRevocationChecker import java.security.cert.PKIXRevocationChecker
import java.security.cert.X509CRL import java.security.cert.X509CRL
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.util.* import java.util.*
import kotlin.collections.ArrayList
/** /**
* Implementation of [PKIXRevocationChecker] which determines whether certificate is revoked using [externalCrlSource] which knows how to * Custom [PKIXRevocationChecker] which delegates to a plugable [CrlSource] to retrieve the CRLs for certificate revocation checks.
* obtain a set of CRLs for a given certificate from an external source
*/ */
class ExternalSourceRevocationChecker(private val externalCrlSource: ExternalCrlSource, private val dateSource: () -> Date) : PKIXRevocationChecker() { class CordaRevocationChecker(private val crlSource: CrlSource,
private val softFail: Boolean,
private val dateSource: () -> Date = ::Date) : PKIXRevocationChecker() {
companion object { companion object {
private val logger = contextLogger() private val logger = contextLogger()
} }
private val softFailExceptions = ArrayList<CertPathValidatorException>()
override fun check(cert: Certificate, unresolvedCritExts: MutableCollection<String>?) { override fun check(cert: Certificate, unresolvedCritExts: MutableCollection<String>?) {
val x509Certificate = cert as X509Certificate val x509Certificate = cert as X509Certificate
checkApprovedCRLs(x509Certificate, externalCrlSource.fetch(x509Certificate)) checkApprovedCRLs(x509Certificate, getCRLs(x509Certificate))
}
@Suppress("TooGenericExceptionCaught")
private fun getCRLs(cert: X509Certificate): Set<X509CRL> {
val crls = try {
crlSource.fetch(cert)
} catch (e: Exception) {
if (softFail) {
addSoftFailException(e)
return emptySet()
} else {
throw undeterminedRevocationException("Unable to retrieve CRLs", e)
}
}
if (crls.isNotEmpty() || softFail) {
return crls
}
// Note, the JDK tries to find a valid CRL from a different signing key before giving up (RevocationChecker.verifyWithSeparateSigningKey)
throw undeterminedRevocationException("Could not find any valid CRLs", null)
} }
/** /**
@ -47,11 +70,11 @@ class ExternalSourceRevocationChecker(private val externalCrlSource: ExternalCrl
* 5.3 of RFC 5280). * 5.3 of RFC 5280).
*/ */
val unresCritExts = entry.criticalExtensionOIDs val unresCritExts = entry.criticalExtensionOIDs
if (unresCritExts != null && !unresCritExts.isEmpty()) { if (unresCritExts != null && unresCritExts.isNotEmpty()) {
/* remove any that we will process */ /* remove any that we will process */
unresCritExts.remove(Extension.cRLDistributionPoints.id) unresCritExts.remove(Extension.cRLDistributionPoints.id)
unresCritExts.remove(Extension.certificateIssuer.id) unresCritExts.remove(Extension.certificateIssuer.id)
if (!unresCritExts.isEmpty()) { if (unresCritExts.isNotEmpty()) {
throw CertPathValidatorException( throw CertPathValidatorException(
"Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts") "Unrecognized critical extension(s) in revoked CRL entry: $unresCritExts")
} }
@ -64,14 +87,22 @@ class ExternalSourceRevocationChecker(private val externalCrlSource: ExternalCrl
revocationDate, reasonCode, revocationDate, reasonCode,
crl.issuerX500Principal, mutableMapOf()) crl.issuerX500Principal, mutableMapOf())
throw CertPathValidatorException( throw CertPathValidatorException(
t.message, t, null, -1, CertPathValidatorException.BasicReason.REVOKED) t.message, t, null, -1, BasicReason.REVOKED)
} }
} }
} }
} }
/**
* This is set to false intentionally for security reasons.
* It ensures that certificates are provided in reverse direction (from most-trusted CA to target certificate)
* after the necessary validation checks have already been performed.
*
* If that wasn't the case, we could be reaching out to CRL endpoints for invalid certificates, which would open security holes
* e.g. systems that are not part of a Corda network could force a Corda firewall to initiate outbound requests to systems under their control.
*/
override fun isForwardCheckingSupported(): Boolean { override fun isForwardCheckingSupported(): Boolean {
return true return false
} }
override fun getSupportedExtensions(): MutableSet<String>? { override fun getSupportedExtensions(): MutableSet<String>? {
@ -79,10 +110,19 @@ class ExternalSourceRevocationChecker(private val externalCrlSource: ExternalCrl
} }
override fun init(forward: Boolean) { override fun init(forward: Boolean) {
// Nothing to do softFailExceptions.clear()
} }
override fun getSoftFailExceptions(): MutableList<CertPathValidatorException> { override fun getSoftFailExceptions(): MutableList<CertPathValidatorException> {
return LinkedList() return Collections.unmodifiableList(softFailExceptions)
}
private fun addSoftFailException(e: Exception) {
logger.debug("Soft fail exception", e)
softFailExceptions += undeterminedRevocationException(e.message, e)
}
private fun undeterminedRevocationException(message: String?, cause: Throwable?): CertPathValidatorException {
return CertPathValidatorException(message, cause, null, -1, BasicReason.UNDETERMINED_REVOCATION_STATUS)
} }
} }

View File

@ -28,7 +28,7 @@ class SSLHelperTest {
val trustStore = sslConfig.trustStore val trustStore = sslConfig.trustStore
trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(CertificateStore.fromFile(trustStore.path, trustStore.storePassword, trustStore.entryPassword, false), RevocationConfigImpl(RevocationConfig.Mode.HARD_FAIL))) trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(CertificateStore.fromFile(trustStore.path, trustStore.storePassword, trustStore.entryPassword, false), RevocationConfigImpl(RevocationConfig.Mode.HARD_FAIL)))
val sslHandler = createClientSslHelper(NetworkHostAndPort("localhost", 1234), setOf(legalName), keyManagerFactory, trustManagerFactory) val sslHandler = createClientSslHandler(NetworkHostAndPort("localhost", 1234), setOf(legalName), keyManagerFactory, trustManagerFactory)
val legalNameHash = SecureHash.sha256(legalName.toString()).toString().take(32).toLowerCase() val legalNameHash = SecureHash.sha256(legalName.toString()).toString().take(32).toLowerCase()
// These hardcoded values must not be changed, something is broken if you have to change these hardcoded values. // These hardcoded values must not be changed, something is broken if you have to change these hardcoded values.

View File

@ -0,0 +1,51 @@
package net.corda.nodeapi.internal.revocation
import net.corda.core.crypto.Crypto
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.createDevNodeCa
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.node.internal.network.CrlServer
import org.assertj.core.api.Assertions.assertThat
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.math.BigInteger
class CertDistPointCrlSourceTest {
private lateinit var crlServer: CrlServer
@Before
fun setUp() {
// Do not use Security.addProvider(BouncyCastleProvider()) to avoid EdDSA signature disruption in other tests.
Crypto.findProvider(BouncyCastleProvider.PROVIDER_NAME)
crlServer = CrlServer(NetworkHostAndPort("localhost", 0))
crlServer.start()
}
@After
fun tearDown() {
if (::crlServer.isInitialized) {
crlServer.close()
}
}
@Test(timeout=300_000)
fun `happy path`() {
val crlSource = CertDistPointCrlSource()
with(crlSource.fetch(crlServer.intermediateCa.certificate)) {
assertThat(size).isEqualTo(1)
assertThat(single().revokedCertificates).isNull()
}
val nodeCaCert = crlServer.replaceNodeCertDistPoint(createDevNodeCa(crlServer.intermediateCa, ALICE_NAME).certificate)
crlServer.revokedNodeCerts += listOf(BigInteger.ONE, BigInteger.TEN)
with(crlSource.fetch(nodeCaCert)) { // Use a different cert to avoid the cache
assertThat(size).isEqualTo(1)
val revokedCertificates = single().revokedCertificates
assertThat(revokedCertificates.map { it.serialNumber }).containsExactlyInAnyOrder(BigInteger.ONE, BigInteger.TEN)
}
}
}

View File

@ -1,26 +1,27 @@
package net.corda.nodeapi.internal.protonwrapper.netty.revocation package net.corda.nodeapi.internal.revocation
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
import net.corda.nodeapi.internal.DEV_CA_KEY_STORE_PASS import net.corda.nodeapi.internal.DEV_CA_KEY_STORE_PASS
import net.corda.nodeapi.internal.DEV_CA_PRIVATE_KEY_PASS import net.corda.nodeapi.internal.DEV_CA_PRIVATE_KEY_PASS
import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.protonwrapper.netty.ExternalCrlSource import net.corda.nodeapi.internal.protonwrapper.netty.CrlSource
import org.bouncycastle.jcajce.provider.asymmetric.x509.CertificateFactory import org.bouncycastle.jcajce.provider.asymmetric.x509.CertificateFactory
import org.junit.Test import org.junit.Test
import java.math.BigInteger import java.math.BigInteger
import java.security.cert.X509CRL import java.security.cert.X509CRL
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.sql.Date import java.time.LocalDate
import java.time.ZoneOffset
import java.util.*
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertTrue import kotlin.test.assertTrue
class ExternalSourceRevocationCheckerTest { class CordaRevocationCheckerTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun checkRevoked() { fun checkRevoked() {
val checkResult = performCheckOnDate(Date.valueOf("2019-09-27")) val checkResult = performCheckOnDate(LocalDate.of(2019, 9, 27))
val failedChecks = checkResult.filterNot { it.second.isSuccess } val failedChecks = checkResult.filterNot { it.second.isSuccess }
assertEquals(1, failedChecks.size) assertEquals(1, failedChecks.size)
assertEquals(BigInteger.valueOf(8310484079152632582), failedChecks.first().first.serialNumber) assertEquals(BigInteger.valueOf(8310484079152632582), failedChecks.first().first.serialNumber)
@ -28,11 +29,11 @@ class ExternalSourceRevocationCheckerTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun checkTooEarly() { fun checkTooEarly() {
val checkResult = performCheckOnDate(Date.valueOf("2019-08-27")) val checkResult = performCheckOnDate(LocalDate.of(2019, 8, 27))
assertTrue(checkResult.all { it.second.isSuccess }) assertTrue(checkResult.all { it.second.isSuccess })
} }
private fun performCheckOnDate(date: Date): List<Pair<X509Certificate, Try<Unit>>> { private fun performCheckOnDate(date: LocalDate): List<Pair<X509Certificate, Try<Unit>>> {
val certStore = CertificateStore.fromResource( val certStore = CertificateStore.fromResource(
"net/corda/nodeapi/internal/protonwrapper/netty/sslkeystore_Revoked.jks", "net/corda/nodeapi/internal/protonwrapper/netty/sslkeystore_Revoked.jks",
DEV_CA_KEY_STORE_PASS, DEV_CA_PRIVATE_KEY_PASS) DEV_CA_KEY_STORE_PASS, DEV_CA_PRIVATE_KEY_PASS)
@ -40,16 +41,17 @@ class ExternalSourceRevocationCheckerTest {
val resourceAsStream = javaClass.getResourceAsStream("/net/corda/nodeapi/internal/protonwrapper/netty/doorman.crl") val resourceAsStream = javaClass.getResourceAsStream("/net/corda/nodeapi/internal/protonwrapper/netty/doorman.crl")
val crl = CertificateFactory().engineGenerateCRL(resourceAsStream) as X509CRL val crl = CertificateFactory().engineGenerateCRL(resourceAsStream) as X509CRL
//val crlHolder = X509CRLHolder(resourceAsStream) val crlSource = object : CrlSource {
//crlHolder.revokedCertificates as X509CRLEntryHolder
val instance = ExternalSourceRevocationChecker(object : ExternalCrlSource {
override fun fetch(certificate: X509Certificate): Set<X509CRL> = setOf(crl) override fun fetch(certificate: X509Certificate): Set<X509CRL> = setOf(crl)
}) { date } }
val checker = CordaRevocationChecker(crlSource,
softFail = true,
dateSource = { Date.from(date.atStartOfDay().toInstant(ZoneOffset.UTC)) }
)
return certStore.query { return certStore.query {
getCertificateChain(X509Utilities.CORDA_CLIENT_TLS).map { getCertificateChain(X509Utilities.CORDA_CLIENT_TLS).map {
Pair(it, Try.on { instance.check(it, mutableListOf()) }) Pair(it, Try.on { checker.check(it, mutableListOf()) })
} }
} }
} }

View File

@ -270,6 +270,8 @@ tasks.register('integrationTest', Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath classpath = sourceSets.integrationTest.runtimeClasspath
maxParallelForks = (System.env.CORDA_NODE_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_INT_TESTING_FORKS".toInteger() maxParallelForks = (System.env.CORDA_NODE_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_INT_TESTING_FORKS".toInteger()
// CertificateRevocationListNodeTests
systemProperty 'net.corda.dpcrl.connect.timeout', '4000'
} }
tasks.register('slowIntegrationTest', Test) { tasks.register('slowIntegrationTest', Test) {

View File

@ -28,6 +28,7 @@ import org.junit.Test
import org.junit.rules.TemporaryFolder import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith import org.junit.runner.RunWith
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import java.time.Duration
import javax.net.ssl.KeyManagerFactory import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.TrustManagerFactory import javax.net.ssl.TrustManagerFactory
import kotlin.test.assertEquals import kotlin.test.assertEquals
@ -125,7 +126,7 @@ class AMQPClientSslErrorsTest(@Suppress("unused") private val iteration: Int) {
override val keyStore = keyStore override val keyStore = keyStore
override val trustStore = clientConfig.p2pSslOptions.trustStore.get() override val trustStore = clientConfig.p2pSslOptions.trustStore.get()
override val maxMessageSize: Int = MAX_MESSAGE_SIZE override val maxMessageSize: Int = MAX_MESSAGE_SIZE
override val sslHandshakeTimeout: Long = 3000 override val sslHandshakeTimeout: Duration = 3.seconds
} }
clientKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) clientKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())

View File

@ -7,6 +7,8 @@ import net.corda.core.crypto.generateKeyPair
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.coretesting.internal.rigorousMock
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.node.services.config.FlowTimeoutConfiguration import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.config.configureWithDevSSLCertificate
@ -22,8 +24,6 @@ import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.internal.LogHelper import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.configureDatabase
import net.corda.coretesting.internal.rigorousMock
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.internal.MOCK_VERSION_INFO import net.corda.testing.node.internal.MOCK_VERSION_INFO
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
@ -57,7 +57,6 @@ class ArtemisMessagingTest {
@JvmField @JvmField
val temporaryFolder = TemporaryFolder() val temporaryFolder = TemporaryFolder()
// THe
private val portAllocation = incrementalPortAllocation() private val portAllocation = incrementalPortAllocation()
private val serverPort = portAllocation.nextPort() private val serverPort = portAllocation.nextPort()
private val identity = generateKeyPair() private val identity = generateKeyPair()
@ -201,7 +200,9 @@ class ArtemisMessagingTest {
messagingClient!!.start(identity.public, null, maxMessageSize) messagingClient!!.start(identity.public, null, maxMessageSize)
} }
private fun createAndStartClientAndServer(platformVersion: Int = 1, serverMaxMessageSize: Int = MAX_MESSAGE_SIZE, clientMaxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<P2PMessagingClient, BlockingQueue<ReceivedMessage>> { private fun createAndStartClientAndServer(platformVersion: Int = 1,
serverMaxMessageSize: Int = MAX_MESSAGE_SIZE,
clientMaxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<P2PMessagingClient, BlockingQueue<ReceivedMessage>> {
val receivedMessages = LinkedBlockingQueue<ReceivedMessage>() val receivedMessages = LinkedBlockingQueue<ReceivedMessage>()
createMessagingServer(maxMessageSize = serverMaxMessageSize).start() createMessagingServer(maxMessageSize = serverMaxMessageSize).start()
@ -242,7 +243,7 @@ class ArtemisMessagingTest {
} }
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer { private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null).apply { return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null, true).apply {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
messagingServer = this messagingServer = this
} }

View File

@ -5,15 +5,14 @@ import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.crypto.X509CertificateFactory import net.corda.nodeapi.internal.crypto.X509CertificateFactory
import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfigImpl
import net.corda.nodeapi.internal.protonwrapper.netty.certPathToString import net.corda.nodeapi.internal.protonwrapper.netty.certPathToString
import java.security.KeyStore import java.security.KeyStore
import java.security.cert.CertPathValidator import java.security.cert.CertPathValidator
import java.security.cert.CertPathValidatorException import java.security.cert.CertPathValidatorException
import java.security.cert.CertificateException import java.security.cert.CertificateException
import java.security.cert.PKIXBuilderParameters import java.security.cert.PKIXBuilderParameters
import java.security.cert.PKIXRevocationChecker
import java.security.cert.X509CertSelector import java.security.cert.X509CertSelector
import java.util.EnumSet
sealed class CertificateChainCheckPolicy { sealed class CertificateChainCheckPolicy {
companion object { companion object {
@ -94,13 +93,12 @@ sealed class CertificateChainCheckPolicy {
} }
} }
class RevocationCheck(val revocationMode: RevocationConfig.Mode) : CertificateChainCheckPolicy() { class RevocationCheck(val revocationConfig: RevocationConfig) : CertificateChainCheckPolicy() {
constructor(revocationMode: RevocationConfig.Mode) : this(RevocationConfigImpl(revocationMode))
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
return object : Check { return object : Check {
override fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>) { override fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>) {
if (revocationMode == RevocationConfig.Mode.OFF) {
return
}
// Convert javax.security.cert.X509Certificate to java.security.cert.X509Certificate. // Convert javax.security.cert.X509Certificate to java.security.cert.X509Certificate.
val chain = theirChain.map { X509CertificateFactory().generateCertificate(it.encoded.inputStream()) } val chain = theirChain.map { X509CertificateFactory().generateCertificate(it.encoded.inputStream()) }
log.info("Check Client Certpath:\r\n${certPathToString(chain.toTypedArray())}") log.info("Check Client Certpath:\r\n${certPathToString(chain.toTypedArray())}")
@ -110,17 +108,7 @@ sealed class CertificateChainCheckPolicy {
// See PKIXValidator.engineValidate() for reference implementation. // See PKIXValidator.engineValidate() for reference implementation.
val certPath = X509Utilities.buildCertPath(chain.dropLast(1)) val certPath = X509Utilities.buildCertPath(chain.dropLast(1))
val certPathValidator = CertPathValidator.getInstance("PKIX") val certPathValidator = CertPathValidator.getInstance("PKIX")
val pkixRevocationChecker = certPathValidator.revocationChecker as PKIXRevocationChecker val pkixRevocationChecker = revocationConfig.createPKIXRevocationChecker()
pkixRevocationChecker.options = EnumSet.of(
// Prefer CRL over OCSP
PKIXRevocationChecker.Option.PREFER_CRLS,
// Don't fall back to OCSP checking
PKIXRevocationChecker.Option.NO_FALLBACK)
if (revocationMode == RevocationConfig.Mode.SOFT_FAIL) {
// Allow revocation check to succeed if the revocation status cannot be determined for one of
// the following reasons: The CRL or OCSP response cannot be obtained because of a network error.
pkixRevocationChecker.options = pkixRevocationChecker.options + PKIXRevocationChecker.Option.SOFT_FAIL
}
val params = PKIXBuilderParameters(trustStore, X509CertSelector()) val params = PKIXBuilderParameters(trustStore, X509CertSelector())
params.addCertPathChecker(pkixRevocationChecker) params.addCertPathChecker(pkixRevocationChecker)
try { try {

View File

@ -56,7 +56,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
class ArtemisMessagingServer(private val config: NodeConfiguration, class ArtemisMessagingServer(private val config: NodeConfiguration,
private val messagingServerAddress: NetworkHostAndPort, private val messagingServerAddress: NetworkHostAndPort,
private val maxMessageSize: Int, private val maxMessageSize: Int,
private val journalBufferTimeout : Int?) : ArtemisBroker, SingletonSerializeAsToken() { private val journalBufferTimeout : Int?,
private val trace: Boolean = false) : ArtemisBroker, SingletonSerializeAsToken() {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
} }
@ -130,7 +131,11 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
// The transaction cache is configurable, and drives other cache sizes. // The transaction cache is configurable, and drives other cache sizes.
globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize) globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize)
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config.p2pSslOptions)) acceptorConfigurations.add(p2pAcceptorTcpTransport(
NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port),
config.p2pSslOptions,
trace = trace
))
// Enable built in message deduplication. Note we still have to do our own as the delayed commits // Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates. // and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess

View File

@ -0,0 +1,68 @@
package net.corda.node.services.messaging
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.group.ChannelGroup
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.handler.ssl.SslHandler
import net.corda.core.internal.declaredField
import net.corda.nodeapi.internal.ArtemisTcpTransport
import org.apache.activemq.artemis.api.core.BaseInterceptor
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager
import org.apache.activemq.artemis.spi.core.remoting.Acceptor
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener
import org.apache.activemq.artemis.utils.actors.OrderedExecutor
import java.time.Duration
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
@Suppress("unused") // Used via reflection in ArtemisTcpTransport
class NodeNettyAcceptorFactory : AcceptorFactory {
override fun createAcceptor(name: String?,
clusterConnection: ClusterConnection?,
configuration: Map<String, Any>,
handler: BufferHandler?,
listener: ServerConnectionLifeCycleListener?,
threadPool: Executor,
scheduledThreadPool: ScheduledExecutorService?,
protocolMap: MutableMap<String, ProtocolManager<BaseInterceptor<*>, RedirectHandler<*>>>?): Acceptor {
val failureExecutor = OrderedExecutor(threadPool)
return NodeNettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
}
private class NodeNettyAcceptor(name: String?,
clusterConnection: ClusterConnection?,
configuration: Map<String, Any>,
handler: BufferHandler?,
listener: ServerConnectionLifeCycleListener?,
scheduledThreadPool: ScheduledExecutorService?,
failureExecutor: Executor,
protocolMap: MutableMap<String, ProtocolManager<BaseInterceptor<*>, RedirectHandler<*>>>?) :
NettyAcceptor(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap)
{
override fun start() {
super.start()
if (configuration[ArtemisTcpTransport.TRACE_NAME] == true) {
// Artemis does not seem to allow access to the underlying channel so we resort to reflection and get it via the
// serverChannelGroup field. This field is only available after start(), hence why we add the logger here.
declaredField<ChannelGroup>("serverChannelGroup").value.forEach { channel ->
channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO))
}
}
}
override fun getSslHandler(alloc: ByteBufAllocator?, peerHost: String?, peerPort: Int): SslHandler {
val sslHandler = super.getSslHandler(alloc, peerHost, peerPort)
val handshakeTimeout = configuration[ArtemisTcpTransport.SSL_HANDSHAKE_TIMEOUT_NAME] as Duration?
if (handshakeTimeout != null) {
sslHandler.handshakeTimeoutMillis = handshakeTimeout.toMillis()
}
return sslHandler
}
}
}

View File

@ -8,6 +8,7 @@ import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.SslConfiguration import net.corda.nodeapi.internal.config.SslConfiguration
import java.nio.file.Path import java.nio.file.Path
import java.time.Duration
class CertificateStoreStubs { class CertificateStoreStubs {
@ -49,11 +50,11 @@ class CertificateStoreStubs {
keyStorePassword: String = KeyStore.DEFAULT_STORE_PASSWORD, keyPassword: String = keyStorePassword, keyStorePassword: String = KeyStore.DEFAULT_STORE_PASSWORD, keyPassword: String = keyStorePassword,
trustStoreFileName: String = TrustStore.DEFAULT_STORE_FILE_NAME, trustStoreFileName: String = TrustStore.DEFAULT_STORE_FILE_NAME,
trustStorePassword: String = TrustStore.DEFAULT_STORE_PASSWORD, trustStorePassword: String = TrustStore.DEFAULT_STORE_PASSWORD,
trustStoreKeyPassword: String = TrustStore.DEFAULT_KEY_PASSWORD): MutualSslConfiguration { trustStoreKeyPassword: String = TrustStore.DEFAULT_KEY_PASSWORD,
sslHandshakeTimeout: Duration? = null): MutualSslConfiguration {
val keyStore = FileBasedCertificateStoreSupplier(certificatesDirectory / keyStoreFileName, keyStorePassword, keyPassword) val keyStore = FileBasedCertificateStoreSupplier(certificatesDirectory / keyStoreFileName, keyStorePassword, keyPassword)
val trustStore = FileBasedCertificateStoreSupplier(certificatesDirectory / trustStoreFileName, trustStorePassword, trustStoreKeyPassword) val trustStore = FileBasedCertificateStoreSupplier(certificatesDirectory / trustStoreFileName, trustStorePassword, trustStoreKeyPassword)
return SslConfiguration.mutual(keyStore, trustStore) return SslConfiguration.mutual(keyStore, trustStore, sslHandshakeTimeout)
} }
@JvmStatic @JvmStatic

View File

@ -0,0 +1,203 @@
@file:Suppress("MagicNumber")
package net.corda.testing.node.internal.network
import net.corda.core.crypto.Crypto
import net.corda.core.internal.CertRole
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.days
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.coretesting.internal.DEV_INTERMEDIATE_CA
import net.corda.coretesting.internal.DEV_ROOT_CA
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.crypto.ContentSignerBuilder
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.certificateType
import net.corda.nodeapi.internal.crypto.toJca
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x509.CRLDistPoint
import org.bouncycastle.asn1.x509.DistributionPoint
import org.bouncycastle.asn1.x509.DistributionPointName
import org.bouncycastle.asn1.x509.Extension
import org.bouncycastle.asn1.x509.GeneralName
import org.bouncycastle.asn1.x509.GeneralNames
import org.bouncycastle.asn1.x509.IssuingDistributionPoint
import org.bouncycastle.asn1.x509.ReasonFlags
import org.bouncycastle.cert.jcajce.JcaX509CRLConverter
import org.bouncycastle.cert.jcajce.JcaX509ExtensionUtils
import org.bouncycastle.cert.jcajce.JcaX509v2CRLBuilder
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.ServerConnector
import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.servlet.ServletContainer
import java.io.Closeable
import java.math.BigInteger
import java.net.InetSocketAddress
import java.security.KeyPair
import java.security.cert.X509CRL
import java.security.cert.X509Certificate
import java.util.*
import javax.security.auth.x500.X500Principal
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.core.Response
import kotlin.collections.ArrayList
class CrlServer(hostAndPort: NetworkHostAndPort) : Closeable {
companion object {
private const val SIGNATURE_ALGORITHM = "SHA256withECDSA"
const val NODE_CRL = "node.crl"
const val FORBIDDEN_CRL = "forbidden.crl"
const val INTERMEDIATE_CRL = "intermediate.crl"
const val EMPTY_CRL = "empty.crl"
fun X509Certificate.withCrlDistPoint(issuerKeyPair: KeyPair, crlDistPoint: String?, crlIssuer: X500Principal? = null): X509Certificate {
val signatureScheme = Crypto.findSignatureScheme(issuerKeyPair.private)
val provider = Crypto.findProvider(signatureScheme.providerName)
val issuerSigner = ContentSignerBuilder.build(signatureScheme, issuerKeyPair.private, provider)
val builder = X509Utilities.createPartialCertificate(
CertRole.extract(this)!!.certificateType,
issuerX500Principal,
issuerKeyPair.public,
subjectX500Principal,
publicKey,
Pair(Date(System.currentTimeMillis() - 5.minutes.toMillis()), Date(System.currentTimeMillis() + 10.days.toMillis())),
null
)
if (crlDistPoint != null) {
val distPointName = DistributionPointName(GeneralNames(GeneralName(GeneralName.uniformResourceIdentifier, crlDistPoint)))
val crlIssuerGeneralNames = crlIssuer?.let { GeneralNames(GeneralName(X500Name.getInstance(it.encoded))) }
val distPoint = DistributionPoint(distPointName, null, crlIssuerGeneralNames)
builder.addExtension(Extension.cRLDistributionPoints, false, CRLDistPoint(arrayOf(distPoint)))
}
return builder.build(issuerSigner).toJca()
}
}
private val server: Server = Server(InetSocketAddress(hostAndPort.host, hostAndPort.port)).apply {
handler = HandlerCollection().apply {
addHandler(buildServletContextHandler())
}
}
val revokedNodeCerts: MutableList<BigInteger> = ArrayList()
val revokedIntermediateCerts: MutableList<BigInteger> = ArrayList()
val rootCa: CertificateAndKeyPair = DEV_ROOT_CA
private lateinit var _intermediateCa: CertificateAndKeyPair
val intermediateCa: CertificateAndKeyPair get() = _intermediateCa
val hostAndPort: NetworkHostAndPort
get() = server.connectors.mapNotNull { it as? ServerConnector }
.map { NetworkHostAndPort(it.host, it.localPort) }
.first()
fun start() {
server.start()
_intermediateCa = CertificateAndKeyPair(
DEV_INTERMEDIATE_CA.certificate.withCrlDistPoint(rootCa.keyPair, "http://$hostAndPort/crl/$INTERMEDIATE_CRL"),
DEV_INTERMEDIATE_CA.keyPair
)
println("Network management web services started on $hostAndPort")
}
fun replaceNodeCertDistPoint(nodeCaCert: X509Certificate,
nodeCaCrlDistPoint: String? = "http://$hostAndPort/crl/$NODE_CRL",
crlIssuer: X500Principal? = null): X509Certificate {
return nodeCaCert.withCrlDistPoint(intermediateCa.keyPair, nodeCaCrlDistPoint, crlIssuer)
}
fun createRevocationList(signatureAlgorithm: String,
ca: CertificateAndKeyPair,
endpoint: String,
indirect: Boolean,
serialNumbers: List<BigInteger>): X509CRL {
println("Generating CRL for $endpoint")
val builder = JcaX509v2CRLBuilder(ca.certificate.subjectX500Principal, Date(System.currentTimeMillis() - 1.minutes.toMillis()))
val extensionUtils = JcaX509ExtensionUtils()
builder.addExtension(Extension.authorityKeyIdentifier, false, extensionUtils.createAuthorityKeyIdentifier(ca.certificate))
val issuingDistPointName = GeneralName(GeneralName.uniformResourceIdentifier, "http://$hostAndPort/crl/$endpoint")
// This is required and needs to match the certificate settings with respect to being indirect
val issuingDistPoint = IssuingDistributionPoint(DistributionPointName(GeneralNames(issuingDistPointName)), indirect, false)
builder.addExtension(Extension.issuingDistributionPoint, true, issuingDistPoint)
builder.setNextUpdate(Date(System.currentTimeMillis() + 1.seconds.toMillis()))
serialNumbers.forEach {
builder.addCRLEntry(it, Date(System.currentTimeMillis() - 10.minutes.toMillis()), ReasonFlags.certificateHold)
}
val signer = JcaContentSignerBuilder(signatureAlgorithm).setProvider(Crypto.findProvider("BC")).build(ca.keyPair.private)
return JcaX509CRLConverter().setProvider(Crypto.findProvider("BC")).getCRL(builder.build(signer))
}
override fun close() {
println("Shutting down network management web services...")
server.stop()
server.join()
}
private fun buildServletContextHandler(): ServletContextHandler {
return ServletContextHandler().apply {
contextPath = "/"
val resourceConfig = ResourceConfig().apply {
register(CrlServlet(this@CrlServer))
}
val jerseyServlet = ServletHolder(ServletContainer(resourceConfig)).apply { initOrder = 0 }
addServlet(jerseyServlet, "/*")
}
}
@Path("crl")
class CrlServlet(private val crlServer: CrlServer) {
@GET
@Path(NODE_CRL)
@Produces("application/pkcs7-crl")
fun getNodeCRL(): Response {
return Response.ok(crlServer.createRevocationList(
SIGNATURE_ALGORITHM,
crlServer.intermediateCa,
NODE_CRL,
false,
crlServer.revokedNodeCerts
).encoded).build()
}
@GET
@Path(FORBIDDEN_CRL)
@Produces("application/pkcs7-crl")
fun getNodeSlowCRL(): Response {
return Response.status(Response.Status.FORBIDDEN).build()
}
@GET
@Path(INTERMEDIATE_CRL)
@Produces("application/pkcs7-crl")
fun getIntermediateCRL(): Response {
return Response.ok(crlServer.createRevocationList(
SIGNATURE_ALGORITHM,
crlServer.rootCa,
INTERMEDIATE_CRL,
false,
crlServer.revokedIntermediateCerts
).encoded).build()
}
@GET
@Path(EMPTY_CRL)
@Produces("application/pkcs7-crl")
fun getEmptyCRL(): Response {
return Response.ok(crlServer.createRevocationList(
SIGNATURE_ALGORITHM,
crlServer.rootCa,
EMPTY_CRL,
true, emptyList()
).encoded).build()
}
}
}