mirror of
https://github.com/corda/corda.git
synced 2025-02-05 10:39:13 +00:00
Add Artemis round-robin support for node hot-cold in-process Artemis modes.
This commit is contained in:
parent
f64226364b
commit
6d270685aa
@ -15,6 +15,8 @@ import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.bridge.internal.BridgeInstance
|
||||
import net.corda.bridge.services.api.BridgeMode
|
||||
import net.corda.bridge.services.config.BridgeHAConfigImpl
|
||||
import net.corda.core.internal.copyToDirectory
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.serialize
|
||||
@ -134,7 +136,6 @@ class BridgeIntegrationTest {
|
||||
artemisClient.stop()
|
||||
artemisServer.stop()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -258,6 +259,113 @@ class BridgeIntegrationTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Test artemis failover logic`() {
|
||||
val configResource = "/net/corda/bridge/artemisfailover/bridge.conf"
|
||||
createNetworkParams(tempFolder.root.toPath())
|
||||
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
|
||||
assertEquals(BridgeMode.SenderReceiver, config.bridgeMode)
|
||||
assertEquals(NetworkHostAndPort("localhost", 11005), config.outboundConfig!!.artemisBrokerAddress)
|
||||
assertEquals(listOf(NetworkHostAndPort("localhost", 12005)), config.outboundConfig!!.alternateArtemisBrokerAddresses)
|
||||
assertEquals(NetworkHostAndPort("0.0.0.0", 10005), config.inboundConfig!!.listeningAddress)
|
||||
assertNull(config.floatInnerConfig)
|
||||
assertNull(config.floatOuterConfig)
|
||||
config.createBridgeKeyStores(DUMMY_BANK_A_NAME)
|
||||
val (artemisServer, artemisClient) = createArtemis()
|
||||
val (artemisServer2, artemisClient2) = createArtemis2()
|
||||
try {
|
||||
installBridgeControlResponder(artemisClient)
|
||||
installBridgeControlResponder(artemisClient2)
|
||||
val bridge = BridgeInstance(config, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
|
||||
val stateFollower = bridge.activeChange.toBlocking().iterator
|
||||
assertEquals(false, stateFollower.next())
|
||||
assertEquals(false, bridge.active)
|
||||
bridge.start()
|
||||
assertEquals(true, stateFollower.next())
|
||||
assertEquals(true, bridge.active)
|
||||
assertEquals(true, serverListening("localhost", 10005))
|
||||
artemisClient.stop() // Stop artemis to force failover to second choice
|
||||
artemisServer.stop()
|
||||
assertEquals(false, stateFollower.next())
|
||||
assertEquals(false, bridge.active)
|
||||
assertEquals(true, stateFollower.next())
|
||||
assertEquals(true, bridge.active)
|
||||
bridge.stop()
|
||||
assertEquals(false, stateFollower.next())
|
||||
assertEquals(false, bridge.active)
|
||||
assertEquals(false, serverListening("localhost", 10005))
|
||||
} finally {
|
||||
artemisClient.stop()
|
||||
artemisServer.stop()
|
||||
artemisClient2.stop()
|
||||
artemisServer2.stop()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Test artemis failover logic with float`() {
|
||||
val bridgeFolder = tempFolder.root.toPath()
|
||||
val bridgeConfigResource = "/net/corda/bridge/artemisfailoverandfloat/bridge/bridge.conf"
|
||||
val bridgeConfig = createAndLoadConfigFromResource(bridgeFolder, bridgeConfigResource)
|
||||
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
|
||||
createNetworkParams(bridgeFolder)
|
||||
assertEquals(BridgeMode.FloatInner, bridgeConfig.bridgeMode)
|
||||
assertEquals(NetworkHostAndPort("localhost", 11005), bridgeConfig.outboundConfig!!.artemisBrokerAddress)
|
||||
assertEquals(listOf(NetworkHostAndPort("localhost", 12005)), bridgeConfig.outboundConfig!!.alternateArtemisBrokerAddresses)
|
||||
val floatFolder = tempFolder.root.toPath() / "float"
|
||||
val floatConfigResource = "/net/corda/bridge/artemisfailoverandfloat/float/bridge.conf"
|
||||
val floatConfig = createAndLoadConfigFromResource(floatFolder, floatConfigResource)
|
||||
floatConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
|
||||
createNetworkParams(floatFolder)
|
||||
assertEquals(BridgeMode.FloatOuter, floatConfig.bridgeMode)
|
||||
assertEquals(NetworkHostAndPort("0.0.0.0", 10005), floatConfig.inboundConfig!!.listeningAddress)
|
||||
val (artemisServer, artemisClient) = createArtemis()
|
||||
val (artemisServer2, artemisClient2) = createArtemis2()
|
||||
try {
|
||||
installBridgeControlResponder(artemisClient)
|
||||
installBridgeControlResponder(artemisClient2)
|
||||
val bridge = BridgeInstance(bridgeConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
|
||||
val bridgeStateFollower = bridge.activeChange.toBlocking().iterator
|
||||
val float = BridgeInstance(floatConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
|
||||
val floatStateFollower = float.activeChange.toBlocking().iterator
|
||||
assertEquals(false, floatStateFollower.next())
|
||||
float.start()
|
||||
assertEquals(true, floatStateFollower.next())
|
||||
assertEquals(true, float.active) // float is running
|
||||
assertEquals(false, serverListening("localhost", 10005)) // but not activated
|
||||
assertEquals(false, bridgeStateFollower.next())
|
||||
bridge.start()
|
||||
assertEquals(true, bridgeStateFollower.next())
|
||||
assertEquals(true, bridge.active)
|
||||
assertEquals(true, float.active)
|
||||
assertEquals(true, serverListening("localhost", 10005)) // now activated
|
||||
artemisClient.stop() // Stop artemis to force failover to second choice
|
||||
artemisServer.stop()
|
||||
assertEquals(false, bridgeStateFollower.next())
|
||||
assertEquals(false, bridge.active)
|
||||
assertEquals(true, float.active)
|
||||
assertEquals(false, serverListening("localhost", 10005)) // now activated
|
||||
assertEquals(true, bridgeStateFollower.next())
|
||||
assertEquals(true, bridge.active)
|
||||
assertEquals(true, float.active)
|
||||
assertEquals(true, serverListening("localhost", 10005)) // now activated
|
||||
bridge.stop()
|
||||
assertEquals(false, bridgeStateFollower.next())
|
||||
assertEquals(false, bridge.active)
|
||||
assertEquals(true, float.active)
|
||||
assertEquals(false, serverListening("localhost", 10005)) // now de-activated
|
||||
float.stop()
|
||||
assertEquals(false, floatStateFollower.next())
|
||||
assertEquals(false, bridge.active)
|
||||
assertEquals(false, float.active)
|
||||
} finally {
|
||||
artemisClient.stop()
|
||||
artemisServer.stop()
|
||||
artemisServer2.stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private fun createArtemis(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory
|
||||
@ -276,6 +384,30 @@ class BridgeIntegrationTest {
|
||||
return Pair(artemisServer, artemisClient)
|
||||
}
|
||||
|
||||
private fun createArtemis2(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||
val originalCertsFolderPath = tempFolder.root.toPath() / "certificates"
|
||||
val folderPath = tempFolder.root.toPath() / "artemis2"
|
||||
val newCertsFolderPath = folderPath / "certificates"
|
||||
newCertsFolderPath.createDirectories()
|
||||
(originalCertsFolderPath / "truststore.jks").copyToDirectory(newCertsFolderPath)
|
||||
(originalCertsFolderPath / "sslkeystore.jks").copyToDirectory(newCertsFolderPath)
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(folderPath).whenever(it).baseDirectory
|
||||
doReturn(ALICE_NAME).whenever(it).myLegalName
|
||||
doReturn("trustpass").whenever(it).trustStorePassword
|
||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||
doReturn(NetworkHostAndPort("localhost", 12005)).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", 12005), MAX_MESSAGE_SIZE)
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", 12005), MAX_MESSAGE_SIZE)
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
return Pair(artemisServer, artemisClient)
|
||||
}
|
||||
|
||||
private fun installBridgeControlResponder(artemisClient: ArtemisMessagingClient) {
|
||||
val artemis = artemisClient.started!!
|
||||
val inboxAddress = SimpleString("${P2P_PREFIX}Test")
|
||||
|
@ -52,6 +52,7 @@ interface BridgeSSLConfiguration : SSLConfiguration {
|
||||
*/
|
||||
interface BridgeOutboundConfiguration {
|
||||
val artemisBrokerAddress: NetworkHostAndPort
|
||||
val alternateArtemisBrokerAddresses: List<NetworkHostAndPort>
|
||||
// Allows override of [KeyStore] details for the artemis connection, otherwise the general top level details are used.
|
||||
val customSSLConfiguration: BridgeSSLConfiguration?
|
||||
// Allows use of a SOCKS 4/5 proxy
|
||||
|
@ -24,6 +24,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.FailoverEventType
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
import rx.Subscription
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
@ -66,10 +67,12 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
|
||||
state.locked {
|
||||
check(!running) { "start can't be called twice" }
|
||||
running = true
|
||||
log.info("Connecting to message broker: ${conf.outboundConfig!!.artemisBrokerAddress}")
|
||||
val outboundConf = conf.outboundConfig!!
|
||||
log.info("Connecting to message broker: ${outboundConf.artemisBrokerAddress}")
|
||||
val brokerAddresses = listOf(outboundConf.artemisBrokerAddress) + outboundConf.alternateArtemisBrokerAddresses
|
||||
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
||||
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), conf.outboundConfig!!.artemisBrokerAddress, sslConfiguration)
|
||||
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||
val tcpTransports = brokerAddresses.map { ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), it, sslConfiguration) }
|
||||
locator = ActiveMQClient.createServerLocatorWithoutHA(*tcpTransports.toTypedArray()).apply {
|
||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||
// would be the default and the two lines below can be deleted.
|
||||
connectionTTL = -1
|
||||
@ -117,14 +120,17 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
|
||||
get() = state.locked { started }
|
||||
|
||||
private fun artemisReconnectionLoop() {
|
||||
var tcpIndex = 0
|
||||
while (state.locked { running }) {
|
||||
val locator = state.locked { locator }
|
||||
if (locator == null) {
|
||||
break
|
||||
}
|
||||
try {
|
||||
log.info("Try create session factory")
|
||||
val newSessionFactory = locator.createSessionFactory()
|
||||
val transport = locator.staticTransportConfigurations[tcpIndex]
|
||||
tcpIndex = (tcpIndex + 1).rem(locator.staticTransportConfigurations.size)
|
||||
log.info("Try create session factory ${transport.params[TransportConstants.HOST_PROP_NAME]}:${transport.params[TransportConstants.PORT_PROP_NAME]}")
|
||||
val newSessionFactory = locator.createSessionFactory(transport)
|
||||
log.info("Got session factory")
|
||||
val latch = CountDownLatch(1)
|
||||
newSessionFactory.connection.addCloseListener {
|
||||
@ -152,7 +158,6 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
|
||||
}
|
||||
stateHelper.active = true
|
||||
latch.await()
|
||||
stateHelper.active = false
|
||||
state.locked {
|
||||
started?.apply {
|
||||
producer.close()
|
||||
@ -161,6 +166,7 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
|
||||
}
|
||||
started = null
|
||||
}
|
||||
stateHelper.active = false
|
||||
log.info("Session closed")
|
||||
} catch (ex: Exception) {
|
||||
log.trace("Caught exception", ex)
|
||||
|
@ -34,6 +34,7 @@ data class BridgeSSLConfigurationImpl(override val keyStorePassword: String,
|
||||
}
|
||||
|
||||
data class BridgeOutboundConfigurationImpl(override val artemisBrokerAddress: NetworkHostAndPort,
|
||||
override val alternateArtemisBrokerAddresses: List<NetworkHostAndPort>,
|
||||
override val customSSLConfiguration: BridgeSSLConfigurationImpl?,
|
||||
override val socksProxyConfig: SocksProxyConfig? = null) : BridgeOutboundConfiguration
|
||||
|
||||
|
@ -0,0 +1,18 @@
|
||||
//
|
||||
// R3 Proprietary and Confidential
|
||||
//
|
||||
// Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
//
|
||||
// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
//
|
||||
// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
|
||||
bridgeMode = SenderReceiver
|
||||
outboundConfig : {
|
||||
artemisBrokerAddress = "localhost:11005"
|
||||
alternateArtemisBrokerAddresses = ["localhost:12005"]
|
||||
}
|
||||
inboundConfig : {
|
||||
listeningAddress = "0.0.0.0:10005"
|
||||
}
|
||||
networkParametersPath = network-parameters
|
@ -0,0 +1,19 @@
|
||||
//
|
||||
// R3 Proprietary and Confidential
|
||||
//
|
||||
// Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
//
|
||||
// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
//
|
||||
// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
|
||||
bridgeMode = FloatInner
|
||||
outboundConfig : {
|
||||
artemisBrokerAddress = "localhost:11005"
|
||||
alternateArtemisBrokerAddresses = ["localhost:12005"]
|
||||
}
|
||||
floatInnerConfig : {
|
||||
floatAddresses = [ "localhost:13005" ]
|
||||
expectedCertificateSubject = "O=Bank A, L=London, C=GB"
|
||||
}
|
||||
networkParametersPath = network-parameters
|
@ -0,0 +1,18 @@
|
||||
//
|
||||
// R3 Proprietary and Confidential
|
||||
//
|
||||
// Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
//
|
||||
// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
//
|
||||
// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
|
||||
bridgeMode = FloatOuter
|
||||
inboundConfig : {
|
||||
listeningAddress = "0.0.0.0:10005"
|
||||
}
|
||||
floatOuterConfig : {
|
||||
floatAddress = "localhost:13005"
|
||||
expectedCertificateSubject = "O=Bank A, L=London, C=GB"
|
||||
}
|
||||
networkParametersPath = network-parameters
|
Loading…
x
Reference in New Issue
Block a user