From 9a778b009747be6090f99bf46e2a650e45bcffea Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Tue, 6 Nov 2018 10:48:11 +0000 Subject: [PATCH] ENT-2655: added a lock on leader status to ensure no clients can be leader at the same time (#1517) * ENT-2655: added a lock on leader status to ensure no clients can be leader at the same time * ENT-2655: reworked tests to not use hacky timeouts, now check for max 1 leader at any given time, improved error handling in the latch * ENT-2655: address PR comments and use atomic int properly --- .../ha/ExternalMasterElectionService.kt | 19 +-- .../zookeeper/PrioritizedLeaderLatch.kt | 53 ++++-- .../internal/zookeeper/ZkClientTest.kt | 158 +++++++----------- 3 files changed, 102 insertions(+), 128 deletions(-) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt index 49131f6786..f1babc3dc5 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt @@ -14,9 +14,6 @@ import net.corda.nodeapi.internal.zookeeper.ZkLeader import rx.Subscription import java.lang.management.ManagementFactory import java.net.InetAddress -import java.util.concurrent.Executors -import java.util.concurrent.ScheduledFuture -import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock @@ -30,9 +27,6 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration, private var haElector: ZkLeader? = null private var leaderListener: CordaLeaderListener? = null - private val scheduler = Executors.newSingleThreadScheduledExecutor() - private var becomeMasterFuture: ScheduledFuture<*>? = null - private var statusSubscriber: Subscription? = null private val statusFollower = ServiceStateCombiner(listOf(artemisService)) @@ -40,7 +34,6 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration, private companion object { private val log = contextLogger() - private const val DELAYED_LEADER_START = 5000L } init { @@ -51,15 +44,10 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration, private fun becomeMaster() { auditService.statusChangeEvent("Acquired leadership. Going active") stateHelper.active = true - becomeMasterFuture = null } private fun becomeSlave() { log.info("Cancelling leadership") - becomeMasterFuture?.apply { - cancel(false) - } - becomeMasterFuture = null stateHelper.active = false } @@ -102,11 +90,8 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration, } override fun isLeader() { - log.info("Zookeeper has signalled leadership acquired. Delay master claim for a short period to allow old master to close") - becomeMasterFuture?.apply { - cancel(false) - } - becomeMasterFuture = scheduler.schedule(::becomeMaster, DELAYED_LEADER_START, TimeUnit.MILLISECONDS) + log.info("Zookeeper has signalled leadership acquired.") + becomeMaster() } } leaderListener = listener diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt index a537703231..434e732cff 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt @@ -9,9 +9,7 @@ import org.apache.curator.framework.api.CuratorEvent import org.apache.curator.framework.listen.ListenerContainer import org.apache.curator.framework.recipes.AfterConnectionEstablished import org.apache.curator.framework.recipes.leader.LeaderLatchListener -import org.apache.curator.framework.recipes.locks.LockInternals -import org.apache.curator.framework.recipes.locks.LockInternalsSorter -import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver +import org.apache.curator.framework.recipes.locks.* import org.apache.curator.framework.state.ConnectionState import org.apache.curator.framework.state.ConnectionStateListener import org.apache.curator.utils.ZKPaths @@ -43,7 +41,7 @@ import java.util.concurrent.atomic.AtomicReference * @param nodeId the unique identifier used to link a client to a zNode * @param priority an [Int] value that determines this client's priority in the election. The lower the value, the higher the priority */ -internal class PrioritizedLeaderLatch(client: CuratorFramework, +internal class PrioritizedLeaderLatch(private val client: CuratorFramework, private val path: String, private val nodeId: String, private val priority: Int) : Closeable { @@ -51,18 +49,19 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework, val state = AtomicReference(State.CLOSED) private val watchedClient = client.newWatcherRemoveCuratorFramework() + private val leaderLock = InterProcessSemaphoreMutex(client, "$path$LOCK_PATH_NAME") private val hasLeadership = AtomicBoolean(false) private val ourPath = AtomicReference() private val startTask = AtomicReference>() private val listeners = ListenerContainer() - private val connectionStateListener = ConnectionStateListener { _, newState -> handleStateChange(newState) } private companion object { private val log = contextLogger() /** Used to split the zNode path and extract the priority value for sorting and comparison */ private const val LOCK_NAME = "-latch-" + private const val LOCK_PATH_NAME = "-lock" private val sorter = LockInternalsSorter { str, lockName -> StandardLockInternalsDriver.standardFixForSorting(str, lockName) } } @@ -91,15 +90,14 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework, Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started.") cancelStartTask() - try { watchedClient.removeWatchers() + setLeadership(false) setNode(null) } catch (e: Exception) { throw IOException(e) } finally { watchedClient.connectionStateListenable.removeListener(connectionStateListener) - setLeadership(false) } } @@ -129,7 +127,7 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework, try { reset() } catch (e: Exception) { - log.error("An error occurred while resetting leadership.", e) + log.error("An error occurred while resetting leadership for client $nodeId.", e) } } } @@ -152,11 +150,11 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework, try { if (watchedClient.connectionStateErrorPolicy.isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get()) { - log.info("Client reconnected. Resetting latch.") + log.info("Client $nodeId reconnected. Resetting latch.") reset() } } catch (e: Exception) { - log.error("Could not reset leader latch.", e) + log.error("Could not reset leader latch for client $nodeId.", e) setLeadership(false) } } @@ -193,18 +191,39 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework, } val latchName = "$nodeId$LOCK_NAME${"%05d".format(priority)}" // Fixed width priority to ensure numeric sorting - watchedClient.create() - .creatingParentContainersIfNeeded() - .withProtection().withMode(CreateMode.EPHEMERAL) - .inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, latchName), nodeId.toByteArray(Charsets.UTF_8)) + try { + watchedClient.create() + .creatingParentContainersIfNeeded() + .withProtection().withMode(CreateMode.EPHEMERAL) + .inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, latchName), nodeId.toByteArray(Charsets.UTF_8)) + } catch (e: IllegalStateException) { + log.warn("Trying to join election while client is being closed.") + } } private fun setLeadership(newValue: Boolean) { val oldValue = hasLeadership.getAndSet(newValue) - log.info("Setting leadership to $newValue. Old value was $oldValue.") + log.info("Client $nodeId: setting leadership to $newValue; old value was $oldValue.") if (oldValue && !newValue) { listeners.forEach { listener -> listener?.notLeader(); null } + // Release lock after listeners have been notified + try { + if (leaderLock.isAcquiredInThisProcess) { + leaderLock.release() + } + } catch (e: IllegalStateException) { + log.warn("Client $nodeId: tried to release leader lock without owning it.") + } } else if (!oldValue && newValue) { + // Make sure we're the only leader before invoking listeners. This call will block the current thread until + // the lock is acquired + try { + leaderLock.acquire() + } catch (e: IOException) { + log.warn("Client closed while trying to acquire leader lock.") + } catch (e: IllegalStateException) { + log.warn("Client tried to acquire leader lock while closing.") + } listeners.forEach { listener -> listener?.isLeader(); null } } } @@ -273,10 +292,10 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework, latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path) if (Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) { try { - log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.") + log.info("Client ${latch.nodeId}: change detected in children nodes of path ${latch.path}; checking candidates.") latch.processCandidates() } catch (e: Exception) { - log.error("An error occurred checking the leadership.", e) + log.error("Client ${latch.nodeId}: an error occurred checking the leadership.", e) } } } diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt index c8ac077768..0418b230eb 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt @@ -10,8 +10,8 @@ import org.junit.Test import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger import kotlin.concurrent.thread -import kotlin.test.assertEquals class ZkClientTests { @@ -51,15 +51,18 @@ class ZkClientTests { fun `single client becomes leader`() { val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test3"), "test", 0) val leaderGain = CountDownLatch(1) + var leaderCount = AtomicInteger() + val failures = mutableListOf() thread { client.start() - client.addLeadershipListener(SyncHelperListener("test", leaderGain)) + client.addLeadershipListener(SyncHelperListener("test", leaderCount, failures, leaderGain)) client.requestLeadership() } leaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) require(client.isLeader()) + validateElectionResults(leaderCount, failures) client.close() } @@ -68,10 +71,12 @@ class ZkClientTests { val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test4"), "test", 0) val leaderGain = CountDownLatch(1) val leaderLoss = CountDownLatch(1) + var leaderCount = AtomicInteger() + val failures = mutableListOf() thread { client.start() - client.addLeadershipListener(SyncHelperListener("test", leaderGain, leaderLoss)) + client.addLeadershipListener(SyncHelperListener("test", leaderCount, failures, leaderGain, leaderLoss)) client.requestLeadership() } @@ -80,6 +85,8 @@ class ZkClientTests { client.relinquishLeadership() leaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) assertFalse(client.isLeader()) + validateElectionResults(leaderCount, mutableListOf()) + client.close() } @@ -89,18 +96,16 @@ class ZkClientTests { val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "BOB", 1) val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "CHIP", 2) val aliceLeaderGain = CountDownLatch(1) - val bobLeaderGain = CountDownLatch(1) - val bobLeaderLoss = CountDownLatch(1) - val chipLeaderLoss = CountDownLatch(1) - val chipLeaderGain = CountDownLatch(1) + var leaderCount = AtomicInteger() + val failures = mutableListOf() listOf(alice, bob, chip).forEach { client -> thread{ client.start() when (client) { - alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain)) - bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, bobLeaderGain, bobLeaderLoss)) - chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss)) + alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, aliceLeaderGain)) + bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures)) + chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures)) } client.requestLeadership() } @@ -108,13 +113,9 @@ class ZkClientTests { aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) require(alice.isLeader()) - if (bobLeaderGain.count == 0L) //wait to lose leadership if leader at some point - bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) assertFalse(bob.isLeader()) - if (chipLeaderGain.count == 0L) //wait to lose leadership if leader at some point - chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) assertFalse(chip.isLeader()) - + validateElectionResults(leaderCount, failures) listOf(alice, bob, chip).forEach { client -> client.close() } } @@ -125,17 +126,17 @@ class ZkClientTests { val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test6"), "CHIP", 2) val aliceLeaderGain = CountDownLatch(1) val bobLeaderGain = CountDownLatch(1) - val bobLeaderLoss = CountDownLatch(1) - val chipLeaderLoss = CountDownLatch(1) val chipLeaderGain = CountDownLatch(1) + var leaderCount = AtomicInteger() + val failures = mutableListOf() listOf(alice, bob, chip).forEach { client -> thread{ client.start() when (client) { - alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain)) - bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, bobLeaderGain, bobLeaderLoss)) - chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss)) + alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, aliceLeaderGain)) + bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, bobLeaderGain)) + chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, chipLeaderGain)) } client.requestLeadership() } @@ -143,33 +144,23 @@ class ZkClientTests { aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) require(alice.isLeader()) - if (bobLeaderGain.count == 0L) //wait to lose leadership if leader at some point - bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) assertFalse(bob.isLeader()) - if (chipLeaderGain.count == 0L) - chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) assertFalse(chip.isLeader()) //wait to lose leadership if leader at some point - val bobLeaderGain2 = CountDownLatch(1) - bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain2)) - alice.relinquishLeadership() - bobLeaderGain2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) // wait for bob to become leader + bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) // wait for bob to become leader assertFalse(alice.isLeader()) require(bob.isLeader()) assertFalse(chip.isLeader()) - val chipLeaderGain2 = CountDownLatch(1) - chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain2)) - bob.relinquishLeadership() - chipLeaderGain2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + chipLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) assertFalse(alice.isLeader()) assertFalse(bob.isLeader()) require(chip.isLeader()) - + validateElectionResults(leaderCount, failures) listOf(alice, bob, chip).forEach { client -> client.close() } } @@ -180,37 +171,35 @@ class ZkClientTests { val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2000) val aliceLeaderGain = CountDownLatch(1) val bobLeaderGain = CountDownLatch(1) - val bobLeaderLoss = CountDownLatch(1) - val chipLeaderLoss = CountDownLatch(1) val chipLeaderGain = CountDownLatch(1) + var leaderCount = AtomicInteger() + val failures = mutableListOf() chip.start() - chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain, chipLeaderLoss)) + chip.addLeadershipListener(SyncHelperListener(chip.nodeId, leaderCount, failures, chipLeaderGain)) chip.requestLeadership() chipLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) require(chip.isLeader()) bob.start() - bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain, bobLeaderLoss)) + bob.addLeadershipListener(SyncHelperListener(bob.nodeId, leaderCount, failures, bobLeaderGain)) bob.requestLeadership() - chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) require(bob.isLeader()) assertFalse(chip.isLeader()) alice.start() - alice.addLeadershipListener(SyncHelperListener(alice.nodeId, aliceLeaderGain)) + alice.addLeadershipListener(SyncHelperListener(alice.nodeId, leaderCount, failures, aliceLeaderGain)) alice.requestLeadership() - bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) require(alice.isLeader()) assertFalse(bob.isLeader()) assertFalse(chip.isLeader()) - + validateElectionResults(leaderCount, failures) listOf(alice, bob, chip).forEach { client -> client.close() } } @@ -220,49 +209,41 @@ class ZkClientTests { val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "BOB", 1) val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "CHIP", 2) val aliceLeaderGain = CountDownLatch(1) - val aliceLeaderLoss = CountDownLatch(1) val bobLeaderGain = CountDownLatch(1) - val chipLeaderLoss = CountDownLatch(1) val chipLeaderGain = CountDownLatch(1) + var leaderCount = AtomicInteger() + val failures = mutableListOf() listOf(alice, chip).forEach { client -> thread{ client.start() when (client) { - alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain, aliceLeaderLoss)) - chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss)) + alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, aliceLeaderGain)) + chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures, chipLeaderGain)) } client.requestLeadership() } } aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) - if (chipLeaderGain.count == 0L) //wait to lose leadership if leader at some point - chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) require(alice.isLeader()) assertFalse(chip.isLeader()) bob.start() - bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain)) + bob.addLeadershipListener(SyncHelperListener(bob.nodeId, leaderCount, failures, bobLeaderGain)) bob.requestLeadership() require(alice.isLeader()) assertFalse(bob.isLeader()) assertFalse(chip.isLeader()) - val chipLeaderGain2 = CountDownLatch(1) - val chipLeaderLoss2 = CountDownLatch(1) - chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain2, chipLeaderLoss2)) alice.relinquishLeadership() - aliceLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) - if (chipLeaderGain.count == 0L) //wait to lose leadership if gained - chipLeaderLoss2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) assertFalse(alice.isLeader()) require(bob.isLeader()) assertFalse(chip.isLeader()) - + validateElectionResults(leaderCount, failures) listOf(alice, bob, chip).forEach { client -> client.close() } } @@ -278,11 +259,11 @@ class ZkClientTests { } val countDownLatch = CountDownLatch(clientList.size) - val leaderBuffer = mutableListOf() - + var leaderCount = AtomicInteger() + val failures = mutableListOf() clientList.forEach { client -> thread{ - client.addLeadershipListener(HelperListener(client.nodeId, leaderBuffer)) + client.addLeadershipListener(SyncHelperListener(client.nodeId, leaderCount, failures)) client.start() val randomizer = Random() val actions = listOf(Action.RELINQUISH, Action.REQUEST) @@ -291,63 +272,52 @@ class ZkClientTests { when(action) { Action.REQUEST -> client.requestLeadership() Action.RELINQUISH -> client.relinquishLeadership() - else -> throw IllegalArgumentException("Invalid action choice") } - Thread.sleep(100) } - countDownLatch.countDown() } } - countDownLatch.await(CLIENT_TIMEOUT, TimeUnit.SECONDS) - //only one leader should exist - var leaderCount = 0 - var leaderId = "" - - clientList.forEach { client -> - if (client.isLeader()) { - leaderCount++ - leaderId = client.nodeId - } - } - - require(leaderCount <= 1) - require(leaderBuffer.size <= 1) - if (leaderBuffer.size == 1) { - println(leaderBuffer) - assertEquals(leaderBuffer.first(), leaderId) - } + Thread.sleep(1000) // Wait a bit for curator threads to finish their work + validateElectionResults(leaderCount, failures) clientList.forEach { client -> client.close() } } + private fun validateElectionResults(leaderCount: AtomicInteger, failures: MutableList) { + require(leaderCount.get() <= 1) + if (failures.size != 0) { + failures.forEach { + log.error(it) + } + assert(failures.isNotEmpty()) + } + } + private enum class Action { - START, STOP, REQUEST, RELINQUISH + REQUEST, RELINQUISH } - private class HelperListener(private val nodeId: String, - private val leaders: MutableList) : CordaLeaderListener { - @Synchronized - override fun notLeader() { - leaders.remove(nodeId) - } - - @Synchronized - override fun isLeader() { - leaders.add(nodeId) - } - } private class SyncHelperListener(private val nodeId: String, + private val leaderCount: AtomicInteger, + private val failures: MutableList, private val leaderGain: CountDownLatch = CountDownLatch(1), - private val leaderLoss: CountDownLatch = CountDownLatch(1)) : CordaLeaderListener { + private val leaderLoss: CountDownLatch = CountDownLatch(1) + ) : CordaLeaderListener { override fun notLeader() { log.info("$nodeId is no longer leader.") + val previousCount = leaderCount.getAndDecrement() + if (previousCount != 1) { + failures.add("LeaderCount expected was 1. Was $previousCount.") + } leaderLoss.countDown() - } override fun isLeader() { log.info("$nodeId is the new leader.") + val previousCount = leaderCount.getAndIncrement() + if (previousCount != 0) { + failures.add("LeaderCount expected was 0. Was $previousCount") + } leaderGain.countDown() } }