diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeRestartTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeRestartTest.kt index ed60857ac9..e58530cd53 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeRestartTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeRestartTest.kt @@ -22,15 +22,22 @@ import net.corda.testing.node.internal.cordappsForPackages import net.corda.testing.node.internal.internalDriver import org.junit.ClassRule import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized import java.util.* import java.util.concurrent.ConcurrentHashMap import kotlin.concurrent.thread import kotlin.test.assertEquals -class BridgeRestartTest : IntegrationTest() { +@RunWith(Parameterized::class) +class BridgeRestartTest(private val enableSNI: Boolean) : IntegrationTest() { companion object { val pingStarted = ConcurrentHashMap>() + @JvmStatic + @Parameterized.Parameters(name = "enableSNI = {0}") + fun data() = listOf(false, true) + @ClassRule @JvmField val databaseSchemas = IntegrationTestSchemas(DUMMY_BANK_A_NAME.toDatabaseSchemaName(), DUMMY_BANK_B_NAME.toDatabaseSchemaName(), DUMMY_NOTARY_NAME.toDatabaseSchemaName()) @@ -44,7 +51,7 @@ class BridgeRestartTest : IntegrationTest() { val pongSession = initiateFlow(pongParty) pongSession.sendAndReceive(times) pingStarted.getOrPut(runId) { openFuture() }.set(Unit) - for (i in 1 .. times) { + for (i in 1..times) { logger.info("PING $i") val j = pongSession.sendAndReceive(i).unwrap { it } assertEquals(i, j) @@ -57,7 +64,7 @@ class BridgeRestartTest : IntegrationTest() { @Suspendable override fun call() { val times = pingSession.sendAndReceive(Unit).unwrap { it } - for (i in 1 .. times) { + for (i in 1..times) { logger.info("PONG $i") val j = pingSession.sendAndReceive(i).unwrap { it } assertEquals(i, j) @@ -68,7 +75,7 @@ class BridgeRestartTest : IntegrationTest() { @Test fun restartLongPingPongFlowRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) - internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge")) { + internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge"), enableSNI = enableSNI) { val bFuture = startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) val bridgePort = 20005 val brokerPort = 21005 @@ -121,7 +128,7 @@ class BridgeRestartTest : IntegrationTest() { @Test fun restartSeveralPingPongFlowsRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) - internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge")) { + internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge"), enableSNI = enableSNI) { val bFuture = startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) val bridgePort = 20005 val brokerPort = 21005 @@ -154,7 +161,7 @@ class BridgeRestartTest : IntegrationTest() { // We kill -9 and restart the bridge after a random sleep CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { connection -> - val handles = (1 .. 10).map { + val handles = (1..10).map { connection.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100) } diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt index 2ddd21e436..5300b19d0c 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt @@ -116,7 +116,6 @@ class SNIBridgeTest : IntegrationTest() { // Start broker val broker = createArtemisTextCertsLogin(artemisPort, nodeConfigs[DUMMY_BANK_B_NAME]!!.p2pSslOptions) broker.start() - println(broker.isActive) val aFuture = startNode( providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), @@ -125,14 +124,11 @@ class SNIBridgeTest : IntegrationTest() { "p2pAddress" to "localhost:$advertisedP2PPort", "messagingServerAddress" to "0.0.0.0:$artemisPort", "messagingServerExternal" to true, - "enterpriseConfiguration" to mapOf( - "externalBridge" to true - ) + "enterpriseConfiguration" to mapOf("externalBridge" to true) ) ) val a = aFuture.getOrThrow() - println(a.nodeInfo) val bFuture = startNode( providedName = DUMMY_BANK_B_NAME, @@ -142,25 +138,14 @@ class SNIBridgeTest : IntegrationTest() { "p2pAddress" to "localhost:$advertisedP2PPort", "messagingServerAddress" to "0.0.0.0:$artemisPort", "messagingServerExternal" to true, - "enterpriseConfiguration" to mapOf( - "externalBridge" to true - ) + "enterpriseConfiguration" to mapOf("externalBridge" to true) ) ) val b = bFuture.getOrThrow() - println(b.nodeInfo) - - val bridge = startBridge(ALICE_NAME, advertisedP2PPort, artemisPort, mapOf( - "outboundConfig" to mapOf( - "artemisBrokerAddress" to "localhost:$artemisPort" - ), - "inboundConfig" to mapOf( - "listeningAddress" to "0.0.0.0:$advertisedP2PPort" - ) + val bridge = startBridge(ALICE_NAME, advertisedP2PPort, artemisPort, emptyMap( )).getOrThrow() - println(bridge.brokerPort) // Start a node on the other side of the bridge val c = startNode(providedName = DUMMY_BANK_C_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:${portAllocation.nextPort()}")).getOrThrow() diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt index 3ed2ead932..deef7c2661 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt @@ -83,6 +83,7 @@ class AMQPListenerTest { override val trustStore = clientTrustStore override val maxMessageSize: Int = maxMessageSize override val trace: Boolean = true + override val enableSNI: Boolean = clientConfig.bridgeInnerConfig?.enableSNI ?: true } // create and connect a real client val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)), diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallConfiguration.kt b/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallConfiguration.kt index 322581d514..3454ec5288 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallConfiguration.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallConfiguration.kt @@ -66,6 +66,7 @@ interface BridgeInnerConfiguration { val customSSLConfiguration: BridgeSSLConfiguration? // The SSL keystores to provision into the Float in DMZ val customFloatOuterSSLConfiguration: BridgeSSLConfiguration? + val enableSNI: Boolean } interface BridgeHAConfig { diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/config/FirewallConfigurationImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/config/FirewallConfigurationImpl.kt index 1a1d82fff1..9c5a164fa4 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/config/FirewallConfigurationImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/config/FirewallConfigurationImpl.kt @@ -7,8 +7,8 @@ import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier -import net.corda.nodeapi.internal.config.SslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration +import net.corda.nodeapi.internal.config.SslConfiguration import net.corda.nodeapi.internal.config.parseAs import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig import java.nio.file.Path @@ -39,7 +39,8 @@ data class BridgeInboundConfigurationImpl(override val listeningAddress: Network data class BridgeInnerConfigurationImpl(override val floatAddresses: List, override val expectedCertificateSubject: CordaX500Name, override val customSSLConfiguration: BridgeSSLConfigurationImpl?, - override val customFloatOuterSSLConfiguration: BridgeSSLConfigurationImpl?) : BridgeInnerConfiguration + override val customFloatOuterSSLConfiguration: BridgeSSLConfigurationImpl?, + override val enableSNI: Boolean = true) : BridgeInnerConfiguration data class FloatOuterConfigurationImpl(override val floatAddress: NetworkHostAndPort, override val expectedCertificateSubject: CordaX500Name, @@ -71,15 +72,12 @@ data class FirewallConfigurationImpl( override val p2pConfirmationWindowSize: Int = 1048576, override val whitelistedHeaders: List = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList(), override val auditServiceConfiguration: AuditServiceConfigurationImpl, - override val healthCheckPhrase: String? = null -) : FirewallConfiguration { + override val healthCheckPhrase: String? = null) : FirewallConfiguration { init { - if (firewallMode == FirewallMode.SenderReceiver) { - require(inboundConfig != null && outboundConfig != null) { "Missing required configuration" } - } else if (firewallMode == FirewallMode.BridgeInner) { - require(bridgeInnerConfig != null && outboundConfig != null) { "Missing required configuration" } - } else if (firewallMode == FirewallMode.FloatOuter) { - require(inboundConfig != null && floatOuterConfig != null) { "Missing required configuration" } + when (firewallMode) { + FirewallMode.SenderReceiver -> require(inboundConfig != null && outboundConfig != null) { "Missing required configuration" } + FirewallMode.BridgeInner -> require(bridgeInnerConfig != null && outboundConfig != null) { "Missing required configuration" } + FirewallMode.FloatOuter -> require(inboundConfig != null && floatOuterConfig != null) { "Missing required configuration" } } } diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt index ea7459d8cf..6a1c1b8cad 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt @@ -16,6 +16,7 @@ import rx.Observable import rx.Subscription import rx.subjects.PublishSubject import java.io.ByteArrayInputStream +import java.lang.String.valueOf import java.security.KeyStore import java.util.* @@ -28,7 +29,7 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration, private val consoleLogger = LoggerFactory.getLogger("BasicInfo") } - private val statusFollower: ServiceStateCombiner + private val statusFollower: ServiceStateCombiner = ServiceStateCombiner(listOf(auditService)) private var statusSubscriber: Subscription? = null private var amqpServer: AMQPServer? = null private var keyStorePrivateKeyPassword: CharArray? = null @@ -36,10 +37,6 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration, private var onConnectAuditSubscription: Subscription? = null private var onReceiveSubscription: Subscription? = null - init { - statusFollower = ServiceStateCombiner(listOf(auditService)) - } - override fun provisionKeysAndActivate(keyStoreBytes: ByteArray, keyStorePassword: CharArray, keyStorePrivateKeyPassword: CharArray, @@ -59,6 +56,7 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration, override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail override val maxMessageSize: Int = maximumMessageSize override val trace: Boolean = conf.enableAMQPPacketTrace + override val enableSNI: Boolean = conf.bridgeInnerConfig?.enableSNI ?: true override val healthCheckPhrase = conf.healthCheckPhrase } val server = AMQPServer(bindAddress.host, @@ -93,7 +91,7 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration, ByteArrayInputStream(keyStoreBytes).use { keyStore.load(it, keyStorePassword) } - return X509KeyStore(keyStore, java.lang.String.valueOf(keyStorePassword)) + return X509KeyStore(keyStore, valueOf(keyStorePassword)) } override fun wipeKeysAndDeactivate() { diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt index f80a201fb5..8b32f07ea7 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt @@ -38,7 +38,8 @@ class FloatControlListenerService(val conf: FirewallConfiguration, private var connectSubscriber: Subscription? = null private var receiveSubscriber: Subscription? = null private var amqpControlServer: AMQPServer? = null - private val sslConfiguration: MutualSslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: conf.p2pSslOptions + private val sslConfiguration: MutualSslConfiguration = conf.floatOuterConfig?.customSSLConfiguration + ?: conf.p2pSslOptions private val floatControlAddress = conf.floatOuterConfig!!.floatAddress private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject private var activeConnectionInfo: ConnectionChange? = null diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt index a081a1ac3e..8f7bc90a93 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt @@ -59,6 +59,7 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration, override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail override val maxMessageSize: Int = maximumMessageSize override val trace: Boolean = conf.enableAMQPPacketTrace + override val enableSNI: Boolean = conf.bridgeInnerConfig!!.enableSNI override val healthCheckPhrase = conf.healthCheckPhrase } val controlClient = AMQPClient(floatAddresses, diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt index 1826d5870a..8c0c545565 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt @@ -28,7 +28,11 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration, private val statusFollower: ServiceStateCombiner = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService)) private var statusSubscriber: Subscription? = null private var listenerActiveSubscriber: Subscription? = null - private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf.p2pSslOptions, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) }, + private var bridgeControlListener = BridgeControlListener(conf.p2pSslOptions, + conf.outboundConfig!!.socksProxyConfig, + maxMessageSize, + conf.bridgeInnerConfig?.enableSNI ?: true, + { ForwardingArtemisMessageClient(artemisConnectionService) }, BridgeAuditServiceAdaptor(auditService)) private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider { diff --git a/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerStartStopTest.kt b/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerStartStopTest.kt index 8c722a7cc1..5b2cf35e8f 100644 --- a/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerStartStopTest.kt +++ b/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerStartStopTest.kt @@ -208,7 +208,7 @@ class FlowWorkerStartStopTest { } private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener { - val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize) + val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize, enableSNI = true) bridgeControlListener.start() return bridgeControlListener } diff --git a/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt b/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt index 29a1cdc7ed..7e19892cb0 100644 --- a/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt +++ b/experimental/flow-worker/src/integration-test/kotlin/net/corda/flowworker/FlowWorkerTest.kt @@ -247,7 +247,7 @@ class FlowWorkerTest { } private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener { - val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize) + val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize, enableSNI = true) bridgeControlListener.start() return bridgeControlListener } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 520962fcea..c65238cdd1 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -35,9 +35,12 @@ import kotlin.concurrent.withLock * The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager. */ @VisibleForTesting -class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig? = null, maxMessageSize: Int, - private val artemisMessageClientFactory: () -> ArtemisSessionProvider, - private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager { +open class AMQPBridgeManager(config: MutualSslConfiguration, + socksProxyConfig: SocksProxyConfig? = null, + maxMessageSize: Int, + enableSNI: Boolean, + private val artemisMessageClientFactory: () -> ArtemisSessionProvider, + private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager { private val lock = ReentrantLock() private val queueNamesToBridgesMap = mutableMapOf>() @@ -47,19 +50,21 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP override val socksProxyConfig: SocksProxyConfig?, override val maxMessageSize: Int, override val useOpenSsl: Boolean, + override val enableSNI: Boolean, override val sourceX500Name: String? = null) : AMQPConfiguration { - constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.keyStore.get(), + constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int, enableSNI: Boolean) : this(config.keyStore.get(), config.trustStore.get(), socksProxyConfig, maxMessageSize, - config.useOpenSsl) + config.useOpenSsl, + enableSNI) } - private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, socksProxyConfig, maxMessageSize) + private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, socksProxyConfig, maxMessageSize, enableSNI) private var sharedEventLoopGroup: EventLoopGroup? = null private var artemis: ArtemisSessionProvider? = null - constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }) + constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, enableSNI: Boolean, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, maxMessageSize, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }) companion object { private const val NUM_BRIDGE_THREADS = 0 // Default sized pool @@ -154,7 +159,11 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) this.session = session // Several producers (in the case of shared bridge) can put messages in the same outbound p2p queue. The consumers are created using the source x500 name as a filter - val consumer = session.createConsumer(queueName, "hyphenated_props:sender-subject-name = '${amqpConfig.sourceX500Name}'") + val consumer = if (amqpConfig.enableSNI) { + session.createConsumer(queueName, "hyphenated_props:sender-subject-name = '${amqpConfig.sourceX500Name}'") + } else { + session.createConsumer(queueName) + } this.consumer = consumer consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler) session.start() @@ -230,7 +239,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP return } } - val newAMQPConfig = AMQPConfigurationImpl(amqpConfig.keyStore, amqpConfig.trustStore, amqpConfig.socksProxyConfig, amqpConfig.maxMessageSize, amqpConfig.useOpenSsl, sourceX500Name) + val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, socksProxyConfig, maxMessageSize, useOpenSsl, enableSNI, sourceX500Name) } val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService) bridges += newBridge bridgeMetricsService?.bridgeCreated(targets, legalNames) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index e7fd2a0328..59cc625880 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -1,6 +1,5 @@ package net.corda.nodeapi.internal.bridging -import net.corda.core.identity.CordaX500Name import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize @@ -11,7 +10,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.ArtemisSessionProvider import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig @@ -28,13 +26,18 @@ import java.util.* class BridgeControlListener(val config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig? = null, maxMessageSize: Int, + enableSNI: Boolean, private val artemisMessageClientFactory: () -> ArtemisSessionProvider, bridgeMetricsService: BridgeMetricsService? = null) : AutoCloseable { private val bridgeId: String = UUID.randomUUID().toString() private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId" private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId" private val validInboundQueues = mutableSetOf() - private val bridgeManager = LoopbackBridgeManagerWrapper(config, socksProxyConfig, maxMessageSize, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic) + private val bridgeManager = if (enableSNI) { + LoopbackBridgeManager(config, socksProxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic) + } else { + AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService) + } private var artemis: ArtemisSessionProvider? = null private var controlConsumer: ClientConsumer? = null private var notifyConsumer: ClientConsumer? = null @@ -42,7 +45,8 @@ class BridgeControlListener(val config: MutualSslConfiguration, constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, - socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }) + enableSNI: Boolean, + socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, maxMessageSize, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }) companion object { private val log = contextLogger() @@ -163,7 +167,10 @@ class BridgeControlListener(val config: MutualSslConfiguration, val wasActive = active validInboundQueues.addAll(controlMessage.inboxQueues) log.info("Added inbox: ${controlMessage.inboxQueues}") - bridgeManager.inboxesAdded(controlMessage.inboxQueues) + if (bridgeManager is LoopbackBridgeManager) { + // Notify loopback bridge manager inboxes has changed. + bridgeManager.inboxesAdded(controlMessage.inboxQueues) + } if (!wasActive && active) { _activeChange.onNext(true) } @@ -187,54 +194,4 @@ class BridgeControlListener(val config: MutualSslConfiguration, } } } - - private class LoopbackBridgeManagerWrapper(config: MutualSslConfiguration, - socksProxyConfig: SocksProxyConfig? = null, - maxMessageSize: Int, - artemisMessageClientFactory: () -> ArtemisSessionProvider, - bridgeMetricsService: BridgeMetricsService? = null, - private val isLocalInbox: (String) -> Boolean) : BridgeManager { - - private val bridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, artemisMessageClientFactory, bridgeMetricsService) - private val loopbackBridgeManager = LoopbackBridgeManager(artemisMessageClientFactory, bridgeMetricsService) - - override fun deployBridge(sourceX500Name: String, queueName: String, targets: List, legalNames: Set) { - val inboxAddress = translateLocalQueueToInboxAddress(queueName) - if (isLocalInbox(inboxAddress)) { - log.info("Deploying loopback bridge for $queueName, source $sourceX500Name") - loopbackBridgeManager.deployBridge(sourceX500Name, queueName, targets, legalNames) - } else { - log.info("Deploying AMQP bridge for $queueName, source $sourceX500Name") - bridgeManager.deployBridge(sourceX500Name, queueName, targets, legalNames) - } - } - - override fun destroyBridge(queueName: String, targets: List) { - bridgeManager.destroyBridge(queueName, targets) - loopbackBridgeManager.destroyBridge(queueName, targets) - } - - override fun start() { - bridgeManager.start() - loopbackBridgeManager.start() - } - - override fun stop() { - bridgeManager.stop() - loopbackBridgeManager.stop() - } - - override fun close() = stop() - - /** - * Remove any AMQP bridge for the local inbox and create a loopback bridge for that queue. - */ - fun inboxesAdded(inboxes: List) { - for (inbox in inboxes) { - bridgeManager.destroyAllBridge(inbox).forEach { source, bridgeEntry -> - loopbackBridgeManager.deployBridge(source, bridgeEntry.queueName, bridgeEntry.targets, bridgeEntry.legalNames.toSet()) - } - } - } - } } \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt index 3ca3e3f5e6..8657c2515f 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt @@ -9,7 +9,9 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.ArtemisSessionProvider +import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl +import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ClientConsumer @@ -23,8 +25,17 @@ import org.slf4j.MDC * inboxes. */ @VisibleForTesting -class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> ArtemisSessionProvider, - private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager { +class LoopbackBridgeManager(config: MutualSslConfiguration, + socksProxyConfig: SocksProxyConfig? = null, + maxMessageSize: Int, + enableSNI: Boolean, + private val artemisMessageClientFactory: () -> ArtemisSessionProvider, + private val bridgeMetricsService: BridgeMetricsService? = null, + private val isLocalInbox: (String) -> Boolean) : AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService) { + + companion object { + private val log = contextLogger() + } private val queueNamesToBridgesMap = ConcurrentBox(mutableMapOf>()) private var artemis: ArtemisSessionProvider? = null @@ -118,21 +129,29 @@ class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> Artem } override fun deployBridge(sourceX500Name: String, queueName: String, targets: List, legalNames: Set) { - queueNamesToBridgesMap.exclusive { - val bridges = getOrPut(queueName) { mutableListOf() } - for (target in targets) { - if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) { - return + val inboxAddress = translateLocalQueueToInboxAddress(queueName) + if (isLocalInbox(inboxAddress)) { + log.info("Deploying loopback bridge for $queueName, source $sourceX500Name") + queueNamesToBridgesMap.exclusive { + val bridges = getOrPut(queueName) { mutableListOf() } + for (target in targets) { + if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) { + return + } } - } - val newBridge = LoopbackBridge(sourceX500Name, queueName, targets, legalNames, artemis!!, bridgeMetricsService) - bridges += newBridge - bridgeMetricsService?.bridgeCreated(targets, legalNames) - newBridge - }.start() + val newBridge = LoopbackBridge(sourceX500Name, queueName, targets, legalNames, artemis!!, bridgeMetricsService) + bridges += newBridge + bridgeMetricsService?.bridgeCreated(targets, legalNames) + newBridge + }.start() + } else { + log.info("Deploying AMQP bridge for $queueName, source $sourceX500Name") + super.deployBridge(sourceX500Name, queueName, targets, legalNames) + } } override fun destroyBridge(queueName: String, targets: List) { + super.destroyBridge(queueName, targets) queueNamesToBridgesMap.exclusive { val bridges = this[queueName] ?: mutableListOf() for (target in targets) { @@ -149,7 +168,19 @@ class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> Artem } } + /** + * Remove any AMQP bridge for the local inbox and create a loopback bridge for that queue. + */ + fun inboxesAdded(inboxes: List) { + for (inbox in inboxes) { + super.destroyAllBridge(inbox).forEach { source, bridgeEntry -> + deployBridge(source, bridgeEntry.queueName, bridgeEntry.targets, bridgeEntry.legalNames.toSet()) + } + } + } + override fun start() { + super.start() val artemis = artemisMessageClientFactory() this.artemis = artemis artemis.start() @@ -158,6 +189,7 @@ class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> Artem override fun stop() = close() override fun close() { + super.close() queueNamesToBridgesMap.exclusive { for (bridge in values.flatten()) { bridge.stop() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index 06392c2cf3..fbe1536b26 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -41,8 +41,8 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, private val userName: String?, private val password: String?, private val trace: Boolean, - private val onOpen: (Pair) -> Unit, - private val onClose: (Pair) -> Unit, + private val onOpen: (SocketChannel, ConnectionChange) -> Unit, + private val onClose: (SocketChannel, ConnectionChange) -> Unit, private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() { companion object { private val log = contextLogger() @@ -114,7 +114,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, val ch = ctx.channel() logInfoWithMDC("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}") if (!suppressClose) { - onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert))) + onClose(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert)) } eventProcessor?.close() ctx.fireChannelInactive() @@ -263,7 +263,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, logInfoWithMDC("Handshake completed with subject: $remoteX500Name, requested server name: ${sslHandler.getRequestedServerName()}.") createAMQPEngine(ctx) - onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false))) + onOpen(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)) } private fun handleFailedHandshake(ctx: ChannelHandlerContext, evt: SslHandshakeCompletionEvent) { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt index 02e2c986a0..1e49766b61 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt @@ -14,19 +14,14 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory import net.corda.core.identity.CordaX500Name import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger -import net.corda.nodeapi.internal.config.CertificateStore -import net.corda.nodeapi.internal.crypto.x509 import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl import net.corda.nodeapi.internal.requireMessageSize import rx.Observable import rx.subjects.PublishSubject -import sun.security.x509.X500Name import java.lang.Long.min import java.net.InetSocketAddress -import java.security.KeyStore -import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.KeyManagerFactory @@ -99,23 +94,21 @@ class AMQPClient(val targets: List, retryInterval = min(MAX_RETRY_INTERVAL, retryInterval * BACKOFF_MULTIPLIER) } - private val connectListener = object : ChannelFutureListener { - override fun operationComplete(future: ChannelFuture) { - if (!future.isSuccess) { - log.info("Failed to connect to $currentTarget") + private val connectListener = ChannelFutureListener { future -> + if (!future.isSuccess) { + log.info("Failed to connect to $currentTarget") - if (!stopping) { - workerGroup?.schedule({ - nextTarget() - restart() - }, retryInterval, TimeUnit.MILLISECONDS) - } - } else { - log.info("Connected to $currentTarget") - // Connection established successfully - clientChannel = future.channel() - clientChannel?.closeFuture()?.addListener(closeListener) + if (!stopping) { + workerGroup?.schedule({ + nextTarget() + restart() + }, retryInterval, TimeUnit.MILLISECONDS) } + } else { + log.info("Connected to $currentTarget") + // Connection established successfully + clientChannel = future.channel() + clientChannel?.closeFuture()?.addListener(closeListener) } } @@ -164,7 +157,7 @@ class AMQPClient(val targets: List, val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, parent.configuration) val target = parent.currentTarget - val handler = if (parent.configuration.useOpenSsl){ + val handler = if (parent.configuration.useOpenSsl) { createClientOpenSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory, ch.alloc()) } else { createClientSslHelper(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory) @@ -178,13 +171,13 @@ class AMQPClient(val targets: List, conf.userName, conf.password, conf.trace, - { + { _, change -> parent.retryInterval = MIN_RETRY_INTERVAL // reset to fast reconnect if we connect properly - parent._onConnection.onNext(it.second) + parent._onConnection.onNext(change) }, - { - parent._onConnection.onNext(it.second) - if (it.second.badCert) { + { _, change -> + parent._onConnection.onNext(change) + if (change.badCert) { log.error("Blocking future connection attempts to $target due to bad certificate on endpoint") parent.badCertTargets += target } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt index 42cce2caf0..24d5e9fc8a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt @@ -71,5 +71,9 @@ interface AMQPConfiguration { */ val healthCheckPhrase: String? get() = null + + @JvmDefault + val enableSNI: Boolean + get() = true } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt index 26f2f06f22..c87718057c 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt @@ -1,10 +1,7 @@ package net.corda.nodeapi.internal.protonwrapper.netty import io.netty.bootstrap.ServerBootstrap -import io.netty.channel.Channel -import io.netty.channel.ChannelInitializer -import io.netty.channel.ChannelOption -import io.netty.channel.EventLoopGroup +import io.netty.channel.* import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel @@ -67,52 +64,45 @@ class AMQPServer(val hostName: String, override fun initChannel(ch: SocketChannel) { val amqpConfiguration = parent.configuration val pipeline = ch.pipeline() - amqpConfiguration.healthCheckPhrase?.let { pipeline.addLast(ModeSelectingChannel.NAME, ModeSelectingChannel(it)) } - - val keyStore = amqpConfiguration.keyStore - - // Used for SNI matching with javaSSL. - val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, amqpConfiguration) - // Used to create a mapping for SNI matching with openSSL. - val keyManagerFactoriesMap = splitKeystore(amqpConfiguration) - val handler = if (amqpConfiguration.useOpenSsl){ - // SNI matching needed only when multiple nodes exist behind the server. - if (keyStore.aliases().size > 1) { - createServerSNIOpenSslHandler(keyManagerFactoriesMap, trustManagerFactory) - } else { - createServerOpenSslHandler(wrappedKeyManagerFactory, trustManagerFactory, ch.alloc()) - } - } else { - // For javaSSL, SNI matching is handled at key manager level. - createServerSslHelper(keyStore, wrappedKeyManagerFactory, trustManagerFactory) - } - - pipeline.addLast("sslHandler", handler) - + val (sslHandler, keyManagerFactoriesMap) = createSSLHandler(amqpConfiguration, ch) + pipeline.addLast("sslHandler", sslHandler) if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) pipeline.addLast(AMQPChannelHandler(true, null, // Passing a mapping of legal names to key managers to be able to pick the correct one after // SNI completion event is fired up. - if (keyStore.aliases().size > 1 && amqpConfiguration.useOpenSsl) - keyManagerFactoriesMap - else - // Single entry, key can be anything. - mapOf(DEFAULT to wrappedKeyManagerFactory), + keyManagerFactoriesMap, conf.userName, conf.password, conf.trace, - { - parent.clientChannels[it.first.remoteAddress()] = it.first - parent._onConnection.onNext(it.second) + { channel, change -> + parent.clientChannels[channel.remoteAddress()] = channel + parent._onConnection.onNext(change) }, - { - parent.clientChannels.remove(it.first.remoteAddress()) - parent._onConnection.onNext(it.second) + { channel, change -> + parent.clientChannels.remove(channel.remoteAddress()) + parent._onConnection.onNext(change) }, { rcv -> parent._onReceive.onNext(rcv) })) } + + private fun createSSLHandler(amqpConfig: AMQPConfiguration, ch: SocketChannel): Pair> { + return if (amqpConfig.useOpenSsl && amqpConfig.enableSNI && amqpConfig.keyStore.aliases().size > 1) { + val keyManagerFactoriesMap = splitKeystore(amqpConfig) + // SNI matching needed only when multiple nodes exist behind the server. + Pair(createServerSNIOpenSslHandler(keyManagerFactoriesMap, trustManagerFactory), keyManagerFactoriesMap) + } else { + val keyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, amqpConfig) + val handler = if (amqpConfig.useOpenSsl) { + createServerOpenSslHandler(keyManagerFactory, trustManagerFactory, ch.alloc()) + } else { + // For javaSSL, SNI matching is handled at key manager level. + createServerSslHandler(amqpConfig.keyStore, keyManagerFactory, trustManagerFactory) + } + Pair(handler, mapOf(DEFAULT to keyManagerFactory)) + } + } } fun start() { @@ -189,10 +179,7 @@ class AMQPServer(val hostName: String, } fun dropConnection(connectionRemoteHost: InetSocketAddress) { - val channel = clientChannels[connectionRemoteHost] - if (channel != null) { - channel.close() - } + clientChannels[connectionRemoteHost]?.close() } fun complete(delivery: Delivery, target: InetSocketAddress) { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/CertHoldingKeyManagerFactoryWrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/CertHoldingKeyManagerFactoryWrapper.kt index 8d7ed6e6ed..a001b5525d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/CertHoldingKeyManagerFactoryWrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/CertHoldingKeyManagerFactoryWrapper.kt @@ -26,15 +26,11 @@ class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerF return if (factorySpi is CertHoldingKeyManagerFactorySpiWrapper) keyManagers else keyManagers.map { val aliasProvidingKeyManager = getDefaultKeyManager(it) // Use the SNIKeyManager if keystore has several entries and only for clients and non-openSSL servers. - if (amqpConfig.keyStore.aliases().size > 1) { - // Clients - if (amqpConfig.sourceX500Name != null) { - SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig) - } else if (!amqpConfig.useOpenSsl) { // JDK SSL servers - SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig) - } else { - aliasProvidingKeyManager - } + // Condition of using SNIKeyManager: if its client, or JDKSsl server. + val isClient = amqpConfig.sourceX500Name != null + val enableSNI = amqpConfig.enableSNI && amqpConfig.keyStore.aliases().size > 1 + if (enableSNI && (isClient || !amqpConfig.useOpenSsl)) { + SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig) } else { aliasProvidingKeyManager } @@ -78,5 +74,4 @@ class CertHoldingKeyManagerFactoryWrapper(factory: KeyManagerFactory, amqpConfig keyManager.getCertificateChain(alias) } else null } - } \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt index f93e3dc1d9..e594c45e2f 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt @@ -149,9 +149,9 @@ internal fun createClientOpenSslHandler(target: NetworkHostAndPort, return SslHandler(sslEngine) } -internal fun createServerSslHelper(keyStore: CertificateStore, - keyManagerFactory: KeyManagerFactory, - trustManagerFactory: TrustManagerFactory): SslHandler { +internal fun createServerSslHandler(keyStore: CertificateStore, + keyManagerFactory: KeyManagerFactory, + trustManagerFactory: TrustManagerFactory): SslHandler { val sslContext = SSLContext.getInstance("TLS") val keyManagers = keyManagerFactory.keyManagers val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray() @@ -189,12 +189,10 @@ internal fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateSto internal fun createServerOpenSslHandler(keyManagerFactory: KeyManagerFactory, trustManagerFactory: TrustManagerFactory, alloc: ByteBufAllocator): SslHandler { - val sslContext = SslContextBuilder.forServer(keyManagerFactory).sslProvider(SslProvider.OPENSSL).trustManager(LoggingTrustManagerFactoryWrapper(trustManagerFactory)).build() + + val sslContext = getServerSslContextBuilder(keyManagerFactory, trustManagerFactory).build() val sslEngine = sslContext.newEngine(alloc) sslEngine.useClientMode = false - sslEngine.needClientAuth = true - sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray() - sslEngine.enabledCipherSuites = ArtemisTcpTransport.CIPHER_SUITES.toTypedArray() return SslHandler(sslEngine) } @@ -205,20 +203,21 @@ internal fun createServerSNIOpenSslHandler(keyManagerFactoriesMap: Map { diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/TestKeyManagerFactoryWrapper.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/TestKeyManagerFactoryWrapper.kt index e8c2d07f61..d3fcdbb676 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/TestKeyManagerFactoryWrapper.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/TestKeyManagerFactoryWrapper.kt @@ -94,5 +94,5 @@ class TestKeyManagerFactoryWrapper { assertNull(otherWrappedKeyManagerFactory.getCurrentCertChain()) } - private class AMQPConfigurationImpl(override val keyStore: CertificateStore, override val trustStore: CertificateStore, override val maxMessageSize: Int) : AMQPConfiguration + private class AMQPConfigurationImpl(override val keyStore: CertificateStore, override val trustStore: CertificateStore, override val maxMessageSize: Int, override val enableSNI: Boolean = true) : AMQPConfiguration } \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 99cf6c5f83..5c1b9437fb 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -17,10 +17,7 @@ import net.corda.nodeapi.internal.bridging.AMQPBridgeManager import net.corda.nodeapi.internal.bridging.BridgeManager import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer -import net.corda.testing.core.ALICE_NAME -import net.corda.testing.core.BOB_NAME -import net.corda.testing.core.MAX_MESSAGE_SIZE -import net.corda.testing.core.TestIdentity +import net.corda.testing.core.* import net.corda.testing.driver.PortAllocation import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.stubs.CertificateStoreStubs @@ -45,7 +42,7 @@ import kotlin.system.measureTimeMillis import kotlin.test.assertEquals @RunWith(Parameterized::class) -class AMQPBridgeTest(private val useOpenSsl: Boolean) { +class AMQPBridgeTest(private val useOpenSsl: Boolean, private val enableSNI: Boolean) { companion object { private val logger = contextLogger() @@ -53,8 +50,8 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) { const val echoPhrase = "Hello!" @JvmStatic - @Parameterized.Parameters(name = "useOpenSsl = {0}") - fun data(): Collection = listOf(false, true) + @Parameterized.Parameters(name = "useOpenSsl = {0}, enableSNI = {1}") + fun data() = listOf(false, true).product(listOf(false, true)) private fun String.assertEchoResponse(address: InetSocketAddress, drip: Boolean = false) { SocketChannel.open(address).use { @@ -357,7 +354,7 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) { artemisServer.start() artemisClient.start() - val bridgeManager = AMQPBridgeManager(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE) + val bridgeManager = AMQPBridgeManager(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, enableSNI) bridgeManager.start() val artemis = artemisClient.started!! if (sourceQueueName != null) { @@ -413,10 +410,11 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) { val keyStore = serverConfig.p2pSslOptions.keyStore.get() val amqpConfig = object : AMQPConfiguration { override val keyStore = keyStore - override val trustStore = serverConfig.p2pSslOptions.trustStore.get() + override val trustStore = serverConfig.p2pSslOptions.trustStore.get() //override val trace: Boolean = true override val maxMessageSize: Int = maxMessageSize override val useOpenSsl = serverConfig.p2pSslOptions.useOpenSsl + override val enableSNI: Boolean = this@AMQPBridgeTest.enableSNI override val healthCheckPhrase = echoPhrase } return AMQPServer("0.0.0.0", diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt index 7d76717db0..c50a246d89 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt @@ -193,6 +193,7 @@ class LoopbackBridgeTest(private val useOpenSsl: Boolean) { doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions doReturn(true).whenever(it).crlCheckSoftFail doReturn(artemisAddress).whenever(it).p2pAddress + doReturn(true).whenever(it).enableSNI doReturn(null).whenever(it).jmxMonitoringHttpPort doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration } @@ -204,7 +205,13 @@ class LoopbackBridgeTest(private val useOpenSsl: Boolean) { artemisServer.start() artemisClient.start() - val bridgeManager = LoopbackBridgeManager({ ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize) }) + val bridgeManager = LoopbackBridgeManager(artemisConfig.p2pSslOptions, + null, + MAX_MESSAGE_SIZE, + artemisConfig.enableSNI, + { ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize) }, + null, + { true }) bridgeManager.start() val artemis = artemisClient.started!! diff --git a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 9e2b93f8b2..9e107fb7c3 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -87,6 +87,7 @@ class ArtemisMessagingTest { doReturn(null).whenever(it).jmxMonitoringHttpPort doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration doReturn(false).whenever(it).messagingServerExternal + doReturn(true).whenever(it).enableSNI doReturn(FlowTimeoutConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).flowTimeout } LogHelper.setLevel(PersistentUniquenessProvider::class) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 5da15dc78e..7280b41ee9 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -265,7 +265,7 @@ open class Node(configuration: NodeConfiguration, configuration.enterpriseConfiguration.externalBrokerConnectionConfiguration, configuration.enterpriseConfiguration.externalBrokerBackupAddresses) } - BridgeControlListener(configuration.p2pSslOptions, null, networkParameters.maxMessageSize, artemisClient) + BridgeControlListener(configuration.p2pSslOptions, null, networkParameters.maxMessageSize, configuration.enableSNI, artemisClient) } else { null } diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 8a9ab7629c..6e57e50669 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -83,6 +83,7 @@ interface NodeConfiguration { val p2pSslOptions: MutualSslConfiguration val cordappDirectories: List + val enableSNI: Boolean val flowOverrides: FlowOverrideConfig? val cordappSignerKeyFingerprintBlacklist: List @@ -236,6 +237,7 @@ data class NodeConfigurationImpl( override val flowMonitorSuspensionLoggingThresholdMillis: Duration = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS, override val cordappDirectories: List = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT), override val jmxReporterType: JmxReporterType? = JmxReporterType.JOLOKIA, + override val enableSNI: Boolean = true, private val useOpenSsl: Boolean = false, override val flowOverrides: FlowOverrideConfig?, override val cordappSignerKeyFingerprintBlacklist: List = DEV_PUB_KEY_HASHES.map { it.toString() } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 68b99e12b7..6e4090893c 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -14,6 +14,7 @@ import net.corda.node.services.statemachine.FlowMessagingImpl import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException +import org.apache.activemq.artemis.api.core.Message.* import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientProducer @@ -44,7 +45,8 @@ class MessagingExecutor( metricRegistry: MetricRegistry, val ourSenderUUID: String, queueBound: Int, - val myLegalName: String + val myLegalName: String, + private val enableSNI: Boolean ) { private sealed class Job { data class Acknowledge(val message: ClientMessage) : Job() @@ -56,7 +58,10 @@ class MessagingExecutor( ) : Job() { override fun toString() = "Send(${message.uniqueMessageId}, target=$target)" } - object Shutdown : Job() { override fun toString() = "Shutdown" } + + object Shutdown : Job() { + override fun toString() = "Shutdown" + } } private val queue = ArrayBlockingQueue(queueBound) @@ -68,9 +73,7 @@ class MessagingExecutor( private val sendQueueSizeOnInsert = metricRegistry.histogram("P2P.SendQueueSizeOnInsert") init { - metricRegistry.register("P2P.SendQueueSize", Gauge { - queue.size - }) + metricRegistry.register("P2P.SendQueueSize", Gauge { queue.size }) } private val ourSenderSeqNo = AtomicLong() @@ -125,7 +128,7 @@ class MessagingExecutor( } } Job.Shutdown -> { - if(session.stillOpen()) { + if (session.stillOpen()) { session.commit() } break@eventLoop @@ -157,10 +160,10 @@ class MessagingExecutor( "Send to: $mqAddress topic: ${job.message.topic} " + "sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}" } - producer.send(SimpleString(mqAddress), artemisMessage, { + producer.send(SimpleString(mqAddress), artemisMessage) { job.timer.stop() job.sentFuture.set(Unit) - }) + } } fun cordaToArtemisMessage(message: Message, target: MessageRecipients? = null): ClientMessage? { @@ -172,15 +175,17 @@ class MessagingExecutor( putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(myLegalName)) // Add a group ID to messages to be able to have multiple filtered consumers while preventing reordering. // This header will be dropped off during transit through the bridge, which is fine as it's needed locally only. - if (target != null && target is ArtemisMessagingComponent.ServiceAddress) { - putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString(message.uniqueMessageId.toString)) - } else { - putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString(myLegalName)) + if (enableSNI) { + if (target is ArtemisMessagingComponent.ServiceAddress) { + putStringProperty(HDR_GROUP_ID, SimpleString(message.uniqueMessageId.toString)) + } else { + putStringProperty(HDR_GROUP_ID, SimpleString(myLegalName)) + } } sendMessageSizeMetric.update(message.data.bytes.size) writeBodyBufferBytes(message.data.bytes) // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString)) + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString)) // If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut. if (ourSenderUUID == message.senderUUID) { putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString(ourSenderUUID)) @@ -188,7 +193,7 @@ class MessagingExecutor( } // For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) { - putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis) + putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis) } message.additionalHeaders.forEach { key, value -> putStringProperty(key, value) } } @@ -196,7 +201,7 @@ class MessagingExecutor( private fun acknowledgeJob(job: Job.Acknowledge) { log.debug { - val id = job.message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID) + val id = job.message.getStringProperty(HDR_DUPLICATE_DETECTION_ID) "Acking $id" } job.message.individualAcknowledge() diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 720f3149ec..2ead2c5a30 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -248,7 +248,8 @@ class P2PMessagingClient(val config: NodeConfiguration, metricRegistry, queueBound = config.enterpriseConfiguration.tuning.maximumMessagingBatchSize, ourSenderUUID = ourSenderUUID, - myLegalName = legalName + myLegalName = legalName, + enableSNI = config.enableSNI ) this@P2PMessagingClient.messagingExecutor = messagingExecutor messagingExecutor.start() @@ -574,7 +575,9 @@ class P2PMessagingClient(val config: NodeConfiguration, val internalTargetQueue = (address as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address") state.locked { - createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = false, isServiceAddress = address is ServiceAddress) + val isServiceAddress = address is ServiceAddress + val exclusive = if (config.enableSNI) false else !isServiceAddress + createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = exclusive, isServiceAddress = isServiceAddress) } internalTargetQueue } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt index 14d7037398..dea3e7b33b 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -344,7 +344,8 @@ fun driver(defaultParameters: DriverParameters = DriverParameters(), dsl: Dr networkParameters = defaultParameters.networkParameters, notaryCustomOverrides = defaultParameters.notaryCustomOverrides, inMemoryDB = defaultParameters.inMemoryDB, - cordappsForAllNodes = defaultParameters.cordappsForAllNodes() + cordappsForAllNodes = defaultParameters.cordappsForAllNodes(), + enableSNI = defaultParameters.enableSNI ), coerce = { it }, dsl = dsl, @@ -400,7 +401,8 @@ data class DriverParameters( val notaryCustomOverrides: Map = emptyMap(), val initialiseSerialization: Boolean = true, val inMemoryDB: Boolean = true, - val cordappsForAllNodes: Collection? = null + val cordappsForAllNodes: Collection? = null, + val enableSNI: Boolean = true ) { constructor( isDebug: Boolean = false, diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index 2552a6a155..098a3fd0b9 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -89,7 +89,8 @@ class DriverDSLImpl( val networkParameters: NetworkParameters, val notaryCustomOverrides: Map, val inMemoryDB: Boolean, - val cordappsForAllNodes: Collection + val cordappsForAllNodes: Collection, + val enableSNI: Boolean ) : InternalDriverDSL { private var _executorService: ScheduledExecutorService? = null @@ -308,7 +309,8 @@ class DriverDSLImpl( NodeConfiguration::rpcUsers.name to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() }, NodeConfiguration::verifierType.name to verifierType.name, "enterpriseConfiguration.tuning.flowThreadPoolSize" to "1", - NodeConfiguration::flowOverrides.name to flowOverrideConfig.toConfig().root().unwrapped() + NodeConfiguration::flowOverrides.name to flowOverrideConfig.toConfig().root().unwrapped(), + NodeConfiguration::enableSNI.name to enableSNI ) + czUrlConfig + customOverrides val config = NodeConfig(ConfigHelper.loadConfig( baseDirectory = baseDirectory(name), @@ -1116,7 +1118,8 @@ fun genericDriver( networkParameters = defaultParameters.networkParameters, notaryCustomOverrides = defaultParameters.notaryCustomOverrides, inMemoryDB = defaultParameters.inMemoryDB, - cordappsForAllNodes = defaultParameters.cordappsForAllNodes() + cordappsForAllNodes = defaultParameters.cordappsForAllNodes(), + enableSNI = defaultParameters.enableSNI ) ) val shutdownHook = addShutdownHook(driverDsl::shutdown) @@ -1210,6 +1213,7 @@ fun internalDriver( notaryCustomOverrides: Map = DriverParameters().notaryCustomOverrides, inMemoryDB: Boolean = DriverParameters().inMemoryDB, cordappsForAllNodes: Collection = DriverParameters().cordappsForAllNodes(), + enableSNI: Boolean = DriverParameters().enableSNI, dsl: DriverDSLImpl.() -> A ): A { return genericDriver( @@ -1228,7 +1232,8 @@ fun internalDriver( networkParameters = networkParameters, notaryCustomOverrides = notaryCustomOverrides, inMemoryDB = inMemoryDB, - cordappsForAllNodes = cordappsForAllNodes + cordappsForAllNodes = cordappsForAllNodes, + enableSNI = enableSNI ), coerce = { it }, dsl = dsl, diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 98ab766aaa..2bcc6ad7bd 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -121,6 +121,7 @@ fun rpcDriver( notaryCustomOverrides: Map = emptyMap(), inMemoryDB: Boolean = true, cordappsForAllNodes: Collection = cordappsInCurrentAndAdditionalPackages(), + enableSNI:Boolean = true, dsl: RPCDriverDSL.() -> A ): A { return genericDriver( @@ -140,7 +141,8 @@ fun rpcDriver( networkParameters = networkParameters, notaryCustomOverrides = notaryCustomOverrides, inMemoryDB = inMemoryDB, - cordappsForAllNodes = cordappsForAllNodes + cordappsForAllNodes = cordappsForAllNodes, + enableSNI = enableSNI ), externalTrace ), coerce = { it }, diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/core/TestUtils.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/core/TestUtils.kt index 485e7671c2..1b07eda223 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/core/TestUtils.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/core/TestUtils.kt @@ -5,7 +5,10 @@ package net.corda.testing.core import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.StateRef -import net.corda.core.crypto.* +import net.corda.core.crypto.Crypto +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.SignatureScheme +import net.corda.core.crypto.toStringShort import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate @@ -161,3 +164,7 @@ fun NodeInfo.singleIdentityAndCert(): PartyAndCertificate = legalIdentitiesAndCe * Extract a single identity from the node info. Throws an error if the node has multiple identities. */ fun NodeInfo.singleIdentity(): Party = singleIdentityAndCert().party + +fun Collection.product(p2: Collection): Collection> = flatMap { param1 -> + p2.map { param2 -> arrayOf(param1, param2) } +}