Bogdan ent 2295 SNI (#1435)

* ENT-2295: added SNI support to bridge

* ENT-2295: removed unused method args, adde new line

* ENT-2295: fix checking for existing bridges

* ENT-2295: fix AMQPBridgeTest(included source x500 name in messages)

* ENT-2295: fix ProtonWrapperTests (added source id and only check for SNI if bridge is shared)

* ENT-2295: fixed issue with artemis round robin not working when autogrouping was on

* ENT-2295: adapt to use openSSL, added SNI tests

* ENT-2295: server side openSSL now uses SniHandler magic

* ENT-2295: service queues are not exclusive

* ENT-2295: remove check for nodes sharing artemis when resolving targets

* ENT-2516 SNI - Log the requested server name (if any) in the AMQPServer (#1454)

* WIP

* log server name in ssl handshake

* big fix

* handle nullable sslParameters

* ENT-2295: address PR comments

* ENT-2295: remove unused imports

* ENT-2295: fix warnings

* ENT-2295: address PR comments

* ENT-2295: added node to node intergration tests, added openssl dep to bridge capsule

* ENT-2295: message group id is unique for service queues

* ENT-2295: address PR comment
This commit is contained in:
bpaunescu
2018-10-12 12:24:54 +01:00
committed by GitHub
parent 89886ce194
commit ba271f7adc
21 changed files with 839 additions and 104 deletions

View File

@ -49,6 +49,8 @@ dependencies {
smokeTestCompile project(':test-utils') smokeTestCompile project(':test-utils')
smokeTestCompile "org.apache.curator:curator-test:${curator_version}" smokeTestCompile "org.apache.curator:curator-test:${curator_version}"
smokeTestCompile "junit:junit:$junit_version" smokeTestCompile "junit:junit:$junit_version"
// Adding native SSL library to allow using native SSL with Artemis and AMQP
smokeTestCompile "io.netty:netty-tcnative-boringssl-static:$tcnative_version"
} }

View File

@ -0,0 +1,275 @@
package net.corda.bridge
import co.paralleluniverse.fibers.Suspendable
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.div
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.artemis.BrokerJaasLoginModule
import net.corda.node.services.Permissions
import net.corda.node.services.config.EnterpriseConfiguration
import net.corda.node.services.config.MutualExclusionConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pAcceptorTcpTransport
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.crypto.loadOrCreateKeyStore
import net.corda.testing.core.*
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.stubs.CertificateStoreStubs
import net.corda.testing.node.User
import net.corda.testing.node.internal.cordappsForPackages
import net.corda.testing.node.internal.internalDriver
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.TextFileCertificateLoginModule
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.io.File
import java.io.FileOutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyStore
import javax.security.auth.login.AppConfigurationEntry
import kotlin.test.assertEquals
class SNIBridgeTest {
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private abstract class AbstractNodeConfiguration : NodeConfiguration
@StartableByRPC
@InitiatingFlow
class Ping(val pongParty: Party, val times: Int) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val pongSession = initiateFlow(pongParty)
pongSession.sendAndReceive<Unit>(times)
BridgeRestartTest.pingStarted.getOrPut(runId) { openFuture() }.set(Unit)
for (i in 1..times) {
logger.info("PING $i")
val j = pongSession.sendAndReceive<Int>(i).unwrap { it }
assertEquals(i, j)
}
}
}
@InitiatedBy(Ping::class)
class Pong(val pingSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val times = pingSession.sendAndReceive<Int>(Unit).unwrap { it }
for (i in 1..times) {
logger.info("PONG $i $pingSession")
val j = pingSession.sendAndReceive<Int>(i).unwrap { it }
assertEquals(i, j)
}
}
}
@Test
fun `Nodes behind all in one bridge can communicate with external node`() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<BridgeRestartTest.Ping>(), Permissions.all()))
internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge")) {
val artemisPort = portAllocation.nextPort()
val advertisedP2PPort = portAllocation.nextPort()
// We create a config for ALICE_NAME so we can use the dir lookup from the Driver when starting the bridge
val nodeConfigs = createNodesConfigs(listOf(DUMMY_BANK_A_NAME, DUMMY_BANK_B_NAME, ALICE_NAME))
// Remove the created trust and key stores
val bridgePath = temporaryFolder.root.path / ALICE_NAME.organisation
Files.delete(bridgePath / "node/certificates/truststore.jks")
Files.delete(bridgePath / "node/certificates/sslkeystore.jks")
// TODO: change bridge driver to use any provided base dir, not just look for one based on identity
createAggregateStores(nodeConfigs.minus(ALICE_NAME).values.toList(), baseDirectory(ALICE_NAME))
val bankAPath = temporaryFolder.root.path / DUMMY_BANK_A_NAME.organisation / "node"
val bankBPath = temporaryFolder.root.path / DUMMY_BANK_B_NAME.organisation / "node"
// Start broker
val broker = createArtemisTextCertsLogin(artemisPort, nodeConfigs[DUMMY_BANK_B_NAME]!!.p2pSslOptions)
broker.start()
println(broker.isActive)
val aFuture = startNode(
providedName = DUMMY_BANK_A_NAME,
rpcUsers = listOf(demoUser),
customOverrides = mapOf(
"baseDirectory" to "$bankAPath",
"p2pAddress" to "localhost:$advertisedP2PPort",
"messagingServerAddress" to "0.0.0.0:$artemisPort",
"messagingServerExternal" to true,
"enterpriseConfiguration" to mapOf(
"externalBridge" to true
)
)
)
val a = aFuture.getOrThrow()
println(a.nodeInfo)
val bFuture = startNode(
providedName = DUMMY_BANK_B_NAME,
rpcUsers = listOf(demoUser),
customOverrides = mapOf(
"baseDirectory" to "$bankBPath",
"p2pAddress" to "localhost:$advertisedP2PPort",
"messagingServerAddress" to "0.0.0.0:$artemisPort",
"messagingServerExternal" to true,
"enterpriseConfiguration" to mapOf(
"externalBridge" to true
)
)
)
val b = bFuture.getOrThrow()
println(b.nodeInfo)
val bridge = startBridge(ALICE_NAME, advertisedP2PPort, artemisPort, mapOf(
"outboundConfig" to mapOf(
"artemisBrokerAddress" to "localhost:$artemisPort"
),
"inboundConfig" to mapOf(
"listeningAddress" to "0.0.0.0:$advertisedP2PPort"
)
)).getOrThrow()
println(bridge.brokerPort)
// Start a node on the other side of the bridge
val c = startNode(providedName = DUMMY_BANK_C_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:${portAllocation.nextPort()}")).getOrThrow()
// BANK_C initiates flows with BANK_A and BANK_B
CordaRPCClient(c.rpcAddress).use(demoUser.username, demoUser.password) {
var handle = it.proxy.startFlow(::Ping, a.nodeInfo.singleIdentity(), 5)
handle.returnValue.getOrThrow()
handle = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 5)
handle.returnValue.getOrThrow()
}
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
val handle = it.proxy.startFlow(::Ping, c.nodeInfo.singleIdentity(), 5)
handle.returnValue.getOrThrow()
}
CordaRPCClient(b.rpcAddress).use(demoUser.username, demoUser.password) {
val handle = it.proxy.startFlow(::Ping, c.nodeInfo.singleIdentity(), 5)
handle.returnValue.getOrThrow()
}
}
}
private fun createNodesConfigs(legalNames: List<CordaX500Name>): Map<CordaX500Name, NodeConfiguration> {
val tempFolders = legalNames.map { it to temporaryFolder.root.toPath() / it.organisation }.toMap()
val baseDirectories = tempFolders.mapValues { it.value / "node" }
val certificatesDirectories = baseDirectories.mapValues { it.value / "certificates" }
val signingCertificateStores = certificatesDirectories.mapValues { CertificateStoreStubs.Signing.withCertificatesDirectory(it.value) }
val pspSslConfigurations = certificatesDirectories.mapValues { CertificateStoreStubs.P2P.withCertificatesDirectory(it.value, useOpenSsl = false) }
val nodeConfigs = legalNames.map { name ->
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectories[name]).whenever(it).baseDirectory
doReturn(certificatesDirectories[name]).whenever(it).certificatesDirectory
doReturn(name).whenever(it).myLegalName
doReturn(signingCertificateStores[name]).whenever(it).signingCertificateStore
doReturn(pspSslConfigurations[name]).whenever(it).p2pSslOptions
doReturn(true).whenever(it).crlCheckSoftFail
doReturn(true).whenever(it).messagingServerExternal
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
serverConfig.configureWithDevSSLCertificate()
name to serverConfig
}.toMap()
return nodeConfigs
}
private fun createAggregateStores(nodeConfigs: List<NodeConfiguration>, bridgeDirPath: Path) {
val trustStore = nodeConfigs.first().p2pSslOptions.trustStore.get(true)
val newKeyStore = loadOrCreateKeyStore(bridgeDirPath / "certificates/sslkeystore.jks", "cordacadevpass")
nodeConfigs.forEach {
mergeKeyStores(newKeyStore, it.p2pSslOptions.keyStore.get(true), it.myLegalName.toString())
}
// Save to disk and copy in the bridge directory
trustStore.writeTo(FileOutputStream(File("$bridgeDirPath/certificates/truststore.jks")))
newKeyStore.store(FileOutputStream(File("$bridgeDirPath/certificates/sslkeystore.jks")), "cordacadevpass".toCharArray())
}
private fun mergeKeyStores(newKeyStore: KeyStore, oldKeyStore: CertificateStore, newAlias: String) {
val keyStore = oldKeyStore.value.internal
keyStore.aliases().toList().forEach {
val key = keyStore.getKey(it, oldKeyStore.password.toCharArray())
val certs = keyStore.getCertificateChain(it)
newKeyStore.setKeyEntry(newAlias, key, oldKeyStore.password.toCharArray(), certs)
}
}
private fun ConfigurationImpl.configureAddressSecurity(): Configuration {
val nodeInternalRole = Role("Node", true, true, true, true, true, true, true, true, true, true)
securityRoles["${ArtemisMessagingComponent.INTERNAL_PREFIX}#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
securityRoles["${ArtemisMessagingComponent.P2P_PREFIX}#"] = setOf(nodeInternalRole, restrictedRole(BrokerJaasLoginModule.PEER_ROLE, send = true))
securityRoles["*"] = setOf(Role("guest", true, true, true, true, true, true, true, true, true, true))
return this
}
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false,
deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role {
return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue,
deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue)
}
private fun createArtemisTextCertsLogin(p2pPort: Int, p2pSslOptions: MutualSslConfiguration): ActiveMQServer {
val artemisDir = temporaryFolder.root.path / "artemis"
val config = ConfigurationImpl().apply {
bindingsDirectory = (artemisDir / "bindings").toString()
journalDirectory = (artemisDir / "journal").toString()
largeMessagesDirectory = (artemisDir / "large-messages").toString()
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort("0.0.0.0", p2pPort), p2pSslOptions))
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true
isPopulateValidatedUser = true
journalBufferSize_NIO = MAX_MESSAGE_SIZE + ArtemisMessagingComponent.JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
journalBufferSize_AIO = MAX_MESSAGE_SIZE + ArtemisMessagingComponent.JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
journalFileSize = MAX_MESSAGE_SIZE + ArtemisMessagingComponent.JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB.
managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS)
}.configureAddressSecurity()
val usersPropertiesFilePath = ConfigTest::class.java.getResource("/net/corda/bridge/artemis/artemis-users.properties").path
val rolesPropertiesFilePath = ConfigTest::class.java.getResource("/net/corda/bridge/artemis/artemis-roles.properties").path
val securityConfiguration = object : SecurityConfiguration() {
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf(
"org.apache.activemq.jaas.textfiledn.user" to usersPropertiesFilePath,
"org.apache.activemq.jaas.textfiledn.role" to rolesPropertiesFilePath
)
return arrayOf(AppConfigurationEntry(name, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options))
}
}
val securityManager = ActiveMQJAASSecurityManager(TextFileCertificateLoginModule::class.java.name, securityConfiguration)
return ActiveMQServerImpl(config, securityManager)
}
}

View File

@ -0,0 +1,3 @@
Node=NodeA,NodeB,SystemUsers/Node
Peer=SystemUsers/Peer
Verifier=SystemUsers/Verifier

View File

@ -0,0 +1,5 @@
NodeA=O=Bank A, L=London, C=GB
NodeB=O=Bank B, L=New York, C=US
SystemUsers/Node=SystemUsers/Node
SystemUsers/Peer=
SystemUsers/Verifier=

View File

@ -42,11 +42,12 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
private val lock = ReentrantLock() private val lock = ReentrantLock()
private val queueNamesToBridgesMap = mutableMapOf<String, MutableList<AMQPBridge>>() private val queueNamesToBridgesMap = mutableMapOf<String, MutableList<AMQPBridge>>()
private class AMQPConfigurationImpl private constructor(override val keyStore: CertificateStore, private class AMQPConfigurationImpl (override val keyStore: CertificateStore,
override val trustStore: CertificateStore, override val trustStore: CertificateStore,
override val socksProxyConfig: SocksProxyConfig?, override val socksProxyConfig: SocksProxyConfig?,
override val maxMessageSize: Int, override val maxMessageSize: Int,
override val useOpenSsl: Boolean) : AMQPConfiguration { override val useOpenSsl: Boolean,
override val sourceX500Name: String? = null) : AMQPConfiguration {
constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.keyStore.get(), constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.keyStore.get(),
config.trustStore.get(), config.trustStore.get(),
socksProxyConfig, socksProxyConfig,
@ -72,7 +73,8 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
* If the delivery fails the session is rolled back to prevent loss of the message. This may cause duplicate delivery, * If the delivery fails the session is rolled back to prevent loss of the message. This may cause duplicate delivery,
* however Artemis and the remote Corda instanced will deduplicate these messages. * however Artemis and the remote Corda instanced will deduplicate these messages.
*/ */
private class AMQPBridge(val queueName: String, private class AMQPBridge(val sourceX500Name: String,
val queueName: String,
val targets: List<NetworkHostAndPort>, val targets: List<NetworkHostAndPort>,
val legalNames: Set<CordaX500Name>, val legalNames: Set<CordaX500Name>,
private val amqpConfig: AMQPConfiguration, private val amqpConfig: AMQPConfiguration,
@ -87,6 +89,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
val oldMDC = MDC.getCopyOfContextMap() val oldMDC = MDC.getCopyOfContextMap()
try { try {
MDC.put("queueName", queueName) MDC.put("queueName", queueName)
MDC.put("source", amqpConfig.sourceX500Name)
MDC.put("targets", targets.joinToString(separator = ";") { it.toString() }) MDC.put("targets", targets.joinToString(separator = ";") { it.toString() })
MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() }) MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() })
MDC.put("maxMessageSize", amqpConfig.maxMessageSize.toString()) MDC.put("maxMessageSize", amqpConfig.maxMessageSize.toString())
@ -150,7 +153,8 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
val sessionFactory = artemis.started!!.sessionFactory val sessionFactory = artemis.started!!.sessionFactory
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
this.session = session this.session = session
val consumer = session.createConsumer(queueName) // Several producers (in the case of shared bridge) can put messages in the same outbound p2p queue. The consumers are created using the source x500 name as a filter
val consumer = session.createConsumer(queueName, "hyphenated_props:sender-subject-name = '${amqpConfig.sourceX500Name}'")
this.consumer = consumer this.consumer = consumer
consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler) consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler)
session.start() session.start()
@ -219,15 +223,16 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
} }
} }
override fun deployBridge(queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) { override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
val newBridge = lock.withLock { val newBridge = lock.withLock {
val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() } val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() }
for (target in targets) { for (target in targets) {
if (bridges.any { it.targets.contains(target) }) { if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) {
return return
} }
} }
val newBridge = AMQPBridge(queueName, targets, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService) val newAMQPConfig = AMQPConfigurationImpl(amqpConfig.keyStore, amqpConfig.trustStore, amqpConfig.socksProxyConfig, amqpConfig.maxMessageSize, amqpConfig.useOpenSsl, sourceX500Name)
val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService)
bridges += newBridge bridges += newBridge
bridgeMetricsService?.bridgeCreated(targets, legalNames) bridgeMetricsService?.bridgeCreated(targets, legalNames)
newBridge newBridge

View File

@ -159,7 +159,7 @@ class BridgeControlListener(val config: MutualSslConfiguration,
return return
} }
for (outQueue in controlMessage.sendQueues) { for (outQueue in controlMessage.sendQueues) {
bridgeManager.deployBridge(outQueue.queueName, outQueue.targets, outQueue.legalNames.toSet()) bridgeManager.deployBridge(controlMessage.nodeIdentity, outQueue.queueName, outQueue.targets, outQueue.legalNames.toSet())
} }
val wasActive = active val wasActive = active
validInboundQueues.addAll(controlMessage.inboxQueues) validInboundQueues.addAll(controlMessage.inboxQueues)
@ -175,7 +175,7 @@ class BridgeControlListener(val config: MutualSslConfiguration,
log.error("Invalid queue names in control message $controlMessage") log.error("Invalid queue names in control message $controlMessage")
return return
} }
bridgeManager.deployBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets, controlMessage.bridgeInfo.legalNames.toSet()) bridgeManager.deployBridge(controlMessage.nodeIdentity, controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets, controlMessage.bridgeInfo.legalNames.toSet())
} }
is BridgeControl.Delete -> { is BridgeControl.Delete -> {
if (!controlMessage.bridgeInfo.queueName.startsWith(PEERS_PREFIX)) { if (!controlMessage.bridgeInfo.queueName.startsWith(PEERS_PREFIX)) {

View File

@ -9,7 +9,7 @@ import net.corda.core.utilities.NetworkHostAndPort
*/ */
@VisibleForTesting @VisibleForTesting
interface BridgeManager : AutoCloseable { interface BridgeManager : AutoCloseable {
fun deployBridge(queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
fun destroyBridge(queueName: String, targets: List<NetworkHostAndPort>) fun destroyBridge(queueName: String, targets: List<NetworkHostAndPort>)

View File

@ -59,6 +59,7 @@ interface CertificateStore : Iterable<Pair<String, X509Certificate>> {
forEach { (alias, certificate) -> action.invoke(alias, certificate) } forEach { (alias, certificate) -> action.invoke(alias, certificate) }
} }
fun aliases(): List<String> = value.internal.aliases().toList()
/** /**
* @throws IllegalArgumentException if no certificate for the alias is found, or if the certificate is not an [X509Certificate]. * @throws IllegalArgumentException if no certificate for the alias is found, or if the certificate is not an [X509Certificate].
*/ */

View File

@ -7,6 +7,7 @@ import io.netty.channel.ChannelPromise
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import io.netty.handler.proxy.ProxyConnectException import io.netty.handler.proxy.ProxyConnectException
import io.netty.handler.proxy.ProxyConnectionEvent import io.netty.handler.proxy.ProxyConnectionEvent
import io.netty.handler.ssl.SniCompletionEvent
import io.netty.handler.ssl.SslHandler import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeCompletionEvent import io.netty.handler.ssl.SslHandshakeCompletionEvent
import io.netty.util.ReferenceCountUtil import io.netty.util.ReferenceCountUtil
@ -25,6 +26,8 @@ import org.slf4j.MDC
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.channels.ClosedChannelException import java.nio.channels.ClosedChannelException
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import javax.net.ssl.ExtendedSSLSession
import javax.net.ssl.SNIHostName
import javax.net.ssl.SSLException import javax.net.ssl.SSLException
/** /**
@ -34,7 +37,7 @@ import javax.net.ssl.SSLException
*/ */
internal class AMQPChannelHandler(private val serverMode: Boolean, internal class AMQPChannelHandler(private val serverMode: Boolean,
private val allowedRemoteLegalNames: Set<CordaX500Name>?, private val allowedRemoteLegalNames: Set<CordaX500Name>?,
private var keyManagerFactory: CertHoldingKeyManagerFactoryWrapper, private val keyManagerFactoriesMap: Map<String, CertHoldingKeyManagerFactoryWrapper>,
private val userName: String?, private val userName: String?,
private val password: String?, private val password: String?,
private val trace: Boolean, private val trace: Boolean,
@ -51,6 +54,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
private var suppressClose: Boolean = false private var suppressClose: Boolean = false
private var badCert: Boolean = false private var badCert: Boolean = false
private var localCert: X509Certificate? = null private var localCert: X509Certificate? = null
private var requestedServerName: String? = null
private fun withMDC(block: () -> Unit) { private fun withMDC(block: () -> Unit) {
val oldMDC = MDC.getCopyOfContextMap() val oldMDC = MDC.getCopyOfContextMap()
@ -117,58 +121,43 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
} }
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is ProxyConnectionEvent) { when (evt) {
remoteAddress = evt.destinationAddress() // update address to teh real target address is ProxyConnectionEvent -> {
} // update address to the real target address
if (evt is SslHandshakeCompletionEvent) { remoteAddress = evt.destinationAddress()
if (evt.isSuccess) {
val sslHandler = ctx.pipeline().get(SslHandler::class.java)
val sslSession = sslHandler.engine().session
localCert = keyManagerFactory.getCurrentCertChain()?.get(0)
if (localCert == null) {
log.error("SSL KeyManagerFactory failed to provide a local cert")
ctx.close()
return
}
if (sslSession.peerCertificates == null || sslSession.peerCertificates.isEmpty()) {
log.error("No peer certificates")
ctx.close()
return
}
remoteCert = sslHandler.engine().session.peerCertificates[0].x509
val remoteX500Name = try {
CordaX500Name.build(remoteCert!!.subjectX500Principal)
} catch (ex: IllegalArgumentException) {
badCert = true
logErrorWithMDC("Certificate subject not a valid CordaX500Name", ex)
ctx.close()
return
}
if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) {
badCert = true
logErrorWithMDC("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
ctx.close()
return
}
logInfoWithMDC("Handshake completed with subject: $remoteX500Name")
createAMQPEngine(ctx)
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
} else {
val cause = evt.cause()
// This happens when the peer node is closed during SSL establishment.
if (cause is ClosedChannelException) {
logWarnWithMDC("SSL Handshake closed early.")
} else if (cause is SSLException && cause.message == "handshake timed out") { // Sadly the exception thrown by Netty wrapper requires that we check the message.
logWarnWithMDC("SSL Handshake timed out")
} else {
badCert = true
}
logErrorWithMDC("Handshake failure: ${evt.cause().message}")
if (log.isTraceEnabled) {
withMDC { log.trace("Handshake failure", evt.cause()) }
}
ctx.close()
} }
is SniCompletionEvent -> {
if (evt.isSuccess) {
// The SniCompletionEvent is fired up before context is switched (after SslHandshakeCompletionEvent)
// so we save the requested server name now to be able log it once the handshake is completed successfully
// Note: this event is only triggered when using OpenSSL.
requestedServerName = evt.hostname()
logInfoWithMDC("SNI completion success.")
} else {
logErrorWithMDC("SNI completion failure: ${evt.cause().message}")
}
}
is SslHandshakeCompletionEvent -> {
if (evt.isSuccess) {
handleSuccessfulHandshake(ctx)
} else {
handleFailedHandshake(ctx, evt)
}
}
}
}
private fun SslHandler.getRequestedServerName(): String? {
return if (serverMode) {
val session = engine().session
when (session) {
// Server name can be obtained from SSL session when using JavaSSL.
is ExtendedSSLSession -> (session.requestedServerNames.firstOrNull() as? SNIHostName)?.asciiName
// For Open SSL server name is obtained from SniCompletionEvent
else -> requestedServerName
}
} else {
(engine().sslParameters?.serverNames?.firstOrNull() as? SNIHostName)?.asciiName
} }
} }
@ -234,4 +223,62 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
} }
eventProcessor!!.processEventsAsync() eventProcessor!!.processEventsAsync()
} }
private fun handleSuccessfulHandshake(ctx: ChannelHandlerContext) {
val sslHandler = ctx.pipeline().get(SslHandler::class.java)
val sslSession = sslHandler.engine().session
// Depending on what matching method is used, getting the local certificate is done by selecting the
// appropriate keyManagerFactory
val keyManagerFactory = requestedServerName?.let {
keyManagerFactoriesMap[it]
} ?: keyManagerFactoriesMap.values.single()
localCert = keyManagerFactory.getCurrentCertChain()?.first()
if (localCert == null) {
log.error("SSL KeyManagerFactory failed to provide a local cert")
ctx.close()
return
}
if (sslSession.peerCertificates == null || sslSession.peerCertificates.isEmpty()) {
log.error("No peer certificates")
ctx.close()
return
}
remoteCert = sslHandler.engine().session.peerCertificates.first().x509
val remoteX500Name = try {
CordaX500Name.build(remoteCert!!.subjectX500Principal)
} catch (ex: IllegalArgumentException) {
badCert = true
logErrorWithMDC("Certificate subject not a valid CordaX500Name", ex)
ctx.close()
return
}
if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) {
badCert = true
logErrorWithMDC("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
ctx.close()
return
}
logInfoWithMDC("Handshake completed with subject: $remoteX500Name, requested server name: ${sslHandler.getRequestedServerName()}.")
createAMQPEngine(ctx)
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
}
private fun handleFailedHandshake(ctx: ChannelHandlerContext, evt: SslHandshakeCompletionEvent) {
val cause = evt.cause()
// This happens when the peer node is closed during SSL establishment.
when {
cause is ClosedChannelException -> logWarnWithMDC("SSL Handshake closed early.")
// Sadly the exception thrown by Netty wrapper requires that we check the message.
cause is SSLException && cause.message == "handshake timed out" -> logWarnWithMDC("SSL Handshake timed out")
else -> badCert = true
}
logErrorWithMDC("Handshake failure: ${evt.cause().message}")
if (log.isTraceEnabled) {
withMDC { log.trace("Handshake failure", evt.cause()) }
}
ctx.close()
}
} }

View File

@ -14,14 +14,18 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.crypto.x509
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
import net.corda.nodeapi.internal.requireMessageSize import net.corda.nodeapi.internal.requireMessageSize
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import sun.security.x509.X500Name
import java.lang.Long.min import java.lang.Long.min
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.security.KeyStore
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
@ -158,7 +162,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
} }
} }
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory) val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, parent.configuration)
val target = parent.currentTarget val target = parent.currentTarget
val handler = if (parent.configuration.useOpenSsl){ val handler = if (parent.configuration.useOpenSsl){
createClientOpenSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory, ch.alloc()) createClientOpenSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory, ch.alloc())
@ -169,7 +173,8 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
pipeline.addLast(AMQPChannelHandler(false, pipeline.addLast(AMQPChannelHandler(false,
parent.allowedRemoteLegalNames, parent.allowedRemoteLegalNames,
wrappedKeyManagerFactory, // Single entry, key can be anything.
mapOf(DEFAULT to wrappedKeyManagerFactory),
conf.userName, conf.userName,
conf.password, conf.password,
conf.trace, conf.trace,

View File

@ -2,7 +2,6 @@ package net.corda.nodeapi.internal.protonwrapper.netty
import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.CertificateStore
import java.security.KeyStore
interface AMQPConfiguration { interface AMQPConfiguration {
/** /**
@ -56,6 +55,10 @@ interface AMQPConfiguration {
val socksProxyConfig: SocksProxyConfig? val socksProxyConfig: SocksProxyConfig?
get() = null get() = null
@JvmDefault
val sourceX500Name: String?
get() = null
/** /**
* Whether to use the tcnative open/boring SSL provider or the default Java SSL provider * Whether to use the tcnative open/boring SSL provider or the default Java SSL provider
*/ */

View File

@ -23,7 +23,6 @@ import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.net.BindException import java.net.BindException
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.security.cert.X509Certificate
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.KeyManagerFactory import javax.net.ssl.KeyManagerFactory
@ -66,18 +65,37 @@ class AMQPServer(val hostName: String,
} }
override fun initChannel(ch: SocketChannel) { override fun initChannel(ch: SocketChannel) {
val amqpConfiguration = parent.configuration
val keyStore = amqpConfiguration.keyStore
val pipeline = ch.pipeline() val pipeline = ch.pipeline()
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory) // Used for SNI matching with javaSSL.
val handler = if (parent.configuration.useOpenSsl){ val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, amqpConfiguration)
createServerOpenSslHandler(wrappedKeyManagerFactory, trustManagerFactory, ch.alloc()) // Used to create a mapping for SNI matching with openSSL.
val keyManagerFactoriesMap = splitKeystore(amqpConfiguration)
val handler = if (amqpConfiguration.useOpenSsl){
// SNI matching needed only when multiple nodes exist behind the server.
if (keyStore.aliases().size > 1) {
createServerSNIOpenSslHandler(keyManagerFactoriesMap, trustManagerFactory)
} else {
createServerOpenSslHandler(wrappedKeyManagerFactory, trustManagerFactory, ch.alloc())
}
} else { } else {
createServerSslHelper(wrappedKeyManagerFactory, trustManagerFactory) // For javaSSL, SNI matching is handled at key manager level.
createServerSslHelper(keyStore, wrappedKeyManagerFactory, trustManagerFactory)
} }
pipeline.addLast("sslHandler", handler) pipeline.addLast("sslHandler", handler)
if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
pipeline.addLast(AMQPChannelHandler(true, pipeline.addLast(AMQPChannelHandler(true,
null, null,
wrappedKeyManagerFactory, // Passing a mapping of legal names to key managers to be able to pick the correct one after
// SNI completion event is fired up.
if (keyStore.aliases().size > 1 && amqpConfiguration.useOpenSsl)
keyManagerFactoriesMap
else
// Single entry, key can be anything.
mapOf(DEFAULT to wrappedKeyManagerFactory),
conf.userName, conf.userName,
conf.password, conf.password,
conf.trace, conf.trace,

View File

@ -5,7 +5,7 @@ import java.security.cert.X509Certificate
import javax.net.ssl.* import javax.net.ssl.*
class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerFactorySpi) : KeyManagerFactorySpi() { class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerFactorySpi, private val amqpConfig: AMQPConfiguration) : KeyManagerFactorySpi() {
override fun engineInit(keyStore: KeyStore?, password: CharArray?) { override fun engineInit(keyStore: KeyStore?, password: CharArray?) {
val engineInitMethod = KeyManagerFactorySpi::class.java.getDeclaredMethod("engineInit", KeyStore::class.java, CharArray::class.java) val engineInitMethod = KeyManagerFactorySpi::class.java.getDeclaredMethod("engineInit", KeyStore::class.java, CharArray::class.java)
engineInitMethod.isAccessible = true engineInitMethod.isAccessible = true
@ -23,16 +23,32 @@ class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerF
engineGetKeyManagersMethod.isAccessible = true engineGetKeyManagersMethod.isAccessible = true
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
val keyManagers = engineGetKeyManagersMethod.invoke(factorySpi) as Array<KeyManager> val keyManagers = engineGetKeyManagersMethod.invoke(factorySpi) as Array<KeyManager>
return if (factorySpi is CertHoldingKeyManagerFactorySpiWrapper) keyManagers else keyManagers.mapNotNull { return if (factorySpi is CertHoldingKeyManagerFactorySpiWrapper) keyManagers else keyManagers.map {
@Suppress("USELESS_CAST") // the casts to KeyManager are not useless - without them, the typed array will be of type Any val aliasProvidingKeyManager = getDefaultKeyManager(it)
when (it) { // Use the SNIKeyManager if keystore has several entries and only for clients and non-openSSL servers.
is X509ExtendedKeyManager -> AliasProvidingExtendedKeyMangerWrapper(it) as KeyManager if (amqpConfig.keyStore.aliases().size > 1) {
is X509KeyManager -> AliasProvidingKeyMangerWrapperImpl(it) as KeyManager // Clients
else -> null if (amqpConfig.sourceX500Name != null) {
SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig)
} else if (!amqpConfig.useOpenSsl) { // JDK SSL servers
SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig)
} else {
aliasProvidingKeyManager
}
} else {
aliasProvidingKeyManager
} }
}.toTypedArray() }.toTypedArray()
} }
private fun getDefaultKeyManager(keyManager: KeyManager): KeyManager {
return when (keyManager) {
is X509ExtendedKeyManager -> AliasProvidingExtendedKeyMangerWrapper(keyManager)
is X509KeyManager -> AliasProvidingKeyMangerWrapperImpl(keyManager)
else -> throw UnsupportedOperationException("Supported key manager types are: X509ExtendedKeyManager, X509KeyManager. Provided ${keyManager::class.java.name}")
}
}
private val keyManagers = lazy { getKeyManagersImpl() } private val keyManagers = lazy { getKeyManagersImpl() }
override fun engineGetKeyManagers(): Array<KeyManager> { override fun engineGetKeyManagers(): Array<KeyManager> {
@ -46,12 +62,12 @@ class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerF
* the wrapper is not thread safe as in it will return the last used alias/cert chain and has itself no notion * the wrapper is not thread safe as in it will return the last used alias/cert chain and has itself no notion
* of belonging to a certain channel. * of belonging to a certain channel.
*/ */
class CertHoldingKeyManagerFactoryWrapper(factory: KeyManagerFactory) : KeyManagerFactory(getFactorySpi(factory), factory.provider, factory.algorithm) { class CertHoldingKeyManagerFactoryWrapper(factory: KeyManagerFactory, amqpConfig: AMQPConfiguration) : KeyManagerFactory(getFactorySpi(factory, amqpConfig), factory.provider, factory.algorithm) {
companion object { companion object {
private fun getFactorySpi(factory: KeyManagerFactory): KeyManagerFactorySpi { private fun getFactorySpi(factory: KeyManagerFactory, amqpConfig: AMQPConfiguration): KeyManagerFactorySpi {
val spiField = KeyManagerFactory::class.java.getDeclaredField("factorySpi") val spiField = KeyManagerFactory::class.java.getDeclaredField("factorySpi")
spiField.isAccessible = true spiField.isAccessible = true
return CertHoldingKeyManagerFactorySpiWrapper(spiField.get(factory) as KeyManagerFactorySpi) return CertHoldingKeyManagerFactorySpiWrapper(spiField.get(factory) as KeyManagerFactorySpi, amqpConfig)
} }
} }

View File

@ -0,0 +1,109 @@
package net.corda.nodeapi.internal.protonwrapper.netty
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.crypto.x509
import org.slf4j.MDC
import sun.security.x509.X500Name
import java.net.Socket
import java.security.Principal
import javax.net.ssl.*
internal class SNIKeyManager(private val keyManager: X509ExtendedKeyManager, private val amqpConfig: AMQPConfiguration) : X509ExtendedKeyManager(), X509KeyManager by keyManager, AliasProvidingKeyMangerWrapper {
companion object {
private val log = contextLogger()
}
override var lastAlias: String? = null
private fun withMDC(block: () -> Unit) {
val oldMDC = MDC.getCopyOfContextMap()
try {
MDC.put("lastAlias", lastAlias)
MDC.put("isServer", amqpConfig.sourceX500Name.isNullOrEmpty().toString())
MDC.put("sourceX500Name", amqpConfig.sourceX500Name)
MDC.put("useOpenSSL", amqpConfig.useOpenSsl.toString())
block()
} finally {
MDC.setContextMap(oldMDC)
}
}
private fun logDebugWithMDC(msg: () -> String) {
if (log.isDebugEnabled) {
withMDC { log.debug(msg()) }
}
}
override fun chooseClientAlias(keyType: Array<out String>, issuers: Array<out Principal>, socket: Socket): String? {
return storeIfNotNull { chooseClientAlias(amqpConfig.keyStore, amqpConfig.sourceX500Name) }
}
override fun chooseEngineClientAlias(keyType: Array<out String>, issuers: Array<out Principal>, engine: SSLEngine): String? {
return storeIfNotNull { chooseClientAlias(amqpConfig.keyStore, amqpConfig.sourceX500Name) }
}
override fun chooseServerAlias(keyType: String?, issuers: Array<out Principal>?, socket: Socket): String? {
return storeIfNotNull {
val matcher = (socket as SSLSocket).sslParameters.sniMatchers.first()
chooseServerAlias(keyType, issuers, matcher)
}
}
override fun chooseEngineServerAlias(keyType: String?, issuers: Array<out Principal>?, engine: SSLEngine?): String? {
return storeIfNotNull {
val matcher = engine?.sslParameters?.sniMatchers?.first()
chooseServerAlias(keyType, issuers, matcher)
}
}
private fun chooseServerAlias(keyType: String?, issuers: Array<out Principal>?, matcher: SNIMatcher?): String? {
val aliases = keyManager.getServerAliases(keyType, issuers)
if (aliases == null || aliases.isEmpty()) {
logDebugWithMDC { "Keystore doesn't contain any aliases for key type $keyType and issuers $issuers." }
return null
}
log.debug("Checking aliases: $aliases.")
matcher?.let {
val matchedAlias = (it as ServerSNIMatcher).matchedAlias
if (aliases.contains(matchedAlias)) {
logDebugWithMDC { "Found match for $matchedAlias." }
return matchedAlias
}
}
logDebugWithMDC { "Unable to find a matching alias." }
return null
}
private fun chooseClientAlias(keyStore: CertificateStore, clientLegalName: String?): String? {
clientLegalName?.let {
val aliases = keyStore.aliases()
if (aliases.isEmpty()) {
logDebugWithMDC { "Keystore doesn't contain any entries." }
}
aliases.forEach { alias ->
val x500Name = keyStore[alias].x509.subjectDN as X500Name
val aliasCordaX500Name = CordaX500Name.build(x500Name.asX500Principal())
val clientCordaX500Name = CordaX500Name.parse(it)
if (clientCordaX500Name == aliasCordaX500Name) {
logDebugWithMDC { "Found alias $alias for $clientCordaX500Name." }
return alias
}
}
}
return null
}
private fun storeIfNotNull(func: () -> String?): String? {
val alias = func()
if (alias != null) {
lastAlias = alias
}
return alias
}
}

View File

@ -1,9 +1,8 @@
package net.corda.nodeapi.internal.protonwrapper.netty package net.corda.nodeapi.internal.protonwrapper.netty
import io.netty.buffer.ByteBufAllocator import io.netty.buffer.ByteBufAllocator
import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.*
import io.netty.handler.ssl.SslHandler import io.netty.util.DomainNameMappingBuilder
import io.netty.handler.ssl.SslProvider
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.newSecureRandom import net.corda.core.crypto.newSecureRandom
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
@ -13,15 +12,19 @@ import net.corda.core.utilities.toHex
import net.corda.nodeapi.internal.ArtemisTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport
import net.corda.nodeapi.internal.config.CertificateStore import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.crypto.toBc import net.corda.nodeapi.internal.crypto.toBc
import net.corda.nodeapi.internal.crypto.x509
import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier
import org.bouncycastle.asn1.x509.Extension import org.bouncycastle.asn1.x509.Extension
import org.bouncycastle.asn1.x509.SubjectKeyIdentifier import org.bouncycastle.asn1.x509.SubjectKeyIdentifier
import sun.security.x509.X500Name
import java.net.Socket import java.net.Socket
import java.security.KeyStore
import java.security.cert.* import java.security.cert.*
import java.util.* import java.util.*
import javax.net.ssl.* import javax.net.ssl.*
private const val HOSTNAME_FORMAT = "%s.corda.net" private const val HOSTNAME_FORMAT = "%s.corda.net"
internal const val DEFAULT = "default"
internal class LoggingTrustManagerWrapper(val wrapped: X509ExtendedTrustManager) : X509ExtendedTrustManager() { internal class LoggingTrustManagerWrapper(val wrapped: X509ExtendedTrustManager) : X509ExtendedTrustManager() {
companion object { companion object {
@ -146,7 +149,8 @@ internal fun createClientOpenSslHandler(target: NetworkHostAndPort,
return SslHandler(sslEngine) return SslHandler(sslEngine)
} }
internal fun createServerSslHelper(keyManagerFactory: KeyManagerFactory, internal fun createServerSslHelper(keyStore: CertificateStore,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler { trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS") val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers val keyManagers = keyManagerFactory.keyManagers
@ -158,6 +162,9 @@ internal fun createServerSslHelper(keyManagerFactory: KeyManagerFactory,
sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray() sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray()
sslEngine.enabledCipherSuites = ArtemisTcpTransport.CIPHER_SUITES.toTypedArray() sslEngine.enabledCipherSuites = ArtemisTcpTransport.CIPHER_SUITES.toTypedArray()
sslEngine.enableSessionCreation = true sslEngine.enableSessionCreation = true
val sslParameters = sslEngine.sslParameters
sslParameters.sniMatchers = listOf(ServerSNIMatcher(keyStore))
sslEngine.sslParameters = sslParameters
return SslHandler(sslEngine) return SslHandler(sslEngine)
} }
@ -191,10 +198,55 @@ internal fun createServerOpenSslHandler(keyManagerFactory: KeyManagerFactory,
return SslHandler(sslEngine) return SslHandler(sslEngine)
} }
/**
* Creates a special SNI handler used only when openSSL is used for AMQPServer
*/
internal fun createServerSNIOpenSslHandler(keyManagerFactoriesMap: Map<String, KeyManagerFactory>,
trustManagerFactory: TrustManagerFactory): SniHandler {
// Default value can be any in the map.
val sslCtxBuilder = SslContextBuilder.forServer(keyManagerFactoriesMap.values.first())
.sslProvider(SslProvider.OPENSSL)
.trustManager(LoggingTrustManagerFactoryWrapper(trustManagerFactory))
.clientAuth(ClientAuth.REQUIRE)
.ciphers(ArtemisTcpTransport.CIPHER_SUITES)
.protocols(*ArtemisTcpTransport.TLS_VERSIONS.toTypedArray())
val mapping = DomainNameMappingBuilder(sslCtxBuilder.build())
keyManagerFactoriesMap.forEach {
mapping.add(it.key, sslCtxBuilder.keyManager(it.value).build())
}
return SniHandler(mapping.build())
}
internal fun splitKeystore(config: AMQPConfiguration): Map<String, CertHoldingKeyManagerFactoryWrapper> {
val keyStore = config.keyStore.value.internal
val password = config.keyStore.password.toCharArray()
return keyStore.aliases().toList().map { alias ->
val key = keyStore.getKey(alias, password)
val certs = keyStore.getCertificateChain(alias)
val x500Name = keyStore.getCertificate(alias).x509.subjectDN as X500Name
val cordaX500Name = CordaX500Name.build(x500Name.asX500Principal())
val newKeyStore = KeyStore.getInstance("JKS")
newKeyStore.load(null)
newKeyStore.setKeyEntry(alias, key, password, certs)
val newKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
newKeyManagerFactory.init(newKeyStore, password)
x500toHostName(cordaX500Name) to CertHoldingKeyManagerFactoryWrapper(newKeyManagerFactory, config)
}.toMap()
}
fun KeyManagerFactory.init(keyStore: CertificateStore) = init(keyStore.value.internal, keyStore.password.toCharArray()) fun KeyManagerFactory.init(keyStore: CertificateStore) = init(keyStore.value.internal, keyStore.password.toCharArray())
fun TrustManagerFactory.init(trustStore: CertificateStore) = init(trustStore.value.internal) fun TrustManagerFactory.init(trustStore: CertificateStore) = init(trustStore.value.internal)
/**
* Method that converts a [CordaX500Name] to a a valid hostname (RFC-1035). It's used for SNI to indicate the target
* when trying to communicate with nodes that reside behind the same firewall. This is a solution to TLS's extension not
* yet supporting x500 names as server names
*/
internal fun x500toHostName(x500Name: CordaX500Name): String { internal fun x500toHostName(x500Name: CordaX500Name): String {
val secureHash = SecureHash.sha256(x500Name.toString()) val secureHash = SecureHash.sha256(x500Name.toString())
// RFC 1035 specifies a limit 255 bytes for hostnames with each label being 63 bytes or less. Due to this, the string // RFC 1035 specifies a limit 255 bytes for hostnames with each label being 63 bytes or less. Due to this, the string

View File

@ -0,0 +1,36 @@
package net.corda.nodeapi.internal.protonwrapper.netty
import net.corda.core.identity.CordaX500Name
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.crypto.x509
import sun.security.x509.X500Name
import javax.net.ssl.SNIHostName
import javax.net.ssl.SNIMatcher
import javax.net.ssl.SNIServerName
import javax.net.ssl.StandardConstants
class ServerSNIMatcher(private val keyStore: CertificateStore) : SNIMatcher(0) {
var matchedAlias: String? = null
private set
var matchedServerName: String? = null
private set
override fun matches(serverName: SNIServerName): Boolean {
if (serverName.type == StandardConstants.SNI_HOST_NAME) {
keyStore.aliases().forEach { alias ->
val x500Name = keyStore[alias].x509.subjectDN as X500Name
val cordaX500Name = CordaX500Name.build(x500Name.asX500Principal())
// Convert the CordaX500Name into the expected host name and compare
// E.g. O=Corda B, L=London, C=GB becomes 3c6dd991936308edb210555103ffc1bb.corda.net
if ((serverName as SNIHostName).asciiName == x500toHostName(cordaX500Name)) {
matchedAlias = alias
matchedServerName = serverName.asciiName
return true
}
}
}
return false
}
}

View File

@ -5,7 +5,9 @@ import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.stubs.CertificateStoreStubs import net.corda.testing.internal.stubs.CertificateStoreStubs
import org.junit.Rule import org.junit.Rule
@ -41,8 +43,8 @@ class TestKeyManagerFactoryWrapper {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
val underlyingKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) val underlyingKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
val amqpConfig = AMQPConfigurationImpl(config.p2pSslOptions.keyStore.get(true), config.p2pSslOptions.trustStore.get(true), MAX_MESSAGE_SIZE)
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory) val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory, amqpConfig)
wrappedKeyManagerFactory.init(config.p2pSslOptions.keyStore.get()) wrappedKeyManagerFactory.init(config.p2pSslOptions.keyStore.get())
val keyManagers = wrappedKeyManagerFactory.keyManagers val keyManagers = wrappedKeyManagerFactory.keyManagers
assertFalse(keyManagers.isEmpty()) assertFalse(keyManagers.isEmpty())
@ -74,11 +76,11 @@ class TestKeyManagerFactoryWrapper {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
val underlyingKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) val underlyingKeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
val amqpConfig = AMQPConfigurationImpl(config.p2pSslOptions.keyStore.get(true), config.p2pSslOptions.trustStore.get(true), MAX_MESSAGE_SIZE)
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory) val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory, amqpConfig)
wrappedKeyManagerFactory.init(config.p2pSslOptions.keyStore.get()) wrappedKeyManagerFactory.init(config.p2pSslOptions.keyStore.get())
val otherWrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory) val otherWrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(underlyingKeyManagerFactory, amqpConfig)
val keyManagers = wrappedKeyManagerFactory.keyManagers val keyManagers = wrappedKeyManagerFactory.keyManagers
assertFalse(keyManagers.isEmpty()) assertFalse(keyManagers.isEmpty())
@ -92,4 +94,5 @@ class TestKeyManagerFactoryWrapper {
assertNull(otherWrappedKeyManagerFactory.getCurrentCertChain()) assertNull(otherWrappedKeyManagerFactory.getCurrentCertChain())
} }
private class AMQPConfigurationImpl(override val keyStore: CertificateStore, override val trustStore: CertificateStore, override val maxMessageSize: Int) : AMQPConfiguration
} }

View File

@ -72,6 +72,7 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) {
val artemis = artemisClient.started!! val artemis = artemisClient.started!!
for (i in 0 until 3) { for (i in 0 until 3) {
val artemisMessage = artemis.session.createMessage(true).apply { val artemisMessage = artemis.session.createMessage(true).apply {
putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, ALICE_NAME.toString())
putIntProperty(P2PMessagingHeaders.senderUUID, i) putIntProperty(P2PMessagingHeaders.senderUUID, i)
writeBodyBufferBytes("Test$i".toByteArray()) writeBodyBufferBytes("Test$i".toByteArray())
// Use the magic deduplication property built into Artemis as our message identity too // Use the magic deduplication property built into Artemis as our message identity too
@ -149,6 +150,7 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) {
// Send a fresh item and check receive // Send a fresh item and check receive
val artemisMessage = artemis.session.createMessage(true).apply { val artemisMessage = artemis.session.createMessage(true).apply {
putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, ALICE_NAME.toString())
putIntProperty(P2PMessagingHeaders.senderUUID, 3) putIntProperty(P2PMessagingHeaders.senderUUID, 3)
writeBodyBufferBytes("Test3".toByteArray()) writeBodyBufferBytes("Test3".toByteArray())
// Use the magic deduplication property built into Artemis as our message identity too // Use the magic deduplication property built into Artemis as our message identity too
@ -287,7 +289,7 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) {
if (sourceQueueName != null) { if (sourceQueueName != null) {
// Local queue for outgoing messages // Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
bridgeManager.deployBridge(sourceQueueName, listOf(amqpAddress), setOf(BOB.name)) bridgeManager.deployBridge(ALICE_NAME.toString(), sourceQueueName, listOf(amqpAddress), setOf(BOB.name))
} }
return Triple(artemisServer, artemisClient, bridgeManager) return Triple(artemisServer, artemisClient, bridgeManager)
} }

View File

@ -17,6 +17,7 @@ import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
@ -26,10 +27,7 @@ import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.nodeapi.internal.protonwrapper.netty.init import net.corda.nodeapi.internal.protonwrapper.netty.init
import net.corda.nodeapi.internal.registerDevP2pCertificates import net.corda.nodeapi.internal.registerDevP2pCertificates
import net.corda.nodeapi.internal.registerDevSigningCertificates import net.corda.nodeapi.internal.registerDevSigningCertificates
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.*
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.driver.PortAllocation import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.createDevIntermediateCaCertPath import net.corda.testing.internal.createDevIntermediateCaCertPath
import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.rigorousMock
@ -422,6 +420,76 @@ class ProtonWrapperTests(val sslSetup: SslSetup) {
server.stop() server.stop()
} }
@Test
fun `SNI AMQP client to SNI AMQP server`() {
println(sslSetup)
val amqpServer = createServerWithMultipleNames(serverPort, listOf(ALICE_NAME, CHARLIE_NAME))
amqpServer.use {
amqpServer.start()
val receiveSubs = amqpServer.onReceive.subscribe {
assertEquals(BOB_NAME.toString(), it.sourceLegalName)
assertEquals(P2P_PREFIX + "Test", it.topic)
assertEquals("Test", String(it.payload))
it.complete(true)
}
createClient(MAX_MESSAGE_SIZE, setOf(ALICE_NAME)).use { amqpClient ->
val serverConnected = amqpServer.onConnection.toFuture()
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val serverConnect = serverConnected.get()
assertEquals(true, serverConnect.connected)
assertEquals(BOB_NAME, CordaX500Name.build(serverConnect.remoteCert!!.subjectX500Principal))
val clientConnect = clientConnected.get()
assertEquals(true, clientConnect.connected)
assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal))
val msg = amqpClient.createMessage("Test".toByteArray(),
P2P_PREFIX + "Test",
ALICE_NAME.toString(),
emptyMap())
amqpClient.write(msg)
assertEquals(MessageStatus.Acknowledged, msg.onComplete.get())
}
createClientWithMultipleCerts(listOf(BOC_NAME, BOB_NAME), BOB_NAME, setOf(ALICE_NAME)).use { amqpClient ->
val serverConnected = amqpServer.onConnection.toFuture()
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val serverConnect = serverConnected.get()
assertEquals(true, serverConnect.connected)
assertEquals(BOB_NAME, CordaX500Name.build(serverConnect.remoteCert!!.subjectX500Principal))
val clientConnect = clientConnected.get()
assertEquals(true, clientConnect.connected)
assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal))
val msg = amqpClient.createMessage("Test".toByteArray(),
P2P_PREFIX + "Test",
ALICE_NAME.toString(),
emptyMap())
amqpClient.write(msg)
assertEquals(MessageStatus.Acknowledged, msg.onComplete.get())
}
receiveSubs.unsubscribe()
}
}
@Test
fun `non-existent SNI AMQP client to SNI AMQP server with multiple identities`() {
val amqpServer = createServerWithMultipleNames(serverPort, listOf(ALICE_NAME, CHARLIE_NAME))
amqpServer.use {
amqpServer.start()
val amqpClient = createClientWithMultipleCerts(listOf(BOC_NAME, BOB_NAME), BOB_NAME, setOf(DUMMY_BANK_A_NAME))
amqpClient.use {
val serverConnected = amqpServer.onConnection.toFuture()
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val serverConnect = serverConnected.get()
assertEquals(false, serverConnect.connected)
val clientConnect = clientConnected.get()
assertEquals(false, clientConnect.connected)
}
}
}
private fun createArtemisServerAndClient(maxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<ArtemisMessagingServer, ArtemisMessagingClient> { private fun createArtemisServerAndClient(maxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val baseDirectory = temporaryFolder.root.toPath() / "artemis" val baseDirectory = temporaryFolder.root.toPath() / "artemis"
val certificatesDirectory = baseDirectory / "certificates" val certificatesDirectory = baseDirectory / "certificates"
@ -447,7 +515,8 @@ class ProtonWrapperTests(val sslSetup: SslSetup) {
return Pair(server, client) return Pair(server, client)
} }
private fun createClient(maxMessageSize: Int = MAX_MESSAGE_SIZE): AMQPClient { private fun createClient(maxMessageSize: Int = MAX_MESSAGE_SIZE,
expectedRemoteLegalNames: Set<CordaX500Name> = setOf(ALICE_NAME, CHARLIE_NAME)): AMQPClient {
val baseDirectory = temporaryFolder.root.toPath() / "client" val baseDirectory = temporaryFolder.root.toPath() / "client"
val certificatesDirectory = baseDirectory / "certificates" val certificatesDirectory = baseDirectory / "certificates"
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
@ -469,13 +538,14 @@ class ProtonWrapperTests(val sslSetup: SslSetup) {
override val trustStore = clientTruststore override val trustStore = clientTruststore
override val trace: Boolean = true override val trace: Boolean = true
override val maxMessageSize: Int = maxMessageSize override val maxMessageSize: Int = maxMessageSize
override val sourceX500Name = BOB_NAME.toString()
override val useOpenSsl: Boolean = sslSetup.clientNative override val useOpenSsl: Boolean = sslSetup.clientNative
} }
return AMQPClient( return AMQPClient(
listOf(NetworkHostAndPort("localhost", serverPort), listOf(NetworkHostAndPort("localhost", serverPort),
NetworkHostAndPort("localhost", serverPort2), NetworkHostAndPort("localhost", serverPort2),
NetworkHostAndPort("localhost", artemisPort)), NetworkHostAndPort("localhost", artemisPort)),
setOf(ALICE_NAME, CHARLIE_NAME), expectedRemoteLegalNames,
amqpConfig) amqpConfig)
} }
@ -542,4 +612,75 @@ class ProtonWrapperTests(val sslSetup: SslSetup) {
port, port,
amqpConfig) amqpConfig)
} }
private fun createAmqpConfigWithMultipleCerts(legalNames: List<CordaX500Name>,
sourceLegalName: String? = null,
maxMessageSize: Int = MAX_MESSAGE_SIZE,
crlCheckSoftFail: Boolean = true,
useOpenSsl: Boolean) :AMQPConfiguration {
val tempFolders = legalNames.map { it to temporaryFolder.root.toPath() / it.organisation }.toMap()
val baseDirectories = tempFolders.mapValues { it.value / "node" }
val certificatesDirectories = baseDirectories.mapValues { it.value / "certificates" }
val signingCertificateStores = certificatesDirectories.mapValues { CertificateStoreStubs.Signing.withCertificatesDirectory(it.value) }
val pspSslConfigurations = certificatesDirectories.mapValues { CertificateStoreStubs.P2P.withCertificatesDirectory(it.value, useOpenSsl = sslSetup.serverNative) }
val serverConfigs = legalNames.map { name ->
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectories[name]).whenever(it).baseDirectory
doReturn(certificatesDirectories[name]).whenever(it).certificatesDirectory
doReturn(name).whenever(it).myLegalName
doReturn(signingCertificateStores[name]).whenever(it).signingCertificateStore
doReturn(pspSslConfigurations[name]).whenever(it).p2pSslOptions
doReturn(crlCheckSoftFail).whenever(it).crlCheckSoftFail
}
serverConfig.configureWithDevSSLCertificate()
serverConfig
}
val serverTruststore = serverConfigs.first().p2pSslOptions.trustStore.get(true)
val serverKeystore = serverConfigs.first().p2pSslOptions.keyStore.get(true)
// Merge rest of keystores into the first
serverConfigs.subList(1, serverConfigs.size).forEach {
mergeKeyStores(serverKeystore, it.p2pSslOptions.keyStore.get(true), it.myLegalName.toString())
}
return object : AMQPConfiguration {
override val keyStore: CertificateStore = serverKeystore
override val trustStore: CertificateStore = serverTruststore
override val trace: Boolean = true
override val maxMessageSize: Int = maxMessageSize
override val useOpenSsl: Boolean = useOpenSsl
override val sourceX500Name: String? = sourceLegalName
}
}
private fun createServerWithMultipleNames(port: Int,
serverNames: List<CordaX500Name>,
maxMessageSize: Int = MAX_MESSAGE_SIZE,
crlCheckSoftFail: Boolean = true): AMQPServer {
return AMQPServer(
"0.0.0.0",
port,
createAmqpConfigWithMultipleCerts(serverNames, null, maxMessageSize, crlCheckSoftFail, sslSetup.serverNative))
}
private fun createClientWithMultipleCerts(clientNames: List<CordaX500Name>,
sourceLegalName: CordaX500Name,
expectedRemoteLegalNames: Set<CordaX500Name> = setOf(ALICE_NAME, CHARLIE_NAME)): AMQPClient {
return AMQPClient(
listOf(NetworkHostAndPort("localhost", serverPort),
NetworkHostAndPort("localhost", serverPort2),
NetworkHostAndPort("localhost", artemisPort)),
expectedRemoteLegalNames,
createAmqpConfigWithMultipleCerts(clientNames, sourceLegalName.toString(), MAX_MESSAGE_SIZE, true, sslSetup.clientNative))
}
private fun mergeKeyStores(newKeyStore: CertificateStore, oldKeyStore: CertificateStore, newAlias: String) {
val keyStore = oldKeyStore.value.internal
keyStore.aliases().toList().forEach {
val key = keyStore.getKey(it, oldKeyStore.password.toCharArray())
val certs = keyStore.getCertificateChain(it)
newKeyStore.value.internal.setKeyEntry(newAlias, key, oldKeyStore.password.toCharArray(), certs)
}
}
} }

View File

@ -9,6 +9,7 @@ import net.corda.core.utilities.debug
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.node.VersionInfo import net.corda.node.VersionInfo
import net.corda.node.services.statemachine.FlowMessagingImpl import net.corda.node.services.statemachine.FlowMessagingImpl
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
@ -142,7 +143,7 @@ class MessagingExecutor(
private fun sendJob(job: Job.Send) { private fun sendJob(job: Job.Send) {
val mqAddress = resolver.resolveTargetToArtemisQueue(job.target) val mqAddress = resolver.resolveTargetToArtemisQueue(job.target)
val artemisMessage = cordaToArtemisMessage(job.message) val artemisMessage = cordaToArtemisMessage(job.message, job.target)
log.trace { log.trace {
"Send to: $mqAddress topic: ${job.message.topic} " + "Send to: $mqAddress topic: ${job.message.topic} " +
"sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}" "sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}"
@ -150,13 +151,20 @@ class MessagingExecutor(
producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) }) producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) })
} }
fun cordaToArtemisMessage(message: Message): ClientMessage? { fun cordaToArtemisMessage(message: Message, target: MessageRecipients? = null): ClientMessage? {
return session.createMessage(true).apply { return session.createMessage(true).apply {
putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor) putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor)
putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion) putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion)
putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion) putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion)
putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic)) putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic))
putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(myLegalName)) putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(myLegalName))
// Add a group ID to messages to be able to have multiple filtered consumers while preventing reordering.
// This header will be dropped off during transit through the bridge, which is fine as it's needed locally only.
if (target != null && target is ArtemisMessagingComponent.ServiceAddress) {
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString(message.uniqueMessageId.toString))
} else {
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString(myLegalName))
}
sendMessageSizeMetric.update(message.data.bytes.size) sendMessageSizeMetric.update(message.data.bytes.size)
writeBodyBufferBytes(message.data.bytes) writeBodyBufferBytes(message.data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too // Use the magic deduplication property built into Artemis as our message identity too

View File

@ -34,6 +34,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeControl
@ -216,6 +217,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
// Create a general purpose producer. // Create a general purpose producer.
producer = producerSession!!.createProducer() producer = producerSession!!.createProducer()
inboxes += RemoteInboxAddress(myIdentity).queueName inboxes += RemoteInboxAddress(myIdentity).queueName
serviceIdentity?.let { serviceIdentity?.let {
inboxes += RemoteInboxAddress(it).queueName inboxes += RemoteInboxAddress(it).queueName
@ -543,7 +545,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
val internalTargetQueue = (address as? ArtemisAddress)?.queueName val internalTargetQueue = (address as? ArtemisAddress)?.queueName
?: throw IllegalArgumentException("Not an Artemis address") ?: throw IllegalArgumentException("Not an Artemis address")
state.locked { state.locked {
createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = address !is ServiceAddress) createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = false)
} }
internalTargetQueue internalTargetQueue
} }
@ -572,8 +574,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false,
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), exclusive, null) ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), exclusive, null)
sendBridgeCreateMessage()
} }
// When there are multiple nodes sharing the firewall, the peer queue may already exist as it was created when
// another node tried communicating with the target. A bridge is still needed as there has to be one per source-queue-target
sendBridgeCreateMessage()
} }
knownQueues += queueName knownQueues += queueName
} }