mirror of
https://github.com/corda/corda.git
synced 2025-01-27 06:39:38 +00:00
Merge remote-tracking branch 'origin/master' into aslemmer-merge-19-Feb
This commit is contained in:
commit
a548942a0a
@ -77,6 +77,7 @@ buildscript {
|
||||
ext.selenium_version = '3.8.1'
|
||||
ext.ghostdriver_version = '2.1.0'
|
||||
ext.eaagentloader_version = '1.0.3'
|
||||
ext.curator_version = '4.0.0'
|
||||
|
||||
// Update 121 is required for ObjectInputFilter and at time of writing 131 was latest:
|
||||
ext.java8_minUpdateVersion = '131'
|
||||
|
@ -41,6 +41,10 @@ dependencies {
|
||||
compile "org.liquibase:liquibase-core:$liquibase_version"
|
||||
runtime 'com.mattbertolini:liquibase-slf4j:2.0.0'
|
||||
|
||||
// Apache Curator: a client library for Zookeeper
|
||||
compile "org.apache.curator:curator-recipes:${curator_version}"
|
||||
testCompile "org.apache.curator:curator-test:${curator_version}"
|
||||
|
||||
// Unit testing helpers.
|
||||
testCompile "junit:junit:$junit_version"
|
||||
testCompile "org.assertj:assertj-core:$assertj_version"
|
||||
|
@ -0,0 +1,280 @@
|
||||
package net.corda.nodeapi.internal.zookeeper
|
||||
|
||||
import com.google.common.base.Preconditions
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.api.BackgroundCallback
|
||||
import org.apache.curator.framework.api.CuratorEvent
|
||||
import org.apache.curator.framework.listen.ListenerContainer
|
||||
import org.apache.curator.framework.recipes.AfterConnectionEstablished
|
||||
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
|
||||
import org.apache.curator.framework.recipes.locks.LockInternals
|
||||
import org.apache.curator.framework.recipes.locks.LockInternalsSorter
|
||||
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver
|
||||
import org.apache.curator.framework.state.ConnectionState
|
||||
import org.apache.curator.framework.state.ConnectionStateListener
|
||||
import org.apache.curator.utils.ZKPaths
|
||||
import org.apache.zookeeper.CreateMode
|
||||
import org.apache.zookeeper.KeeperException
|
||||
import org.apache.zookeeper.WatchedEvent
|
||||
import org.apache.zookeeper.Watcher
|
||||
import java.io.Closeable
|
||||
import java.io.IOException
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
/**
|
||||
* A modified version of the Apache Curator [LeaderLatch] recipe. It allows prioritized leader election.
|
||||
* Upon start, a zNode is created using the [nodeId] and [priority] arguments of the constructor.
|
||||
* If the creation is successful, a [Watcher] is created for the [path]. I will be triggered by
|
||||
* [Watcher.Event.EventType.NodeChildrenChanged] events which indicate that a candidate has left or joined the
|
||||
* election process. After receiving this event, the latch will check the candidates and their priorities to determine
|
||||
* if it is leader or not.
|
||||
*
|
||||
* Clients with the same priority are treated on a first-come first-served basis.
|
||||
*
|
||||
* Because [Zookeeper] cannot guarantee that path changes are reliably seen, a new watcher is immediately set when the
|
||||
* existing one is triggered.
|
||||
*
|
||||
* @param client the [CuratorFramework] instance used to manage the Zookeeper connection
|
||||
* @param path the path used to create zNodes for the election candidates
|
||||
* @param nodeId the unique identifier used to link a client to a zNode
|
||||
* @param priority an [Int] value that determines this client's priority in the election. The lower the value, the higher the priority
|
||||
*/
|
||||
internal class PrioritizedLeaderLatch(client: CuratorFramework,
|
||||
private val path: String,
|
||||
private val nodeId: String,
|
||||
private val priority: Int) : Closeable {
|
||||
|
||||
val state = AtomicReference<State>(State.CLOSED)
|
||||
|
||||
private val watchedClient = client.newWatcherRemoveCuratorFramework()
|
||||
private val hasLeadership = AtomicBoolean(false)
|
||||
private val ourPath = AtomicReference<String>()
|
||||
private val startTask = AtomicReference<Future<*>>()
|
||||
|
||||
private val listeners = ListenerContainer<LeaderLatchListener>()
|
||||
|
||||
private val connectionStateListener = ConnectionStateListener { _, newState -> handleStateChange(newState) }
|
||||
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
/** Used to split the zNode path and extract the priority value for sorting and comparison */
|
||||
private const val LOCK_NAME = "-latch-"
|
||||
private val sorter = LockInternalsSorter { str, lockName -> StandardLockInternalsDriver.standardFixForSorting(str, lockName) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Joins the election process
|
||||
*/
|
||||
@Throws(Exception::class)
|
||||
fun start() {
|
||||
Preconditions.checkState(state.compareAndSet(State.CLOSED, State.STARTED),
|
||||
"Cannot be started more than once.")
|
||||
startTask.set(AfterConnectionEstablished.execute(watchedClient, {
|
||||
try {
|
||||
internalStart()
|
||||
} finally {
|
||||
startTask.set(null)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
/**
|
||||
* Leaves the election process, relinquishing leadership if acquired.
|
||||
* Cleans up all watchers and connection listener
|
||||
*/
|
||||
@Throws(IOException::class, IllegalStateException::class)
|
||||
override fun close() {
|
||||
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED),
|
||||
"Already closed or has not been started.")
|
||||
cancelStartTask()
|
||||
|
||||
try {
|
||||
watchedClient.removeWatchers()
|
||||
setNode(null)
|
||||
} catch (e: Exception) {
|
||||
throw IOException(e)
|
||||
} finally {
|
||||
watchedClient.connectionStateListenable.removeListener(connectionStateListener)
|
||||
setLeadership(false)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an election listener that will remain until explicitly removed
|
||||
* @param listener a [LeaderLatchListener] instance
|
||||
*/
|
||||
fun addListener(listener: LeaderLatchListener) { listeners.addListener(listener) }
|
||||
|
||||
/**
|
||||
* Removes the listener passed as argument
|
||||
* @param listener a [LeaderLatchListener] instance
|
||||
*/
|
||||
fun removeListener(listener: LeaderLatchListener) { listeners.removeListener(listener) }
|
||||
|
||||
/**
|
||||
* @return [true] if leader, [false] otherwise
|
||||
*/
|
||||
fun hasLeadership(): Boolean {
|
||||
return State.STARTED == state.get() && hasLeadership.get()
|
||||
}
|
||||
|
||||
private fun internalStart() {
|
||||
if (State.STARTED == state.get()) {
|
||||
log.info("$nodeId latch started for path $path.")
|
||||
watchedClient.connectionStateListenable.addListener(connectionStateListener)
|
||||
try {
|
||||
reset()
|
||||
} catch (e: Exception) {
|
||||
log.error("An error occurred while resetting leadership.", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun cancelStartTask(): Boolean {
|
||||
val localStartTask = startTask.getAndSet(null)
|
||||
|
||||
if (localStartTask != null) {
|
||||
localStartTask.cancel(true)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
private fun handleStateChange(newState: ConnectionState?) {
|
||||
log.info("State change. New state: $newState")
|
||||
when (newState) {
|
||||
ConnectionState.RECONNECTED -> {
|
||||
try {
|
||||
if (watchedClient.connectionStateErrorPolicy.isErrorState(ConnectionState.SUSPENDED) ||
|
||||
!hasLeadership.get()) {
|
||||
log.info("Client reconnected. Resetting latch.")
|
||||
reset()
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
log.error("Could not reset leader latch.", e)
|
||||
setLeadership(false)
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionState.SUSPENDED -> {
|
||||
if (watchedClient.connectionStateErrorPolicy.isErrorState(ConnectionState.SUSPENDED))
|
||||
setLeadership(false)
|
||||
}
|
||||
|
||||
ConnectionState.LOST -> setLeadership(false)
|
||||
}
|
||||
}
|
||||
|
||||
@Throws(Exception::class)
|
||||
private fun reset() {
|
||||
setLeadership(false)
|
||||
setNode(null)
|
||||
|
||||
val joinElectionCallback = BackgroundCallback { _, event ->
|
||||
if (event.resultCode == KeeperException.Code.OK.intValue()) {
|
||||
setNode(event.name)
|
||||
if (State.CLOSED == state.get())
|
||||
setNode(null)
|
||||
else {
|
||||
log.info("$nodeId is joining election with node ${ourPath.get()}")
|
||||
watchedClient.children.usingWatcher(ElectionWatcher(this)).inBackground(NoNodeCallback(this)).forPath(path)
|
||||
processCandidates()
|
||||
}
|
||||
} else {
|
||||
log.error("processCandidates() failed: " + event.resultCode)
|
||||
}
|
||||
}
|
||||
|
||||
watchedClient.create()
|
||||
.creatingParentContainersIfNeeded()
|
||||
.withProtection().withMode(CreateMode.EPHEMERAL)
|
||||
.inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, "$nodeId$LOCK_NAME$priority"), nodeId.toByteArray(Charsets.UTF_8))
|
||||
}
|
||||
|
||||
private fun setLeadership(newValue: Boolean) {
|
||||
val oldValue = hasLeadership.getAndSet(newValue)
|
||||
log.info("Setting leadership to $newValue. Old value was $oldValue.")
|
||||
if (oldValue && !newValue) {
|
||||
listeners.forEach { listener -> listener?.notLeader(); null }
|
||||
} else if (!oldValue && newValue) {
|
||||
listeners.forEach { listener -> listener?.isLeader(); null }
|
||||
}
|
||||
}
|
||||
|
||||
@Throws(Exception::class)
|
||||
private fun processCandidates() {
|
||||
val callback = BackgroundCallback { _, event ->
|
||||
if (event.resultCode == KeeperException.Code.OK.intValue())
|
||||
checkLeadership(event.children)
|
||||
}
|
||||
|
||||
watchedClient.children.inBackground(callback).forPath(ZKPaths.makePath(path, null))
|
||||
}
|
||||
|
||||
@Throws(Exception::class)
|
||||
private fun checkLeadership(children: List<String>) {
|
||||
val localOurPath = ourPath.get()
|
||||
val sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children)
|
||||
val ownIndex = if (localOurPath != null) sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) else -1
|
||||
log.debug("Election candidates are: $sortedChildren")
|
||||
when {
|
||||
ownIndex < 0 -> {
|
||||
log.error("Can't find our zNode[$nodeId]. Resetting. Index: $ownIndex. My path is ${ourPath.get()}")
|
||||
reset()
|
||||
}
|
||||
|
||||
ownIndex == 0 -> {
|
||||
if (!hasLeadership.get())
|
||||
setLeadership(true)
|
||||
}
|
||||
|
||||
else -> {
|
||||
if (hasLeadership.get()) {
|
||||
setLeadership(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Throws(Exception::class)
|
||||
private fun setNode(newValue: String?) {
|
||||
val oldPath: String? = ourPath.getAndSet(newValue)
|
||||
if (oldPath != null) {
|
||||
log.info("Deleting node $oldPath.")
|
||||
watchedClient.delete().guaranteed().inBackground().forPath(oldPath)
|
||||
}
|
||||
}
|
||||
|
||||
enum class State {
|
||||
STARTED,
|
||||
CLOSED
|
||||
}
|
||||
|
||||
private class NoNodeCallback(private val latch: PrioritizedLeaderLatch) : BackgroundCallback {
|
||||
override fun processResult(client: CuratorFramework, event: CuratorEvent) {
|
||||
if (event.resultCode == KeeperException.Code.NONODE.intValue())
|
||||
if (event.resultCode == KeeperException.Code.NONODE.intValue())
|
||||
latch.reset()
|
||||
}
|
||||
}
|
||||
|
||||
private class ElectionWatcher(private val latch: PrioritizedLeaderLatch) : Watcher {
|
||||
override fun process(event: WatchedEvent) {
|
||||
log.info("Client ${latch.nodeId} detected event ${event.type}.")
|
||||
latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path)
|
||||
if (State.STARTED == latch.state.get() && Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) {
|
||||
try {
|
||||
log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.")
|
||||
latch.processCandidates()
|
||||
} catch (e: Exception) {
|
||||
log.error("An error occurred checking the leadership.", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
package net.corda.nodeapi.internal.zookeeper
|
||||
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.apache.curator.RetryPolicy
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory
|
||||
import org.apache.curator.framework.imps.CuratorFrameworkState
|
||||
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
|
||||
import org.apache.curator.retry.RetryOneTime
|
||||
import org.apache.curator.utils.CloseableUtils
|
||||
|
||||
/**
|
||||
* Simple Zookeeper client that offers priority based leader election.
|
||||
*
|
||||
* @param connectionString the connection string(i.e. localhost:1234) used to connect to the Zookeeper server/cluster
|
||||
* @param electionPath the zNode path used for the election process. Clients that compete for leader must use the same path
|
||||
* @param nodeId unique client identifier used in the creation of child zNodes
|
||||
* @param priority indicates the priority of the client in the election process. Low value means high priority(a client
|
||||
* with [priority] set to 0 will have become leader before a client with [priority] 1
|
||||
* @param retryPolicy is an instance of [RetryPolicy] and indicates the process in case connection to Zookeeper server/cluster
|
||||
* is lost. If no policy is supplied, [RetryOneTime] will be used with 500ms before attempting to reconnect
|
||||
*/
|
||||
class ZkClient(connectionString: String,
|
||||
electionPath: String,
|
||||
val nodeId: String,
|
||||
val priority: Int,
|
||||
retryPolicy: RetryPolicy = RetryOneTime(500)) : ZkLeader {
|
||||
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private val client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
|
||||
private val leaderLatch = PrioritizedLeaderLatch(client, electionPath, nodeId, priority)
|
||||
|
||||
override fun start() {
|
||||
if (client.state != CuratorFrameworkState.STARTED) {
|
||||
log.info("Client $nodeId is starting.")
|
||||
client.start()
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
log.info("Client $nodeId is stopping.")
|
||||
if (leaderLatch.state.get() != PrioritizedLeaderLatch.State.CLOSED)
|
||||
CloseableUtils.closeQuietly(leaderLatch)
|
||||
CloseableUtils.closeQuietly(client)
|
||||
}
|
||||
|
||||
@Throws(IllegalStateException::class)
|
||||
override fun requestLeadership() {
|
||||
if (client.state != CuratorFrameworkState.STARTED)
|
||||
throw(IllegalStateException("Client $nodeId must be started before attempting to be leader."))
|
||||
|
||||
if (leaderLatch.state.get() != PrioritizedLeaderLatch.State.STARTED) {
|
||||
log.info("Client $nodeId is attempting to become leader.")
|
||||
leaderLatch.start()
|
||||
}
|
||||
}
|
||||
|
||||
override fun relinquishLeadership() {
|
||||
if (leaderLatch.hasLeadership()) {
|
||||
log.info("Client $nodeId is relinquishing leadership.")
|
||||
CloseableUtils.closeQuietly(leaderLatch)
|
||||
}
|
||||
}
|
||||
|
||||
override fun isLeader(): Boolean{
|
||||
return leaderLatch.hasLeadership()
|
||||
}
|
||||
|
||||
override fun isStarted(): Boolean {
|
||||
return client.state == CuratorFrameworkState.STARTED
|
||||
}
|
||||
|
||||
override fun addLeadershipListener(listener: LeaderLatchListener) {
|
||||
leaderLatch.addListener(listener)
|
||||
}
|
||||
|
||||
|
||||
override fun removeLeaderShipListener(listener: LeaderLatchListener) {
|
||||
leaderLatch.removeListener(listener)
|
||||
}
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
package net.corda.nodeapi.internal.zookeeper
|
||||
|
||||
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
|
||||
|
||||
interface ZkLeader {
|
||||
/**
|
||||
* Starts the client and connects to the Zookeeper server.
|
||||
*/
|
||||
fun start()
|
||||
|
||||
/**
|
||||
* Closes the connection to the Zookeeper server. If the client is involved in the election process, it will drop out
|
||||
* and relinquish leadership.
|
||||
*/
|
||||
fun close()
|
||||
|
||||
/**
|
||||
* Joins election process. Subsequent calls will have no effect if client is leader or a candidate.
|
||||
* Throws [IllegalStateException] if the client isn't started.
|
||||
*/
|
||||
@Throws(IllegalStateException::class)
|
||||
fun requestLeadership()
|
||||
|
||||
/**
|
||||
* Withdraws client from the election process if it is the leader. A new election will be triggered for remaining
|
||||
* candidates. If the client isn't the leader, nothing will happen.
|
||||
*/
|
||||
fun relinquishLeadership()
|
||||
|
||||
/**
|
||||
* @param listener an instance of [LeaderLatchListener] that will be used for election notifications
|
||||
*/
|
||||
fun addLeadershipListener(listener: LeaderLatchListener)
|
||||
|
||||
/**
|
||||
* @param listener the [LeaderLatchListener] instance to be removed
|
||||
*/
|
||||
fun removeLeaderShipListener(listener: LeaderLatchListener)
|
||||
|
||||
/**
|
||||
* @return [true] if client is the current leader, [false] otherwise
|
||||
*/
|
||||
fun isLeader(): Boolean
|
||||
|
||||
/**
|
||||
* @return [true] if client is started, [false] otherwise
|
||||
*/
|
||||
fun isStarted(): Boolean
|
||||
}
|
@ -0,0 +1,70 @@
|
||||
package net.corda.nodeapi.internal.zookeeper
|
||||
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory
|
||||
import org.apache.curator.retry.RetryOneTime
|
||||
import org.apache.curator.test.TestingServer
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.fail
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.IOException
|
||||
|
||||
class PrioritizedLeaderLatchTest {
|
||||
|
||||
private lateinit var zkServer: TestingServer
|
||||
private companion object {
|
||||
private val ELECTION_PATH = "/example/leader"
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
zkServer = TestingServer(true)
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
zkServer.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `start stop`() {
|
||||
val curatorClient = CuratorFrameworkFactory.newClient(zkServer.connectString, RetryOneTime(100))
|
||||
curatorClient.start()
|
||||
val latch = PrioritizedLeaderLatch(curatorClient, ELECTION_PATH, "test", 0)
|
||||
assertEquals(PrioritizedLeaderLatch.State.CLOSED, latch.state.get())
|
||||
try {
|
||||
latch.start()
|
||||
} catch (e: Exception) {
|
||||
fail(e.message)
|
||||
}
|
||||
|
||||
assertEquals(PrioritizedLeaderLatch.State.STARTED, latch.state.get())
|
||||
|
||||
try {
|
||||
latch.close()
|
||||
} catch (e:IOException) {
|
||||
fail(e.message)
|
||||
}
|
||||
|
||||
assertEquals(PrioritizedLeaderLatch.State.CLOSED, latch.state.get())
|
||||
curatorClient.close()
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException::class)
|
||||
fun `double start`() {
|
||||
val curatorClient = CuratorFrameworkFactory.newClient(zkServer.connectString, RetryOneTime(100))
|
||||
curatorClient.start()
|
||||
val latch = PrioritizedLeaderLatch(curatorClient, ELECTION_PATH, "test", 0)
|
||||
latch.start()
|
||||
latch.start()
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException::class)
|
||||
fun `close while state is closed`() {
|
||||
val curatorClient = CuratorFrameworkFactory.newClient(zkServer.connectString, RetryOneTime(100))
|
||||
curatorClient.start()
|
||||
val latch = PrioritizedLeaderLatch(curatorClient, ELECTION_PATH, "test", 0)
|
||||
latch.close()
|
||||
}
|
||||
}
|
@ -0,0 +1,349 @@
|
||||
package net.corda.nodeapi.internal.zookeeper
|
||||
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
|
||||
import org.apache.curator.test.TestingServer
|
||||
import org.apache.curator.utils.ZKPaths
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertFalse
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class ZkClientTests {
|
||||
|
||||
private lateinit var zkServer: TestingServer
|
||||
private companion object {
|
||||
private val ELECTION_PATH = "/example/leader"
|
||||
private val ELECTION_TIMEOUT = 2000L
|
||||
private val log = contextLogger()
|
||||
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
zkServer = TestingServer(true)
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
zkServer.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `start and stop client`() {
|
||||
val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test1"), "test", 0)
|
||||
client.start()
|
||||
assertFalse(client.isLeader())
|
||||
client.close()
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException::class)
|
||||
fun `client requests leader before start`() {
|
||||
val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test2"), "test", 0)
|
||||
client.requestLeadership()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `single client becomes leader`() {
|
||||
val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test3"), "test", 0)
|
||||
val leaderGain = CountDownLatch(1)
|
||||
|
||||
thread {
|
||||
client.start()
|
||||
client.addLeadershipListener(SyncHelperListener("test", leaderGain))
|
||||
client.requestLeadership()
|
||||
}
|
||||
|
||||
leaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assert(client.isLeader())
|
||||
client.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `single client relinquishes leadership`() {
|
||||
val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test4"), "test", 0)
|
||||
val leaderGain = CountDownLatch(1)
|
||||
val leaderLoss = CountDownLatch(1)
|
||||
|
||||
thread {
|
||||
client.start()
|
||||
client.addLeadershipListener(SyncHelperListener("test", leaderGain, leaderLoss))
|
||||
client.requestLeadership()
|
||||
}
|
||||
|
||||
leaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assert(client.isLeader())
|
||||
client.relinquishLeadership()
|
||||
leaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assertFalse(client.isLeader())
|
||||
client.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `client with highest priority becomes leader`() {
|
||||
val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "ALICE", 0)
|
||||
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "BOB", 1)
|
||||
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "CHIP", 2)
|
||||
val aliceLeaderGain = CountDownLatch(1)
|
||||
val bobLeaderGain = CountDownLatch(1)
|
||||
val bobLeaderLoss = CountDownLatch(1)
|
||||
val chipLeaderLoss = CountDownLatch(1)
|
||||
val chipLeaderGain = CountDownLatch(1)
|
||||
|
||||
listOf(alice, bob, chip).forEach { client ->
|
||||
thread{
|
||||
client.start()
|
||||
when (client) {
|
||||
alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain))
|
||||
bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, bobLeaderGain, bobLeaderLoss))
|
||||
chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss))
|
||||
}
|
||||
client.requestLeadership()
|
||||
}
|
||||
}
|
||||
|
||||
aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assert(alice.isLeader())
|
||||
if (bobLeaderGain.count == 0L) //wait to lose leadership if leader at some point
|
||||
bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assertFalse(bob.isLeader())
|
||||
if (chipLeaderGain.count == 0L) //wait to lose leadership if leader at some point
|
||||
chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assertFalse(chip.isLeader())
|
||||
|
||||
listOf(alice, bob, chip).forEach { client -> client.close() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `leader relinquishes, next highest priority takes over`() {
|
||||
val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test6"), "ALICE", 0)
|
||||
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test6"), "BOB", 1)
|
||||
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test6"), "CHIP", 2)
|
||||
val aliceLeaderGain = CountDownLatch(1)
|
||||
val bobLeaderGain = CountDownLatch(1)
|
||||
val bobLeaderLoss = CountDownLatch(1)
|
||||
val chipLeaderLoss = CountDownLatch(1)
|
||||
val chipLeaderGain = CountDownLatch(1)
|
||||
|
||||
listOf(alice, bob, chip).forEach { client ->
|
||||
thread{
|
||||
client.start()
|
||||
when (client) {
|
||||
alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain))
|
||||
bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, bobLeaderGain, bobLeaderLoss))
|
||||
chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss))
|
||||
}
|
||||
client.requestLeadership()
|
||||
}
|
||||
}
|
||||
|
||||
aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assert(alice.isLeader())
|
||||
if (bobLeaderGain.count == 0L) //wait to lose leadership if leader at some point
|
||||
bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assertFalse(bob.isLeader())
|
||||
if (chipLeaderGain.count == 0L)
|
||||
chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assertFalse(chip.isLeader()) //wait to lose leadership if leader at some point
|
||||
|
||||
val bobLeaderGain2 = CountDownLatch(1)
|
||||
bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain2))
|
||||
|
||||
alice.relinquishLeadership()
|
||||
bobLeaderGain2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) // wait for bob to become leader
|
||||
|
||||
assertFalse(alice.isLeader())
|
||||
assert(bob.isLeader())
|
||||
assertFalse(chip.isLeader())
|
||||
|
||||
val chipLeaderGain2 = CountDownLatch(1)
|
||||
chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain2))
|
||||
|
||||
bob.relinquishLeadership()
|
||||
chipLeaderGain2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
|
||||
assertFalse(alice.isLeader())
|
||||
assertFalse(bob.isLeader())
|
||||
assert(chip.isLeader())
|
||||
|
||||
listOf(alice, bob, chip).forEach { client -> client.close() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `clients with higher priority join and take leadership`() {
|
||||
val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "ALICE", 0)
|
||||
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "BOB", 1)
|
||||
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2)
|
||||
val aliceLeaderGain = CountDownLatch(1)
|
||||
val bobLeaderGain = CountDownLatch(1)
|
||||
val bobLeaderLoss = CountDownLatch(1)
|
||||
val chipLeaderLoss = CountDownLatch(1)
|
||||
val chipLeaderGain = CountDownLatch(1)
|
||||
|
||||
chip.start()
|
||||
chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain, chipLeaderLoss))
|
||||
chip.requestLeadership()
|
||||
|
||||
chipLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assert(chip.isLeader())
|
||||
|
||||
bob.start()
|
||||
bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain, bobLeaderLoss))
|
||||
bob.requestLeadership()
|
||||
|
||||
chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assert(bob.isLeader())
|
||||
assertFalse(chip.isLeader())
|
||||
|
||||
alice.start()
|
||||
alice.addLeadershipListener(SyncHelperListener(alice.nodeId, aliceLeaderGain))
|
||||
alice.requestLeadership()
|
||||
|
||||
bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
|
||||
assert(alice.isLeader())
|
||||
assertFalse(bob.isLeader())
|
||||
assertFalse(chip.isLeader())
|
||||
|
||||
listOf(alice, bob, chip).forEach { client -> client.close() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `client with mid-level priority joins and becomes leader after current leader relinquishes`() {
|
||||
val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "ALICE", 0)
|
||||
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "BOB", 1)
|
||||
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "CHIP", 2)
|
||||
val aliceLeaderGain = CountDownLatch(1)
|
||||
val aliceLeaderLoss = CountDownLatch(1)
|
||||
val bobLeaderGain = CountDownLatch(1)
|
||||
val chipLeaderLoss = CountDownLatch(1)
|
||||
val chipLeaderGain = CountDownLatch(1)
|
||||
|
||||
listOf(alice, chip).forEach { client ->
|
||||
thread{
|
||||
client.start()
|
||||
when (client) {
|
||||
alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain, aliceLeaderLoss))
|
||||
chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss))
|
||||
}
|
||||
client.requestLeadership()
|
||||
}
|
||||
}
|
||||
|
||||
aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
if (chipLeaderGain.count == 0L) //wait to lose leadership if leader at some point
|
||||
chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
assert(alice.isLeader())
|
||||
assertFalse(chip.isLeader())
|
||||
|
||||
bob.start()
|
||||
bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain))
|
||||
bob.requestLeadership()
|
||||
|
||||
assert(alice.isLeader())
|
||||
assertFalse(bob.isLeader())
|
||||
assertFalse(chip.isLeader())
|
||||
|
||||
val chipLeaderGain2 = CountDownLatch(1)
|
||||
val chipLeaderLoss2 = CountDownLatch(1)
|
||||
chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain2, chipLeaderLoss2))
|
||||
alice.relinquishLeadership()
|
||||
aliceLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
if (chipLeaderGain.count == 0L) //wait to lose leadership if gained
|
||||
chipLeaderLoss2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||
|
||||
assertFalse(alice.isLeader())
|
||||
assert(bob.isLeader())
|
||||
assertFalse(chip.isLeader())
|
||||
|
||||
listOf(alice, bob, chip).forEach { client -> client.close() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `clients randomly do things`() {
|
||||
val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "ALICE", 0)
|
||||
val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "BOB", 1)
|
||||
val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "CHIP", 2)
|
||||
val dave = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "DAVE", 3)
|
||||
val countDownLatch = CountDownLatch(3)
|
||||
val leaderBuffer = mutableListOf<String>()
|
||||
|
||||
listOf(alice, bob, chip, dave).forEach { client ->
|
||||
thread{
|
||||
client.addLeadershipListener(HelperListener(client.nodeId, leaderBuffer))
|
||||
client.start()
|
||||
val randomizer = Random()
|
||||
val actions = listOf(Action.RELINQUISH, Action.REQUEST)
|
||||
for (i in 1..100) {
|
||||
val action = actions[randomizer.nextInt(actions.size)]
|
||||
when(action) {
|
||||
Action.REQUEST -> client.requestLeadership()
|
||||
Action.RELINQUISH -> client.relinquishLeadership()
|
||||
}
|
||||
Thread.sleep(100)
|
||||
}
|
||||
|
||||
|
||||
countDownLatch.countDown()
|
||||
}
|
||||
}
|
||||
|
||||
countDownLatch.await(120, TimeUnit.SECONDS)
|
||||
//only one leader should exist
|
||||
var leaderCount = 0
|
||||
var leaderId = ""
|
||||
|
||||
listOf(alice, bob, chip, dave).forEach { client ->
|
||||
if (client.isLeader()) {
|
||||
leaderCount++
|
||||
leaderId = client.nodeId
|
||||
}
|
||||
}
|
||||
|
||||
assert(leaderCount <= 1)
|
||||
assert(leaderBuffer.size <= 1)
|
||||
if (leaderBuffer.size == 1) {
|
||||
println(leaderBuffer)
|
||||
assertEquals(leaderBuffer.first(), leaderId)
|
||||
}
|
||||
listOf(alice, bob, chip, dave).forEach { client -> client.close() }
|
||||
}
|
||||
|
||||
private enum class Action {
|
||||
START, STOP, REQUEST, RELINQUISH
|
||||
}
|
||||
|
||||
private class HelperListener(private val nodeId: String,
|
||||
private val leaders: MutableList<String>) : LeaderLatchListener{
|
||||
@Synchronized
|
||||
override fun notLeader() {
|
||||
leaders.remove(nodeId)
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun isLeader() {
|
||||
leaders.add(nodeId)
|
||||
}
|
||||
}
|
||||
private class SyncHelperListener(private val nodeId: String,
|
||||
private val leaderGain: CountDownLatch = CountDownLatch(1),
|
||||
private val leaderLoss: CountDownLatch = CountDownLatch(1)) : LeaderLatchListener {
|
||||
override fun notLeader() {
|
||||
log.info("$nodeId is no longer leader.")
|
||||
leaderLoss.countDown()
|
||||
|
||||
}
|
||||
override fun isLeader() {
|
||||
log.info("$nodeId is the new leader.")
|
||||
leaderGain.countDown()
|
||||
}
|
||||
}
|
||||
}
|
@ -14,16 +14,15 @@ import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.services.config.GraphiteOptions
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.RelayConfiguration
|
||||
import net.corda.node.services.statemachine.MultiThreadedStateMachineManager
|
||||
import net.corda.node.services.statemachine.SingleThreadedStateMachineManager
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import org.fusesource.jansi.Ansi
|
||||
import org.fusesource.jansi.AnsiConsole
|
||||
import java.io.IOException
|
||||
import java.net.Inet6Address
|
||||
import java.net.InetAddress
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
@ -38,11 +37,18 @@ open class EnterpriseNode(configuration: NodeConfiguration,
|
||||
private val logger by lazy { loggerFor<EnterpriseNode>() }
|
||||
|
||||
private fun defaultGraphitePrefix(legalName: CordaX500Name): String {
|
||||
return legalName.organisation + "_" + InetAddress.getLocalHost().hostAddress.trim().replace(".", "_")
|
||||
return (legalName.organisation + "_" + legalName.locality + "_" + legalName.country + "_" + Inet6Address.getLocalHost().hostAddress)
|
||||
}
|
||||
|
||||
private fun getGraphitePrefix(configuration: NodeConfiguration): String {
|
||||
return configuration.graphiteOptions!!.prefix ?: defaultGraphitePrefix(configuration.myLegalName)
|
||||
fun getGraphitePrefix(configuration: NodeConfiguration): String {
|
||||
val customPrefix = configuration.graphiteOptions!!.prefix
|
||||
// Create a graphite prefix stripping all non-allowed characteres
|
||||
val graphiteName = (customPrefix ?: defaultGraphitePrefix(configuration.myLegalName))
|
||||
.trim().replace(Regex("[^0-9a-zA-Z_]"), "_")
|
||||
if (customPrefix != null && graphiteName != customPrefix) {
|
||||
logger.warn("Invalid graphite prefix ${customPrefix} specified in config - got mangled to ${graphiteName}. Only letters, numbers and underscores are allowed")
|
||||
}
|
||||
return graphiteName
|
||||
}
|
||||
}
|
||||
|
||||
@ -140,7 +146,7 @@ D""".trimStart()
|
||||
GraphiteReporter.forRegistry(metrics)
|
||||
.prefixedWith(getGraphitePrefix(configuration))
|
||||
.convertDurationsTo(TimeUnit.MILLISECONDS)
|
||||
.convertRatesTo(TimeUnit.MINUTES)
|
||||
.convertRatesTo(TimeUnit.SECONDS)
|
||||
.filter(MetricFilter.ALL)
|
||||
.build(PickledGraphite(configuration.graphiteOptions!!.server, configuration.graphiteOptions!!.port))
|
||||
.start(configuration.graphiteOptions!!.sampleInvervallSeconds, TimeUnit.SECONDS)
|
||||
@ -181,4 +187,4 @@ D""".trimStart()
|
||||
return super.makeStateMachineManager(database)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,29 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.node.services.config.GraphiteOptions
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import org.junit.Test
|
||||
|
||||
class EnterpriseNodeTest {
|
||||
@Test
|
||||
fun `Check sanitizing of graphite names`() {
|
||||
checkReplacement("abc", "abc")
|
||||
checkReplacement("abc.1.2", "abc_1_2")
|
||||
checkReplacement("abc", "foo__bar_", "foo (bar)")
|
||||
|
||||
}
|
||||
|
||||
fun checkReplacement(orgname: String, expectedName: String, custom: String? = null) {
|
||||
val nodeConfig = mock<NodeConfiguration>() {
|
||||
whenever(it.myLegalName).thenReturn(CordaX500Name(orgname, "London", "GB"))
|
||||
whenever(it.graphiteOptions).thenReturn(GraphiteOptions("server", 12345, custom))
|
||||
}
|
||||
|
||||
val expectedPattern = if (custom == null) "${expectedName}_London_GB_\\d+_\\d+_\\d+_\\d+" else expectedName
|
||||
val createdName = EnterpriseNode.getGraphitePrefix(nodeConfig)
|
||||
assert(Regex(expectedPattern).matches(createdName), { "${createdName} did not match ${expectedPattern}" })
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user