Merge pull request #755 from corda/mnesbit-bridge-ha

Introduce a simple Zookeeper managed HA Leader election into the Bridge/Float
This commit is contained in:
Matthew Nesbit 2018-04-20 08:59:58 +01:00 committed by GitHub
commit 3931f13852
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 384 additions and 39 deletions

View File

@ -48,6 +48,7 @@ dependencies {
compile "com.jcabi:jcabi-manifests:1.1" compile "com.jcabi:jcabi-manifests:1.1"
integrationTestCompile project(':node-driver') integrationTestCompile project(':node-driver')
integrationTestCompile "org.apache.curator:curator-test:${curator_version}"
testCompile "junit:junit:$junit_version" testCompile "junit:junit:$junit_version"
testCompile project(':test-utils') testCompile project(':test-utils')
} }

View File

@ -14,6 +14,7 @@ import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever import com.nhaarman.mockito_kotlin.whenever
import net.corda.bridge.internal.BridgeInstance import net.corda.bridge.internal.BridgeInstance
import net.corda.bridge.services.api.BridgeMode import net.corda.bridge.services.api.BridgeMode
import net.corda.bridge.services.config.BridgeHAConfigImpl
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
@ -28,6 +29,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.zookeeper.ZkClient
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE import net.corda.testing.core.MAX_MESSAGE_SIZE
@ -35,6 +37,7 @@ import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.curator.test.TestingServer
import org.junit.Assert.assertEquals import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull import org.junit.Assert.assertNull
import org.junit.Rule import org.junit.Rule
@ -134,6 +137,127 @@ class BridgeIntegrationTest {
} }
@Test
fun `Run HA all in one mode`() {
val configResource = "/net/corda/bridge/hasingleprocess/bridge.conf"
createNetworkParams(tempFolder.root.toPath())
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
assertEquals(BridgeHAConfigImpl("zk//:localhost:11105", 10), config.haConfig)
config.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val (artemisServer, artemisClient) = createArtemis()
val zkServer = TestingServer(11105, false)
try {
installBridgeControlResponder(artemisClient)
val bridge = BridgeInstance(config, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
val stateFollower = bridge.activeChange.toBlocking().iterator
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
bridge.start()
assertEquals(false, bridge.active) // Starting the bridge insufficient to go active
zkServer.start() // Now start zookeeper and we should be able to become active
assertEquals(true, stateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, serverListening("localhost", 10005))
val higherPriorityClient = ZkClient("localhost:11105", "/bridge/ha", "Test", 5)
higherPriorityClient.start()
higherPriorityClient.requestLeadership() // should win leadership and kick out our bridge
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
var socketState = true
for (i in 0 until 5) { // The event signalling bridge down is pretty immediate, but the cascade of events leading to socket close can take a while
socketState = serverListening("localhost", 10005)
if (!socketState) break
Thread.sleep(100)
}
assertEquals(false, socketState)
higherPriorityClient.relinquishLeadership() // let our bridge back as leader
higherPriorityClient.close()
assertEquals(true, stateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, serverListening("localhost", 10005))
bridge.stop() // Finally check shutdown
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
assertEquals(false, serverListening("localhost", 10005))
} finally {
artemisClient.stop()
artemisServer.stop()
zkServer.stop()
}
}
@Test
fun `Run HA float and bridge mode`() {
val bridgeFolder = tempFolder.root.toPath()
val bridgeConfigResource = "/net/corda/bridge/hawithfloat/bridge/bridge.conf"
val bridgeConfig = createAndLoadConfigFromResource(bridgeFolder, bridgeConfigResource)
assertEquals(BridgeHAConfigImpl("zk//:localhost:11105", 10), bridgeConfig.haConfig)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
createNetworkParams(bridgeFolder)
val floatFolder = tempFolder.root.toPath() / "float"
val floatConfigResource = "/net/corda/bridge/hawithfloat/float/bridge.conf"
val floatConfig = createAndLoadConfigFromResource(floatFolder, floatConfigResource)
assertNull(floatConfig.haConfig)
floatConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
createNetworkParams(floatFolder)
val (artemisServer, artemisClient) = createArtemis()
val zkServer = TestingServer(11105, false)
try {
installBridgeControlResponder(artemisClient)
val bridge = BridgeInstance(bridgeConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
val bridgeStateFollower = bridge.activeChange.toBlocking().iterator
val float = BridgeInstance(floatConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
val floatStateFollower = float.activeChange.toBlocking().iterator
assertEquals(false, bridgeStateFollower.next())
assertEquals(false, bridge.active)
assertEquals(false, floatStateFollower.next())
assertEquals(false, float.active)
float.start()
assertEquals(true, floatStateFollower.next()) // float goes active, but not listening
assertEquals(true, float.active)
assertEquals(false, serverListening("localhost", 10005))
bridge.start()
assertEquals(false, bridge.active) // Starting the bridge/float insufficient to go active
assertEquals(true, float.active) // still active, but not listening
assertEquals(false, serverListening("localhost", 10005))
zkServer.start() // Now start zookeeper and we should be able to become active on float listener
assertEquals(true, bridgeStateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, float.active)
assertEquals(true, serverListening("localhost", 10005))
val higherPriorityClient = ZkClient("localhost:11105", "/bridge/ha", "Test", 5)
higherPriorityClient.start()
higherPriorityClient.requestLeadership() // should win leadership and kick out our bridge
assertEquals(false, bridgeStateFollower.next())
assertEquals(false, bridge.active)
assertEquals(true, float.active)
var socketState = true
for (i in 0 until 5) { // The event signalling bridge down is pretty immediate, but the cascade of events leading to socket close can take a while
socketState = serverListening("localhost", 10005)
if (!socketState) break
Thread.sleep(100)
}
assertEquals(false, socketState)
higherPriorityClient.relinquishLeadership() // let our bridge back as leader
higherPriorityClient.close()
assertEquals(true, bridgeStateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, float.active)
assertEquals(true, serverListening("localhost", 10005))
bridge.stop() // Finally check shutdown
float.stop()
assertEquals(false, bridgeStateFollower.next())
assertEquals(false, bridge.active)
assertEquals(false, floatStateFollower.next())
assertEquals(false, float.active)
assertEquals(false, serverListening("localhost", 10005))
} finally {
artemisClient.stop()
artemisServer.stop()
zkServer.stop()
}
}
private fun createArtemis(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> { private fun createArtemis(): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also { val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory doReturn(tempFolder.root.toPath()).whenever(it).baseDirectory

View File

@ -83,6 +83,11 @@ interface FloatInnerConfiguration {
val customFloatOuterSSLConfiguration: BridgeSSLConfiguration? val customFloatOuterSSLConfiguration: BridgeSSLConfiguration?
} }
interface BridgeHAConfig {
val haConnectionString: String
val haPriority: Int
}
/** /**
* Details of the listening port for a [BridgeMode.FloatOuter] process and of the certificate that the [BridgeMode.FloatInner] should present. * Details of the listening port for a [BridgeMode.FloatOuter] process and of the certificate that the [BridgeMode.FloatInner] should present.
* Required for [BridgeMode.FloatOuter] mode. * Required for [BridgeMode.FloatOuter] mode.
@ -100,7 +105,7 @@ interface BridgeConfiguration : NodeSSLConfiguration {
val inboundConfig: BridgeInboundConfiguration? val inboundConfig: BridgeInboundConfiguration?
val floatInnerConfig: FloatInnerConfiguration? val floatInnerConfig: FloatInnerConfiguration?
val floatOuterConfig: FloatOuterConfiguration? val floatOuterConfig: FloatOuterConfiguration?
val haConfig: String? val haConfig: BridgeHAConfig?
val networkParametersPath: Path val networkParametersPath: Path
val enableAMQPPacketTrace: Boolean val enableAMQPPacketTrace: Boolean
// Reconnect to artemis after [artemisReconnectionInterval] ms the default value is 5000 ms. // Reconnect to artemis after [artemisReconnectionInterval] ms the default value is 5000 ms.

View File

@ -49,6 +49,8 @@ data class FloatOuterConfigurationImpl(override val floatAddress: NetworkHostAnd
override val expectedCertificateSubject: CordaX500Name, override val expectedCertificateSubject: CordaX500Name,
override val customSSLConfiguration: BridgeSSLConfigurationImpl?) : FloatOuterConfiguration override val customSSLConfiguration: BridgeSSLConfigurationImpl?) : FloatOuterConfiguration
data class BridgeHAConfigImpl(override val haConnectionString: String, override val haPriority: Int = 10) : BridgeHAConfig
data class BridgeConfigurationImpl( data class BridgeConfigurationImpl(
override val baseDirectory: Path, override val baseDirectory: Path,
override val certificatesDirectory: Path = baseDirectory / "certificates", override val certificatesDirectory: Path = baseDirectory / "certificates",
@ -62,7 +64,7 @@ data class BridgeConfigurationImpl(
override val inboundConfig: BridgeInboundConfigurationImpl?, override val inboundConfig: BridgeInboundConfigurationImpl?,
override val floatInnerConfig: FloatInnerConfigurationImpl?, override val floatInnerConfig: FloatInnerConfigurationImpl?,
override val floatOuterConfig: FloatOuterConfigurationImpl?, override val floatOuterConfig: FloatOuterConfigurationImpl?,
override val haConfig: String?, override val haConfig: BridgeHAConfigImpl?,
override val enableAMQPPacketTrace: Boolean, override val enableAMQPPacketTrace: Boolean,
override val artemisReconnectionInterval: Int = 5000, override val artemisReconnectionInterval: Int = 5000,
override val politeShutdownPeriod: Int = 1000, override val politeShutdownPeriod: Int = 1000,

View File

@ -0,0 +1,83 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.bridge.services.ha
import net.corda.bridge.services.api.BridgeAuditService
import net.corda.bridge.services.api.BridgeConfiguration
import net.corda.bridge.services.api.BridgeMasterService
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener
import net.corda.nodeapi.internal.zookeeper.ZkClient
import net.corda.nodeapi.internal.zookeeper.ZkLeader
import java.lang.management.ManagementFactory
import java.net.InetAddress
class ExternalMasterElectionService(val conf: BridgeConfiguration,
val auditService: BridgeAuditService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeMasterService, ServiceStateSupport by stateHelper {
private var haElector: ZkLeader? = null
private var leaderListener: CordaLeaderListener? = null
companion object {
val log = contextLogger()
}
init {
require(conf.haConfig != null) { "Undefined HA Config" }
require(conf.haConfig!!.haConnectionString.split(',').all { it.startsWith("zk//:") }) { "Only Zookeeper HA mode 'zk//:IPADDR:PORT supported" }
}
override fun start() {
val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk//:", "") }.joinToString(",")
val leaderPriority = conf.haConfig!!.haPriority
val hostName: String = InetAddress.getLocalHost().hostName
val info = ManagementFactory.getRuntimeMXBean()
val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this
val nodeId = "$hostName:$pid"
val leaderElector = ZkClient(zkConf, "/bridge/ha", nodeId, leaderPriority)
haElector = leaderElector
val listener = object : CordaLeaderListener {
override fun notLeader() {
auditService.statusChangeEvent("Loss of leadership signalled by Zookeeper")
stateHelper.active = false
}
override fun isLeader() {
auditService.statusChangeEvent("Acquired leadership from Zookeeper. Going active")
stateHelper.active = true
}
}
leaderListener = listener
leaderElector.addLeadershipListener(listener)
leaderElector.start()
auditService.statusChangeEvent("Requesting leadership from Zookeeper")
leaderElector.requestLeadership()
}
override fun stop() {
auditService.statusChangeEvent("Stop requested")
stateHelper.active = false
haElector?.apply {
if (leaderListener != null) {
removeLeadershipListener(leaderListener!!)
}
relinquishLeadership()
close()
}
haElector = null
leaderListener = null
}
}

View File

@ -49,6 +49,10 @@ class InProcessBridgeReceiverService(val conf: BridgeConfiguration,
sslConfiguration.keyStorePassword.toCharArray(), sslConfiguration.keyStorePassword.toCharArray(),
trustStoreBytes, trustStoreBytes,
sslConfiguration.trustStorePassword.toCharArray()) sslConfiguration.trustStorePassword.toCharArray())
} else {
if (amqpListenerService.running) {
amqpListenerService.wipeKeysAndDeactivate()
}
} }
stateHelper.active = it stateHelper.active = it
} }

View File

@ -13,6 +13,7 @@ package net.corda.bridge.services.supervisors
import net.corda.bridge.services.api.* import net.corda.bridge.services.api.*
import net.corda.bridge.services.artemis.BridgeArtemisConnectionServiceImpl import net.corda.bridge.services.artemis.BridgeArtemisConnectionServiceImpl
import net.corda.bridge.services.filter.SimpleMessageFilterService import net.corda.bridge.services.filter.SimpleMessageFilterService
import net.corda.bridge.services.ha.ExternalMasterElectionService
import net.corda.bridge.services.ha.SingleInstanceMasterService import net.corda.bridge.services.ha.SingleInstanceMasterService
import net.corda.bridge.services.receiver.InProcessBridgeReceiverService import net.corda.bridge.services.receiver.InProcessBridgeReceiverService
import net.corda.bridge.services.receiver.TunnelingBridgeReceiverService import net.corda.bridge.services.receiver.TunnelingBridgeReceiverService
@ -42,10 +43,10 @@ class BridgeSupervisorServiceImpl(val conf: BridgeConfiguration,
private var statusSubscriber: Subscription? = null private var statusSubscriber: Subscription? = null
init { init {
if (conf.haConfig.isNullOrEmpty()) { if (conf.haConfig == null) {
haService = SingleInstanceMasterService(conf, auditService) haService = SingleInstanceMasterService(conf, auditService)
} else { } else {
TODO() haService = ExternalMasterElectionService(conf, auditService)
} }
artemisService = BridgeArtemisConnectionServiceImpl(conf, maxMessageSize, auditService) artemisService = BridgeArtemisConnectionServiceImpl(conf, maxMessageSize, auditService)
senderService = DirectBridgeSenderService(conf, auditService, haService, artemisService) senderService = DirectBridgeSenderService(conf, auditService, haService, artemisService)

View File

@ -36,6 +36,7 @@ class FloatSupervisorServiceImpl(val conf: BridgeConfiguration,
init { init {
amqpListenerService = BridgeAMQPListenerServiceImpl(conf, auditService) amqpListenerService = BridgeAMQPListenerServiceImpl(conf, auditService)
floatControlService = if (conf.bridgeMode == BridgeMode.FloatOuter) { floatControlService = if (conf.bridgeMode == BridgeMode.FloatOuter) {
require(conf.haConfig == null) { "Float process should not have HA config, that is controlled via the bridge." }
FloatControlListenerService(conf, auditService, amqpListenerService) FloatControlListenerService(conf, auditService, amqpListenerService)
} else { } else {
null null

View File

@ -0,0 +1,20 @@
//
// R3 Proprietary and Confidential
//
// Copyright (c) 2018 R3 Limited. All rights reserved.
//
// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
//
// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
bridgeMode = SenderReceiver
outboundConfig : {
artemisBrokerAddress = "localhost:11005"
}
inboundConfig : {
listeningAddress = "0.0.0.0:10005"
}
networkParametersPath = network-parameters
haConfig : {
haConnectionString = "zk//:localhost:11105"
}

View File

@ -0,0 +1,21 @@
//
// R3 Proprietary and Confidential
//
// Copyright (c) 2018 R3 Limited. All rights reserved.
//
// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
//
// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
bridgeMode = FloatInner
outboundConfig : {
artemisBrokerAddress = "localhost:11005"
}
floatInnerConfig : {
floatAddresses = [ "localhost:12005" ]
expectedCertificateSubject = "O=Bank A, L=London, C=GB"
}
networkParametersPath = network-parameters
haConfig : {
haConnectionString = "zk//:localhost:11105"
}

View File

@ -0,0 +1,18 @@
//
// R3 Proprietary and Confidential
//
// Copyright (c) 2018 R3 Limited. All rights reserved.
//
// The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
//
// Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
bridgeMode = FloatOuter
inboundConfig : {
listeningAddress = "0.0.0.0:10005"
}
floatOuterConfig : {
floatAddress = "localhost:12005"
expectedCertificateSubject = "O=Bank A, L=London, C=GB"
}
networkParametersPath = network-parameters

View File

@ -54,8 +54,8 @@ dependencies {
runtime 'com.mattbertolini:liquibase-slf4j:2.0.0' runtime 'com.mattbertolini:liquibase-slf4j:2.0.0'
// Apache Curator: a client library for Zookeeper // Apache Curator: a client library for Zookeeper
shadow "org.apache.curator:curator-client:${curator_version}"
shadow "org.apache.curator:curator-recipes:${curator_version}" shadow "org.apache.curator:curator-recipes:${curator_version}"
testCompile configurations.shadow // need to look at rewritten classes
testCompile "org.apache.curator:curator-test:${curator_version}" testCompile "org.apache.curator:curator-test:${curator_version}"
// FastClasspathScanner: classpath scanning - needed for the NetworkBootstraper // FastClasspathScanner: classpath scanning - needed for the NetworkBootstraper
@ -76,6 +76,7 @@ dependencies {
} }
configurations { configurations {
compile.extendsFrom shadow
testArtifacts.extendsFrom testRuntime testArtifacts.extendsFrom testRuntime
} }
@ -90,7 +91,9 @@ shadowJar {
configurations = [project.configurations.shadow] configurations = [project.configurations.shadow]
classifier "" classifier ""
dependencies { dependencies {
include(dependency("org.apache.curator:curator-client:${curator_version}"))
include(dependency("org.apache.curator:curator-recipes:${curator_version}")) include(dependency("org.apache.curator:curator-recipes:${curator_version}"))
include(dependency("org.apache.curator:curator-framework:${curator_version}"))
include(dependency('org.apache.zookeeper:zookeeper:3.5.3-beta')) include(dependency('org.apache.zookeeper:zookeeper:3.5.3-beta'))
include(dependency('commons-cli:commons-cli:1.2')) include(dependency('commons-cli:commons-cli:1.2'))
include(dependency('io.netty:netty:3.10.5.Final')) include(dependency('io.netty:netty:3.10.5.Final'))

View File

@ -23,6 +23,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREF
import net.corda.nodeapi.internal.ArtemisSessionProvider import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientConsumer
@ -35,6 +36,7 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
socksProxyConfig: SocksProxyConfig? = null, socksProxyConfig: SocksProxyConfig? = null,
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable { val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString() private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, artemisMessageClientFactory) private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, artemisMessageClientFactory)
private val validInboundQueues = mutableSetOf<String>() private val validInboundQueues = mutableSetOf<String>()
private var artemis: ArtemisSessionProvider? = null private var artemis: ArtemisSessionProvider? = null
@ -64,8 +66,11 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
artemis.start() artemis.start()
val artemisClient = artemis.started!! val artemisClient = artemis.started!!
val artemisSession = artemisClient.session val artemisSession = artemisClient.session
val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId" try {
artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue) artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue)
} catch (ex: ActiveMQQueueExistsException) {
// Ignore if there is a queue still not cleaned up
}
val control = artemisSession.createConsumer(bridgeControlQueue) val control = artemisSession.createConsumer(bridgeControlQueue)
controlConsumer = control controlConsumer = control
control.setMessageHandler { msg -> control.setMessageHandler { msg ->
@ -88,7 +93,10 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
validInboundQueues.clear() validInboundQueues.clear()
controlConsumer?.close() controlConsumer?.close()
controlConsumer = null controlConsumer = null
artemis?.stop() artemis?.apply {
started?.session?.deleteQueue(bridgeControlQueue)
stop()
}
artemis = null artemis = null
bridgeManager.stop() bridgeManager.stop()
} }

View File

@ -12,6 +12,7 @@ package net.corda.nodeapi.internal.zookeeper
import com.google.common.base.Preconditions import com.google.common.base.Preconditions
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.api.BackgroundCallback import org.apache.curator.framework.api.BackgroundCallback
import org.apache.curator.framework.api.CuratorEvent import org.apache.curator.framework.api.CuratorEvent
@ -176,6 +177,8 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
} }
ConnectionState.LOST -> setLeadership(false) ConnectionState.LOST -> setLeadership(false)
else -> log.debug { "Ignoring state change $newState" }
} }
} }
@ -199,10 +202,11 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
} }
} }
val latchName = "$nodeId$LOCK_NAME${"%05d".format(priority)}" // Fixed width priority to ensure numeric sorting
watchedClient.create() watchedClient.create()
.creatingParentContainersIfNeeded() .creatingParentContainersIfNeeded()
.withProtection().withMode(CreateMode.EPHEMERAL) .withProtection().withMode(CreateMode.EPHEMERAL)
.inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, "$nodeId$LOCK_NAME$priority"), nodeId.toByteArray(Charsets.UTF_8)) .inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, latchName), nodeId.toByteArray(Charsets.UTF_8))
} }
private fun setLeadership(newValue: Boolean) { private fun setLeadership(newValue: Boolean) {
@ -275,8 +279,9 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
private class ElectionWatcher(private val latch: PrioritizedLeaderLatch) : Watcher { private class ElectionWatcher(private val latch: PrioritizedLeaderLatch) : Watcher {
override fun process(event: WatchedEvent) { override fun process(event: WatchedEvent) {
log.info("Client ${latch.nodeId} detected event ${event.type}.") log.info("Client ${latch.nodeId} detected event ${event.type}.")
if (State.STARTED == latch.state.get()) {
latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path) latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path)
if (State.STARTED == latch.state.get() && Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) { if (Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) {
try { try {
log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.") log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.")
latch.processCandidates() latch.processCandidates()
@ -285,6 +290,7 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
} }
} }
} }
}
} }
} }

View File

@ -11,12 +11,14 @@
package net.corda.nodeapi.internal.zookeeper package net.corda.nodeapi.internal.zookeeper
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import org.apache.curator.RetryPolicy import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.imps.CuratorFrameworkState import org.apache.curator.framework.imps.CuratorFrameworkState
import org.apache.curator.framework.recipes.leader.LeaderLatchListener import org.apache.curator.framework.recipes.leader.LeaderLatchListener
import org.apache.curator.retry.RetryOneTime import org.apache.curator.retry.RetryForever
import org.apache.curator.retry.RetryNTimes
import org.apache.curator.utils.CloseableUtils import org.apache.curator.utils.CloseableUtils
import java.util.concurrent.ConcurrentHashMap
/** /**
* Simple Zookeeper client that offers priority based leader election. * Simple Zookeeper client that offers priority based leader election.
@ -26,21 +28,33 @@ import org.apache.curator.utils.CloseableUtils
* @param nodeId unique client identifier used in the creation of child zNodes * @param nodeId unique client identifier used in the creation of child zNodes
* @param priority indicates the priority of the client in the election process. Low value means high priority(a client * @param priority indicates the priority of the client in the election process. Low value means high priority(a client
* with [priority] set to 0 will have become leader before a client with [priority] 1 * with [priority] set to 0 will have become leader before a client with [priority] 1
* @param retryPolicy is an instance of [RetryPolicy] and indicates the process in case connection to Zookeeper server/cluster * @param retryInterval the interval in msec between retries of the Zookeeper connection. Default value is 500 msec.
* is lost. If no policy is supplied, [RetryOneTime] will be used with 500ms before attempting to reconnect * @param retryCount the number of retries before giving up default value is 1. Use -1 to indicate forever.
*/ */
class ZkClient(connectionString: String, class ZkClient(connectionString: String,
electionPath: String, electionPath: String,
val nodeId: String, val nodeId: String,
val priority: Int, val priority: Int,
retryPolicy: RetryPolicy = RetryOneTime(500)) : ZkLeader { retryInterval: Int = 500,
retryCount: Int = 1) : ZkLeader {
private companion object { private companion object {
private val log = contextLogger() private val log = contextLogger()
} }
private val client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy) private val client: CuratorFramework
private val leaderLatch = PrioritizedLeaderLatch(client, electionPath, nodeId, priority) private val leaderLatch: PrioritizedLeaderLatch
private val listeners = ConcurrentHashMap<CordaLeaderListener, LeaderLatchListener>()
init {
val retryPolicy = if (retryCount == -1) {
RetryForever(retryInterval)
} else {
RetryNTimes(retryCount, retryInterval)
}
client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
leaderLatch = PrioritizedLeaderLatch(client, electionPath, nodeId, priority)
}
override fun start() { override fun start() {
if (client.state != CuratorFrameworkState.STARTED) { if (client.state != CuratorFrameworkState.STARTED) {
@ -82,12 +96,26 @@ class ZkClient(connectionString: String,
return client.state == CuratorFrameworkState.STARTED return client.state == CuratorFrameworkState.STARTED
} }
override fun addLeadershipListener(listener: LeaderLatchListener) { override fun addLeadershipListener(listener: CordaLeaderListener) {
leaderLatch.addListener(listener) val listenerStub = object : LeaderLatchListener {
override fun notLeader() {
listener.notLeader()
}
override fun isLeader() {
listener.isLeader()
}
}
listeners[listener] = listenerStub
leaderLatch.addListener(listenerStub)
} }
override fun removeLeaderShipListener(listener: LeaderLatchListener) { override fun removeLeadershipListener(listener: CordaLeaderListener) {
leaderLatch.removeListener(listener) val listenerStub = listeners.remove(listener)
if (listenerStub != null) {
leaderLatch.removeListener(listenerStub)
}
} }
} }

View File

@ -10,7 +10,27 @@
package net.corda.nodeapi.internal.zookeeper package net.corda.nodeapi.internal.zookeeper
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
/**
* Listener interface for leader election results, to avoid public reference to shadowed curator classes.
*/
interface CordaLeaderListener {
/**
* This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
*
* Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If
* this occurs, you can expect {@link #notLeader()} to also be called.
*/
fun notLeader()
/**
* This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
*
* Note that it is possible that by the time this method call happens, hasLeadership has become true. If
* this occurs, you can expect {@link #isLeader()} to also be called.
*/
fun isLeader()
}
interface ZkLeader { interface ZkLeader {
/** /**
@ -38,14 +58,14 @@ interface ZkLeader {
fun relinquishLeadership() fun relinquishLeadership()
/** /**
* @param listener an instance of [LeaderLatchListener] that will be used for election notifications * @param listener an instance of [CordaLeaderListener] that will be used for election notifications
*/ */
fun addLeadershipListener(listener: LeaderLatchListener) fun addLeadershipListener(listener: CordaLeaderListener)
/** /**
* @param listener the [LeaderLatchListener] instance to be removed * @param listener the [CordaLeaderListener] instance to be removed
*/ */
fun removeLeaderShipListener(listener: LeaderLatchListener) fun removeLeadershipListener(listener: CordaLeaderListener)
/** /**
* @return [true] if client is the current leader, [false] otherwise * @return [true] if client is the current leader, [false] otherwise

View File

@ -11,7 +11,6 @@
package net.corda.nodeapi.internal.zookeeper package net.corda.nodeapi.internal.zookeeper
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
import org.apache.curator.test.TestingServer import org.apache.curator.test.TestingServer
import org.apache.curator.utils.ZKPaths import org.apache.curator.utils.ZKPaths
import org.junit.After import org.junit.After
@ -187,8 +186,8 @@ class ZkClientTests {
@Test @Test
fun `clients with higher priority join and take leadership`() { fun `clients with higher priority join and take leadership`() {
val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "ALICE", 0) val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "ALICE", 0)
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "BOB", 1) val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "BOB", 50) // Use numbers that check for numeric sorting
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2) val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2000)
val aliceLeaderGain = CountDownLatch(1) val aliceLeaderGain = CountDownLatch(1)
val bobLeaderGain = CountDownLatch(1) val bobLeaderGain = CountDownLatch(1)
val bobLeaderLoss = CountDownLatch(1) val bobLeaderLoss = CountDownLatch(1)
@ -297,6 +296,7 @@ class ZkClientTests {
when(action) { when(action) {
Action.REQUEST -> client.requestLeadership() Action.REQUEST -> client.requestLeadership()
Action.RELINQUISH -> client.relinquishLeadership() Action.RELINQUISH -> client.relinquishLeadership()
else -> throw IllegalArgumentException("Invalid action choice")
} }
Thread.sleep(100) Thread.sleep(100)
} }
@ -332,7 +332,7 @@ class ZkClientTests {
} }
private class HelperListener(private val nodeId: String, private class HelperListener(private val nodeId: String,
private val leaders: MutableList<String>) : LeaderLatchListener{ private val leaders: MutableList<String>) : CordaLeaderListener {
@Synchronized @Synchronized
override fun notLeader() { override fun notLeader() {
leaders.remove(nodeId) leaders.remove(nodeId)
@ -345,7 +345,7 @@ class ZkClientTests {
} }
private class SyncHelperListener(private val nodeId: String, private class SyncHelperListener(private val nodeId: String,
private val leaderGain: CountDownLatch = CountDownLatch(1), private val leaderGain: CountDownLatch = CountDownLatch(1),
private val leaderLoss: CountDownLatch = CountDownLatch(1)) : LeaderLatchListener { private val leaderLoss: CountDownLatch = CountDownLatch(1)) : CordaLeaderListener {
override fun notLeader() { override fun notLeader() {
log.info("$nodeId is no longer leader.") log.info("$nodeId is no longer leader.")
leaderLoss.countDown() leaderLoss.countDown()

View File

@ -52,8 +52,8 @@ class SimmValuationTest : IntegrationTest() {
val nodeAFuture = startNode(providedName = nodeALegalName) val nodeAFuture = startNode(providedName = nodeALegalName)
val nodeBFuture = startNode(providedName = nodeBLegalName) val nodeBFuture = startNode(providedName = nodeBLegalName)
val (nodeA, nodeB) = listOf(nodeAFuture, nodeBFuture).map { it.getOrThrow() } val (nodeA, nodeB) = listOf(nodeAFuture, nodeBFuture).map { it.getOrThrow() }
val nodeAWebServerFuture = startWebserver(nodeA) val nodeAWebServerFuture = startWebserver(nodeA, maximumHeapSize = "300m")
val nodeBWebServerFuture = startWebserver(nodeB) val nodeBWebServerFuture = startWebserver(nodeB, maximumHeapSize = "300m")
val nodeAApi = HttpApi.fromHostAndPort(nodeAWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo") val nodeAApi = HttpApi.fromHostAndPort(nodeAWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo")
val nodeBApi = HttpApi.fromHostAndPort(nodeBWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo") val nodeBApi = HttpApi.fromHostAndPort(nodeBWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo")
val nodeBParty = getPartyWithName(nodeAApi, nodeBLegalName) val nodeBParty = getPartyWithName(nodeAApi, nodeBLegalName)