ENT-2655: added a lock on leader status to ensure no clients can be leader at the same time (#1517)

* ENT-2655: added a lock on leader status to ensure no clients can be leader at the same time

* ENT-2655: reworked tests to not use hacky timeouts, now check for max 1 leader at any given time, improved error handling in the latch

* ENT-2655: address PR comments and use atomic int properly
This commit is contained in:
bpaunescu 2018-11-06 10:48:11 +00:00 committed by GitHub
parent b07cd38186
commit 9a778b0097
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 128 deletions

View File

@ -14,9 +14,6 @@ import net.corda.nodeapi.internal.zookeeper.ZkLeader
import rx.Subscription
import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
@ -30,9 +27,6 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration,
private var haElector: ZkLeader? = null
private var leaderListener: CordaLeaderListener? = null
private val scheduler = Executors.newSingleThreadScheduledExecutor()
private var becomeMasterFuture: ScheduledFuture<*>? = null
private var statusSubscriber: Subscription? = null
private val statusFollower = ServiceStateCombiner(listOf(artemisService))
@ -40,7 +34,6 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration,
private companion object {
private val log = contextLogger()
private const val DELAYED_LEADER_START = 5000L
}
init {
@ -51,15 +44,10 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration,
private fun becomeMaster() {
auditService.statusChangeEvent("Acquired leadership. Going active")
stateHelper.active = true
becomeMasterFuture = null
}
private fun becomeSlave() {
log.info("Cancelling leadership")
becomeMasterFuture?.apply {
cancel(false)
}
becomeMasterFuture = null
stateHelper.active = false
}
@ -102,11 +90,8 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration,
}
override fun isLeader() {
log.info("Zookeeper has signalled leadership acquired. Delay master claim for a short period to allow old master to close")
becomeMasterFuture?.apply {
cancel(false)
}
becomeMasterFuture = scheduler.schedule(::becomeMaster, DELAYED_LEADER_START, TimeUnit.MILLISECONDS)
log.info("Zookeeper has signalled leadership acquired.")
becomeMaster()
}
}
leaderListener = listener

View File

@ -9,9 +9,7 @@ import org.apache.curator.framework.api.CuratorEvent
import org.apache.curator.framework.listen.ListenerContainer
import org.apache.curator.framework.recipes.AfterConnectionEstablished
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
import org.apache.curator.framework.recipes.locks.LockInternals
import org.apache.curator.framework.recipes.locks.LockInternalsSorter
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver
import org.apache.curator.framework.recipes.locks.*
import org.apache.curator.framework.state.ConnectionState
import org.apache.curator.framework.state.ConnectionStateListener
import org.apache.curator.utils.ZKPaths
@ -43,7 +41,7 @@ import java.util.concurrent.atomic.AtomicReference
* @param nodeId the unique identifier used to link a client to a zNode
* @param priority an [Int] value that determines this client's priority in the election. The lower the value, the higher the priority
*/
internal class PrioritizedLeaderLatch(client: CuratorFramework,
internal class PrioritizedLeaderLatch(private val client: CuratorFramework,
private val path: String,
private val nodeId: String,
private val priority: Int) : Closeable {
@ -51,18 +49,19 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
val state = AtomicReference<State>(State.CLOSED)
private val watchedClient = client.newWatcherRemoveCuratorFramework()
private val leaderLock = InterProcessSemaphoreMutex(client, "$path$LOCK_PATH_NAME")
private val hasLeadership = AtomicBoolean(false)
private val ourPath = AtomicReference<String>()
private val startTask = AtomicReference<Future<*>>()
private val listeners = ListenerContainer<LeaderLatchListener>()
private val connectionStateListener = ConnectionStateListener { _, newState -> handleStateChange(newState) }
private companion object {
private val log = contextLogger()
/** Used to split the zNode path and extract the priority value for sorting and comparison */
private const val LOCK_NAME = "-latch-"
private const val LOCK_PATH_NAME = "-lock"
private val sorter = LockInternalsSorter { str, lockName -> StandardLockInternalsDriver.standardFixForSorting(str, lockName) }
}
@ -91,15 +90,14 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED),
"Already closed or has not been started.")
cancelStartTask()
try {
watchedClient.removeWatchers()
setLeadership(false)
setNode(null)
} catch (e: Exception) {
throw IOException(e)
} finally {
watchedClient.connectionStateListenable.removeListener(connectionStateListener)
setLeadership(false)
}
}
@ -129,7 +127,7 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
try {
reset()
} catch (e: Exception) {
log.error("An error occurred while resetting leadership.", e)
log.error("An error occurred while resetting leadership for client $nodeId.", e)
}
}
}
@ -152,11 +150,11 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
try {
if (watchedClient.connectionStateErrorPolicy.isErrorState(ConnectionState.SUSPENDED) ||
!hasLeadership.get()) {
log.info("Client reconnected. Resetting latch.")
log.info("Client $nodeId reconnected. Resetting latch.")
reset()
}
} catch (e: Exception) {
log.error("Could not reset leader latch.", e)
log.error("Could not reset leader latch for client $nodeId.", e)
setLeadership(false)
}
}
@ -193,18 +191,39 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
}
val latchName = "$nodeId$LOCK_NAME${"%05d".format(priority)}" // Fixed width priority to ensure numeric sorting
watchedClient.create()
.creatingParentContainersIfNeeded()
.withProtection().withMode(CreateMode.EPHEMERAL)
.inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, latchName), nodeId.toByteArray(Charsets.UTF_8))
try {
watchedClient.create()
.creatingParentContainersIfNeeded()
.withProtection().withMode(CreateMode.EPHEMERAL)
.inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, latchName), nodeId.toByteArray(Charsets.UTF_8))
} catch (e: IllegalStateException) {
log.warn("Trying to join election while client is being closed.")
}
}
private fun setLeadership(newValue: Boolean) {
val oldValue = hasLeadership.getAndSet(newValue)
log.info("Setting leadership to $newValue. Old value was $oldValue.")
log.info("Client $nodeId: setting leadership to $newValue; old value was $oldValue.")
if (oldValue && !newValue) {
listeners.forEach { listener -> listener?.notLeader(); null }
// Release lock after listeners have been notified
try {
if (leaderLock.isAcquiredInThisProcess) {
leaderLock.release()
}
} catch (e: IllegalStateException) {
log.warn("Client $nodeId: tried to release leader lock without owning it.")
}
} else if (!oldValue && newValue) {
// Make sure we're the only leader before invoking listeners. This call will block the current thread until
// the lock is acquired
try {
leaderLock.acquire()
} catch (e: IOException) {
log.warn("Client closed while trying to acquire leader lock.")
} catch (e: IllegalStateException) {
log.warn("Client tried to acquire leader lock while closing.")
}
listeners.forEach { listener -> listener?.isLeader(); null }
}
}
@ -273,10 +292,10 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path)
if (Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) {
try {
log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.")
log.info("Client ${latch.nodeId}: change detected in children nodes of path ${latch.path}; checking candidates.")
latch.processCandidates()
} catch (e: Exception) {
log.error("An error occurred checking the leadership.", e)
log.error("Client ${latch.nodeId}: an error occurred checking the leadership.", e)
}
}
}

View File

@ -10,8 +10,8 @@ import org.junit.Test
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class ZkClientTests {
@ -51,15 +51,18 @@ class ZkClientTests {
fun `single client becomes leader`() {
val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test3"), "test", 0)
val leaderGain = CountDownLatch(1)
var leaderCount = AtomicInteger()
val failures = mutableListOf<String>()
thread {
client.start()
client.addLeadershipListener(SyncHelperListener("test", leaderGain))
client.addLeadershipListener(SyncHelperListener("test", leaderCount, failures, leaderGain))
client.requestLeadership()
}
leaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
require(client.isLeader())
validateElectionResults(leaderCount, failures)
client.close()
}
@ -68,10 +71,12 @@ class ZkClientTests {
val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test4"), "test", 0)
val leaderGain = CountDownLatch(1)
val leaderLoss = CountDownLatch(1)
var leaderCount = AtomicInteger()
val failures = mutableListOf<String>()
thread {
client.start()
client.addLeadershipListener(SyncHelperListener("test", leaderGain, leaderLoss))
client.addLeadershipListener(SyncHelperListener("test", leaderCount, failures, leaderGain, leaderLoss))
client.requestLeadership()
}
@ -80,6 +85,8 @@ class ZkClientTests {
client.relinquishLeadership()
leaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
assertFalse(client.isLeader())
validateElectionResults(leaderCount, mutableListOf())
client.close()
}
@ -89,18 +96,16 @@ class ZkClientTests {
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "BOB", 1)
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "CHIP", 2)
val aliceLeaderGain = CountDownLatch(1)
val bobLeaderGain = CountDownLatch(1)
val bobLeaderLoss = CountDownLatch(1)
val chipLeaderLoss = CountDownLatch(1)
val chipLeaderGain = CountDownLatch(1)
var leaderCount = AtomicInteger()
val failures = mutableListOf<String>()
listOf(alice, bob, chip).forEach { client ->
thread{
client.start()
when (client) {
alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain))
bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, bobLeaderGain, bobLeaderLoss))
chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss))
alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, aliceLeaderGain))
bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures))
chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures))
}
client.requestLeadership()
}
@ -108,13 +113,9 @@ class ZkClientTests {
aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
require(alice.isLeader())
if (bobLeaderGain.count == 0L) //wait to lose leadership if leader at some point
bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
assertFalse(bob.isLeader())
if (chipLeaderGain.count == 0L) //wait to lose leadership if leader at some point
chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
assertFalse(chip.isLeader())
validateElectionResults(leaderCount, failures)
listOf(alice, bob, chip).forEach { client -> client.close() }
}
@ -125,17 +126,17 @@ class ZkClientTests {
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test6"), "CHIP", 2)
val aliceLeaderGain = CountDownLatch(1)
val bobLeaderGain = CountDownLatch(1)
val bobLeaderLoss = CountDownLatch(1)
val chipLeaderLoss = CountDownLatch(1)
val chipLeaderGain = CountDownLatch(1)
var leaderCount = AtomicInteger()
val failures = mutableListOf<String>()
listOf(alice, bob, chip).forEach { client ->
thread{
client.start()
when (client) {
alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain))
bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, bobLeaderGain, bobLeaderLoss))
chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss))
alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, aliceLeaderGain))
bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, bobLeaderGain))
chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, chipLeaderGain))
}
client.requestLeadership()
}
@ -143,33 +144,23 @@ class ZkClientTests {
aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
require(alice.isLeader())
if (bobLeaderGain.count == 0L) //wait to lose leadership if leader at some point
bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
assertFalse(bob.isLeader())
if (chipLeaderGain.count == 0L)
chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
assertFalse(chip.isLeader()) //wait to lose leadership if leader at some point
val bobLeaderGain2 = CountDownLatch(1)
bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain2))
alice.relinquishLeadership()
bobLeaderGain2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) // wait for bob to become leader
bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) // wait for bob to become leader
assertFalse(alice.isLeader())
require(bob.isLeader())
assertFalse(chip.isLeader())
val chipLeaderGain2 = CountDownLatch(1)
chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain2))
bob.relinquishLeadership()
chipLeaderGain2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
chipLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
assertFalse(alice.isLeader())
assertFalse(bob.isLeader())
require(chip.isLeader())
validateElectionResults(leaderCount, failures)
listOf(alice, bob, chip).forEach { client -> client.close() }
}
@ -180,37 +171,35 @@ class ZkClientTests {
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2000)
val aliceLeaderGain = CountDownLatch(1)
val bobLeaderGain = CountDownLatch(1)
val bobLeaderLoss = CountDownLatch(1)
val chipLeaderLoss = CountDownLatch(1)
val chipLeaderGain = CountDownLatch(1)
var leaderCount = AtomicInteger()
val failures = mutableListOf<String>()
chip.start()
chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain, chipLeaderLoss))
chip.addLeadershipListener(SyncHelperListener(chip.nodeId, leaderCount, failures, chipLeaderGain))
chip.requestLeadership()
chipLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
require(chip.isLeader())
bob.start()
bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain, bobLeaderLoss))
bob.addLeadershipListener(SyncHelperListener(bob.nodeId, leaderCount, failures, bobLeaderGain))
bob.requestLeadership()
chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
require(bob.isLeader())
assertFalse(chip.isLeader())
alice.start()
alice.addLeadershipListener(SyncHelperListener(alice.nodeId, aliceLeaderGain))
alice.addLeadershipListener(SyncHelperListener(alice.nodeId, leaderCount, failures, aliceLeaderGain))
alice.requestLeadership()
bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
require(alice.isLeader())
assertFalse(bob.isLeader())
assertFalse(chip.isLeader())
validateElectionResults(leaderCount, failures)
listOf(alice, bob, chip).forEach { client -> client.close() }
}
@ -220,49 +209,41 @@ class ZkClientTests {
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "BOB", 1)
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "CHIP", 2)
val aliceLeaderGain = CountDownLatch(1)
val aliceLeaderLoss = CountDownLatch(1)
val bobLeaderGain = CountDownLatch(1)
val chipLeaderLoss = CountDownLatch(1)
val chipLeaderGain = CountDownLatch(1)
var leaderCount = AtomicInteger()
val failures = mutableListOf<String>()
listOf(alice, chip).forEach { client ->
thread{
client.start()
when (client) {
alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain, aliceLeaderLoss))
chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss))
alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, aliceLeaderGain))
chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, chipLeaderGain))
}
client.requestLeadership()
}
}
aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
if (chipLeaderGain.count == 0L) //wait to lose leadership if leader at some point
chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
require(alice.isLeader())
assertFalse(chip.isLeader())
bob.start()
bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain))
bob.addLeadershipListener(SyncHelperListener(bob.nodeId, leaderCount, failures, bobLeaderGain))
bob.requestLeadership()
require(alice.isLeader())
assertFalse(bob.isLeader())
assertFalse(chip.isLeader())
val chipLeaderGain2 = CountDownLatch(1)
val chipLeaderLoss2 = CountDownLatch(1)
chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain2, chipLeaderLoss2))
alice.relinquishLeadership()
aliceLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
if (chipLeaderGain.count == 0L) //wait to lose leadership if gained
chipLeaderLoss2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
assertFalse(alice.isLeader())
require(bob.isLeader())
assertFalse(chip.isLeader())
validateElectionResults(leaderCount, failures)
listOf(alice, bob, chip).forEach { client -> client.close() }
}
@ -278,11 +259,11 @@ class ZkClientTests {
}
val countDownLatch = CountDownLatch(clientList.size)
val leaderBuffer = mutableListOf<String>()
var leaderCount = AtomicInteger()
val failures = mutableListOf<String>()
clientList.forEach { client ->
thread{
client.addLeadershipListener(HelperListener(client.nodeId, leaderBuffer))
client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures))
client.start()
val randomizer = Random()
val actions = listOf(Action.RELINQUISH, Action.REQUEST)
@ -291,63 +272,52 @@ class ZkClientTests {
when(action) {
Action.REQUEST -> client.requestLeadership()
Action.RELINQUISH -> client.relinquishLeadership()
else -> throw IllegalArgumentException("Invalid action choice")
}
Thread.sleep(100)
}
countDownLatch.countDown()
}
}
countDownLatch.await(CLIENT_TIMEOUT, TimeUnit.SECONDS)
//only one leader should exist
var leaderCount = 0
var leaderId = ""
clientList.forEach { client ->
if (client.isLeader()) {
leaderCount++
leaderId = client.nodeId
}
}
require(leaderCount <= 1)
require(leaderBuffer.size <= 1)
if (leaderBuffer.size == 1) {
println(leaderBuffer)
assertEquals(leaderBuffer.first(), leaderId)
}
Thread.sleep(1000) // Wait a bit for curator threads to finish their work
validateElectionResults(leaderCount, failures)
clientList.forEach { client -> client.close() }
}
private fun validateElectionResults(leaderCount: AtomicInteger, failures: MutableList<String>) {
require(leaderCount.get() <= 1)
if (failures.size != 0) {
failures.forEach {
log.error(it)
}
assert(failures.isNotEmpty())
}
}
private enum class Action {
START, STOP, REQUEST, RELINQUISH
REQUEST, RELINQUISH
}
private class HelperListener(private val nodeId: String,
private val leaders: MutableList<String>) : CordaLeaderListener {
@Synchronized
override fun notLeader() {
leaders.remove(nodeId)
}
@Synchronized
override fun isLeader() {
leaders.add(nodeId)
}
}
private class SyncHelperListener(private val nodeId: String,
private val leaderCount: AtomicInteger,
private val failures: MutableList<String>,
private val leaderGain: CountDownLatch = CountDownLatch(1),
private val leaderLoss: CountDownLatch = CountDownLatch(1)) : CordaLeaderListener {
private val leaderLoss: CountDownLatch = CountDownLatch(1)
) : CordaLeaderListener {
override fun notLeader() {
log.info("$nodeId is no longer leader.")
val previousCount = leaderCount.getAndDecrement()
if (previousCount != 1) {
failures.add("LeaderCount expected was 1. Was $previousCount.")
}
leaderLoss.countDown()
}
override fun isLeader() {
log.info("$nodeId is the new leader.")
val previousCount = leaderCount.getAndIncrement()
if (previousCount != 0) {
failures.add("LeaderCount expected was 0. Was $previousCount")
}
leaderGain.countDown()
}
}