diff --git a/bridge/bridgecapsule/build.gradle b/bridge/bridgecapsule/build.gradle index c1e4b5f46f..cd588b5653 100644 --- a/bridge/bridgecapsule/build.gradle +++ b/bridge/bridgecapsule/build.gradle @@ -49,6 +49,8 @@ dependencies { smokeTestCompile project(':test-utils') smokeTestCompile "org.apache.curator:curator-test:${curator_version}" smokeTestCompile "junit:junit:$junit_version" + // Adding native SSL library to allow using native SSL with Artemis and AMQP + smokeTestCompile "io.netty:netty-tcnative-boringssl-static:$tcnative_version" } diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt new file mode 100644 index 0000000000..fb3b593776 --- /dev/null +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt @@ -0,0 +1,275 @@ +package net.corda.bridge + +import co.paralleluniverse.fibers.Suspendable +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.flows.* +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.div +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap +import net.corda.node.internal.artemis.BrokerJaasLoginModule +import net.corda.node.services.Permissions +import net.corda.node.services.config.EnterpriseConfiguration +import net.corda.node.services.config.MutualExclusionConfiguration +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pAcceptorTcpTransport +import net.corda.nodeapi.internal.config.CertificateStore +import net.corda.nodeapi.internal.config.MutualSslConfiguration +import net.corda.nodeapi.internal.crypto.loadOrCreateKeyStore +import net.corda.testing.core.* +import net.corda.testing.internal.rigorousMock +import net.corda.testing.internal.stubs.CertificateStoreStubs +import net.corda.testing.node.User +import net.corda.testing.node.internal.cordappsForPackages +import net.corda.testing.node.internal.internalDriver +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.config.Configuration +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration +import org.apache.activemq.artemis.core.security.Role +import org.apache.activemq.artemis.core.server.ActiveMQServer +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager +import org.apache.activemq.artemis.spi.core.security.jaas.TextFileCertificateLoginModule +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import java.io.File +import java.io.FileOutputStream +import java.nio.file.Files +import java.nio.file.Path +import java.security.KeyStore +import javax.security.auth.login.AppConfigurationEntry +import kotlin.test.assertEquals + +class SNIBridgeTest { + @Rule + @JvmField + val temporaryFolder = TemporaryFolder() + + private abstract class AbstractNodeConfiguration : NodeConfiguration + + @StartableByRPC + @InitiatingFlow + class Ping(val pongParty: Party, val times: Int) : FlowLogic() { + @Suspendable + override fun call() { + val pongSession = initiateFlow(pongParty) + pongSession.sendAndReceive(times) + BridgeRestartTest.pingStarted.getOrPut(runId) { openFuture() }.set(Unit) + for (i in 1..times) { + logger.info("PING $i") + val j = pongSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } + } + } + + @InitiatedBy(Ping::class) + class Pong(val pingSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val times = pingSession.sendAndReceive(Unit).unwrap { it } + for (i in 1..times) { + logger.info("PONG $i $pingSession") + val j = pingSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } + } + } + + @Test + fun `Nodes behind all in one bridge can communicate with external node`() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge")) { + val artemisPort = portAllocation.nextPort() + val advertisedP2PPort = portAllocation.nextPort() + // We create a config for ALICE_NAME so we can use the dir lookup from the Driver when starting the bridge + val nodeConfigs = createNodesConfigs(listOf(DUMMY_BANK_A_NAME, DUMMY_BANK_B_NAME, ALICE_NAME)) + // Remove the created trust and key stores + val bridgePath = temporaryFolder.root.path / ALICE_NAME.organisation + Files.delete(bridgePath / "node/certificates/truststore.jks") + Files.delete(bridgePath / "node/certificates/sslkeystore.jks") + // TODO: change bridge driver to use any provided base dir, not just look for one based on identity + createAggregateStores(nodeConfigs.minus(ALICE_NAME).values.toList(), baseDirectory(ALICE_NAME)) + + val bankAPath = temporaryFolder.root.path / DUMMY_BANK_A_NAME.organisation / "node" + val bankBPath = temporaryFolder.root.path / DUMMY_BANK_B_NAME.organisation / "node" + // Start broker + val broker = createArtemisTextCertsLogin(artemisPort, nodeConfigs[DUMMY_BANK_B_NAME]!!.p2pSslOptions) + broker.start() + println(broker.isActive) + val aFuture = startNode( + providedName = DUMMY_BANK_A_NAME, + rpcUsers = listOf(demoUser), + customOverrides = mapOf( + "baseDirectory" to "$bankAPath", + "p2pAddress" to "localhost:$advertisedP2PPort", + "messagingServerAddress" to "0.0.0.0:$artemisPort", + "messagingServerExternal" to true, + "enterpriseConfiguration" to mapOf( + "externalBridge" to true + ) + ) + ) + + val a = aFuture.getOrThrow() + println(a.nodeInfo) + + val bFuture = startNode( + providedName = DUMMY_BANK_B_NAME, + rpcUsers = listOf(demoUser), + customOverrides = mapOf( + "baseDirectory" to "$bankBPath", + "p2pAddress" to "localhost:$advertisedP2PPort", + "messagingServerAddress" to "0.0.0.0:$artemisPort", + "messagingServerExternal" to true, + "enterpriseConfiguration" to mapOf( + "externalBridge" to true + ) + ) + ) + + val b = bFuture.getOrThrow() + println(b.nodeInfo) + + + val bridge = startBridge(ALICE_NAME, advertisedP2PPort, artemisPort, mapOf( + "outboundConfig" to mapOf( + "artemisBrokerAddress" to "localhost:$artemisPort" + ), + "inboundConfig" to mapOf( + "listeningAddress" to "0.0.0.0:$advertisedP2PPort" + ) + )).getOrThrow() + println(bridge.brokerPort) + + // Start a node on the other side of the bridge + val c = startNode(providedName = DUMMY_BANK_C_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:${portAllocation.nextPort()}")).getOrThrow() + + + // BANK_C initiates flows with BANK_A and BANK_B + CordaRPCClient(c.rpcAddress).use(demoUser.username, demoUser.password) { + var handle = it.proxy.startFlow(::Ping, a.nodeInfo.singleIdentity(), 5) + handle.returnValue.getOrThrow() + + handle = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 5) + handle.returnValue.getOrThrow() + } + + + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { + val handle = it.proxy.startFlow(::Ping, c.nodeInfo.singleIdentity(), 5) + handle.returnValue.getOrThrow() + } + + CordaRPCClient(b.rpcAddress).use(demoUser.username, demoUser.password) { + val handle = it.proxy.startFlow(::Ping, c.nodeInfo.singleIdentity(), 5) + handle.returnValue.getOrThrow() + } + } + } + + private fun createNodesConfigs(legalNames: List): Map { + val tempFolders = legalNames.map { it to temporaryFolder.root.toPath() / it.organisation }.toMap() + val baseDirectories = tempFolders.mapValues { it.value / "node" } + val certificatesDirectories = baseDirectories.mapValues { it.value / "certificates" } + val signingCertificateStores = certificatesDirectories.mapValues { CertificateStoreStubs.Signing.withCertificatesDirectory(it.value) } + val pspSslConfigurations = certificatesDirectories.mapValues { CertificateStoreStubs.P2P.withCertificatesDirectory(it.value, useOpenSsl = false) } + val nodeConfigs = legalNames.map { name -> + val serverConfig = rigorousMock().also { + doReturn(baseDirectories[name]).whenever(it).baseDirectory + doReturn(certificatesDirectories[name]).whenever(it).certificatesDirectory + doReturn(name).whenever(it).myLegalName + doReturn(signingCertificateStores[name]).whenever(it).signingCertificateStore + doReturn(pspSslConfigurations[name]).whenever(it).p2pSslOptions + doReturn(true).whenever(it).crlCheckSoftFail + doReturn(true).whenever(it).messagingServerExternal + doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration + } + serverConfig.configureWithDevSSLCertificate() + name to serverConfig + }.toMap() + + return nodeConfigs + } + + private fun createAggregateStores(nodeConfigs: List, bridgeDirPath: Path) { + val trustStore = nodeConfigs.first().p2pSslOptions.trustStore.get(true) + val newKeyStore = loadOrCreateKeyStore(bridgeDirPath / "certificates/sslkeystore.jks", "cordacadevpass") + + nodeConfigs.forEach { + mergeKeyStores(newKeyStore, it.p2pSslOptions.keyStore.get(true), it.myLegalName.toString()) + } + + // Save to disk and copy in the bridge directory + trustStore.writeTo(FileOutputStream(File("$bridgeDirPath/certificates/truststore.jks"))) + newKeyStore.store(FileOutputStream(File("$bridgeDirPath/certificates/sslkeystore.jks")), "cordacadevpass".toCharArray()) + } + + + private fun mergeKeyStores(newKeyStore: KeyStore, oldKeyStore: CertificateStore, newAlias: String) { + val keyStore = oldKeyStore.value.internal + keyStore.aliases().toList().forEach { + val key = keyStore.getKey(it, oldKeyStore.password.toCharArray()) + val certs = keyStore.getCertificateChain(it) + newKeyStore.setKeyEntry(newAlias, key, oldKeyStore.password.toCharArray(), certs) + } + } + + + private fun ConfigurationImpl.configureAddressSecurity(): Configuration { + val nodeInternalRole = Role("Node", true, true, true, true, true, true, true, true, true, true) + securityRoles["${ArtemisMessagingComponent.INTERNAL_PREFIX}#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node + securityRoles["${ArtemisMessagingComponent.P2P_PREFIX}#"] = setOf(nodeInternalRole, restrictedRole(BrokerJaasLoginModule.PEER_ROLE, send = true)) + securityRoles["*"] = setOf(Role("guest", true, true, true, true, true, true, true, true, true, true)) + return this + } + + private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, + deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false, + deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role { + return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue, + deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue) + } + + private fun createArtemisTextCertsLogin(p2pPort: Int, p2pSslOptions: MutualSslConfiguration): ActiveMQServer { + val artemisDir = temporaryFolder.root.path / "artemis" + val config = ConfigurationImpl().apply { + bindingsDirectory = (artemisDir / "bindings").toString() + journalDirectory = (artemisDir / "journal").toString() + largeMessagesDirectory = (artemisDir / "large-messages").toString() + acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort("0.0.0.0", p2pPort), p2pSslOptions)) + idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess + isPersistIDCache = true + isPopulateValidatedUser = true + journalBufferSize_NIO = MAX_MESSAGE_SIZE + ArtemisMessagingComponent.JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store. + journalBufferSize_AIO = MAX_MESSAGE_SIZE + ArtemisMessagingComponent.JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store. + journalFileSize = MAX_MESSAGE_SIZE + ArtemisMessagingComponent.JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB. + managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS) + }.configureAddressSecurity() + + val usersPropertiesFilePath = ConfigTest::class.java.getResource("/net/corda/bridge/artemis/artemis-users.properties").path + val rolesPropertiesFilePath = ConfigTest::class.java.getResource("/net/corda/bridge/artemis/artemis-roles.properties").path + val securityConfiguration = object : SecurityConfiguration() { + override fun getAppConfigurationEntry(name: String): Array { + val options = mapOf( + "org.apache.activemq.jaas.textfiledn.user" to usersPropertiesFilePath, + "org.apache.activemq.jaas.textfiledn.role" to rolesPropertiesFilePath + ) + + return arrayOf(AppConfigurationEntry(name, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options)) + } + } + val securityManager = ActiveMQJAASSecurityManager(TextFileCertificateLoginModule::class.java.name, securityConfiguration) + return ActiveMQServerImpl(config, securityManager) + } +} \ No newline at end of file diff --git a/bridge/src/test/resources/net/corda/bridge/artemis/artemis-roles.properties b/bridge/src/test/resources/net/corda/bridge/artemis/artemis-roles.properties new file mode 100644 index 0000000000..cfeab7b622 --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/artemis/artemis-roles.properties @@ -0,0 +1,3 @@ +Node=NodeA,NodeB,SystemUsers/Node +Peer=SystemUsers/Peer +Verifier=SystemUsers/Verifier \ No newline at end of file diff --git a/bridge/src/test/resources/net/corda/bridge/artemis/artemis-users.properties b/bridge/src/test/resources/net/corda/bridge/artemis/artemis-users.properties new file mode 100644 index 0000000000..d6f061ff85 --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/artemis/artemis-users.properties @@ -0,0 +1,5 @@ +NodeA=O=Bank A, L=London, C=GB +NodeB=O=Bank B, L=New York, C=US +SystemUsers/Node=SystemUsers/Node +SystemUsers/Peer= +SystemUsers/Verifier= \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index eecf16a95b..2b62929211 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -42,11 +42,12 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP private val lock = ReentrantLock() private val queueNamesToBridgesMap = mutableMapOf>() - private class AMQPConfigurationImpl private constructor(override val keyStore: CertificateStore, - override val trustStore: CertificateStore, - override val socksProxyConfig: SocksProxyConfig?, - override val maxMessageSize: Int, - override val useOpenSsl: Boolean) : AMQPConfiguration { + private class AMQPConfigurationImpl (override val keyStore: CertificateStore, + override val trustStore: CertificateStore, + override val socksProxyConfig: SocksProxyConfig?, + override val maxMessageSize: Int, + override val useOpenSsl: Boolean, + override val sourceX500Name: String? = null) : AMQPConfiguration { constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.keyStore.get(), config.trustStore.get(), socksProxyConfig, @@ -72,7 +73,8 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP * If the delivery fails the session is rolled back to prevent loss of the message. This may cause duplicate delivery, * however Artemis and the remote Corda instanced will deduplicate these messages. */ - private class AMQPBridge(val queueName: String, + private class AMQPBridge(val sourceX500Name: String, + val queueName: String, val targets: List, val legalNames: Set, private val amqpConfig: AMQPConfiguration, @@ -87,6 +89,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP val oldMDC = MDC.getCopyOfContextMap() try { MDC.put("queueName", queueName) + MDC.put("source", amqpConfig.sourceX500Name) MDC.put("targets", targets.joinToString(separator = ";") { it.toString() }) MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() }) MDC.put("maxMessageSize", amqpConfig.maxMessageSize.toString()) @@ -150,7 +153,8 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP val sessionFactory = artemis.started!!.sessionFactory val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) this.session = session - val consumer = session.createConsumer(queueName) + // Several producers (in the case of shared bridge) can put messages in the same outbound p2p queue. The consumers are created using the source x500 name as a filter + val consumer = session.createConsumer(queueName, "hyphenated_props:sender-subject-name = '${amqpConfig.sourceX500Name}'") this.consumer = consumer consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler) session.start() @@ -219,15 +223,16 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP } } - override fun deployBridge(queueName: String, targets: List, legalNames: Set) { + override fun deployBridge(sourceX500Name: String, queueName: String, targets: List, legalNames: Set) { val newBridge = lock.withLock { val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() } for (target in targets) { - if (bridges.any { it.targets.contains(target) }) { + if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) { return } } - val newBridge = AMQPBridge(queueName, targets, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService) + val newAMQPConfig = AMQPConfigurationImpl(amqpConfig.keyStore, amqpConfig.trustStore, amqpConfig.socksProxyConfig, amqpConfig.maxMessageSize, amqpConfig.useOpenSsl, sourceX500Name) + val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService) bridges += newBridge bridgeMetricsService?.bridgeCreated(targets, legalNames) newBridge diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index c60bf8c3a1..30999ec556 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -159,7 +159,7 @@ class BridgeControlListener(val config: MutualSslConfiguration, return } for (outQueue in controlMessage.sendQueues) { - bridgeManager.deployBridge(outQueue.queueName, outQueue.targets, outQueue.legalNames.toSet()) + bridgeManager.deployBridge(controlMessage.nodeIdentity, outQueue.queueName, outQueue.targets, outQueue.legalNames.toSet()) } val wasActive = active validInboundQueues.addAll(controlMessage.inboxQueues) @@ -175,7 +175,7 @@ class BridgeControlListener(val config: MutualSslConfiguration, log.error("Invalid queue names in control message $controlMessage") return } - bridgeManager.deployBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets, controlMessage.bridgeInfo.legalNames.toSet()) + bridgeManager.deployBridge(controlMessage.nodeIdentity, controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets, controlMessage.bridgeInfo.legalNames.toSet()) } is BridgeControl.Delete -> { if (!controlMessage.bridgeInfo.queueName.startsWith(PEERS_PREFIX)) { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeManager.kt index 69b1509550..322dc72f86 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeManager.kt @@ -9,7 +9,7 @@ import net.corda.core.utilities.NetworkHostAndPort */ @VisibleForTesting interface BridgeManager : AutoCloseable { - fun deployBridge(queueName: String, targets: List, legalNames: Set) + fun deployBridge(sourceX500Name: String, queueName: String, targets: List, legalNames: Set) fun destroyBridge(queueName: String, targets: List) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/CertificateStore.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/CertificateStore.kt index 3ca6be6d6b..2a7f536a6a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/CertificateStore.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/CertificateStore.kt @@ -59,6 +59,7 @@ interface CertificateStore : Iterable> { forEach { (alias, certificate) -> action.invoke(alias, certificate) } } + fun aliases(): List = value.internal.aliases().toList() /** * @throws IllegalArgumentException if no certificate for the alias is found, or if the certificate is not an [X509Certificate]. */ diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index 387537fd01..06392c2cf3 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -7,6 +7,7 @@ import io.netty.channel.ChannelPromise import io.netty.channel.socket.SocketChannel import io.netty.handler.proxy.ProxyConnectException import io.netty.handler.proxy.ProxyConnectionEvent +import io.netty.handler.ssl.SniCompletionEvent import io.netty.handler.ssl.SslHandler import io.netty.handler.ssl.SslHandshakeCompletionEvent import io.netty.util.ReferenceCountUtil @@ -25,6 +26,8 @@ import org.slf4j.MDC import java.net.InetSocketAddress import java.nio.channels.ClosedChannelException import java.security.cert.X509Certificate +import javax.net.ssl.ExtendedSSLSession +import javax.net.ssl.SNIHostName import javax.net.ssl.SSLException /** @@ -34,7 +37,7 @@ import javax.net.ssl.SSLException */ internal class AMQPChannelHandler(private val serverMode: Boolean, private val allowedRemoteLegalNames: Set?, - private var keyManagerFactory: CertHoldingKeyManagerFactoryWrapper, + private val keyManagerFactoriesMap: Map, private val userName: String?, private val password: String?, private val trace: Boolean, @@ -51,6 +54,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, private var suppressClose: Boolean = false private var badCert: Boolean = false private var localCert: X509Certificate? = null + private var requestedServerName: String? = null private fun withMDC(block: () -> Unit) { val oldMDC = MDC.getCopyOfContextMap() @@ -117,58 +121,43 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, } override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { - if (evt is ProxyConnectionEvent) { - remoteAddress = evt.destinationAddress() // update address to teh real target address - } - if (evt is SslHandshakeCompletionEvent) { - if (evt.isSuccess) { - val sslHandler = ctx.pipeline().get(SslHandler::class.java) - val sslSession = sslHandler.engine().session - localCert = keyManagerFactory.getCurrentCertChain()?.get(0) - if (localCert == null) { - log.error("SSL KeyManagerFactory failed to provide a local cert") - ctx.close() - return - } - if (sslSession.peerCertificates == null || sslSession.peerCertificates.isEmpty()) { - log.error("No peer certificates") - ctx.close() - return - } - remoteCert = sslHandler.engine().session.peerCertificates[0].x509 - val remoteX500Name = try { - CordaX500Name.build(remoteCert!!.subjectX500Principal) - } catch (ex: IllegalArgumentException) { - badCert = true - logErrorWithMDC("Certificate subject not a valid CordaX500Name", ex) - ctx.close() - return - } - if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) { - badCert = true - logErrorWithMDC("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames") - ctx.close() - return - } - logInfoWithMDC("Handshake completed with subject: $remoteX500Name") - createAMQPEngine(ctx) - onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false))) - } else { - val cause = evt.cause() - // This happens when the peer node is closed during SSL establishment. - if (cause is ClosedChannelException) { - logWarnWithMDC("SSL Handshake closed early.") - } else if (cause is SSLException && cause.message == "handshake timed out") { // Sadly the exception thrown by Netty wrapper requires that we check the message. - logWarnWithMDC("SSL Handshake timed out") - } else { - badCert = true - } - logErrorWithMDC("Handshake failure: ${evt.cause().message}") - if (log.isTraceEnabled) { - withMDC { log.trace("Handshake failure", evt.cause()) } - } - ctx.close() + when (evt) { + is ProxyConnectionEvent -> { + // update address to the real target address + remoteAddress = evt.destinationAddress() } + is SniCompletionEvent -> { + if (evt.isSuccess) { + // The SniCompletionEvent is fired up before context is switched (after SslHandshakeCompletionEvent) + // so we save the requested server name now to be able log it once the handshake is completed successfully + // Note: this event is only triggered when using OpenSSL. + requestedServerName = evt.hostname() + logInfoWithMDC("SNI completion success.") + } else { + logErrorWithMDC("SNI completion failure: ${evt.cause().message}") + } + } + is SslHandshakeCompletionEvent -> { + if (evt.isSuccess) { + handleSuccessfulHandshake(ctx) + } else { + handleFailedHandshake(ctx, evt) + } + } + } + } + + private fun SslHandler.getRequestedServerName(): String? { + return if (serverMode) { + val session = engine().session + when (session) { + // Server name can be obtained from SSL session when using JavaSSL. + is ExtendedSSLSession -> (session.requestedServerNames.firstOrNull() as? SNIHostName)?.asciiName + // For Open SSL server name is obtained from SniCompletionEvent + else -> requestedServerName + } + } else { + (engine().sslParameters?.serverNames?.firstOrNull() as? SNIHostName)?.asciiName } } @@ -234,4 +223,62 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, } eventProcessor!!.processEventsAsync() } + + private fun handleSuccessfulHandshake(ctx: ChannelHandlerContext) { + val sslHandler = ctx.pipeline().get(SslHandler::class.java) + val sslSession = sslHandler.engine().session + // Depending on what matching method is used, getting the local certificate is done by selecting the + // appropriate keyManagerFactory + val keyManagerFactory = requestedServerName?.let { + keyManagerFactoriesMap[it] + } ?: keyManagerFactoriesMap.values.single() + + localCert = keyManagerFactory.getCurrentCertChain()?.first() + + if (localCert == null) { + log.error("SSL KeyManagerFactory failed to provide a local cert") + ctx.close() + return + } + if (sslSession.peerCertificates == null || sslSession.peerCertificates.isEmpty()) { + log.error("No peer certificates") + ctx.close() + return + } + remoteCert = sslHandler.engine().session.peerCertificates.first().x509 + val remoteX500Name = try { + CordaX500Name.build(remoteCert!!.subjectX500Principal) + } catch (ex: IllegalArgumentException) { + badCert = true + logErrorWithMDC("Certificate subject not a valid CordaX500Name", ex) + ctx.close() + return + } + if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) { + badCert = true + logErrorWithMDC("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames") + ctx.close() + return + } + + logInfoWithMDC("Handshake completed with subject: $remoteX500Name, requested server name: ${sslHandler.getRequestedServerName()}.") + createAMQPEngine(ctx) + onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false))) + } + + private fun handleFailedHandshake(ctx: ChannelHandlerContext, evt: SslHandshakeCompletionEvent) { + val cause = evt.cause() + // This happens when the peer node is closed during SSL establishment. + when { + cause is ClosedChannelException -> logWarnWithMDC("SSL Handshake closed early.") + // Sadly the exception thrown by Netty wrapper requires that we check the message. + cause is SSLException && cause.message == "handshake timed out" -> logWarnWithMDC("SSL Handshake timed out") + else -> badCert = true + } + logErrorWithMDC("Handshake failure: ${evt.cause().message}") + if (log.isTraceEnabled) { + withMDC { log.trace("Handshake failure", evt.cause()) } + } + ctx.close() + } } \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt index e1ca000f47..02e2c986a0 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt @@ -14,14 +14,18 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory import net.corda.core.identity.CordaX500Name import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger +import net.corda.nodeapi.internal.config.CertificateStore +import net.corda.nodeapi.internal.crypto.x509 import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl import net.corda.nodeapi.internal.requireMessageSize import rx.Observable import rx.subjects.PublishSubject +import sun.security.x509.X500Name import java.lang.Long.min import java.net.InetSocketAddress +import java.security.KeyStore import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock @@ -158,7 +162,7 @@ class AMQPClient(val targets: List, } } - val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory) + val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, parent.configuration) val target = parent.currentTarget val handler = if (parent.configuration.useOpenSsl){ createClientOpenSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory, ch.alloc()) @@ -169,7 +173,8 @@ class AMQPClient(val targets: List, if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) pipeline.addLast(AMQPChannelHandler(false, parent.allowedRemoteLegalNames, - wrappedKeyManagerFactory, + // Single entry, key can be anything. + mapOf(DEFAULT to wrappedKeyManagerFactory), conf.userName, conf.password, conf.trace, diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt index 4f676ab2f3..b197e218e2 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt @@ -2,7 +2,6 @@ package net.corda.nodeapi.internal.protonwrapper.netty import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.config.CertificateStore -import java.security.KeyStore interface AMQPConfiguration { /** @@ -56,6 +55,10 @@ interface AMQPConfiguration { val socksProxyConfig: SocksProxyConfig? get() = null + @JvmDefault + val sourceX500Name: String? + get() = null + /** * Whether to use the tcnative open/boring SSL provider or the default Java SSL provider */ diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt index 2f8cff04ef..de631a77d3 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt @@ -23,7 +23,6 @@ import rx.Observable import rx.subjects.PublishSubject import java.net.BindException import java.net.InetSocketAddress -import java.security.cert.X509Certificate import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.KeyManagerFactory @@ -66,18 +65,37 @@ class AMQPServer(val hostName: String, } override fun initChannel(ch: SocketChannel) { + val amqpConfiguration = parent.configuration + val keyStore = amqpConfiguration.keyStore val pipeline = ch.pipeline() - val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory) - val handler = if (parent.configuration.useOpenSsl){ - createServerOpenSslHandler(wrappedKeyManagerFactory, trustManagerFactory, ch.alloc()) + // Used for SNI matching with javaSSL. + val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, amqpConfiguration) + // Used to create a mapping for SNI matching with openSSL. + val keyManagerFactoriesMap = splitKeystore(amqpConfiguration) + val handler = if (amqpConfiguration.useOpenSsl){ + // SNI matching needed only when multiple nodes exist behind the server. + if (keyStore.aliases().size > 1) { + createServerSNIOpenSslHandler(keyManagerFactoriesMap, trustManagerFactory) + } else { + createServerOpenSslHandler(wrappedKeyManagerFactory, trustManagerFactory, ch.alloc()) + } } else { - createServerSslHelper(wrappedKeyManagerFactory, trustManagerFactory) + // For javaSSL, SNI matching is handled at key manager level. + createServerSslHelper(keyStore, wrappedKeyManagerFactory, trustManagerFactory) } + pipeline.addLast("sslHandler", handler) + if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) pipeline.addLast(AMQPChannelHandler(true, null, - wrappedKeyManagerFactory, + // Passing a mapping of legal names to key managers to be able to pick the correct one after + // SNI completion event is fired up. + if (keyStore.aliases().size > 1 && amqpConfiguration.useOpenSsl) + keyManagerFactoriesMap + else + // Single entry, key can be anything. + mapOf(DEFAULT to wrappedKeyManagerFactory), conf.userName, conf.password, conf.trace, diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/CertHoldingKeyManagerFactoryWrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/CertHoldingKeyManagerFactoryWrapper.kt index 7ce99a79a4..8d7ed6e6ed 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/CertHoldingKeyManagerFactoryWrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/CertHoldingKeyManagerFactoryWrapper.kt @@ -5,7 +5,7 @@ import java.security.cert.X509Certificate import javax.net.ssl.* -class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerFactorySpi) : KeyManagerFactorySpi() { +class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerFactorySpi, private val amqpConfig: AMQPConfiguration) : KeyManagerFactorySpi() { override fun engineInit(keyStore: KeyStore?, password: CharArray?) { val engineInitMethod = KeyManagerFactorySpi::class.java.getDeclaredMethod("engineInit", KeyStore::class.java, CharArray::class.java) engineInitMethod.isAccessible = true @@ -23,16 +23,32 @@ class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerF engineGetKeyManagersMethod.isAccessible = true @Suppress("UNCHECKED_CAST") val keyManagers = engineGetKeyManagersMethod.invoke(factorySpi) as Array - return if (factorySpi is CertHoldingKeyManagerFactorySpiWrapper) keyManagers else keyManagers.mapNotNull { - @Suppress("USELESS_CAST") // the casts to KeyManager are not useless - without them, the typed array will be of type Any - when (it) { - is X509ExtendedKeyManager -> AliasProvidingExtendedKeyMangerWrapper(it) as KeyManager - is X509KeyManager -> AliasProvidingKeyMangerWrapperImpl(it) as KeyManager - else -> null + return if (factorySpi is CertHoldingKeyManagerFactorySpiWrapper) keyManagers else keyManagers.map { + val aliasProvidingKeyManager = getDefaultKeyManager(it) + // Use the SNIKeyManager if keystore has several entries and only for clients and non-openSSL servers. + if (amqpConfig.keyStore.aliases().size > 1) { + // Clients + if (amqpConfig.sourceX500Name != null) { + SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig) + } else if (!amqpConfig.useOpenSsl) { // JDK SSL servers + SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig) + } else { + aliasProvidingKeyManager + } + } else { + aliasProvidingKeyManager } }.toTypedArray() } + private fun getDefaultKeyManager(keyManager: KeyManager): KeyManager { + return when (keyManager) { + is X509ExtendedKeyManager -> AliasProvidingExtendedKeyMangerWrapper(keyManager) + is X509KeyManager -> AliasProvidingKeyMangerWrapperImpl(keyManager) + else -> throw UnsupportedOperationException("Supported key manager types are: X509ExtendedKeyManager, X509KeyManager. Provided ${keyManager::class.java.name}") + } + } + private val keyManagers = lazy { getKeyManagersImpl() } override fun engineGetKeyManagers(): Array { @@ -46,12 +62,12 @@ class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerF * the wrapper is not thread safe as in it will return the last used alias/cert chain and has itself no notion * of belonging to a certain channel. */ -class CertHoldingKeyManagerFactoryWrapper(factory: KeyManagerFactory) : KeyManagerFactory(getFactorySpi(factory), factory.provider, factory.algorithm) { +class CertHoldingKeyManagerFactoryWrapper(factory: KeyManagerFactory, amqpConfig: AMQPConfiguration) : KeyManagerFactory(getFactorySpi(factory, amqpConfig), factory.provider, factory.algorithm) { companion object { - private fun getFactorySpi(factory: KeyManagerFactory): KeyManagerFactorySpi { + private fun getFactorySpi(factory: KeyManagerFactory, amqpConfig: AMQPConfiguration): KeyManagerFactorySpi { val spiField = KeyManagerFactory::class.java.getDeclaredField("factorySpi") spiField.isAccessible = true - return CertHoldingKeyManagerFactorySpiWrapper(spiField.get(factory) as KeyManagerFactorySpi) + return CertHoldingKeyManagerFactorySpiWrapper(spiField.get(factory) as KeyManagerFactorySpi, amqpConfig) } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SNIKeyManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SNIKeyManager.kt new file mode 100644 index 0000000000..13e2ceb039 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SNIKeyManager.kt @@ -0,0 +1,109 @@ +package net.corda.nodeapi.internal.protonwrapper.netty + +import net.corda.core.identity.CordaX500Name +import net.corda.core.utilities.contextLogger +import net.corda.nodeapi.internal.config.CertificateStore +import net.corda.nodeapi.internal.crypto.x509 +import org.slf4j.MDC +import sun.security.x509.X500Name +import java.net.Socket +import java.security.Principal +import javax.net.ssl.* + +internal class SNIKeyManager(private val keyManager: X509ExtendedKeyManager, private val amqpConfig: AMQPConfiguration) : X509ExtendedKeyManager(), X509KeyManager by keyManager, AliasProvidingKeyMangerWrapper { + + companion object { + private val log = contextLogger() + } + + override var lastAlias: String? = null + + private fun withMDC(block: () -> Unit) { + val oldMDC = MDC.getCopyOfContextMap() + try { + MDC.put("lastAlias", lastAlias) + MDC.put("isServer", amqpConfig.sourceX500Name.isNullOrEmpty().toString()) + MDC.put("sourceX500Name", amqpConfig.sourceX500Name) + MDC.put("useOpenSSL", amqpConfig.useOpenSsl.toString()) + block() + } finally { + MDC.setContextMap(oldMDC) + } + } + + private fun logDebugWithMDC(msg: () -> String) { + if (log.isDebugEnabled) { + withMDC { log.debug(msg()) } + } + } + + override fun chooseClientAlias(keyType: Array, issuers: Array, socket: Socket): String? { + return storeIfNotNull { chooseClientAlias(amqpConfig.keyStore, amqpConfig.sourceX500Name) } + } + + override fun chooseEngineClientAlias(keyType: Array, issuers: Array, engine: SSLEngine): String? { + return storeIfNotNull { chooseClientAlias(amqpConfig.keyStore, amqpConfig.sourceX500Name) } + } + + override fun chooseServerAlias(keyType: String?, issuers: Array?, socket: Socket): String? { + return storeIfNotNull { + val matcher = (socket as SSLSocket).sslParameters.sniMatchers.first() + chooseServerAlias(keyType, issuers, matcher) + } + } + + override fun chooseEngineServerAlias(keyType: String?, issuers: Array?, engine: SSLEngine?): String? { + return storeIfNotNull { + val matcher = engine?.sslParameters?.sniMatchers?.first() + chooseServerAlias(keyType, issuers, matcher) + } + } + + private fun chooseServerAlias(keyType: String?, issuers: Array?, matcher: SNIMatcher?): String? { + val aliases = keyManager.getServerAliases(keyType, issuers) + if (aliases == null || aliases.isEmpty()) { + logDebugWithMDC { "Keystore doesn't contain any aliases for key type $keyType and issuers $issuers." } + return null + } + + log.debug("Checking aliases: $aliases.") + matcher?.let { + val matchedAlias = (it as ServerSNIMatcher).matchedAlias + if (aliases.contains(matchedAlias)) { + logDebugWithMDC { "Found match for $matchedAlias." } + return matchedAlias + } + } + + logDebugWithMDC { "Unable to find a matching alias." } + return null + } + + private fun chooseClientAlias(keyStore: CertificateStore, clientLegalName: String?): String? { + clientLegalName?.let { + val aliases = keyStore.aliases() + if (aliases.isEmpty()) { + logDebugWithMDC { "Keystore doesn't contain any entries." } + } + aliases.forEach { alias -> + val x500Name = keyStore[alias].x509.subjectDN as X500Name + val aliasCordaX500Name = CordaX500Name.build(x500Name.asX500Principal()) + val clientCordaX500Name = CordaX500Name.parse(it) + if (clientCordaX500Name == aliasCordaX500Name) { + logDebugWithMDC { "Found alias $alias for $clientCordaX500Name." } + return alias + } + } + } + + return null + } + + private fun storeIfNotNull(func: () -> String?): String? { + val alias = func() + if (alias != null) { + lastAlias = alias + } + return alias + } +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt index 9665ea47ec..0d462ed476 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/SSLHelper.kt @@ -1,9 +1,8 @@ package net.corda.nodeapi.internal.protonwrapper.netty import io.netty.buffer.ByteBufAllocator -import io.netty.handler.ssl.SslContextBuilder -import io.netty.handler.ssl.SslHandler -import io.netty.handler.ssl.SslProvider +import io.netty.handler.ssl.* +import io.netty.util.DomainNameMappingBuilder import net.corda.core.crypto.SecureHash import net.corda.core.crypto.newSecureRandom import net.corda.core.identity.CordaX500Name @@ -13,15 +12,19 @@ import net.corda.core.utilities.toHex import net.corda.nodeapi.internal.ArtemisTcpTransport import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.crypto.toBc +import net.corda.nodeapi.internal.crypto.x509 import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier import org.bouncycastle.asn1.x509.Extension import org.bouncycastle.asn1.x509.SubjectKeyIdentifier +import sun.security.x509.X500Name import java.net.Socket +import java.security.KeyStore import java.security.cert.* import java.util.* import javax.net.ssl.* private const val HOSTNAME_FORMAT = "%s.corda.net" +internal const val DEFAULT = "default" internal class LoggingTrustManagerWrapper(val wrapped: X509ExtendedTrustManager) : X509ExtendedTrustManager() { companion object { @@ -146,7 +149,8 @@ internal fun createClientOpenSslHandler(target: NetworkHostAndPort, return SslHandler(sslEngine) } -internal fun createServerSslHelper(keyManagerFactory: KeyManagerFactory, +internal fun createServerSslHelper(keyStore: CertificateStore, + keyManagerFactory: KeyManagerFactory, trustManagerFactory: TrustManagerFactory): SslHandler { val sslContext = SSLContext.getInstance("TLS") val keyManagers = keyManagerFactory.keyManagers @@ -158,6 +162,9 @@ internal fun createServerSslHelper(keyManagerFactory: KeyManagerFactory, sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray() sslEngine.enabledCipherSuites = ArtemisTcpTransport.CIPHER_SUITES.toTypedArray() sslEngine.enableSessionCreation = true + val sslParameters = sslEngine.sslParameters + sslParameters.sniMatchers = listOf(ServerSNIMatcher(keyStore)) + sslEngine.sslParameters = sslParameters return SslHandler(sslEngine) } @@ -191,10 +198,55 @@ internal fun createServerOpenSslHandler(keyManagerFactory: KeyManagerFactory, return SslHandler(sslEngine) } +/** + * Creates a special SNI handler used only when openSSL is used for AMQPServer + */ +internal fun createServerSNIOpenSslHandler(keyManagerFactoriesMap: Map, + trustManagerFactory: TrustManagerFactory): SniHandler { + + // Default value can be any in the map. + val sslCtxBuilder = SslContextBuilder.forServer(keyManagerFactoriesMap.values.first()) + .sslProvider(SslProvider.OPENSSL) + .trustManager(LoggingTrustManagerFactoryWrapper(trustManagerFactory)) + .clientAuth(ClientAuth.REQUIRE) + .ciphers(ArtemisTcpTransport.CIPHER_SUITES) + .protocols(*ArtemisTcpTransport.TLS_VERSIONS.toTypedArray()) + + val mapping = DomainNameMappingBuilder(sslCtxBuilder.build()) + + keyManagerFactoriesMap.forEach { + mapping.add(it.key, sslCtxBuilder.keyManager(it.value).build()) + } + + return SniHandler(mapping.build()) +} + +internal fun splitKeystore(config: AMQPConfiguration): Map { + val keyStore = config.keyStore.value.internal + val password = config.keyStore.password.toCharArray() + return keyStore.aliases().toList().map { alias -> + val key = keyStore.getKey(alias, password) + val certs = keyStore.getCertificateChain(alias) + val x500Name = keyStore.getCertificate(alias).x509.subjectDN as X500Name + val cordaX500Name = CordaX500Name.build(x500Name.asX500Principal()) + val newKeyStore = KeyStore.getInstance("JKS") + newKeyStore.load(null) + newKeyStore.setKeyEntry(alias, key, password, certs) + val newKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) + newKeyManagerFactory.init(newKeyStore, password) + x500toHostName(cordaX500Name) to CertHoldingKeyManagerFactoryWrapper(newKeyManagerFactory, config) + }.toMap() +} + fun KeyManagerFactory.init(keyStore: CertificateStore) = init(keyStore.value.internal, keyStore.password.toCharArray()) fun TrustManagerFactory.init(trustStore: CertificateStore) = init(trustStore.value.internal) +/** + * Method that converts a [CordaX500Name] to a a valid hostname (RFC-1035). It's used for SNI to indicate the target + * when trying to communicate with nodes that reside behind the same firewall. This is a solution to TLS's extension not + * yet supporting x500 names as server names + */ internal fun x500toHostName(x500Name: CordaX500Name): String { val secureHash = SecureHash.sha256(x500Name.toString()) // RFC 1035 specifies a limit 255 bytes for hostnames with each label being 63 bytes or less. Due to this, the string diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/ServerSNIMatcher.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/ServerSNIMatcher.kt new file mode 100644 index 0000000000..9014234f25 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/ServerSNIMatcher.kt @@ -0,0 +1,36 @@ +package net.corda.nodeapi.internal.protonwrapper.netty + +import net.corda.core.identity.CordaX500Name +import net.corda.nodeapi.internal.config.CertificateStore +import net.corda.nodeapi.internal.crypto.x509 +import sun.security.x509.X500Name +import javax.net.ssl.SNIHostName +import javax.net.ssl.SNIMatcher +import javax.net.ssl.SNIServerName +import javax.net.ssl.StandardConstants + +class ServerSNIMatcher(private val keyStore: CertificateStore) : SNIMatcher(0) { + + var matchedAlias: String? = null + private set + var matchedServerName: String? = null + private set + + override fun matches(serverName: SNIServerName): Boolean { + if (serverName.type == StandardConstants.SNI_HOST_NAME) { + keyStore.aliases().forEach { alias -> + val x500Name = keyStore[alias].x509.subjectDN as X500Name + val cordaX500Name = CordaX500Name.build(x500Name.asX500Principal()) + // Convert the CordaX500Name into the expected host name and compare + // E.g. O=Corda B, L=London, C=GB becomes 3c6dd991936308edb210555103ffc1bb.corda.net + if ((serverName as SNIHostName).asciiName == x500toHostName(cordaX500Name)) { + matchedAlias = alias + matchedServerName = serverName.asciiName + return true + } + } + } + + return false + } +} diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/TestKeyManagerFactoryWrapper.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/TestKeyManagerFactoryWrapper.kt index 8ca20dad62..e8c2d07f61 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/TestKeyManagerFactoryWrapper.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/TestKeyManagerFactoryWrapper.kt @@ -5,7 +5,9 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.core.internal.div import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.nodeapi.internal.config.CertificateStore import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.MAX_MESSAGE_SIZE import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.stubs.CertificateStoreStubs import org.junit.Rule @@ -41,8 +43,8 @@ class TestKeyManagerFactoryWrapper { config.configureWithDevSSLCertificate() val underlyingKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) - - val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory) + val amqpConfig = AMQPConfigurationImpl(config.p2pSslOptions.keyStore.get(true), config.p2pSslOptions.trustStore.get(true), MAX_MESSAGE_SIZE) + val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory, amqpConfig) wrappedKeyManagerFactory.init(config.p2pSslOptions.keyStore.get()) val keyManagers = wrappedKeyManagerFactory.keyManagers assertFalse(keyManagers.isEmpty()) @@ -74,11 +76,11 @@ class TestKeyManagerFactoryWrapper { config.configureWithDevSSLCertificate() val underlyingKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) - - val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory) + val amqpConfig = AMQPConfigurationImpl(config.p2pSslOptions.keyStore.get(true), config.p2pSslOptions.trustStore.get(true), MAX_MESSAGE_SIZE) + val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory, amqpConfig) wrappedKeyManagerFactory.init(config.p2pSslOptions.keyStore.get()) - val otherWrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory) + val otherWrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory, amqpConfig) val keyManagers = wrappedKeyManagerFactory.keyManagers assertFalse(keyManagers.isEmpty()) @@ -92,4 +94,5 @@ class TestKeyManagerFactoryWrapper { assertNull(otherWrappedKeyManagerFactory.getCurrentCertChain()) } + private class AMQPConfigurationImpl(override val keyStore: CertificateStore, override val trustStore: CertificateStore, override val maxMessageSize: Int) : AMQPConfiguration } \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 4499bebfe6..225091a718 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -72,6 +72,7 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) { val artemis = artemisClient.started!! for (i in 0 until 3) { val artemisMessage = artemis.session.createMessage(true).apply { + putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, ALICE_NAME.toString()) putIntProperty(P2PMessagingHeaders.senderUUID, i) writeBodyBufferBytes("Test$i".toByteArray()) // Use the magic deduplication property built into Artemis as our message identity too @@ -149,6 +150,7 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) { // Send a fresh item and check receive val artemisMessage = artemis.session.createMessage(true).apply { + putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, ALICE_NAME.toString()) putIntProperty(P2PMessagingHeaders.senderUUID, 3) writeBodyBufferBytes("Test3".toByteArray()) // Use the magic deduplication property built into Artemis as our message identity too @@ -287,7 +289,7 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) { if (sourceQueueName != null) { // Local queue for outgoing messages artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) - bridgeManager.deployBridge(sourceQueueName, listOf(amqpAddress), setOf(BOB.name)) + bridgeManager.deployBridge(ALICE_NAME.toString(), sourceQueueName, listOf(amqpAddress), setOf(BOB.name)) } return Triple(artemisServer, artemisClient, bridgeManager) } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 5007928e50..186805fa3b 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -17,6 +17,7 @@ import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisTcpTransport +import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus @@ -26,10 +27,7 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer import net.corda.nodeapi.internal.protonwrapper.netty.init import net.corda.nodeapi.internal.registerDevP2pCertificates import net.corda.nodeapi.internal.registerDevSigningCertificates -import net.corda.testing.core.ALICE_NAME -import net.corda.testing.core.BOB_NAME -import net.corda.testing.core.CHARLIE_NAME -import net.corda.testing.core.MAX_MESSAGE_SIZE +import net.corda.testing.core.* import net.corda.testing.driver.PortAllocation import net.corda.testing.internal.createDevIntermediateCaCertPath import net.corda.testing.internal.rigorousMock @@ -422,6 +420,76 @@ class ProtonWrapperTests(val sslSetup: SslSetup) { server.stop() } + @Test + fun `SNI AMQP client to SNI AMQP server`() { + println(sslSetup) + val amqpServer = createServerWithMultipleNames(serverPort, listOf(ALICE_NAME, CHARLIE_NAME)) + amqpServer.use { + amqpServer.start() + val receiveSubs = amqpServer.onReceive.subscribe { + assertEquals(BOB_NAME.toString(), it.sourceLegalName) + assertEquals(P2P_PREFIX + "Test", it.topic) + assertEquals("Test", String(it.payload)) + it.complete(true) + } + createClient(MAX_MESSAGE_SIZE, setOf(ALICE_NAME)).use { amqpClient -> + val serverConnected = amqpServer.onConnection.toFuture() + val clientConnected = amqpClient.onConnection.toFuture() + amqpClient.start() + val serverConnect = serverConnected.get() + assertEquals(true, serverConnect.connected) + assertEquals(BOB_NAME, CordaX500Name.build(serverConnect.remoteCert!!.subjectX500Principal)) + val clientConnect = clientConnected.get() + assertEquals(true, clientConnect.connected) + assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal)) + val msg = amqpClient.createMessage("Test".toByteArray(), + P2P_PREFIX + "Test", + ALICE_NAME.toString(), + emptyMap()) + amqpClient.write(msg) + assertEquals(MessageStatus.Acknowledged, msg.onComplete.get()) + + } + + createClientWithMultipleCerts(listOf(BOC_NAME, BOB_NAME), BOB_NAME, setOf(ALICE_NAME)).use { amqpClient -> + val serverConnected = amqpServer.onConnection.toFuture() + val clientConnected = amqpClient.onConnection.toFuture() + amqpClient.start() + val serverConnect = serverConnected.get() + assertEquals(true, serverConnect.connected) + assertEquals(BOB_NAME, CordaX500Name.build(serverConnect.remoteCert!!.subjectX500Principal)) + val clientConnect = clientConnected.get() + assertEquals(true, clientConnect.connected) + assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal)) + val msg = amqpClient.createMessage("Test".toByteArray(), + P2P_PREFIX + "Test", + ALICE_NAME.toString(), + emptyMap()) + amqpClient.write(msg) + assertEquals(MessageStatus.Acknowledged, msg.onComplete.get()) + } + receiveSubs.unsubscribe() + } + } + + @Test + fun `non-existent SNI AMQP client to SNI AMQP server with multiple identities`() { + val amqpServer = createServerWithMultipleNames(serverPort, listOf(ALICE_NAME, CHARLIE_NAME)) + amqpServer.use { + amqpServer.start() + val amqpClient = createClientWithMultipleCerts(listOf(BOC_NAME, BOB_NAME), BOB_NAME, setOf(DUMMY_BANK_A_NAME)) + amqpClient.use { + val serverConnected = amqpServer.onConnection.toFuture() + val clientConnected = amqpClient.onConnection.toFuture() + amqpClient.start() + val serverConnect = serverConnected.get() + assertEquals(false, serverConnect.connected) + val clientConnect = clientConnected.get() + assertEquals(false, clientConnect.connected) + } + } + } + private fun createArtemisServerAndClient(maxMessageSize: Int = MAX_MESSAGE_SIZE): Pair { val baseDirectory = temporaryFolder.root.toPath() / "artemis" val certificatesDirectory = baseDirectory / "certificates" @@ -447,7 +515,8 @@ class ProtonWrapperTests(val sslSetup: SslSetup) { return Pair(server, client) } - private fun createClient(maxMessageSize: Int = MAX_MESSAGE_SIZE): AMQPClient { + private fun createClient(maxMessageSize: Int = MAX_MESSAGE_SIZE, + expectedRemoteLegalNames: Set = setOf(ALICE_NAME, CHARLIE_NAME)): AMQPClient { val baseDirectory = temporaryFolder.root.toPath() / "client" val certificatesDirectory = baseDirectory / "certificates" val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) @@ -469,13 +538,14 @@ class ProtonWrapperTests(val sslSetup: SslSetup) { override val trustStore = clientTruststore override val trace: Boolean = true override val maxMessageSize: Int = maxMessageSize + override val sourceX500Name = BOB_NAME.toString() override val useOpenSsl: Boolean = sslSetup.clientNative } return AMQPClient( listOf(NetworkHostAndPort("localhost", serverPort), NetworkHostAndPort("localhost", serverPort2), NetworkHostAndPort("localhost", artemisPort)), - setOf(ALICE_NAME, CHARLIE_NAME), + expectedRemoteLegalNames, amqpConfig) } @@ -542,4 +612,75 @@ class ProtonWrapperTests(val sslSetup: SslSetup) { port, amqpConfig) } + + private fun createAmqpConfigWithMultipleCerts(legalNames: List, + sourceLegalName: String? = null, + maxMessageSize: Int = MAX_MESSAGE_SIZE, + crlCheckSoftFail: Boolean = true, + useOpenSsl: Boolean) :AMQPConfiguration { + val tempFolders = legalNames.map { it to temporaryFolder.root.toPath() / it.organisation }.toMap() + val baseDirectories = tempFolders.mapValues { it.value / "node" } + val certificatesDirectories = baseDirectories.mapValues { it.value / "certificates" } + val signingCertificateStores = certificatesDirectories.mapValues { CertificateStoreStubs.Signing.withCertificatesDirectory(it.value) } + val pspSslConfigurations = certificatesDirectories.mapValues { CertificateStoreStubs.P2P.withCertificatesDirectory(it.value, useOpenSsl = sslSetup.serverNative) } + val serverConfigs = legalNames.map { name -> + val serverConfig = rigorousMock().also { + doReturn(baseDirectories[name]).whenever(it).baseDirectory + doReturn(certificatesDirectories[name]).whenever(it).certificatesDirectory + doReturn(name).whenever(it).myLegalName + doReturn(signingCertificateStores[name]).whenever(it).signingCertificateStore + doReturn(pspSslConfigurations[name]).whenever(it).p2pSslOptions + + doReturn(crlCheckSoftFail).whenever(it).crlCheckSoftFail + } + serverConfig.configureWithDevSSLCertificate() + serverConfig + } + + val serverTruststore = serverConfigs.first().p2pSslOptions.trustStore.get(true) + val serverKeystore = serverConfigs.first().p2pSslOptions.keyStore.get(true) + // Merge rest of keystores into the first + serverConfigs.subList(1, serverConfigs.size).forEach { + mergeKeyStores(serverKeystore, it.p2pSslOptions.keyStore.get(true), it.myLegalName.toString()) + } + + return object : AMQPConfiguration { + override val keyStore: CertificateStore = serverKeystore + override val trustStore: CertificateStore = serverTruststore + override val trace: Boolean = true + override val maxMessageSize: Int = maxMessageSize + override val useOpenSsl: Boolean = useOpenSsl + override val sourceX500Name: String? = sourceLegalName + } + } + + private fun createServerWithMultipleNames(port: Int, + serverNames: List, + maxMessageSize: Int = MAX_MESSAGE_SIZE, + crlCheckSoftFail: Boolean = true): AMQPServer { + return AMQPServer( + "0.0.0.0", + port, + createAmqpConfigWithMultipleCerts(serverNames, null, maxMessageSize, crlCheckSoftFail, sslSetup.serverNative)) + } + + private fun createClientWithMultipleCerts(clientNames: List, + sourceLegalName: CordaX500Name, + expectedRemoteLegalNames: Set = setOf(ALICE_NAME, CHARLIE_NAME)): AMQPClient { + return AMQPClient( + listOf(NetworkHostAndPort("localhost", serverPort), + NetworkHostAndPort("localhost", serverPort2), + NetworkHostAndPort("localhost", artemisPort)), + expectedRemoteLegalNames, + createAmqpConfigWithMultipleCerts(clientNames, sourceLegalName.toString(), MAX_MESSAGE_SIZE, true, sslSetup.clientNative)) + } + + private fun mergeKeyStores(newKeyStore: CertificateStore, oldKeyStore: CertificateStore, newAlias: String) { + val keyStore = oldKeyStore.value.internal + keyStore.aliases().toList().forEach { + val key = keyStore.getKey(it, oldKeyStore.password.toCharArray()) + val certs = keyStore.getCertificateChain(it) + newKeyStore.value.internal.setKeyEntry(newAlias, key, oldKeyStore.password.toCharArray(), certs) + } + } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 05fd248a6d..5d9e7aa487 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -9,6 +9,7 @@ import net.corda.core.utilities.debug import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.services.statemachine.FlowMessagingImpl +import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException import org.apache.activemq.artemis.api.core.SimpleString @@ -142,7 +143,7 @@ class MessagingExecutor( private fun sendJob(job: Job.Send) { val mqAddress = resolver.resolveTargetToArtemisQueue(job.target) - val artemisMessage = cordaToArtemisMessage(job.message) + val artemisMessage = cordaToArtemisMessage(job.message, job.target) log.trace { "Send to: $mqAddress topic: ${job.message.topic} " + "sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}" @@ -150,13 +151,20 @@ class MessagingExecutor( producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) }) } - fun cordaToArtemisMessage(message: Message): ClientMessage? { + fun cordaToArtemisMessage(message: Message, target: MessageRecipients? = null): ClientMessage? { return session.createMessage(true).apply { putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor) putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion) putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion) putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic)) putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(myLegalName)) + // Add a group ID to messages to be able to have multiple filtered consumers while preventing reordering. + // This header will be dropped off during transit through the bridge, which is fine as it's needed locally only. + if (target != null && target is ArtemisMessagingComponent.ServiceAddress) { + putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString(message.uniqueMessageId.toString)) + } else { + putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString(myLegalName)) + } sendMessageSizeMetric.update(message.data.bytes.size) writeBodyBufferBytes(message.data.bytes) // Use the magic deduplication property built into Artemis as our message identity too diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 84722f4680..a7c106d0f9 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -34,6 +34,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.bridging.BridgeControl @@ -216,6 +217,7 @@ class P2PMessagingClient(val config: NodeConfiguration, // Create a general purpose producer. producer = producerSession!!.createProducer() + inboxes += RemoteInboxAddress(myIdentity).queueName serviceIdentity?.let { inboxes += RemoteInboxAddress(it).queueName @@ -543,7 +545,7 @@ class P2PMessagingClient(val config: NodeConfiguration, val internalTargetQueue = (address as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address") state.locked { - createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = address !is ServiceAddress) + createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = false) } internalTargetQueue } @@ -572,8 +574,10 @@ class P2PMessagingClient(val config: NodeConfiguration, session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), exclusive, null) - sendBridgeCreateMessage() } + // When there are multiple nodes sharing the firewall, the peer queue may already exist as it was created when + // another node tried communicating with the target. A bridge is still needed as there has to be one per source-queue-target + sendBridgeCreateMessage() } knownQueues += queueName }