First cut HA for bridge

Simple implementation of bridge HA logic. Fix of shading magic in gradle. Removal of exposed curator classes from node-api interface.

Simple implementation of bridge HA logic. Fix of shading magic in gradle. Removal of exposed curator classes from node-api interface.

Modify leader priority test to catch lexical rather than numeric sorting.
This commit is contained in:
Matthew Nesbit
2018-04-17 12:03:43 +01:00
parent b3a4e3907f
commit 942da1b8e0
18 changed files with 384 additions and 39 deletions

View File

@ -23,6 +23,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREF
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer
@ -35,6 +36,7 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
socksProxyConfig: SocksProxyConfig? = null,
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, artemisMessageClientFactory)
private val validInboundQueues = mutableSetOf<String>()
private var artemis: ArtemisSessionProvider? = null
@ -64,8 +66,11 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
artemis.start()
val artemisClient = artemis.started!!
val artemisSession = artemisClient.session
val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue)
try {
artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue)
} catch (ex: ActiveMQQueueExistsException) {
// Ignore if there is a queue still not cleaned up
}
val control = artemisSession.createConsumer(bridgeControlQueue)
controlConsumer = control
control.setMessageHandler { msg ->
@ -88,7 +93,10 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
validInboundQueues.clear()
controlConsumer?.close()
controlConsumer = null
artemis?.stop()
artemis?.apply {
started?.session?.deleteQueue(bridgeControlQueue)
stop()
}
artemis = null
bridgeManager.stop()
}

View File

@ -12,6 +12,7 @@ package net.corda.nodeapi.internal.zookeeper
import com.google.common.base.Preconditions
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.api.BackgroundCallback
import org.apache.curator.framework.api.CuratorEvent
@ -176,6 +177,8 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
}
ConnectionState.LOST -> setLeadership(false)
else -> log.debug { "Ignoring state change $newState" }
}
}
@ -199,10 +202,11 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
}
}
val latchName = "$nodeId$LOCK_NAME${"%05d".format(priority)}" // Fixed width priority to ensure numeric sorting
watchedClient.create()
.creatingParentContainersIfNeeded()
.withProtection().withMode(CreateMode.EPHEMERAL)
.inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, "$nodeId$LOCK_NAME$priority"), nodeId.toByteArray(Charsets.UTF_8))
.inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, latchName), nodeId.toByteArray(Charsets.UTF_8))
}
private fun setLeadership(newValue: Boolean) {
@ -275,13 +279,15 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
private class ElectionWatcher(private val latch: PrioritizedLeaderLatch) : Watcher {
override fun process(event: WatchedEvent) {
log.info("Client ${latch.nodeId} detected event ${event.type}.")
latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path)
if (State.STARTED == latch.state.get() && Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) {
try {
log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.")
latch.processCandidates()
} catch (e: Exception) {
log.error("An error occurred checking the leadership.", e)
if (State.STARTED == latch.state.get()) {
latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path)
if (Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) {
try {
log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.")
latch.processCandidates()
} catch (e: Exception) {
log.error("An error occurred checking the leadership.", e)
}
}
}
}

View File

@ -11,12 +11,14 @@
package net.corda.nodeapi.internal.zookeeper
import net.corda.core.utilities.contextLogger
import org.apache.curator.RetryPolicy
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.imps.CuratorFrameworkState
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
import org.apache.curator.retry.RetryOneTime
import org.apache.curator.retry.RetryForever
import org.apache.curator.retry.RetryNTimes
import org.apache.curator.utils.CloseableUtils
import java.util.concurrent.ConcurrentHashMap
/**
* Simple Zookeeper client that offers priority based leader election.
@ -26,21 +28,33 @@ import org.apache.curator.utils.CloseableUtils
* @param nodeId unique client identifier used in the creation of child zNodes
* @param priority indicates the priority of the client in the election process. Low value means high priority(a client
* with [priority] set to 0 will have become leader before a client with [priority] 1
* @param retryPolicy is an instance of [RetryPolicy] and indicates the process in case connection to Zookeeper server/cluster
* is lost. If no policy is supplied, [RetryOneTime] will be used with 500ms before attempting to reconnect
* @param retryInterval the interval in msec between retries of the Zookeeper connection. Default value is 500 msec.
* @param retryCount the number of retries before giving up default value is 1. Use -1 to indicate forever.
*/
class ZkClient(connectionString: String,
electionPath: String,
val nodeId: String,
val priority: Int,
retryPolicy: RetryPolicy = RetryOneTime(500)) : ZkLeader {
retryInterval: Int = 500,
retryCount: Int = 1) : ZkLeader {
private companion object {
private val log = contextLogger()
}
private val client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
private val leaderLatch = PrioritizedLeaderLatch(client, electionPath, nodeId, priority)
private val client: CuratorFramework
private val leaderLatch: PrioritizedLeaderLatch
private val listeners = ConcurrentHashMap<CordaLeaderListener, LeaderLatchListener>()
init {
val retryPolicy = if (retryCount == -1) {
RetryForever(retryInterval)
} else {
RetryNTimes(retryCount, retryInterval)
}
client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
leaderLatch = PrioritizedLeaderLatch(client, electionPath, nodeId, priority)
}
override fun start() {
if (client.state != CuratorFrameworkState.STARTED) {
@ -82,12 +96,26 @@ class ZkClient(connectionString: String,
return client.state == CuratorFrameworkState.STARTED
}
override fun addLeadershipListener(listener: LeaderLatchListener) {
leaderLatch.addListener(listener)
override fun addLeadershipListener(listener: CordaLeaderListener) {
val listenerStub = object : LeaderLatchListener {
override fun notLeader() {
listener.notLeader()
}
override fun isLeader() {
listener.isLeader()
}
}
listeners[listener] = listenerStub
leaderLatch.addListener(listenerStub)
}
override fun removeLeaderShipListener(listener: LeaderLatchListener) {
leaderLatch.removeListener(listener)
override fun removeLeadershipListener(listener: CordaLeaderListener) {
val listenerStub = listeners.remove(listener)
if (listenerStub != null) {
leaderLatch.removeListener(listenerStub)
}
}
}

View File

@ -10,7 +10,27 @@
package net.corda.nodeapi.internal.zookeeper
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
/**
* Listener interface for leader election results, to avoid public reference to shadowed curator classes.
*/
interface CordaLeaderListener {
/**
* This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
*
* Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If
* this occurs, you can expect {@link #notLeader()} to also be called.
*/
fun notLeader()
/**
* This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
*
* Note that it is possible that by the time this method call happens, hasLeadership has become true. If
* this occurs, you can expect {@link #isLeader()} to also be called.
*/
fun isLeader()
}
interface ZkLeader {
/**
@ -38,14 +58,14 @@ interface ZkLeader {
fun relinquishLeadership()
/**
* @param listener an instance of [LeaderLatchListener] that will be used for election notifications
* @param listener an instance of [CordaLeaderListener] that will be used for election notifications
*/
fun addLeadershipListener(listener: LeaderLatchListener)
fun addLeadershipListener(listener: CordaLeaderListener)
/**
* @param listener the [LeaderLatchListener] instance to be removed
* @param listener the [CordaLeaderListener] instance to be removed
*/
fun removeLeaderShipListener(listener: LeaderLatchListener)
fun removeLeadershipListener(listener: CordaLeaderListener)
/**
* @return [true] if client is the current leader, [false] otherwise

View File

@ -11,7 +11,6 @@
package net.corda.nodeapi.internal.zookeeper
import net.corda.core.utilities.contextLogger
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
import org.apache.curator.test.TestingServer
import org.apache.curator.utils.ZKPaths
import org.junit.After
@ -187,8 +186,8 @@ class ZkClientTests {
@Test
fun `clients with higher priority join and take leadership`() {
val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "ALICE", 0)
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "BOB", 1)
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2)
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "BOB", 50) // Use numbers that check for numeric sorting
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2000)
val aliceLeaderGain = CountDownLatch(1)
val bobLeaderGain = CountDownLatch(1)
val bobLeaderLoss = CountDownLatch(1)
@ -297,6 +296,7 @@ class ZkClientTests {
when(action) {
Action.REQUEST -> client.requestLeadership()
Action.RELINQUISH -> client.relinquishLeadership()
else -> throw IllegalArgumentException("Invalid action choice")
}
Thread.sleep(100)
}
@ -332,7 +332,7 @@ class ZkClientTests {
}
private class HelperListener(private val nodeId: String,
private val leaders: MutableList<String>) : LeaderLatchListener{
private val leaders: MutableList<String>) : CordaLeaderListener {
@Synchronized
override fun notLeader() {
leaders.remove(nodeId)
@ -345,7 +345,7 @@ class ZkClientTests {
}
private class SyncHelperListener(private val nodeId: String,
private val leaderGain: CountDownLatch = CountDownLatch(1),
private val leaderLoss: CountDownLatch = CountDownLatch(1)) : LeaderLatchListener {
private val leaderLoss: CountDownLatch = CountDownLatch(1)) : CordaLeaderListener {
override fun notLeader() {
log.info("$nodeId is no longer leader.")
leaderLoss.countDown()