Merge branch 'master' into os-merge-244167d

This commit is contained in:
Shams Asari
2018-07-05 15:47:09 +01:00
16 changed files with 227 additions and 67 deletions

View File

@ -83,13 +83,13 @@ internal class PrioritizedLeaderLatch(client: CuratorFramework,
fun start() {
Preconditions.checkState(state.compareAndSet(State.CLOSED, State.STARTED),
"Cannot be started more than once.")
startTask.set(AfterConnectionEstablished.execute(watchedClient, {
startTask.set(AfterConnectionEstablished.execute(watchedClient) {
try {
internalStart()
} finally {
startTask.set(null)
}
}))
})
}
/**

View File

@ -278,20 +278,25 @@ class ZkClientTests {
@Test
fun `clients randomly do things`() {
val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "ALICE", 0)
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "BOB", 1)
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "CHIP", 2)
val dave = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "DAVE", 3)
val countDownLatch = CountDownLatch(3)
val CLIENTS_NUMBER = 4
val ACTIONS_NUMBER = 100
val CLIENT_TIMEOUT = 60L
val clientList = mutableListOf<ZkClient>()
(1..CLIENTS_NUMBER).forEach {
clientList.add(ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "CLI_${it}", it))
}
val countDownLatch = CountDownLatch(clientList.size)
val leaderBuffer = mutableListOf<String>()
listOf(alice, bob, chip, dave).forEach { client ->
clientList.forEach { client ->
thread{
client.addLeadershipListener(HelperListener(client.nodeId, leaderBuffer))
client.start()
val randomizer = Random()
val actions = listOf(Action.RELINQUISH, Action.REQUEST)
for (i in 1..100) {
for (i in 1..ACTIONS_NUMBER) {
val action = actions[randomizer.nextInt(actions.size)]
when(action) {
Action.REQUEST -> client.requestLeadership()
@ -301,17 +306,16 @@ class ZkClientTests {
Thread.sleep(100)
}
countDownLatch.countDown()
}
}
countDownLatch.await(120, TimeUnit.SECONDS)
countDownLatch.await(CLIENT_TIMEOUT, TimeUnit.SECONDS)
//only one leader should exist
var leaderCount = 0
var leaderId = ""
listOf(alice, bob, chip, dave).forEach { client ->
clientList.forEach { client ->
if (client.isLeader()) {
leaderCount++
leaderId = client.nodeId
@ -324,7 +328,8 @@ class ZkClientTests {
println(leaderBuffer)
assertEquals(leaderBuffer.first(), leaderId)
}
listOf(alice, bob, chip, dave).forEach { client -> client.close() }
clientList.forEach { client -> client.close() }
}
private enum class Action {