From 942da1b8e0461128b8037b78c51fa919f6a4c4d9 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Tue, 17 Apr 2018 12:03:43 +0100 Subject: [PATCH] First cut HA for bridge Simple implementation of bridge HA logic. Fix of shading magic in gradle. Removal of exposed curator classes from node-api interface. Simple implementation of bridge HA logic. Fix of shading magic in gradle. Removal of exposed curator classes from node-api interface. Modify leader priority test to catch lexical rather than numeric sorting. --- bridge/build.gradle | 1 + .../net/corda/bridge/BridgeIntegrationTest.kt | 124 ++++++++++++++++++ .../services/api/BridgeConfiguration.kt | 7 +- .../config/BridgeConfigurationImpl.kt | 4 +- .../ha/ExternalMasterElectionService.kt | 83 ++++++++++++ .../InProcessBridgeReceiverService.kt | 4 + .../BridgeSupervisorServiceImpl.kt | 5 +- .../supervisors/FloatSupervisorServiceImpl.kt | 1 + .../corda/bridge/hasingleprocess/bridge.conf | 20 +++ .../bridge/hawithfloat/bridge/bridge.conf | 21 +++ .../bridge/hawithfloat/float/bridge.conf | 18 +++ node-api/build.gradle | 5 +- .../bridging/BridgeControlListener.kt | 14 +- .../zookeeper/PrioritizedLeaderLatch.kt | 22 ++-- .../nodeapi/internal/zookeeper/ZkClient.kt | 50 +++++-- .../nodeapi/internal/zookeeper/ZkLeader.kt | 30 ++++- .../internal/zookeeper/ZkClientTest.kt | 10 +- .../net/corda/vega/SimmValuationTest.kt | 4 +- 18 files changed, 384 insertions(+), 39 deletions(-) create mode 100644 bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt create mode 100644 bridge/src/test/resources/net/corda/bridge/hasingleprocess/bridge.conf create mode 100644 bridge/src/test/resources/net/corda/bridge/hawithfloat/bridge/bridge.conf create mode 100644 bridge/src/test/resources/net/corda/bridge/hawithfloat/float/bridge.conf diff --git a/bridge/build.gradle b/bridge/build.gradle index 33a052b493..1f718264ee 100644 --- a/bridge/build.gradle +++ b/bridge/build.gradle @@ -48,6 +48,7 @@ dependencies { compile "com.jcabi:jcabi-manifests:1.1" integrationTestCompile project(':node-driver') + integrationTestCompile "org.apache.curator:curator-test:${curator_version}" testCompile "junit:junit:$junit_version" testCompile project(':test-utils') } diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt index f43f8bb7bf..f559e685b6 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt @@ -14,6 +14,7 @@ import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever import net.corda.bridge.internal.BridgeInstance import net.corda.bridge.services.api.BridgeMode +import net.corda.bridge.services.config.BridgeHAConfigImpl import net.corda.core.internal.div import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.serialize @@ -28,6 +29,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.bridging.BridgeControl +import net.corda.nodeapi.internal.zookeeper.ZkClient import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.DUMMY_BANK_A_NAME import net.corda.testing.core.MAX_MESSAGE_SIZE @@ -35,6 +37,7 @@ import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.internal.rigorousMock import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.curator.test.TestingServer import org.junit.Assert.assertEquals import org.junit.Assert.assertNull import org.junit.Rule @@ -134,6 +137,127 @@ class BridgeIntegrationTest { } + @Test + fun `Run HA all in one mode`() { + val configResource = "/net/corda/bridge/hasingleprocess/bridge.conf" + createNetworkParams(tempFolder.root.toPath()) + val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource) + assertEquals(BridgeHAConfigImpl("zk//:localhost:11105", 10), config.haConfig) + config.createBridgeKeyStores(DUMMY_BANK_A_NAME) + val (artemisServer, artemisClient) = createArtemis() + val zkServer = TestingServer(11105, false) + try { + installBridgeControlResponder(artemisClient) + val bridge = BridgeInstance(config, BridgeVersionInfo(1, "1.1", "Dummy", "Test")) + val stateFollower = bridge.activeChange.toBlocking().iterator + assertEquals(false, stateFollower.next()) + assertEquals(false, bridge.active) + bridge.start() + assertEquals(false, bridge.active) // Starting the bridge insufficient to go active + zkServer.start() // Now start zookeeper and we should be able to become active + assertEquals(true, stateFollower.next()) + assertEquals(true, bridge.active) + assertEquals(true, serverListening("localhost", 10005)) + val higherPriorityClient = ZkClient("localhost:11105", "/bridge/ha", "Test", 5) + higherPriorityClient.start() + higherPriorityClient.requestLeadership() // should win leadership and kick out our bridge + assertEquals(false, stateFollower.next()) + assertEquals(false, bridge.active) + var socketState = true + for (i in 0 until 5) { // The event signalling bridge down is pretty immediate, but the cascade of events leading to socket close can take a while + socketState = serverListening("localhost", 10005) + if (!socketState) break + Thread.sleep(100) + } + assertEquals(false, socketState) + higherPriorityClient.relinquishLeadership() // let our bridge back as leader + higherPriorityClient.close() + assertEquals(true, stateFollower.next()) + assertEquals(true, bridge.active) + assertEquals(true, serverListening("localhost", 10005)) + bridge.stop() // Finally check shutdown + assertEquals(false, stateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(false, serverListening("localhost", 10005)) + } finally { + artemisClient.stop() + artemisServer.stop() + zkServer.stop() + } + } + + @Test + fun `Run HA float and bridge mode`() { + val bridgeFolder = tempFolder.root.toPath() + val bridgeConfigResource = "/net/corda/bridge/hawithfloat/bridge/bridge.conf" + val bridgeConfig = createAndLoadConfigFromResource(bridgeFolder, bridgeConfigResource) + assertEquals(BridgeHAConfigImpl("zk//:localhost:11105", 10), bridgeConfig.haConfig) + bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME) + createNetworkParams(bridgeFolder) + val floatFolder = tempFolder.root.toPath() / "float" + val floatConfigResource = "/net/corda/bridge/hawithfloat/float/bridge.conf" + val floatConfig = createAndLoadConfigFromResource(floatFolder, floatConfigResource) + assertNull(floatConfig.haConfig) + floatConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME) + createNetworkParams(floatFolder) + val (artemisServer, artemisClient) = createArtemis() + val zkServer = TestingServer(11105, false) + try { + installBridgeControlResponder(artemisClient) + val bridge = BridgeInstance(bridgeConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test")) + val bridgeStateFollower = bridge.activeChange.toBlocking().iterator + val float = BridgeInstance(floatConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test")) + val floatStateFollower = float.activeChange.toBlocking().iterator + assertEquals(false, bridgeStateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(false, floatStateFollower.next()) + assertEquals(false, float.active) + float.start() + assertEquals(true, floatStateFollower.next()) // float goes active, but not listening + assertEquals(true, float.active) + assertEquals(false, serverListening("localhost", 10005)) + bridge.start() + assertEquals(false, bridge.active) // Starting the bridge/float insufficient to go active + assertEquals(true, float.active) // still active, but not listening + assertEquals(false, serverListening("localhost", 10005)) + zkServer.start() // Now start zookeeper and we should be able to become active on float listener + assertEquals(true, bridgeStateFollower.next()) + assertEquals(true, bridge.active) + assertEquals(true, float.active) + assertEquals(true, serverListening("localhost", 10005)) + val higherPriorityClient = ZkClient("localhost:11105", "/bridge/ha", "Test", 5) + higherPriorityClient.start() + higherPriorityClient.requestLeadership() // should win leadership and kick out our bridge + assertEquals(false, bridgeStateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(true, float.active) + var socketState = true + for (i in 0 until 5) { // The event signalling bridge down is pretty immediate, but the cascade of events leading to socket close can take a while + socketState = serverListening("localhost", 10005) + if (!socketState) break + Thread.sleep(100) + } + assertEquals(false, socketState) + higherPriorityClient.relinquishLeadership() // let our bridge back as leader + higherPriorityClient.close() + assertEquals(true, bridgeStateFollower.next()) + assertEquals(true, bridge.active) + assertEquals(true, float.active) + assertEquals(true, serverListening("localhost", 10005)) + bridge.stop() // Finally check shutdown + float.stop() + assertEquals(false, bridgeStateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(false, floatStateFollower.next()) + assertEquals(false, float.active) + assertEquals(false, serverListening("localhost", 10005)) + } finally { + artemisClient.stop() + artemisServer.stop() + zkServer.stop() + } + } + private fun createArtemis(): Pair { val artemisConfig = rigorousMock().also { doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt b/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt index 306ffa2ec4..0f88a8f016 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt @@ -83,6 +83,11 @@ interface FloatInnerConfiguration { val customFloatOuterSSLConfiguration: BridgeSSLConfiguration? } +interface BridgeHAConfig { + val haConnectionString: String + val haPriority: Int +} + /** * Details of the listening port for a [BridgeMode.FloatOuter] process and of the certificate that the [BridgeMode.FloatInner] should present. * Required for [BridgeMode.FloatOuter] mode. @@ -100,7 +105,7 @@ interface BridgeConfiguration : NodeSSLConfiguration { val inboundConfig: BridgeInboundConfiguration? val floatInnerConfig: FloatInnerConfiguration? val floatOuterConfig: FloatOuterConfiguration? - val haConfig: String? + val haConfig: BridgeHAConfig? val networkParametersPath: Path val enableAMQPPacketTrace: Boolean // Reconnect to artemis after [artemisReconnectionInterval] ms the default value is 5000 ms. diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt index 5ccacf678e..533198f523 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt @@ -49,6 +49,8 @@ data class FloatOuterConfigurationImpl(override val floatAddress: NetworkHostAnd override val expectedCertificateSubject: CordaX500Name, override val customSSLConfiguration: BridgeSSLConfigurationImpl?) : FloatOuterConfiguration +data class BridgeHAConfigImpl(override val haConnectionString: String, override val haPriority: Int = 10) : BridgeHAConfig + data class BridgeConfigurationImpl( override val baseDirectory: Path, override val certificatesDirectory: Path = baseDirectory / "certificates", @@ -62,7 +64,7 @@ data class BridgeConfigurationImpl( override val inboundConfig: BridgeInboundConfigurationImpl?, override val floatInnerConfig: FloatInnerConfigurationImpl?, override val floatOuterConfig: FloatOuterConfigurationImpl?, - override val haConfig: String?, + override val haConfig: BridgeHAConfigImpl?, override val enableAMQPPacketTrace: Boolean, override val artemisReconnectionInterval: Int = 5000, override val politeShutdownPeriod: Int = 1000, diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt new file mode 100644 index 0000000000..b7183d5224 --- /dev/null +++ b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt @@ -0,0 +1,83 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.bridge.services.ha + +import net.corda.bridge.services.api.BridgeAuditService +import net.corda.bridge.services.api.BridgeConfiguration +import net.corda.bridge.services.api.BridgeMasterService +import net.corda.bridge.services.api.ServiceStateSupport +import net.corda.bridge.services.util.ServiceStateHelper +import net.corda.core.utilities.contextLogger +import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener +import net.corda.nodeapi.internal.zookeeper.ZkClient +import net.corda.nodeapi.internal.zookeeper.ZkLeader +import java.lang.management.ManagementFactory +import java.net.InetAddress + +class ExternalMasterElectionService(val conf: BridgeConfiguration, + val auditService: BridgeAuditService, + private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeMasterService, ServiceStateSupport by stateHelper { + + private var haElector: ZkLeader? = null + private var leaderListener: CordaLeaderListener? = null + + companion object { + val log = contextLogger() + } + + init { + require(conf.haConfig != null) { "Undefined HA Config" } + require(conf.haConfig!!.haConnectionString.split(',').all { it.startsWith("zk//:") }) { "Only Zookeeper HA mode 'zk//:IPADDR:PORT supported" } + } + + override fun start() { + val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk//:", "") }.joinToString(",") + val leaderPriority = conf.haConfig!!.haPriority + val hostName: String = InetAddress.getLocalHost().hostName + val info = ManagementFactory.getRuntimeMXBean() + val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this + val nodeId = "$hostName:$pid" + val leaderElector = ZkClient(zkConf, "/bridge/ha", nodeId, leaderPriority) + haElector = leaderElector + val listener = object : CordaLeaderListener { + override fun notLeader() { + auditService.statusChangeEvent("Loss of leadership signalled by Zookeeper") + stateHelper.active = false + } + + override fun isLeader() { + auditService.statusChangeEvent("Acquired leadership from Zookeeper. Going active") + stateHelper.active = true + } + + } + leaderListener = listener + leaderElector.addLeadershipListener(listener) + leaderElector.start() + auditService.statusChangeEvent("Requesting leadership from Zookeeper") + leaderElector.requestLeadership() + } + + override fun stop() { + auditService.statusChangeEvent("Stop requested") + stateHelper.active = false + haElector?.apply { + if (leaderListener != null) { + removeLeadershipListener(leaderListener!!) + } + relinquishLeadership() + close() + } + haElector = null + leaderListener = null + } + +} \ No newline at end of file diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/InProcessBridgeReceiverService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/InProcessBridgeReceiverService.kt index 622e102e2b..bc0cf69725 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/InProcessBridgeReceiverService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/InProcessBridgeReceiverService.kt @@ -49,6 +49,10 @@ class InProcessBridgeReceiverService(val conf: BridgeConfiguration, sslConfiguration.keyStorePassword.toCharArray(), trustStoreBytes, sslConfiguration.trustStorePassword.toCharArray()) + } else { + if (amqpListenerService.running) { + amqpListenerService.wipeKeysAndDeactivate() + } } stateHelper.active = it } diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt index 6962270e6a..5d1820dbac 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt @@ -13,6 +13,7 @@ package net.corda.bridge.services.supervisors import net.corda.bridge.services.api.* import net.corda.bridge.services.artemis.BridgeArtemisConnectionServiceImpl import net.corda.bridge.services.filter.SimpleMessageFilterService +import net.corda.bridge.services.ha.ExternalMasterElectionService import net.corda.bridge.services.ha.SingleInstanceMasterService import net.corda.bridge.services.receiver.InProcessBridgeReceiverService import net.corda.bridge.services.receiver.TunnelingBridgeReceiverService @@ -42,10 +43,10 @@ class BridgeSupervisorServiceImpl(val conf: BridgeConfiguration, private var statusSubscriber: Subscription? = null init { - if (conf.haConfig.isNullOrEmpty()) { + if (conf.haConfig == null) { haService = SingleInstanceMasterService(conf, auditService) } else { - TODO() + haService = ExternalMasterElectionService(conf, auditService) } artemisService = BridgeArtemisConnectionServiceImpl(conf, maxMessageSize, auditService) senderService = DirectBridgeSenderService(conf, auditService, haService, artemisService) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/FloatSupervisorServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/FloatSupervisorServiceImpl.kt index eba2c16193..2a7e9b309b 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/FloatSupervisorServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/FloatSupervisorServiceImpl.kt @@ -36,6 +36,7 @@ class FloatSupervisorServiceImpl(val conf: BridgeConfiguration, init { amqpListenerService = BridgeAMQPListenerServiceImpl(conf, auditService) floatControlService = if (conf.bridgeMode == BridgeMode.FloatOuter) { + require(conf.haConfig == null) { "Float process should not have HA config, that is controlled via the bridge." } FloatControlListenerService(conf, auditService, amqpListenerService) } else { null diff --git a/bridge/src/test/resources/net/corda/bridge/hasingleprocess/bridge.conf b/bridge/src/test/resources/net/corda/bridge/hasingleprocess/bridge.conf new file mode 100644 index 0000000000..d19f06b46c --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/hasingleprocess/bridge.conf @@ -0,0 +1,20 @@ +// +// R3 Proprietary and Confidential +// +// Copyright (c) 2018 R3 Limited. All rights reserved. +// +// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. +// +// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + +bridgeMode = SenderReceiver +outboundConfig : { + artemisBrokerAddress = "localhost:11005" +} +inboundConfig : { + listeningAddress = "0.0.0.0:10005" +} +networkParametersPath = network-parameters +haConfig : { + haConnectionString = "zk//:localhost:11105" +} \ No newline at end of file diff --git a/bridge/src/test/resources/net/corda/bridge/hawithfloat/bridge/bridge.conf b/bridge/src/test/resources/net/corda/bridge/hawithfloat/bridge/bridge.conf new file mode 100644 index 0000000000..e48066f465 --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/hawithfloat/bridge/bridge.conf @@ -0,0 +1,21 @@ +// +// R3 Proprietary and Confidential +// +// Copyright (c) 2018 R3 Limited. All rights reserved. +// +// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. +// +// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + +bridgeMode = FloatInner +outboundConfig : { + artemisBrokerAddress = "localhost:11005" +} +floatInnerConfig : { + floatAddresses = [ "localhost:12005" ] + expectedCertificateSubject = "O=Bank A, L=London, C=GB" +} +networkParametersPath = network-parameters +haConfig : { + haConnectionString = "zk//:localhost:11105" +} \ No newline at end of file diff --git a/bridge/src/test/resources/net/corda/bridge/hawithfloat/float/bridge.conf b/bridge/src/test/resources/net/corda/bridge/hawithfloat/float/bridge.conf new file mode 100644 index 0000000000..661580c02b --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/hawithfloat/float/bridge.conf @@ -0,0 +1,18 @@ +// +// R3 Proprietary and Confidential +// +// Copyright (c) 2018 R3 Limited. All rights reserved. +// +// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. +// +// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + +bridgeMode = FloatOuter +inboundConfig : { + listeningAddress = "0.0.0.0:10005" +} +floatOuterConfig : { + floatAddress = "localhost:12005" + expectedCertificateSubject = "O=Bank A, L=London, C=GB" +} +networkParametersPath = network-parameters diff --git a/node-api/build.gradle b/node-api/build.gradle index 6651c19531..48a43a4a29 100644 --- a/node-api/build.gradle +++ b/node-api/build.gradle @@ -54,8 +54,8 @@ dependencies { runtime 'com.mattbertolini:liquibase-slf4j:2.0.0' // Apache Curator: a client library for Zookeeper + shadow "org.apache.curator:curator-client:${curator_version}" shadow "org.apache.curator:curator-recipes:${curator_version}" - testCompile configurations.shadow // need to look at rewritten classes testCompile "org.apache.curator:curator-test:${curator_version}" // FastClasspathScanner: classpath scanning - needed for the NetworkBootstraper @@ -76,6 +76,7 @@ dependencies { } configurations { + compile.extendsFrom shadow testArtifacts.extendsFrom testRuntime } @@ -90,7 +91,9 @@ shadowJar { configurations = [project.configurations.shadow] classifier "" dependencies { + include(dependency("org.apache.curator:curator-client:${curator_version}")) include(dependency("org.apache.curator:curator-recipes:${curator_version}")) + include(dependency("org.apache.curator:curator-framework:${curator_version}")) include(dependency('org.apache.zookeeper:zookeeper:3.5.3-beta')) include(dependency('commons-cli:commons-cli:1.2')) include(dependency('io.netty:netty:3.10.5.Final')) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index d91654865b..f11ac07c0d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -23,6 +23,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREF import net.corda.nodeapi.internal.ArtemisSessionProvider import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientConsumer @@ -35,6 +36,7 @@ class BridgeControlListener(val config: NodeSSLConfiguration, socksProxyConfig: SocksProxyConfig? = null, val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable { private val bridgeId: String = UUID.randomUUID().toString() + private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId" private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, artemisMessageClientFactory) private val validInboundQueues = mutableSetOf() private var artemis: ArtemisSessionProvider? = null @@ -64,8 +66,11 @@ class BridgeControlListener(val config: NodeSSLConfiguration, artemis.start() val artemisClient = artemis.started!! val artemisSession = artemisClient.session - val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId" - artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue) + try { + artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue) + } catch (ex: ActiveMQQueueExistsException) { + // Ignore if there is a queue still not cleaned up + } val control = artemisSession.createConsumer(bridgeControlQueue) controlConsumer = control control.setMessageHandler { msg -> @@ -88,7 +93,10 @@ class BridgeControlListener(val config: NodeSSLConfiguration, validInboundQueues.clear() controlConsumer?.close() controlConsumer = null - artemis?.stop() + artemis?.apply { + started?.session?.deleteQueue(bridgeControlQueue) + stop() + } artemis = null bridgeManager.stop() } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt index f2d9d71c2b..a6fc0c4cdf 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt @@ -12,6 +12,7 @@ package net.corda.nodeapi.internal.zookeeper import com.google.common.base.Preconditions import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.api.BackgroundCallback import org.apache.curator.framework.api.CuratorEvent @@ -176,6 +177,8 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework, } ConnectionState.LOST -> setLeadership(false) + + else -> log.debug { "Ignoring state change $newState" } } } @@ -199,10 +202,11 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework, } } + val latchName = "$nodeId$LOCK_NAME${"%05d".format(priority)}" // Fixed width priority to ensure numeric sorting watchedClient.create() .creatingParentContainersIfNeeded() .withProtection().withMode(CreateMode.EPHEMERAL) - .inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, "$nodeId$LOCK_NAME$priority"), nodeId.toByteArray(Charsets.UTF_8)) + .inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, latchName), nodeId.toByteArray(Charsets.UTF_8)) } private fun setLeadership(newValue: Boolean) { @@ -275,13 +279,15 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework, private class ElectionWatcher(private val latch: PrioritizedLeaderLatch) : Watcher { override fun process(event: WatchedEvent) { log.info("Client ${latch.nodeId} detected event ${event.type}.") - latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path) - if (State.STARTED == latch.state.get() && Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) { - try { - log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.") - latch.processCandidates() - } catch (e: Exception) { - log.error("An error occurred checking the leadership.", e) + if (State.STARTED == latch.state.get()) { + latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path) + if (Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) { + try { + log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.") + latch.processCandidates() + } catch (e: Exception) { + log.error("An error occurred checking the leadership.", e) + } } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClient.kt index af7bae523c..526ffd420f 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClient.kt @@ -11,12 +11,14 @@ package net.corda.nodeapi.internal.zookeeper import net.corda.core.utilities.contextLogger -import org.apache.curator.RetryPolicy +import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.imps.CuratorFrameworkState import org.apache.curator.framework.recipes.leader.LeaderLatchListener -import org.apache.curator.retry.RetryOneTime +import org.apache.curator.retry.RetryForever +import org.apache.curator.retry.RetryNTimes import org.apache.curator.utils.CloseableUtils +import java.util.concurrent.ConcurrentHashMap /** * Simple Zookeeper client that offers priority based leader election. @@ -26,21 +28,33 @@ import org.apache.curator.utils.CloseableUtils * @param nodeId unique client identifier used in the creation of child zNodes * @param priority indicates the priority of the client in the election process. Low value means high priority(a client * with [priority] set to 0 will have become leader before a client with [priority] 1 - * @param retryPolicy is an instance of [RetryPolicy] and indicates the process in case connection to Zookeeper server/cluster - * is lost. If no policy is supplied, [RetryOneTime] will be used with 500ms before attempting to reconnect + * @param retryInterval the interval in msec between retries of the Zookeeper connection. Default value is 500 msec. + * @param retryCount the number of retries before giving up default value is 1. Use -1 to indicate forever. */ class ZkClient(connectionString: String, electionPath: String, val nodeId: String, val priority: Int, - retryPolicy: RetryPolicy = RetryOneTime(500)) : ZkLeader { + retryInterval: Int = 500, + retryCount: Int = 1) : ZkLeader { private companion object { private val log = contextLogger() } - private val client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy) - private val leaderLatch = PrioritizedLeaderLatch(client, electionPath, nodeId, priority) + private val client: CuratorFramework + private val leaderLatch: PrioritizedLeaderLatch + private val listeners = ConcurrentHashMap() + + init { + val retryPolicy = if (retryCount == -1) { + RetryForever(retryInterval) + } else { + RetryNTimes(retryCount, retryInterval) + } + client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy) + leaderLatch = PrioritizedLeaderLatch(client, electionPath, nodeId, priority) + } override fun start() { if (client.state != CuratorFrameworkState.STARTED) { @@ -82,12 +96,26 @@ class ZkClient(connectionString: String, return client.state == CuratorFrameworkState.STARTED } - override fun addLeadershipListener(listener: LeaderLatchListener) { - leaderLatch.addListener(listener) + override fun addLeadershipListener(listener: CordaLeaderListener) { + val listenerStub = object : LeaderLatchListener { + override fun notLeader() { + listener.notLeader() + } + + override fun isLeader() { + listener.isLeader() + } + + } + listeners[listener] = listenerStub + leaderLatch.addListener(listenerStub) } - override fun removeLeaderShipListener(listener: LeaderLatchListener) { - leaderLatch.removeListener(listener) + override fun removeLeadershipListener(listener: CordaLeaderListener) { + val listenerStub = listeners.remove(listener) + if (listenerStub != null) { + leaderLatch.removeListener(listenerStub) + } } } \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt index b6ea85052b..1a4203fada 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt @@ -10,7 +10,27 @@ package net.corda.nodeapi.internal.zookeeper -import org.apache.curator.framework.recipes.leader.LeaderLatchListener + +/** + * Listener interface for leader election results, to avoid public reference to shadowed curator classes. + */ +interface CordaLeaderListener { + /** + * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true. + * + * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If + * this occurs, you can expect {@link #notLeader()} to also be called. + */ + fun notLeader() + + /** + * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false. + * + * Note that it is possible that by the time this method call happens, hasLeadership has become true. If + * this occurs, you can expect {@link #isLeader()} to also be called. + */ + fun isLeader() +} interface ZkLeader { /** @@ -38,14 +58,14 @@ interface ZkLeader { fun relinquishLeadership() /** - * @param listener an instance of [LeaderLatchListener] that will be used for election notifications + * @param listener an instance of [CordaLeaderListener] that will be used for election notifications */ - fun addLeadershipListener(listener: LeaderLatchListener) + fun addLeadershipListener(listener: CordaLeaderListener) /** - * @param listener the [LeaderLatchListener] instance to be removed + * @param listener the [CordaLeaderListener] instance to be removed */ - fun removeLeaderShipListener(listener: LeaderLatchListener) + fun removeLeadershipListener(listener: CordaLeaderListener) /** * @return [true] if client is the current leader, [false] otherwise diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt index 2e8bd8f029..48f774c1ba 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt @@ -11,7 +11,6 @@ package net.corda.nodeapi.internal.zookeeper import net.corda.core.utilities.contextLogger -import org.apache.curator.framework.recipes.leader.LeaderLatchListener import org.apache.curator.test.TestingServer import org.apache.curator.utils.ZKPaths import org.junit.After @@ -187,8 +186,8 @@ class ZkClientTests { @Test fun `clients with higher priority join and take leadership`() { val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "ALICE", 0) - val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "BOB", 1) - val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2) + val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "BOB", 50) // Use numbers that check for numeric sorting + val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2000) val aliceLeaderGain = CountDownLatch(1) val bobLeaderGain = CountDownLatch(1) val bobLeaderLoss = CountDownLatch(1) @@ -297,6 +296,7 @@ class ZkClientTests { when(action) { Action.REQUEST -> client.requestLeadership() Action.RELINQUISH -> client.relinquishLeadership() + else -> throw IllegalArgumentException("Invalid action choice") } Thread.sleep(100) } @@ -332,7 +332,7 @@ class ZkClientTests { } private class HelperListener(private val nodeId: String, - private val leaders: MutableList) : LeaderLatchListener{ + private val leaders: MutableList) : CordaLeaderListener { @Synchronized override fun notLeader() { leaders.remove(nodeId) @@ -345,7 +345,7 @@ class ZkClientTests { } private class SyncHelperListener(private val nodeId: String, private val leaderGain: CountDownLatch = CountDownLatch(1), - private val leaderLoss: CountDownLatch = CountDownLatch(1)) : LeaderLatchListener { + private val leaderLoss: CountDownLatch = CountDownLatch(1)) : CordaLeaderListener { override fun notLeader() { log.info("$nodeId is no longer leader.") leaderLoss.countDown() diff --git a/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt b/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt index 67177b37ed..02d98752d3 100644 --- a/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt +++ b/samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt @@ -52,8 +52,8 @@ class SimmValuationTest : IntegrationTest() { val nodeAFuture = startNode(providedName = nodeALegalName) val nodeBFuture = startNode(providedName = nodeBLegalName) val (nodeA, nodeB) = listOf(nodeAFuture, nodeBFuture).map { it.getOrThrow() } - val nodeAWebServerFuture = startWebserver(nodeA) - val nodeBWebServerFuture = startWebserver(nodeB) + val nodeAWebServerFuture = startWebserver(nodeA, maximumHeapSize = "300m") + val nodeBWebServerFuture = startWebserver(nodeB, maximumHeapSize = "300m") val nodeAApi = HttpApi.fromHostAndPort(nodeAWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo") val nodeBApi = HttpApi.fromHostAndPort(nodeBWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo") val nodeBParty = getPartyWithName(nodeAApi, nodeBLegalName)