[ENT-2624] Disable switch for SNI functionality (#1487)

* [ENT-2624] Disable switch for SNI functionality

* * Add SNI switch to driver
* Make BridgeRestartTest test for both enableSNI = true and false
This commit is contained in:
Patrick Kuo 2018-11-01 11:15:24 +00:00 committed by GitHub
parent eaea6546fe
commit 5d1362bca6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 259 additions and 255 deletions

View File

@ -22,15 +22,22 @@ import net.corda.testing.node.internal.cordappsForPackages
import net.corda.testing.node.internal.internalDriver
import org.junit.ClassRule
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class BridgeRestartTest : IntegrationTest() {
@RunWith(Parameterized::class)
class BridgeRestartTest(private val enableSNI: Boolean) : IntegrationTest() {
companion object {
val pingStarted = ConcurrentHashMap<StateMachineRunId, OpenFuture<Unit>>()
@JvmStatic
@Parameterized.Parameters(name = "enableSNI = {0}")
fun data() = listOf(false, true)
@ClassRule
@JvmField
val databaseSchemas = IntegrationTestSchemas(DUMMY_BANK_A_NAME.toDatabaseSchemaName(), DUMMY_BANK_B_NAME.toDatabaseSchemaName(), DUMMY_NOTARY_NAME.toDatabaseSchemaName())
@ -44,7 +51,7 @@ class BridgeRestartTest : IntegrationTest() {
val pongSession = initiateFlow(pongParty)
pongSession.sendAndReceive<Unit>(times)
pingStarted.getOrPut(runId) { openFuture() }.set(Unit)
for (i in 1 .. times) {
for (i in 1..times) {
logger.info("PING $i")
val j = pongSession.sendAndReceive<Int>(i).unwrap { it }
assertEquals(i, j)
@ -57,7 +64,7 @@ class BridgeRestartTest : IntegrationTest() {
@Suspendable
override fun call() {
val times = pingSession.sendAndReceive<Int>(Unit).unwrap { it }
for (i in 1 .. times) {
for (i in 1..times) {
logger.info("PONG $i")
val j = pingSession.sendAndReceive<Int>(i).unwrap { it }
assertEquals(i, j)
@ -68,7 +75,7 @@ class BridgeRestartTest : IntegrationTest() {
@Test
fun restartLongPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge")) {
internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge"), enableSNI = enableSNI) {
val bFuture = startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
val bridgePort = 20005
val brokerPort = 21005
@ -121,7 +128,7 @@ class BridgeRestartTest : IntegrationTest() {
@Test
fun restartSeveralPingPongFlowsRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge")) {
internalDriver(startNodesInProcess = true, cordappsForAllNodes = cordappsForPackages("net.corda.bridge"), enableSNI = enableSNI) {
val bFuture = startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
val bridgePort = 20005
val brokerPort = 21005
@ -154,7 +161,7 @@ class BridgeRestartTest : IntegrationTest() {
// We kill -9 and restart the bridge after a random sleep
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { connection ->
val handles = (1 .. 10).map {
val handles = (1..10).map {
connection.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100)
}

View File

@ -116,7 +116,6 @@ class SNIBridgeTest : IntegrationTest() {
// Start broker
val broker = createArtemisTextCertsLogin(artemisPort, nodeConfigs[DUMMY_BANK_B_NAME]!!.p2pSslOptions)
broker.start()
println(broker.isActive)
val aFuture = startNode(
providedName = DUMMY_BANK_A_NAME,
rpcUsers = listOf(demoUser),
@ -125,14 +124,11 @@ class SNIBridgeTest : IntegrationTest() {
"p2pAddress" to "localhost:$advertisedP2PPort",
"messagingServerAddress" to "0.0.0.0:$artemisPort",
"messagingServerExternal" to true,
"enterpriseConfiguration" to mapOf(
"externalBridge" to true
)
"enterpriseConfiguration" to mapOf("externalBridge" to true)
)
)
val a = aFuture.getOrThrow()
println(a.nodeInfo)
val bFuture = startNode(
providedName = DUMMY_BANK_B_NAME,
@ -142,25 +138,14 @@ class SNIBridgeTest : IntegrationTest() {
"p2pAddress" to "localhost:$advertisedP2PPort",
"messagingServerAddress" to "0.0.0.0:$artemisPort",
"messagingServerExternal" to true,
"enterpriseConfiguration" to mapOf(
"externalBridge" to true
)
"enterpriseConfiguration" to mapOf("externalBridge" to true)
)
)
val b = bFuture.getOrThrow()
println(b.nodeInfo)
val bridge = startBridge(ALICE_NAME, advertisedP2PPort, artemisPort, mapOf(
"outboundConfig" to mapOf(
"artemisBrokerAddress" to "localhost:$artemisPort"
),
"inboundConfig" to mapOf(
"listeningAddress" to "0.0.0.0:$advertisedP2PPort"
)
val bridge = startBridge(ALICE_NAME, advertisedP2PPort, artemisPort, emptyMap(
)).getOrThrow()
println(bridge.brokerPort)
// Start a node on the other side of the bridge
val c = startNode(providedName = DUMMY_BANK_C_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:${portAllocation.nextPort()}")).getOrThrow()

View File

@ -83,6 +83,7 @@ class AMQPListenerTest {
override val trustStore = clientTrustStore
override val maxMessageSize: Int = maxMessageSize
override val trace: Boolean = true
override val enableSNI: Boolean = clientConfig.bridgeInnerConfig?.enableSNI ?: true
}
// create and connect a real client
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)),

View File

@ -66,6 +66,7 @@ interface BridgeInnerConfiguration {
val customSSLConfiguration: BridgeSSLConfiguration?
// The SSL keystores to provision into the Float in DMZ
val customFloatOuterSSLConfiguration: BridgeSSLConfiguration?
val enableSNI: Boolean
}
interface BridgeHAConfig {

View File

@ -7,8 +7,8 @@ import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
import net.corda.nodeapi.internal.config.SslConfiguration
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.SslConfiguration
import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
import java.nio.file.Path
@ -39,7 +39,8 @@ data class BridgeInboundConfigurationImpl(override val listeningAddress: Network
data class BridgeInnerConfigurationImpl(override val floatAddresses: List<NetworkHostAndPort>,
override val expectedCertificateSubject: CordaX500Name,
override val customSSLConfiguration: BridgeSSLConfigurationImpl?,
override val customFloatOuterSSLConfiguration: BridgeSSLConfigurationImpl?) : BridgeInnerConfiguration
override val customFloatOuterSSLConfiguration: BridgeSSLConfigurationImpl?,
override val enableSNI: Boolean = true) : BridgeInnerConfiguration
data class FloatOuterConfigurationImpl(override val floatAddress: NetworkHostAndPort,
override val expectedCertificateSubject: CordaX500Name,
@ -71,15 +72,12 @@ data class FirewallConfigurationImpl(
override val p2pConfirmationWindowSize: Int = 1048576,
override val whitelistedHeaders: List<String> = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList(),
override val auditServiceConfiguration: AuditServiceConfigurationImpl,
override val healthCheckPhrase: String? = null
) : FirewallConfiguration {
override val healthCheckPhrase: String? = null) : FirewallConfiguration {
init {
if (firewallMode == FirewallMode.SenderReceiver) {
require(inboundConfig != null && outboundConfig != null) { "Missing required configuration" }
} else if (firewallMode == FirewallMode.BridgeInner) {
require(bridgeInnerConfig != null && outboundConfig != null) { "Missing required configuration" }
} else if (firewallMode == FirewallMode.FloatOuter) {
require(inboundConfig != null && floatOuterConfig != null) { "Missing required configuration" }
when (firewallMode) {
FirewallMode.SenderReceiver -> require(inboundConfig != null && outboundConfig != null) { "Missing required configuration" }
FirewallMode.BridgeInner -> require(bridgeInnerConfig != null && outboundConfig != null) { "Missing required configuration" }
FirewallMode.FloatOuter -> require(inboundConfig != null && floatOuterConfig != null) { "Missing required configuration" }
}
}

View File

@ -16,6 +16,7 @@ import rx.Observable
import rx.Subscription
import rx.subjects.PublishSubject
import java.io.ByteArrayInputStream
import java.lang.String.valueOf
import java.security.KeyStore
import java.util.*
@ -28,7 +29,7 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration,
private val consoleLogger = LoggerFactory.getLogger("BasicInfo")
}
private val statusFollower: ServiceStateCombiner
private val statusFollower: ServiceStateCombiner = ServiceStateCombiner(listOf(auditService))
private var statusSubscriber: Subscription? = null
private var amqpServer: AMQPServer? = null
private var keyStorePrivateKeyPassword: CharArray? = null
@ -36,10 +37,6 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration,
private var onConnectAuditSubscription: Subscription? = null
private var onReceiveSubscription: Subscription? = null
init {
statusFollower = ServiceStateCombiner(listOf(auditService))
}
override fun provisionKeysAndActivate(keyStoreBytes: ByteArray,
keyStorePassword: CharArray,
keyStorePrivateKeyPassword: CharArray,
@ -59,6 +56,7 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration,
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
override val maxMessageSize: Int = maximumMessageSize
override val trace: Boolean = conf.enableAMQPPacketTrace
override val enableSNI: Boolean = conf.bridgeInnerConfig?.enableSNI ?: true
override val healthCheckPhrase = conf.healthCheckPhrase
}
val server = AMQPServer(bindAddress.host,
@ -93,7 +91,7 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration,
ByteArrayInputStream(keyStoreBytes).use {
keyStore.load(it, keyStorePassword)
}
return X509KeyStore(keyStore, java.lang.String.valueOf(keyStorePassword))
return X509KeyStore(keyStore, valueOf(keyStorePassword))
}
override fun wipeKeysAndDeactivate() {

View File

@ -38,7 +38,8 @@ class FloatControlListenerService(val conf: FirewallConfiguration,
private var connectSubscriber: Subscription? = null
private var receiveSubscriber: Subscription? = null
private var amqpControlServer: AMQPServer? = null
private val sslConfiguration: MutualSslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: conf.p2pSslOptions
private val sslConfiguration: MutualSslConfiguration = conf.floatOuterConfig?.customSSLConfiguration
?: conf.p2pSslOptions
private val floatControlAddress = conf.floatOuterConfig!!.floatAddress
private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject
private var activeConnectionInfo: ConnectionChange? = null

View File

@ -59,6 +59,7 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration,
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
override val maxMessageSize: Int = maximumMessageSize
override val trace: Boolean = conf.enableAMQPPacketTrace
override val enableSNI: Boolean = conf.bridgeInnerConfig!!.enableSNI
override val healthCheckPhrase = conf.healthCheckPhrase
}
val controlClient = AMQPClient(floatAddresses,

View File

@ -28,7 +28,11 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration,
private val statusFollower: ServiceStateCombiner = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService))
private var statusSubscriber: Subscription? = null
private var listenerActiveSubscriber: Subscription? = null
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf.p2pSslOptions, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) },
private var bridgeControlListener = BridgeControlListener(conf.p2pSslOptions,
conf.outboundConfig!!.socksProxyConfig,
maxMessageSize,
conf.bridgeInnerConfig?.enableSNI ?: true,
{ ForwardingArtemisMessageClient(artemisConnectionService) },
BridgeAuditServiceAdaptor(auditService))
private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider {

View File

@ -208,7 +208,7 @@ class FlowWorkerStartStopTest {
}
private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener {
val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize)
val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize, enableSNI = true)
bridgeControlListener.start()
return bridgeControlListener
}

View File

@ -247,7 +247,7 @@ class FlowWorkerTest {
}
private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener {
val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize)
val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize, enableSNI = true)
bridgeControlListener.start()
return bridgeControlListener
}

View File

@ -35,9 +35,12 @@ import kotlin.concurrent.withLock
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
*/
@VisibleForTesting
class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig? = null, maxMessageSize: Int,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager {
open class AMQPBridgeManager(config: MutualSslConfiguration,
socksProxyConfig: SocksProxyConfig? = null,
maxMessageSize: Int,
enableSNI: Boolean,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager {
private val lock = ReentrantLock()
private val queueNamesToBridgesMap = mutableMapOf<String, MutableList<AMQPBridge>>()
@ -47,19 +50,21 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
override val socksProxyConfig: SocksProxyConfig?,
override val maxMessageSize: Int,
override val useOpenSsl: Boolean,
override val enableSNI: Boolean,
override val sourceX500Name: String? = null) : AMQPConfiguration {
constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.keyStore.get(),
constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int, enableSNI: Boolean) : this(config.keyStore.get(),
config.trustStore.get(),
socksProxyConfig,
maxMessageSize,
config.useOpenSsl)
config.useOpenSsl,
enableSNI)
}
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, socksProxyConfig, maxMessageSize)
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, socksProxyConfig, maxMessageSize, enableSNI)
private var sharedEventLoopGroup: EventLoopGroup? = null
private var artemis: ArtemisSessionProvider? = null
constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, enableSNI: Boolean, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, maxMessageSize, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object {
private const val NUM_BRIDGE_THREADS = 0 // Default sized pool
@ -154,7 +159,11 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
this.session = session
// Several producers (in the case of shared bridge) can put messages in the same outbound p2p queue. The consumers are created using the source x500 name as a filter
val consumer = session.createConsumer(queueName, "hyphenated_props:sender-subject-name = '${amqpConfig.sourceX500Name}'")
val consumer = if (amqpConfig.enableSNI) {
session.createConsumer(queueName, "hyphenated_props:sender-subject-name = '${amqpConfig.sourceX500Name}'")
} else {
session.createConsumer(queueName)
}
this.consumer = consumer
consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler)
session.start()
@ -230,7 +239,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
return
}
}
val newAMQPConfig = AMQPConfigurationImpl(amqpConfig.keyStore, amqpConfig.trustStore, amqpConfig.socksProxyConfig, amqpConfig.maxMessageSize, amqpConfig.useOpenSsl, sourceX500Name)
val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, socksProxyConfig, maxMessageSize, useOpenSsl, enableSNI, sourceX500Name) }
val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService)
bridges += newBridge
bridgeMetricsService?.bridgeCreated(targets, legalNames)

View File

@ -1,6 +1,5 @@
package net.corda.nodeapi.internal.bridging
import net.corda.core.identity.CordaX500Name
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -11,7 +10,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
@ -28,13 +26,18 @@ import java.util.*
class BridgeControlListener(val config: MutualSslConfiguration,
socksProxyConfig: SocksProxyConfig? = null,
maxMessageSize: Int,
enableSNI: Boolean,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
bridgeMetricsService: BridgeMetricsService? = null) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId"
private val validInboundQueues = mutableSetOf<String>()
private val bridgeManager = LoopbackBridgeManagerWrapper(config, socksProxyConfig, maxMessageSize, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic)
private val bridgeManager = if (enableSNI) {
LoopbackBridgeManager(config, socksProxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic)
} else {
AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService)
}
private var artemis: ArtemisSessionProvider? = null
private var controlConsumer: ClientConsumer? = null
private var notifyConsumer: ClientConsumer? = null
@ -42,7 +45,8 @@ class BridgeControlListener(val config: MutualSslConfiguration,
constructor(config: MutualSslConfiguration,
p2pAddress: NetworkHostAndPort,
maxMessageSize: Int,
socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
enableSNI: Boolean,
socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, maxMessageSize, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object {
private val log = contextLogger()
@ -163,7 +167,10 @@ class BridgeControlListener(val config: MutualSslConfiguration,
val wasActive = active
validInboundQueues.addAll(controlMessage.inboxQueues)
log.info("Added inbox: ${controlMessage.inboxQueues}")
bridgeManager.inboxesAdded(controlMessage.inboxQueues)
if (bridgeManager is LoopbackBridgeManager) {
// Notify loopback bridge manager inboxes has changed.
bridgeManager.inboxesAdded(controlMessage.inboxQueues)
}
if (!wasActive && active) {
_activeChange.onNext(true)
}
@ -187,54 +194,4 @@ class BridgeControlListener(val config: MutualSslConfiguration,
}
}
}
private class LoopbackBridgeManagerWrapper(config: MutualSslConfiguration,
socksProxyConfig: SocksProxyConfig? = null,
maxMessageSize: Int,
artemisMessageClientFactory: () -> ArtemisSessionProvider,
bridgeMetricsService: BridgeMetricsService? = null,
private val isLocalInbox: (String) -> Boolean) : BridgeManager {
private val bridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, artemisMessageClientFactory, bridgeMetricsService)
private val loopbackBridgeManager = LoopbackBridgeManager(artemisMessageClientFactory, bridgeMetricsService)
override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
val inboxAddress = translateLocalQueueToInboxAddress(queueName)
if (isLocalInbox(inboxAddress)) {
log.info("Deploying loopback bridge for $queueName, source $sourceX500Name")
loopbackBridgeManager.deployBridge(sourceX500Name, queueName, targets, legalNames)
} else {
log.info("Deploying AMQP bridge for $queueName, source $sourceX500Name")
bridgeManager.deployBridge(sourceX500Name, queueName, targets, legalNames)
}
}
override fun destroyBridge(queueName: String, targets: List<NetworkHostAndPort>) {
bridgeManager.destroyBridge(queueName, targets)
loopbackBridgeManager.destroyBridge(queueName, targets)
}
override fun start() {
bridgeManager.start()
loopbackBridgeManager.start()
}
override fun stop() {
bridgeManager.stop()
loopbackBridgeManager.stop()
}
override fun close() = stop()
/**
* Remove any AMQP bridge for the local inbox and create a loopback bridge for that queue.
*/
fun inboxesAdded(inboxes: List<String>) {
for (inbox in inboxes) {
bridgeManager.destroyAllBridge(inbox).forEach { source, bridgeEntry ->
loopbackBridgeManager.deployBridge(source, bridgeEntry.queueName, bridgeEntry.targets, bridgeEntry.legalNames.toSet())
}
}
}
}
}

View File

@ -9,7 +9,9 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientConsumer
@ -23,8 +25,17 @@ import org.slf4j.MDC
* inboxes.
*/
@VisibleForTesting
class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager {
class LoopbackBridgeManager(config: MutualSslConfiguration,
socksProxyConfig: SocksProxyConfig? = null,
maxMessageSize: Int,
enableSNI: Boolean,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService? = null,
private val isLocalInbox: (String) -> Boolean) : AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService) {
companion object {
private val log = contextLogger()
}
private val queueNamesToBridgesMap = ConcurrentBox(mutableMapOf<String, MutableList<LoopbackBridge>>())
private var artemis: ArtemisSessionProvider? = null
@ -118,21 +129,29 @@ class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> Artem
}
override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
queueNamesToBridgesMap.exclusive {
val bridges = getOrPut(queueName) { mutableListOf() }
for (target in targets) {
if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) {
return
val inboxAddress = translateLocalQueueToInboxAddress(queueName)
if (isLocalInbox(inboxAddress)) {
log.info("Deploying loopback bridge for $queueName, source $sourceX500Name")
queueNamesToBridgesMap.exclusive {
val bridges = getOrPut(queueName) { mutableListOf() }
for (target in targets) {
if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) {
return
}
}
}
val newBridge = LoopbackBridge(sourceX500Name, queueName, targets, legalNames, artemis!!, bridgeMetricsService)
bridges += newBridge
bridgeMetricsService?.bridgeCreated(targets, legalNames)
newBridge
}.start()
val newBridge = LoopbackBridge(sourceX500Name, queueName, targets, legalNames, artemis!!, bridgeMetricsService)
bridges += newBridge
bridgeMetricsService?.bridgeCreated(targets, legalNames)
newBridge
}.start()
} else {
log.info("Deploying AMQP bridge for $queueName, source $sourceX500Name")
super.deployBridge(sourceX500Name, queueName, targets, legalNames)
}
}
override fun destroyBridge(queueName: String, targets: List<NetworkHostAndPort>) {
super.destroyBridge(queueName, targets)
queueNamesToBridgesMap.exclusive {
val bridges = this[queueName] ?: mutableListOf()
for (target in targets) {
@ -149,7 +168,19 @@ class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> Artem
}
}
/**
* Remove any AMQP bridge for the local inbox and create a loopback bridge for that queue.
*/
fun inboxesAdded(inboxes: List<String>) {
for (inbox in inboxes) {
super.destroyAllBridge(inbox).forEach { source, bridgeEntry ->
deployBridge(source, bridgeEntry.queueName, bridgeEntry.targets, bridgeEntry.legalNames.toSet())
}
}
}
override fun start() {
super.start()
val artemis = artemisMessageClientFactory()
this.artemis = artemis
artemis.start()
@ -158,6 +189,7 @@ class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> Artem
override fun stop() = close()
override fun close() {
super.close()
queueNamesToBridgesMap.exclusive {
for (bridge in values.flatten()) {
bridge.stop()

View File

@ -41,8 +41,8 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
private val userName: String?,
private val password: String?,
private val trace: Boolean,
private val onOpen: (Pair<SocketChannel, ConnectionChange>) -> Unit,
private val onClose: (Pair<SocketChannel, ConnectionChange>) -> Unit,
private val onOpen: (SocketChannel, ConnectionChange) -> Unit,
private val onClose: (SocketChannel, ConnectionChange) -> Unit,
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
companion object {
private val log = contextLogger()
@ -114,7 +114,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
val ch = ctx.channel()
logInfoWithMDC("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
if (!suppressClose) {
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert)))
onClose(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert))
}
eventProcessor?.close()
ctx.fireChannelInactive()
@ -263,7 +263,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
logInfoWithMDC("Handshake completed with subject: $remoteX500Name, requested server name: ${sslHandler.getRequestedServerName()}.")
createAMQPEngine(ctx)
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
onOpen(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false))
}
private fun handleFailedHandshake(ctx: ChannelHandlerContext, evt: SslHandshakeCompletionEvent) {

View File

@ -14,19 +14,14 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.config.CertificateStore
import net.corda.nodeapi.internal.crypto.x509
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
import net.corda.nodeapi.internal.requireMessageSize
import rx.Observable
import rx.subjects.PublishSubject
import sun.security.x509.X500Name
import java.lang.Long.min
import java.net.InetSocketAddress
import java.security.KeyStore
import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.KeyManagerFactory
@ -99,23 +94,21 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
retryInterval = min(MAX_RETRY_INTERVAL, retryInterval * BACKOFF_MULTIPLIER)
}
private val connectListener = object : ChannelFutureListener {
override fun operationComplete(future: ChannelFuture) {
if (!future.isSuccess) {
log.info("Failed to connect to $currentTarget")
private val connectListener = ChannelFutureListener { future ->
if (!future.isSuccess) {
log.info("Failed to connect to $currentTarget")
if (!stopping) {
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
} else {
log.info("Connected to $currentTarget")
// Connection established successfully
clientChannel = future.channel()
clientChannel?.closeFuture()?.addListener(closeListener)
if (!stopping) {
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
} else {
log.info("Connected to $currentTarget")
// Connection established successfully
clientChannel = future.channel()
clientChannel?.closeFuture()?.addListener(closeListener)
}
}
@ -164,7 +157,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, parent.configuration)
val target = parent.currentTarget
val handler = if (parent.configuration.useOpenSsl){
val handler = if (parent.configuration.useOpenSsl) {
createClientOpenSslHandler(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory, ch.alloc())
} else {
createClientSslHelper(target, parent.allowedRemoteLegalNames, wrappedKeyManagerFactory, trustManagerFactory)
@ -178,13 +171,13 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
conf.userName,
conf.password,
conf.trace,
{
{ _, change ->
parent.retryInterval = MIN_RETRY_INTERVAL // reset to fast reconnect if we connect properly
parent._onConnection.onNext(it.second)
parent._onConnection.onNext(change)
},
{
parent._onConnection.onNext(it.second)
if (it.second.badCert) {
{ _, change ->
parent._onConnection.onNext(change)
if (change.badCert) {
log.error("Blocking future connection attempts to $target due to bad certificate on endpoint")
parent.badCertTargets += target
}

View File

@ -71,5 +71,9 @@ interface AMQPConfiguration {
*/
val healthCheckPhrase: String?
get() = null
@JvmDefault
val enableSNI: Boolean
get() = true
}

View File

@ -1,10 +1,7 @@
package net.corda.nodeapi.internal.protonwrapper.netty
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.Channel
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.EventLoopGroup
import io.netty.channel.*
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
@ -67,52 +64,45 @@ class AMQPServer(val hostName: String,
override fun initChannel(ch: SocketChannel) {
val amqpConfiguration = parent.configuration
val pipeline = ch.pipeline()
amqpConfiguration.healthCheckPhrase?.let { pipeline.addLast(ModeSelectingChannel.NAME, ModeSelectingChannel(it)) }
val keyStore = amqpConfiguration.keyStore
// Used for SNI matching with javaSSL.
val wrappedKeyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, amqpConfiguration)
// Used to create a mapping for SNI matching with openSSL.
val keyManagerFactoriesMap = splitKeystore(amqpConfiguration)
val handler = if (amqpConfiguration.useOpenSsl){
// SNI matching needed only when multiple nodes exist behind the server.
if (keyStore.aliases().size > 1) {
createServerSNIOpenSslHandler(keyManagerFactoriesMap, trustManagerFactory)
} else {
createServerOpenSslHandler(wrappedKeyManagerFactory, trustManagerFactory, ch.alloc())
}
} else {
// For javaSSL, SNI matching is handled at key manager level.
createServerSslHelper(keyStore, wrappedKeyManagerFactory, trustManagerFactory)
}
pipeline.addLast("sslHandler", handler)
val (sslHandler, keyManagerFactoriesMap) = createSSLHandler(amqpConfiguration, ch)
pipeline.addLast("sslHandler", sslHandler)
if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
pipeline.addLast(AMQPChannelHandler(true,
null,
// Passing a mapping of legal names to key managers to be able to pick the correct one after
// SNI completion event is fired up.
if (keyStore.aliases().size > 1 && amqpConfiguration.useOpenSsl)
keyManagerFactoriesMap
else
// Single entry, key can be anything.
mapOf(DEFAULT to wrappedKeyManagerFactory),
keyManagerFactoriesMap,
conf.userName,
conf.password,
conf.trace,
{
parent.clientChannels[it.first.remoteAddress()] = it.first
parent._onConnection.onNext(it.second)
{ channel, change ->
parent.clientChannels[channel.remoteAddress()] = channel
parent._onConnection.onNext(change)
},
{
parent.clientChannels.remove(it.first.remoteAddress())
parent._onConnection.onNext(it.second)
{ channel, change ->
parent.clientChannels.remove(channel.remoteAddress())
parent._onConnection.onNext(change)
},
{ rcv -> parent._onReceive.onNext(rcv) }))
}
private fun createSSLHandler(amqpConfig: AMQPConfiguration, ch: SocketChannel): Pair<ChannelHandler, Map<String, CertHoldingKeyManagerFactoryWrapper>> {
return if (amqpConfig.useOpenSsl && amqpConfig.enableSNI && amqpConfig.keyStore.aliases().size > 1) {
val keyManagerFactoriesMap = splitKeystore(amqpConfig)
// SNI matching needed only when multiple nodes exist behind the server.
Pair(createServerSNIOpenSslHandler(keyManagerFactoriesMap, trustManagerFactory), keyManagerFactoriesMap)
} else {
val keyManagerFactory = CertHoldingKeyManagerFactoryWrapper(keyManagerFactory, amqpConfig)
val handler = if (amqpConfig.useOpenSsl) {
createServerOpenSslHandler(keyManagerFactory, trustManagerFactory, ch.alloc())
} else {
// For javaSSL, SNI matching is handled at key manager level.
createServerSslHandler(amqpConfig.keyStore, keyManagerFactory, trustManagerFactory)
}
Pair(handler, mapOf(DEFAULT to keyManagerFactory))
}
}
}
fun start() {
@ -189,10 +179,7 @@ class AMQPServer(val hostName: String,
}
fun dropConnection(connectionRemoteHost: InetSocketAddress) {
val channel = clientChannels[connectionRemoteHost]
if (channel != null) {
channel.close()
}
clientChannels[connectionRemoteHost]?.close()
}
fun complete(delivery: Delivery, target: InetSocketAddress) {

View File

@ -26,15 +26,11 @@ class CertHoldingKeyManagerFactorySpiWrapper(private val factorySpi: KeyManagerF
return if (factorySpi is CertHoldingKeyManagerFactorySpiWrapper) keyManagers else keyManagers.map {
val aliasProvidingKeyManager = getDefaultKeyManager(it)
// Use the SNIKeyManager if keystore has several entries and only for clients and non-openSSL servers.
if (amqpConfig.keyStore.aliases().size > 1) {
// Clients
if (amqpConfig.sourceX500Name != null) {
SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig)
} else if (!amqpConfig.useOpenSsl) { // JDK SSL servers
SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig)
} else {
aliasProvidingKeyManager
}
// Condition of using SNIKeyManager: if its client, or JDKSsl server.
val isClient = amqpConfig.sourceX500Name != null
val enableSNI = amqpConfig.enableSNI && amqpConfig.keyStore.aliases().size > 1
if (enableSNI && (isClient || !amqpConfig.useOpenSsl)) {
SNIKeyManager(aliasProvidingKeyManager as X509ExtendedKeyManager, amqpConfig)
} else {
aliasProvidingKeyManager
}
@ -78,5 +74,4 @@ class CertHoldingKeyManagerFactoryWrapper(factory: KeyManagerFactory, amqpConfig
keyManager.getCertificateChain(alias)
} else null
}
}

View File

@ -149,9 +149,9 @@ internal fun createClientOpenSslHandler(target: NetworkHostAndPort,
return SslHandler(sslEngine)
}
internal fun createServerSslHelper(keyStore: CertificateStore,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
internal fun createServerSslHandler(keyStore: CertificateStore,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray()
@ -189,12 +189,10 @@ internal fun initialiseTrustStoreAndEnableCrlChecking(trustStore: CertificateSto
internal fun createServerOpenSslHandler(keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory,
alloc: ByteBufAllocator): SslHandler {
val sslContext = SslContextBuilder.forServer(keyManagerFactory).sslProvider(SslProvider.OPENSSL).trustManager(LoggingTrustManagerFactoryWrapper(trustManagerFactory)).build()
val sslContext = getServerSslContextBuilder(keyManagerFactory, trustManagerFactory).build()
val sslEngine = sslContext.newEngine(alloc)
sslEngine.useClientMode = false
sslEngine.needClientAuth = true
sslEngine.enabledProtocols = ArtemisTcpTransport.TLS_VERSIONS.toTypedArray()
sslEngine.enabledCipherSuites = ArtemisTcpTransport.CIPHER_SUITES.toTypedArray()
return SslHandler(sslEngine)
}
@ -205,20 +203,21 @@ internal fun createServerSNIOpenSslHandler(keyManagerFactoriesMap: Map<String, K
trustManagerFactory: TrustManagerFactory): SniHandler {
// Default value can be any in the map.
val sslCtxBuilder = SslContextBuilder.forServer(keyManagerFactoriesMap.values.first())
val sslCtxBuilder = getServerSslContextBuilder(keyManagerFactoriesMap.values.first(), trustManagerFactory)
val mapping = DomainNameMappingBuilder(sslCtxBuilder.build())
keyManagerFactoriesMap.forEach {
mapping.add(it.key, sslCtxBuilder.keyManager(it.value).build())
}
return SniHandler(mapping.build())
}
private fun getServerSslContextBuilder(keyManagerFactory: KeyManagerFactory, trustManagerFactory: TrustManagerFactory): SslContextBuilder {
return SslContextBuilder.forServer(keyManagerFactory)
.sslProvider(SslProvider.OPENSSL)
.trustManager(LoggingTrustManagerFactoryWrapper(trustManagerFactory))
.clientAuth(ClientAuth.REQUIRE)
.ciphers(ArtemisTcpTransport.CIPHER_SUITES)
.protocols(*ArtemisTcpTransport.TLS_VERSIONS.toTypedArray())
val mapping = DomainNameMappingBuilder(sslCtxBuilder.build())
keyManagerFactoriesMap.forEach {
mapping.add(it.key, sslCtxBuilder.keyManager(it.value).build())
}
return SniHandler(mapping.build())
}
internal fun splitKeystore(config: AMQPConfiguration): Map<String, CertHoldingKeyManagerFactoryWrapper> {

View File

@ -94,5 +94,5 @@ class TestKeyManagerFactoryWrapper {
assertNull(otherWrappedKeyManagerFactory.getCurrentCertChain())
}
private class AMQPConfigurationImpl(override val keyStore: CertificateStore, override val trustStore: CertificateStore, override val maxMessageSize: Int) : AMQPConfiguration
private class AMQPConfigurationImpl(override val keyStore: CertificateStore, override val trustStore: CertificateStore, override val maxMessageSize: Int, override val enableSNI: Boolean = true) : AMQPConfiguration
}

View File

@ -17,10 +17,7 @@ import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
import net.corda.nodeapi.internal.bridging.BridgeManager
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.*
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.stubs.CertificateStoreStubs
@ -45,7 +42,7 @@ import kotlin.system.measureTimeMillis
import kotlin.test.assertEquals
@RunWith(Parameterized::class)
class AMQPBridgeTest(private val useOpenSsl: Boolean) {
class AMQPBridgeTest(private val useOpenSsl: Boolean, private val enableSNI: Boolean) {
companion object {
private val logger = contextLogger()
@ -53,8 +50,8 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) {
const val echoPhrase = "Hello!"
@JvmStatic
@Parameterized.Parameters(name = "useOpenSsl = {0}")
fun data(): Collection<Boolean> = listOf(false, true)
@Parameterized.Parameters(name = "useOpenSsl = {0}, enableSNI = {1}")
fun data() = listOf(false, true).product(listOf(false, true))
private fun String.assertEchoResponse(address: InetSocketAddress, drip: Boolean = false) {
SocketChannel.open(address).use {
@ -357,7 +354,7 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) {
artemisServer.start()
artemisClient.start()
val bridgeManager = AMQPBridgeManager(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE)
val bridgeManager = AMQPBridgeManager(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, enableSNI)
bridgeManager.start()
val artemis = artemisClient.started!!
if (sourceQueueName != null) {
@ -413,10 +410,11 @@ class AMQPBridgeTest(private val useOpenSsl: Boolean) {
val keyStore = serverConfig.p2pSslOptions.keyStore.get()
val amqpConfig = object : AMQPConfiguration {
override val keyStore = keyStore
override val trustStore = serverConfig.p2pSslOptions.trustStore.get()
override val trustStore = serverConfig.p2pSslOptions.trustStore.get()
//override val trace: Boolean = true
override val maxMessageSize: Int = maxMessageSize
override val useOpenSsl = serverConfig.p2pSslOptions.useOpenSsl
override val enableSNI: Boolean = this@AMQPBridgeTest.enableSNI
override val healthCheckPhrase = echoPhrase
}
return AMQPServer("0.0.0.0",

View File

@ -193,6 +193,7 @@ class LoopbackBridgeTest(private val useOpenSsl: Boolean) {
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
doReturn(true).whenever(it).crlCheckSoftFail
doReturn(artemisAddress).whenever(it).p2pAddress
doReturn(true).whenever(it).enableSNI
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
}
@ -204,7 +205,13 @@ class LoopbackBridgeTest(private val useOpenSsl: Boolean) {
artemisServer.start()
artemisClient.start()
val bridgeManager = LoopbackBridgeManager({ ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize) })
val bridgeManager = LoopbackBridgeManager(artemisConfig.p2pSslOptions,
null,
MAX_MESSAGE_SIZE,
artemisConfig.enableSNI,
{ ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize) },
null,
{ true })
bridgeManager.start()
val artemis = artemisClient.started!!

View File

@ -87,6 +87,7 @@ class ArtemisMessagingTest {
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
doReturn(false).whenever(it).messagingServerExternal
doReturn(true).whenever(it).enableSNI
doReturn(FlowTimeoutConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).flowTimeout
}
LogHelper.setLevel(PersistentUniquenessProvider::class)

View File

@ -265,7 +265,7 @@ open class Node(configuration: NodeConfiguration,
configuration.enterpriseConfiguration.externalBrokerConnectionConfiguration,
configuration.enterpriseConfiguration.externalBrokerBackupAddresses)
}
BridgeControlListener(configuration.p2pSslOptions, null, networkParameters.maxMessageSize, artemisClient)
BridgeControlListener(configuration.p2pSslOptions, null, networkParameters.maxMessageSize, configuration.enableSNI, artemisClient)
} else {
null
}

View File

@ -83,6 +83,7 @@ interface NodeConfiguration {
val p2pSslOptions: MutualSslConfiguration
val cordappDirectories: List<Path>
val enableSNI: Boolean
val flowOverrides: FlowOverrideConfig?
val cordappSignerKeyFingerprintBlacklist: List<String>
@ -236,6 +237,7 @@ data class NodeConfigurationImpl(
override val flowMonitorSuspensionLoggingThresholdMillis: Duration = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS,
override val cordappDirectories: List<Path> = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT),
override val jmxReporterType: JmxReporterType? = JmxReporterType.JOLOKIA,
override val enableSNI: Boolean = true,
private val useOpenSsl: Boolean = false,
override val flowOverrides: FlowOverrideConfig?,
override val cordappSignerKeyFingerprintBlacklist: List<String> = DEV_PUB_KEY_HASHES.map { it.toString() }

View File

@ -14,6 +14,7 @@ import net.corda.node.services.statemachine.FlowMessagingImpl
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException
import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
@ -44,7 +45,8 @@ class MessagingExecutor(
metricRegistry: MetricRegistry,
val ourSenderUUID: String,
queueBound: Int,
val myLegalName: String
val myLegalName: String,
private val enableSNI: Boolean
) {
private sealed class Job {
data class Acknowledge(val message: ClientMessage) : Job()
@ -56,7 +58,10 @@ class MessagingExecutor(
) : Job() {
override fun toString() = "Send(${message.uniqueMessageId}, target=$target)"
}
object Shutdown : Job() { override fun toString() = "Shutdown" }
object Shutdown : Job() {
override fun toString() = "Shutdown"
}
}
private val queue = ArrayBlockingQueue<Job>(queueBound)
@ -68,9 +73,7 @@ class MessagingExecutor(
private val sendQueueSizeOnInsert = metricRegistry.histogram("P2P.SendQueueSizeOnInsert")
init {
metricRegistry.register("P2P.SendQueueSize", Gauge<Int> {
queue.size
})
metricRegistry.register("P2P.SendQueueSize", Gauge<Int> { queue.size })
}
private val ourSenderSeqNo = AtomicLong()
@ -125,7 +128,7 @@ class MessagingExecutor(
}
}
Job.Shutdown -> {
if(session.stillOpen()) {
if (session.stillOpen()) {
session.commit()
}
break@eventLoop
@ -157,10 +160,10 @@ class MessagingExecutor(
"Send to: $mqAddress topic: ${job.message.topic} " +
"sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}"
}
producer.send(SimpleString(mqAddress), artemisMessage, {
producer.send(SimpleString(mqAddress), artemisMessage) {
job.timer.stop()
job.sentFuture.set(Unit)
})
}
}
fun cordaToArtemisMessage(message: Message, target: MessageRecipients? = null): ClientMessage? {
@ -172,15 +175,17 @@ class MessagingExecutor(
putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(myLegalName))
// Add a group ID to messages to be able to have multiple filtered consumers while preventing reordering.
// This header will be dropped off during transit through the bridge, which is fine as it's needed locally only.
if (target != null && target is ArtemisMessagingComponent.ServiceAddress) {
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString(message.uniqueMessageId.toString))
} else {
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID, SimpleString(myLegalName))
if (enableSNI) {
if (target is ArtemisMessagingComponent.ServiceAddress) {
putStringProperty(HDR_GROUP_ID, SimpleString(message.uniqueMessageId.toString))
} else {
putStringProperty(HDR_GROUP_ID, SimpleString(myLegalName))
}
}
sendMessageSizeMetric.update(message.data.bytes.size)
writeBodyBufferBytes(message.data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString))
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString))
// If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut.
if (ourSenderUUID == message.senderUUID) {
putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString(ourSenderUUID))
@ -188,7 +193,7 @@ class MessagingExecutor(
}
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) {
putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
}
message.additionalHeaders.forEach { key, value -> putStringProperty(key, value) }
}
@ -196,7 +201,7 @@ class MessagingExecutor(
private fun acknowledgeJob(job: Job.Acknowledge) {
log.debug {
val id = job.message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)
val id = job.message.getStringProperty(HDR_DUPLICATE_DETECTION_ID)
"Acking $id"
}
job.message.individualAcknowledge()

View File

@ -248,7 +248,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
metricRegistry,
queueBound = config.enterpriseConfiguration.tuning.maximumMessagingBatchSize,
ourSenderUUID = ourSenderUUID,
myLegalName = legalName
myLegalName = legalName,
enableSNI = config.enableSNI
)
this@P2PMessagingClient.messagingExecutor = messagingExecutor
messagingExecutor.start()
@ -574,7 +575,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
val internalTargetQueue = (address as? ArtemisAddress)?.queueName
?: throw IllegalArgumentException("Not an Artemis address")
state.locked {
createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = false, isServiceAddress = address is ServiceAddress)
val isServiceAddress = address is ServiceAddress
val exclusive = if (config.enableSNI) false else !isServiceAddress
createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = exclusive, isServiceAddress = isServiceAddress)
}
internalTargetQueue
}

View File

@ -344,7 +344,8 @@ fun <A> driver(defaultParameters: DriverParameters = DriverParameters(), dsl: Dr
networkParameters = defaultParameters.networkParameters,
notaryCustomOverrides = defaultParameters.notaryCustomOverrides,
inMemoryDB = defaultParameters.inMemoryDB,
cordappsForAllNodes = defaultParameters.cordappsForAllNodes()
cordappsForAllNodes = defaultParameters.cordappsForAllNodes(),
enableSNI = defaultParameters.enableSNI
),
coerce = { it },
dsl = dsl,
@ -400,7 +401,8 @@ data class DriverParameters(
val notaryCustomOverrides: Map<String, Any?> = emptyMap(),
val initialiseSerialization: Boolean = true,
val inMemoryDB: Boolean = true,
val cordappsForAllNodes: Collection<TestCordapp>? = null
val cordappsForAllNodes: Collection<TestCordapp>? = null,
val enableSNI: Boolean = true
) {
constructor(
isDebug: Boolean = false,

View File

@ -89,7 +89,8 @@ class DriverDSLImpl(
val networkParameters: NetworkParameters,
val notaryCustomOverrides: Map<String, Any?>,
val inMemoryDB: Boolean,
val cordappsForAllNodes: Collection<TestCordapp>
val cordappsForAllNodes: Collection<TestCordapp>,
val enableSNI: Boolean
) : InternalDriverDSL {
private var _executorService: ScheduledExecutorService? = null
@ -308,7 +309,8 @@ class DriverDSLImpl(
NodeConfiguration::rpcUsers.name to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
NodeConfiguration::verifierType.name to verifierType.name,
"enterpriseConfiguration.tuning.flowThreadPoolSize" to "1",
NodeConfiguration::flowOverrides.name to flowOverrideConfig.toConfig().root().unwrapped()
NodeConfiguration::flowOverrides.name to flowOverrideConfig.toConfig().root().unwrapped(),
NodeConfiguration::enableSNI.name to enableSNI
) + czUrlConfig + customOverrides
val config = NodeConfig(ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
@ -1116,7 +1118,8 @@ fun <DI : DriverDSL, D : InternalDriverDSL, A> genericDriver(
networkParameters = defaultParameters.networkParameters,
notaryCustomOverrides = defaultParameters.notaryCustomOverrides,
inMemoryDB = defaultParameters.inMemoryDB,
cordappsForAllNodes = defaultParameters.cordappsForAllNodes()
cordappsForAllNodes = defaultParameters.cordappsForAllNodes(),
enableSNI = defaultParameters.enableSNI
)
)
val shutdownHook = addShutdownHook(driverDsl::shutdown)
@ -1210,6 +1213,7 @@ fun <A> internalDriver(
notaryCustomOverrides: Map<String, Any?> = DriverParameters().notaryCustomOverrides,
inMemoryDB: Boolean = DriverParameters().inMemoryDB,
cordappsForAllNodes: Collection<TestCordapp> = DriverParameters().cordappsForAllNodes(),
enableSNI: Boolean = DriverParameters().enableSNI,
dsl: DriverDSLImpl.() -> A
): A {
return genericDriver(
@ -1228,7 +1232,8 @@ fun <A> internalDriver(
networkParameters = networkParameters,
notaryCustomOverrides = notaryCustomOverrides,
inMemoryDB = inMemoryDB,
cordappsForAllNodes = cordappsForAllNodes
cordappsForAllNodes = cordappsForAllNodes,
enableSNI = enableSNI
),
coerce = { it },
dsl = dsl,

View File

@ -121,6 +121,7 @@ fun <A> rpcDriver(
notaryCustomOverrides: Map<String, Any?> = emptyMap(),
inMemoryDB: Boolean = true,
cordappsForAllNodes: Collection<TestCordapp> = cordappsInCurrentAndAdditionalPackages(),
enableSNI:Boolean = true,
dsl: RPCDriverDSL.() -> A
): A {
return genericDriver(
@ -140,7 +141,8 @@ fun <A> rpcDriver(
networkParameters = networkParameters,
notaryCustomOverrides = notaryCustomOverrides,
inMemoryDB = inMemoryDB,
cordappsForAllNodes = cordappsForAllNodes
cordappsForAllNodes = cordappsForAllNodes,
enableSNI = enableSNI
), externalTrace
),
coerce = { it },

View File

@ -5,7 +5,10 @@ package net.corda.testing.core
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.*
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignatureScheme
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
@ -161,3 +164,7 @@ fun NodeInfo.singleIdentityAndCert(): PartyAndCertificate = legalIdentitiesAndCe
* Extract a single identity from the node info. Throws an error if the node has multiple identities.
*/
fun NodeInfo.singleIdentity(): Party = singleIdentityAndCert().party
fun Collection<Any>.product(p2: Collection<Any>): Collection<Array<Any>> = flatMap { param1 ->
p2.map { param2 -> arrayOf(param1, param2) }
}