From 6d270685aa5c75a0dc1ff457bf382e7cf0199388 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Mon, 23 Apr 2018 13:31:49 +0100 Subject: [PATCH] Add Artemis round-robin support for node hot-cold in-process Artemis modes. --- .../net/corda/bridge/BridgeIntegrationTest.kt | 134 +++++++++++++++++- .../services/api/BridgeConfiguration.kt | 1 + .../BridgeArtemisConnectionServiceImpl.kt | 18 ++- .../config/BridgeConfigurationImpl.kt | 1 + .../corda/bridge/artemisfailover/bridge.conf | 18 +++ .../bridge/bridge.conf | 19 +++ .../artemisfailoverandfloat/float/bridge.conf | 18 +++ 7 files changed, 202 insertions(+), 7 deletions(-) create mode 100644 bridge/src/test/resources/net/corda/bridge/artemisfailover/bridge.conf create mode 100644 bridge/src/test/resources/net/corda/bridge/artemisfailoverandfloat/bridge/bridge.conf create mode 100644 bridge/src/test/resources/net/corda/bridge/artemisfailoverandfloat/float/bridge.conf diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt index f559e685b6..5437f1ac16 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt @@ -15,6 +15,8 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.bridge.internal.BridgeInstance import net.corda.bridge.services.api.BridgeMode import net.corda.bridge.services.config.BridgeHAConfigImpl +import net.corda.core.internal.copyToDirectory +import net.corda.core.internal.createDirectories import net.corda.core.internal.div import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.serialize @@ -134,7 +136,6 @@ class BridgeIntegrationTest { artemisClient.stop() artemisServer.stop() } - } @Test @@ -258,6 +259,113 @@ class BridgeIntegrationTest { } } + @Test + fun `Test artemis failover logic`() { + val configResource = "/net/corda/bridge/artemisfailover/bridge.conf" + createNetworkParams(tempFolder.root.toPath()) + val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource) + assertEquals(BridgeMode.SenderReceiver, config.bridgeMode) + assertEquals(NetworkHostAndPort("localhost", 11005), config.outboundConfig!!.artemisBrokerAddress) + assertEquals(listOf(NetworkHostAndPort("localhost", 12005)), config.outboundConfig!!.alternateArtemisBrokerAddresses) + assertEquals(NetworkHostAndPort("0.0.0.0", 10005), config.inboundConfig!!.listeningAddress) + assertNull(config.floatInnerConfig) + assertNull(config.floatOuterConfig) + config.createBridgeKeyStores(DUMMY_BANK_A_NAME) + val (artemisServer, artemisClient) = createArtemis() + val (artemisServer2, artemisClient2) = createArtemis2() + try { + installBridgeControlResponder(artemisClient) + installBridgeControlResponder(artemisClient2) + val bridge = BridgeInstance(config, BridgeVersionInfo(1, "1.1", "Dummy", "Test")) + val stateFollower = bridge.activeChange.toBlocking().iterator + assertEquals(false, stateFollower.next()) + assertEquals(false, bridge.active) + bridge.start() + assertEquals(true, stateFollower.next()) + assertEquals(true, bridge.active) + assertEquals(true, serverListening("localhost", 10005)) + artemisClient.stop() // Stop artemis to force failover to second choice + artemisServer.stop() + assertEquals(false, stateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(true, stateFollower.next()) + assertEquals(true, bridge.active) + bridge.stop() + assertEquals(false, stateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(false, serverListening("localhost", 10005)) + } finally { + artemisClient.stop() + artemisServer.stop() + artemisClient2.stop() + artemisServer2.stop() + } + } + + @Test + fun `Test artemis failover logic with float`() { + val bridgeFolder = tempFolder.root.toPath() + val bridgeConfigResource = "/net/corda/bridge/artemisfailoverandfloat/bridge/bridge.conf" + val bridgeConfig = createAndLoadConfigFromResource(bridgeFolder, bridgeConfigResource) + bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME) + createNetworkParams(bridgeFolder) + assertEquals(BridgeMode.FloatInner, bridgeConfig.bridgeMode) + assertEquals(NetworkHostAndPort("localhost", 11005), bridgeConfig.outboundConfig!!.artemisBrokerAddress) + assertEquals(listOf(NetworkHostAndPort("localhost", 12005)), bridgeConfig.outboundConfig!!.alternateArtemisBrokerAddresses) + val floatFolder = tempFolder.root.toPath() / "float" + val floatConfigResource = "/net/corda/bridge/artemisfailoverandfloat/float/bridge.conf" + val floatConfig = createAndLoadConfigFromResource(floatFolder, floatConfigResource) + floatConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME) + createNetworkParams(floatFolder) + assertEquals(BridgeMode.FloatOuter, floatConfig.bridgeMode) + assertEquals(NetworkHostAndPort("0.0.0.0", 10005), floatConfig.inboundConfig!!.listeningAddress) + val (artemisServer, artemisClient) = createArtemis() + val (artemisServer2, artemisClient2) = createArtemis2() + try { + installBridgeControlResponder(artemisClient) + installBridgeControlResponder(artemisClient2) + val bridge = BridgeInstance(bridgeConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test")) + val bridgeStateFollower = bridge.activeChange.toBlocking().iterator + val float = BridgeInstance(floatConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test")) + val floatStateFollower = float.activeChange.toBlocking().iterator + assertEquals(false, floatStateFollower.next()) + float.start() + assertEquals(true, floatStateFollower.next()) + assertEquals(true, float.active) // float is running + assertEquals(false, serverListening("localhost", 10005)) // but not activated + assertEquals(false, bridgeStateFollower.next()) + bridge.start() + assertEquals(true, bridgeStateFollower.next()) + assertEquals(true, bridge.active) + assertEquals(true, float.active) + assertEquals(true, serverListening("localhost", 10005)) // now activated + artemisClient.stop() // Stop artemis to force failover to second choice + artemisServer.stop() + assertEquals(false, bridgeStateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(true, float.active) + assertEquals(false, serverListening("localhost", 10005)) // now activated + assertEquals(true, bridgeStateFollower.next()) + assertEquals(true, bridge.active) + assertEquals(true, float.active) + assertEquals(true, serverListening("localhost", 10005)) // now activated + bridge.stop() + assertEquals(false, bridgeStateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(true, float.active) + assertEquals(false, serverListening("localhost", 10005)) // now de-activated + float.stop() + assertEquals(false, floatStateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(false, float.active) + } finally { + artemisClient.stop() + artemisServer.stop() + artemisServer2.stop() + } + } + + private fun createArtemis(): Pair { val artemisConfig = rigorousMock().also { doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory @@ -276,6 +384,30 @@ class BridgeIntegrationTest { return Pair(artemisServer, artemisClient) } + private fun createArtemis2(): Pair { + val originalCertsFolderPath = tempFolder.root.toPath() / "certificates" + val folderPath = tempFolder.root.toPath() / "artemis2" + val newCertsFolderPath = folderPath / "certificates" + newCertsFolderPath.createDirectories() + (originalCertsFolderPath / "truststore.jks").copyToDirectory(newCertsFolderPath) + (originalCertsFolderPath / "sslkeystore.jks").copyToDirectory(newCertsFolderPath) + val artemisConfig = rigorousMock().also { + doReturn(folderPath).whenever(it).baseDirectory + doReturn(ALICE_NAME).whenever(it).myLegalName + doReturn("trustpass").whenever(it).trustStorePassword + doReturn("cordacadevpass").whenever(it).keyStorePassword + doReturn(NetworkHostAndPort("localhost", 12005)).whenever(it).p2pAddress + doReturn(null).whenever(it).jmxMonitoringHttpPort + doReturn(emptyList()).whenever(it).certificateChainCheckPolicies + doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration + } + val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 12005), MAX_MESSAGE_SIZE) + val artemisClient = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", 12005), MAX_MESSAGE_SIZE) + artemisServer.start() + artemisClient.start() + return Pair(artemisServer, artemisClient) + } + private fun installBridgeControlResponder(artemisClient: ArtemisMessagingClient) { val artemis = artemisClient.started!! val inboxAddress = SimpleString("${P2P_PREFIX}Test") diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt b/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt index 0f88a8f016..7668eeb2ff 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt @@ -52,6 +52,7 @@ interface BridgeSSLConfiguration : SSLConfiguration { */ interface BridgeOutboundConfiguration { val artemisBrokerAddress: NetworkHostAndPort + val alternateArtemisBrokerAddresses: List // Allows override of [KeyStore] details for the artemis connection, otherwise the general top level details are used. val customSSLConfiguration: BridgeSSLConfiguration? // Allows use of a SOCKS 4/5 proxy diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt index a33bb67ee8..7a29192f7f 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt @@ -24,6 +24,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.FailoverEventType import org.apache.activemq.artemis.api.core.client.ServerLocator +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import rx.Subscription import java.util.concurrent.CountDownLatch @@ -66,10 +67,12 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration, state.locked { check(!running) { "start can't be called twice" } running = true - log.info("Connecting to message broker: ${conf.outboundConfig!!.artemisBrokerAddress}") + val outboundConf = conf.outboundConfig!! + log.info("Connecting to message broker: ${outboundConf.artemisBrokerAddress}") + val brokerAddresses = listOf(outboundConf.artemisBrokerAddress) + outboundConf.alternateArtemisBrokerAddresses // TODO Add broker CN to config for host verification in case the embedded broker isn't used - val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), conf.outboundConfig!!.artemisBrokerAddress, sslConfiguration) - locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { + val tcpTransports = brokerAddresses.map { ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), it, sslConfiguration) } + locator = ActiveMQClient.createServerLocatorWithoutHA(*tcpTransports.toTypedArray()).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // would be the default and the two lines below can be deleted. connectionTTL = -1 @@ -117,14 +120,17 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration, get() = state.locked { started } private fun artemisReconnectionLoop() { + var tcpIndex = 0 while (state.locked { running }) { val locator = state.locked { locator } if (locator == null) { break } try { - log.info("Try create session factory") - val newSessionFactory = locator.createSessionFactory() + val transport = locator.staticTransportConfigurations[tcpIndex] + tcpIndex = (tcpIndex + 1).rem(locator.staticTransportConfigurations.size) + log.info("Try create session factory ${transport.params[TransportConstants.HOST_PROP_NAME]}:${transport.params[TransportConstants.PORT_PROP_NAME]}") + val newSessionFactory = locator.createSessionFactory(transport) log.info("Got session factory") val latch = CountDownLatch(1) newSessionFactory.connection.addCloseListener { @@ -152,7 +158,6 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration, } stateHelper.active = true latch.await() - stateHelper.active = false state.locked { started?.apply { producer.close() @@ -161,6 +166,7 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration, } started = null } + stateHelper.active = false log.info("Session closed") } catch (ex: Exception) { log.trace("Caught exception", ex) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt index 533198f523..76074ccd03 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt @@ -34,6 +34,7 @@ data class BridgeSSLConfigurationImpl(override val keyStorePassword: String, } data class BridgeOutboundConfigurationImpl(override val artemisBrokerAddress: NetworkHostAndPort, + override val alternateArtemisBrokerAddresses: List, override val customSSLConfiguration: BridgeSSLConfigurationImpl?, override val socksProxyConfig: SocksProxyConfig? = null) : BridgeOutboundConfiguration diff --git a/bridge/src/test/resources/net/corda/bridge/artemisfailover/bridge.conf b/bridge/src/test/resources/net/corda/bridge/artemisfailover/bridge.conf new file mode 100644 index 0000000000..e23e7c406c --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/artemisfailover/bridge.conf @@ -0,0 +1,18 @@ +// +// R3 Proprietary and Confidential +// +// Copyright (c) 2018 R3 Limited. All rights reserved. +// +// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. +// +// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + +bridgeMode = SenderReceiver +outboundConfig : { + artemisBrokerAddress = "localhost:11005" + alternateArtemisBrokerAddresses = ["localhost:12005"] +} +inboundConfig : { + listeningAddress = "0.0.0.0:10005" +} +networkParametersPath = network-parameters diff --git a/bridge/src/test/resources/net/corda/bridge/artemisfailoverandfloat/bridge/bridge.conf b/bridge/src/test/resources/net/corda/bridge/artemisfailoverandfloat/bridge/bridge.conf new file mode 100644 index 0000000000..375c09fa7f --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/artemisfailoverandfloat/bridge/bridge.conf @@ -0,0 +1,19 @@ +// +// R3 Proprietary and Confidential +// +// Copyright (c) 2018 R3 Limited. All rights reserved. +// +// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. +// +// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + +bridgeMode = FloatInner +outboundConfig : { + artemisBrokerAddress = "localhost:11005" + alternateArtemisBrokerAddresses = ["localhost:12005"] +} +floatInnerConfig : { + floatAddresses = [ "localhost:13005" ] + expectedCertificateSubject = "O=Bank A, L=London, C=GB" +} +networkParametersPath = network-parameters diff --git a/bridge/src/test/resources/net/corda/bridge/artemisfailoverandfloat/float/bridge.conf b/bridge/src/test/resources/net/corda/bridge/artemisfailoverandfloat/float/bridge.conf new file mode 100644 index 0000000000..0f97389b76 --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/artemisfailoverandfloat/float/bridge.conf @@ -0,0 +1,18 @@ +// +// R3 Proprietary and Confidential +// +// Copyright (c) 2018 R3 Limited. All rights reserved. +// +// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. +// +// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + +bridgeMode = FloatOuter +inboundConfig : { + listeningAddress = "0.0.0.0:10005" +} +floatOuterConfig : { + floatAddress = "localhost:13005" + expectedCertificateSubject = "O=Bank A, L=London, C=GB" +} +networkParametersPath = network-parameters