diff --git a/build.gradle b/build.gradle index 2bd54c41d8..57211a69d0 100644 --- a/build.gradle +++ b/build.gradle @@ -77,6 +77,7 @@ buildscript { ext.selenium_version = '3.8.1' ext.ghostdriver_version = '2.1.0' ext.eaagentloader_version = '1.0.3' + ext.curator_version = '4.0.0' // Update 121 is required for ObjectInputFilter and at time of writing 131 was latest: ext.java8_minUpdateVersion = '131' diff --git a/node-api/build.gradle b/node-api/build.gradle index 2099ebadd3..f59ddc8cf2 100644 --- a/node-api/build.gradle +++ b/node-api/build.gradle @@ -41,6 +41,10 @@ dependencies { compile "org.liquibase:liquibase-core:$liquibase_version" runtime 'com.mattbertolini:liquibase-slf4j:2.0.0' + // Apache Curator: a client library for Zookeeper + compile "org.apache.curator:curator-recipes:${curator_version}" + testCompile "org.apache.curator:curator-test:${curator_version}" + // Unit testing helpers. testCompile "junit:junit:$junit_version" testCompile "org.assertj:assertj-core:$assertj_version" diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt new file mode 100644 index 0000000000..fce338a382 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatch.kt @@ -0,0 +1,280 @@ +package net.corda.nodeapi.internal.zookeeper + +import com.google.common.base.Preconditions +import net.corda.core.utilities.contextLogger +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.api.BackgroundCallback +import org.apache.curator.framework.api.CuratorEvent +import org.apache.curator.framework.listen.ListenerContainer +import org.apache.curator.framework.recipes.AfterConnectionEstablished +import org.apache.curator.framework.recipes.leader.LeaderLatchListener +import org.apache.curator.framework.recipes.locks.LockInternals +import org.apache.curator.framework.recipes.locks.LockInternalsSorter +import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver +import org.apache.curator.framework.state.ConnectionState +import org.apache.curator.framework.state.ConnectionStateListener +import org.apache.curator.utils.ZKPaths +import org.apache.zookeeper.CreateMode +import org.apache.zookeeper.KeeperException +import org.apache.zookeeper.WatchedEvent +import org.apache.zookeeper.Watcher +import java.io.Closeable +import java.io.IOException +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference + +/** + * A modified version of the Apache Curator [LeaderLatch] recipe. It allows prioritized leader election. + * Upon start, a zNode is created using the [nodeId] and [priority] arguments of the constructor. + * If the creation is successful, a [Watcher] is created for the [path]. I will be triggered by + * [Watcher.Event.EventType.NodeChildrenChanged] events which indicate that a candidate has left or joined the + * election process. After receiving this event, the latch will check the candidates and their priorities to determine + * if it is leader or not. + * + * Clients with the same priority are treated on a first-come first-served basis. + * + * Because [Zookeeper] cannot guarantee that path changes are reliably seen, a new watcher is immediately set when the + * existing one is triggered. + * + * @param client the [CuratorFramework] instance used to manage the Zookeeper connection + * @param path the path used to create zNodes for the election candidates + * @param nodeId the unique identifier used to link a client to a zNode + * @param priority an [Int] value that determines this client's priority in the election. The lower the value, the higher the priority + */ +internal class PrioritizedLeaderLatch(client: CuratorFramework, + private val path: String, + private val nodeId: String, + private val priority: Int) : Closeable { + + val state = AtomicReference(State.CLOSED) + + private val watchedClient = client.newWatcherRemoveCuratorFramework() + private val hasLeadership = AtomicBoolean(false) + private val ourPath = AtomicReference() + private val startTask = AtomicReference>() + + private val listeners = ListenerContainer() + + private val connectionStateListener = ConnectionStateListener { _, newState -> handleStateChange(newState) } + + private companion object { + private val log = contextLogger() + /** Used to split the zNode path and extract the priority value for sorting and comparison */ + private const val LOCK_NAME = "-latch-" + private val sorter = LockInternalsSorter { str, lockName -> StandardLockInternalsDriver.standardFixForSorting(str, lockName) } + } + + /** + * Joins the election process + */ + @Throws(Exception::class) + fun start() { + Preconditions.checkState(state.compareAndSet(State.CLOSED, State.STARTED), + "Cannot be started more than once.") + startTask.set(AfterConnectionEstablished.execute(watchedClient, { + try { + internalStart() + } finally { + startTask.set(null) + } + })) + } + + /** + * Leaves the election process, relinquishing leadership if acquired. + * Cleans up all watchers and connection listener + */ + @Throws(IOException::class, IllegalStateException::class) + override fun close() { + Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), + "Already closed or has not been started.") + cancelStartTask() + + try { + watchedClient.removeWatchers() + setNode(null) + } catch (e: Exception) { + throw IOException(e) + } finally { + watchedClient.connectionStateListenable.removeListener(connectionStateListener) + setLeadership(false) + } + } + + /** + * Adds an election listener that will remain until explicitly removed + * @param listener a [LeaderLatchListener] instance + */ + fun addListener(listener: LeaderLatchListener) { listeners.addListener(listener) } + + /** + * Removes the listener passed as argument + * @param listener a [LeaderLatchListener] instance + */ + fun removeListener(listener: LeaderLatchListener) { listeners.removeListener(listener) } + + /** + * @return [true] if leader, [false] otherwise + */ + fun hasLeadership(): Boolean { + return State.STARTED == state.get() && hasLeadership.get() + } + + private fun internalStart() { + if (State.STARTED == state.get()) { + log.info("$nodeId latch started for path $path.") + watchedClient.connectionStateListenable.addListener(connectionStateListener) + try { + reset() + } catch (e: Exception) { + log.error("An error occurred while resetting leadership.", e) + } + } + } + + private fun cancelStartTask(): Boolean { + val localStartTask = startTask.getAndSet(null) + + if (localStartTask != null) { + localStartTask.cancel(true) + return true + } + + return false + } + + private fun handleStateChange(newState: ConnectionState?) { + log.info("State change. New state: $newState") + when (newState) { + ConnectionState.RECONNECTED -> { + try { + if (watchedClient.connectionStateErrorPolicy.isErrorState(ConnectionState.SUSPENDED) || + !hasLeadership.get()) { + log.info("Client reconnected. Resetting latch.") + reset() + } + } catch (e: Exception) { + log.error("Could not reset leader latch.", e) + setLeadership(false) + } + } + + ConnectionState.SUSPENDED -> { + if (watchedClient.connectionStateErrorPolicy.isErrorState(ConnectionState.SUSPENDED)) + setLeadership(false) + } + + ConnectionState.LOST -> setLeadership(false) + } + } + + @Throws(Exception::class) + private fun reset() { + setLeadership(false) + setNode(null) + + val joinElectionCallback = BackgroundCallback { _, event -> + if (event.resultCode == KeeperException.Code.OK.intValue()) { + setNode(event.name) + if (State.CLOSED == state.get()) + setNode(null) + else { + log.info("$nodeId is joining election with node ${ourPath.get()}") + watchedClient.children.usingWatcher(ElectionWatcher(this)).inBackground(NoNodeCallback(this)).forPath(path) + processCandidates() + } + } else { + log.error("processCandidates() failed: " + event.resultCode) + } + } + + watchedClient.create() + .creatingParentContainersIfNeeded() + .withProtection().withMode(CreateMode.EPHEMERAL) + .inBackground(joinElectionCallback).forPath(ZKPaths.makePath(path, "$nodeId$LOCK_NAME$priority"), nodeId.toByteArray(Charsets.UTF_8)) + } + + private fun setLeadership(newValue: Boolean) { + val oldValue = hasLeadership.getAndSet(newValue) + log.info("Setting leadership to $newValue. Old value was $oldValue.") + if (oldValue && !newValue) { + listeners.forEach { listener -> listener?.notLeader(); null } + } else if (!oldValue && newValue) { + listeners.forEach { listener -> listener?.isLeader(); null } + } + } + + @Throws(Exception::class) + private fun processCandidates() { + val callback = BackgroundCallback { _, event -> + if (event.resultCode == KeeperException.Code.OK.intValue()) + checkLeadership(event.children) + } + + watchedClient.children.inBackground(callback).forPath(ZKPaths.makePath(path, null)) + } + + @Throws(Exception::class) + private fun checkLeadership(children: List) { + val localOurPath = ourPath.get() + val sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children) + val ownIndex = if (localOurPath != null) sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) else -1 + log.debug("Election candidates are: $sortedChildren") + when { + ownIndex < 0 -> { + log.error("Can't find our zNode[$nodeId]. Resetting. Index: $ownIndex. My path is ${ourPath.get()}") + reset() + } + + ownIndex == 0 -> { + if (!hasLeadership.get()) + setLeadership(true) + } + + else -> { + if (hasLeadership.get()) { + setLeadership(false) + } + } + } + } + + @Throws(Exception::class) + private fun setNode(newValue: String?) { + val oldPath: String? = ourPath.getAndSet(newValue) + if (oldPath != null) { + log.info("Deleting node $oldPath.") + watchedClient.delete().guaranteed().inBackground().forPath(oldPath) + } + } + + enum class State { + STARTED, + CLOSED + } + + private class NoNodeCallback(private val latch: PrioritizedLeaderLatch) : BackgroundCallback { + override fun processResult(client: CuratorFramework, event: CuratorEvent) { + if (event.resultCode == KeeperException.Code.NONODE.intValue()) + if (event.resultCode == KeeperException.Code.NONODE.intValue()) + latch.reset() + } + } + + private class ElectionWatcher(private val latch: PrioritizedLeaderLatch) : Watcher { + override fun process(event: WatchedEvent) { + log.info("Client ${latch.nodeId} detected event ${event.type}.") + latch.watchedClient.children.usingWatcher(ElectionWatcher(latch)).inBackground(NoNodeCallback(latch)).forPath(latch.path) + if (State.STARTED == latch.state.get() && Watcher.Event.EventType.NodeChildrenChanged == event.type && latch.ourPath.get() != null) { + try { + log.info("Change detected in children nodes of path ${latch.path}. Checking candidates.") + latch.processCandidates() + } catch (e: Exception) { + log.error("An error occurred checking the leadership.", e) + } + } + } + + } +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClient.kt new file mode 100644 index 0000000000..d8ec066d8d --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClient.kt @@ -0,0 +1,83 @@ +package net.corda.nodeapi.internal.zookeeper + +import net.corda.core.utilities.contextLogger +import org.apache.curator.RetryPolicy +import org.apache.curator.framework.CuratorFrameworkFactory +import org.apache.curator.framework.imps.CuratorFrameworkState +import org.apache.curator.framework.recipes.leader.LeaderLatchListener +import org.apache.curator.retry.RetryOneTime +import org.apache.curator.utils.CloseableUtils + +/** + * Simple Zookeeper client that offers priority based leader election. + * + * @param connectionString the connection string(i.e. localhost:1234) used to connect to the Zookeeper server/cluster + * @param electionPath the zNode path used for the election process. Clients that compete for leader must use the same path + * @param nodeId unique client identifier used in the creation of child zNodes + * @param priority indicates the priority of the client in the election process. Low value means high priority(a client + * with [priority] set to 0 will have become leader before a client with [priority] 1 + * @param retryPolicy is an instance of [RetryPolicy] and indicates the process in case connection to Zookeeper server/cluster + * is lost. If no policy is supplied, [RetryOneTime] will be used with 500ms before attempting to reconnect + */ +class ZkClient(connectionString: String, + electionPath: String, + val nodeId: String, + val priority: Int, + retryPolicy: RetryPolicy = RetryOneTime(500)) : ZkLeader { + + private companion object { + private val log = contextLogger() + } + + private val client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy) + private val leaderLatch = PrioritizedLeaderLatch(client, electionPath, nodeId, priority) + + override fun start() { + if (client.state != CuratorFrameworkState.STARTED) { + log.info("Client $nodeId is starting.") + client.start() + } + } + + override fun close() { + log.info("Client $nodeId is stopping.") + if (leaderLatch.state.get() != PrioritizedLeaderLatch.State.CLOSED) + CloseableUtils.closeQuietly(leaderLatch) + CloseableUtils.closeQuietly(client) + } + + @Throws(IllegalStateException::class) + override fun requestLeadership() { + if (client.state != CuratorFrameworkState.STARTED) + throw(IllegalStateException("Client $nodeId must be started before attempting to be leader.")) + + if (leaderLatch.state.get() != PrioritizedLeaderLatch.State.STARTED) { + log.info("Client $nodeId is attempting to become leader.") + leaderLatch.start() + } + } + + override fun relinquishLeadership() { + if (leaderLatch.hasLeadership()) { + log.info("Client $nodeId is relinquishing leadership.") + CloseableUtils.closeQuietly(leaderLatch) + } + } + + override fun isLeader(): Boolean{ + return leaderLatch.hasLeadership() + } + + override fun isStarted(): Boolean { + return client.state == CuratorFrameworkState.STARTED + } + + override fun addLeadershipListener(listener: LeaderLatchListener) { + leaderLatch.addListener(listener) + } + + + override fun removeLeaderShipListener(listener: LeaderLatchListener) { + leaderLatch.removeListener(listener) + } +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt new file mode 100644 index 0000000000..215bc8d412 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt @@ -0,0 +1,49 @@ +package net.corda.nodeapi.internal.zookeeper + +import org.apache.curator.framework.recipes.leader.LeaderLatchListener + +interface ZkLeader { + /** + * Starts the client and connects to the Zookeeper server. + */ + fun start() + + /** + * Closes the connection to the Zookeeper server. If the client is involved in the election process, it will drop out + * and relinquish leadership. + */ + fun close() + + /** + * Joins election process. Subsequent calls will have no effect if client is leader or a candidate. + * Throws [IllegalStateException] if the client isn't started. + */ + @Throws(IllegalStateException::class) + fun requestLeadership() + + /** + * Withdraws client from the election process if it is the leader. A new election will be triggered for remaining + * candidates. If the client isn't the leader, nothing will happen. + */ + fun relinquishLeadership() + + /** + * @param listener an instance of [LeaderLatchListener] that will be used for election notifications + */ + fun addLeadershipListener(listener: LeaderLatchListener) + + /** + * @param listener the [LeaderLatchListener] instance to be removed + */ + fun removeLeaderShipListener(listener: LeaderLatchListener) + + /** + * @return [true] if client is the current leader, [false] otherwise + */ + fun isLeader(): Boolean + + /** + * @return [true] if client is started, [false] otherwise + */ + fun isStarted(): Boolean +} \ No newline at end of file diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatchTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatchTest.kt new file mode 100644 index 0000000000..73dd77b2d2 --- /dev/null +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/PrioritizedLeaderLatchTest.kt @@ -0,0 +1,70 @@ +package net.corda.nodeapi.internal.zookeeper + +import org.apache.curator.framework.CuratorFrameworkFactory +import org.apache.curator.retry.RetryOneTime +import org.apache.curator.test.TestingServer +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Assert.fail +import org.junit.Before +import org.junit.Test +import java.io.IOException + +class PrioritizedLeaderLatchTest { + + private lateinit var zkServer: TestingServer + private companion object { + private val ELECTION_PATH = "/example/leader" + } + + @Before + fun setup() { + zkServer = TestingServer(true) + } + + @After + fun cleanUp() { + zkServer.stop() + } + + @Test + fun `start stop`() { + val curatorClient = CuratorFrameworkFactory.newClient(zkServer.connectString, RetryOneTime(100)) + curatorClient.start() + val latch = PrioritizedLeaderLatch(curatorClient, ELECTION_PATH, "test", 0) + assertEquals(PrioritizedLeaderLatch.State.CLOSED, latch.state.get()) + try { + latch.start() + } catch (e: Exception) { + fail(e.message) + } + + assertEquals(PrioritizedLeaderLatch.State.STARTED, latch.state.get()) + + try { + latch.close() + } catch (e:IOException) { + fail(e.message) + } + + assertEquals(PrioritizedLeaderLatch.State.CLOSED, latch.state.get()) + curatorClient.close() + } + + @Test(expected = IllegalStateException::class) + fun `double start`() { + val curatorClient = CuratorFrameworkFactory.newClient(zkServer.connectString, RetryOneTime(100)) + curatorClient.start() + val latch = PrioritizedLeaderLatch(curatorClient, ELECTION_PATH, "test", 0) + latch.start() + latch.start() + } + + @Test(expected = IllegalStateException::class) + fun `close while state is closed`() { + val curatorClient = CuratorFrameworkFactory.newClient(zkServer.connectString, RetryOneTime(100)) + curatorClient.start() + val latch = PrioritizedLeaderLatch(curatorClient, ELECTION_PATH, "test", 0) + latch.close() + } +} \ No newline at end of file diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt new file mode 100644 index 0000000000..74b02c45ae --- /dev/null +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/zookeeper/ZkClientTest.kt @@ -0,0 +1,349 @@ +package net.corda.nodeapi.internal.zookeeper + +import net.corda.core.utilities.contextLogger +import org.apache.curator.framework.recipes.leader.LeaderLatchListener +import org.apache.curator.test.TestingServer +import org.apache.curator.utils.ZKPaths +import org.junit.After +import org.junit.Assert.assertFalse +import org.junit.Before +import org.junit.Test +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.concurrent.thread +import kotlin.test.assertEquals + +class ZkClientTests { + + private lateinit var zkServer: TestingServer + private companion object { + private val ELECTION_PATH = "/example/leader" + private val ELECTION_TIMEOUT = 2000L + private val log = contextLogger() + + } + + @Before + fun setup() { + zkServer = TestingServer(true) + } + + @After + fun cleanUp() { + zkServer.stop() + } + + @Test + fun `start and stop client`() { + val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test1"), "test", 0) + client.start() + assertFalse(client.isLeader()) + client.close() + } + + @Test(expected = IllegalStateException::class) + fun `client requests leader before start`() { + val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test2"), "test", 0) + client.requestLeadership() + } + + @Test + fun `single client becomes leader`() { + val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test3"), "test", 0) + val leaderGain = CountDownLatch(1) + + thread { + client.start() + client.addLeadershipListener(SyncHelperListener("test", leaderGain)) + client.requestLeadership() + } + + leaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assert(client.isLeader()) + client.close() + } + + @Test + fun `single client relinquishes leadership`() { + val client = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test4"), "test", 0) + val leaderGain = CountDownLatch(1) + val leaderLoss = CountDownLatch(1) + + thread { + client.start() + client.addLeadershipListener(SyncHelperListener("test", leaderGain, leaderLoss)) + client.requestLeadership() + } + + leaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assert(client.isLeader()) + client.relinquishLeadership() + leaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assertFalse(client.isLeader()) + client.close() + } + + @Test + fun `client with highest priority becomes leader`() { + val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "ALICE", 0) + val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "BOB", 1) + val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), "CHIP", 2) + val aliceLeaderGain = CountDownLatch(1) + val bobLeaderGain = CountDownLatch(1) + val bobLeaderLoss = CountDownLatch(1) + val chipLeaderLoss = CountDownLatch(1) + val chipLeaderGain = CountDownLatch(1) + + listOf(alice, bob, chip).forEach { client -> + thread{ + client.start() + when (client) { + alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain)) + bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, bobLeaderGain, bobLeaderLoss)) + chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss)) + } + client.requestLeadership() + } + } + + aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assert(alice.isLeader()) + if (bobLeaderGain.count == 0L) //wait to lose leadership if leader at some point + bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assertFalse(bob.isLeader()) + if (chipLeaderGain.count == 0L) //wait to lose leadership if leader at some point + chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assertFalse(chip.isLeader()) + + listOf(alice, bob, chip).forEach { client -> client.close() } + } + + @Test + fun `leader relinquishes, next highest priority takes over`() { + val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test6"), "ALICE", 0) + val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test6"), "BOB", 1) + val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test6"), "CHIP", 2) + val aliceLeaderGain = CountDownLatch(1) + val bobLeaderGain = CountDownLatch(1) + val bobLeaderLoss = CountDownLatch(1) + val chipLeaderLoss = CountDownLatch(1) + val chipLeaderGain = CountDownLatch(1) + + listOf(alice, bob, chip).forEach { client -> + thread{ + client.start() + when (client) { + alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain)) + bob -> client.addLeadershipListener(SyncHelperListener(client.nodeId, bobLeaderGain, bobLeaderLoss)) + chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss)) + } + client.requestLeadership() + } + } + + aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assert(alice.isLeader()) + if (bobLeaderGain.count == 0L) //wait to lose leadership if leader at some point + bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assertFalse(bob.isLeader()) + if (chipLeaderGain.count == 0L) + chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assertFalse(chip.isLeader()) //wait to lose leadership if leader at some point + + val bobLeaderGain2 = CountDownLatch(1) + bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain2)) + + alice.relinquishLeadership() + bobLeaderGain2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) // wait for bob to become leader + + assertFalse(alice.isLeader()) + assert(bob.isLeader()) + assertFalse(chip.isLeader()) + + val chipLeaderGain2 = CountDownLatch(1) + chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain2)) + + bob.relinquishLeadership() + chipLeaderGain2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + + assertFalse(alice.isLeader()) + assertFalse(bob.isLeader()) + assert(chip.isLeader()) + + listOf(alice, bob, chip).forEach { client -> client.close() } + } + + @Test + fun `clients with higher priority join and take leadership`() { + val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "ALICE", 0) + val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "BOB", 1) + val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test7"), "CHIP", 2) + val aliceLeaderGain = CountDownLatch(1) + val bobLeaderGain = CountDownLatch(1) + val bobLeaderLoss = CountDownLatch(1) + val chipLeaderLoss = CountDownLatch(1) + val chipLeaderGain = CountDownLatch(1) + + chip.start() + chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain, chipLeaderLoss)) + chip.requestLeadership() + + chipLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assert(chip.isLeader()) + + bob.start() + bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain, bobLeaderLoss)) + bob.requestLeadership() + + chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assert(bob.isLeader()) + assertFalse(chip.isLeader()) + + alice.start() + alice.addLeadershipListener(SyncHelperListener(alice.nodeId, aliceLeaderGain)) + alice.requestLeadership() + + bobLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + + assert(alice.isLeader()) + assertFalse(bob.isLeader()) + assertFalse(chip.isLeader()) + + listOf(alice, bob, chip).forEach { client -> client.close() } + } + + @Test + fun `client with mid-level priority joins and becomes leader after current leader relinquishes`() { + val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "ALICE", 0) + val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "BOB", 1) + val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test8"), "CHIP", 2) + val aliceLeaderGain = CountDownLatch(1) + val aliceLeaderLoss = CountDownLatch(1) + val bobLeaderGain = CountDownLatch(1) + val chipLeaderLoss = CountDownLatch(1) + val chipLeaderGain = CountDownLatch(1) + + listOf(alice, chip).forEach { client -> + thread{ + client.start() + when (client) { + alice -> client.addLeadershipListener(SyncHelperListener(client.nodeId, aliceLeaderGain, aliceLeaderLoss)) + chip -> client.addLeadershipListener(SyncHelperListener(client.nodeId, chipLeaderGain, chipLeaderLoss)) + } + client.requestLeadership() + } + } + + aliceLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + if (chipLeaderGain.count == 0L) //wait to lose leadership if leader at some point + chipLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + assert(alice.isLeader()) + assertFalse(chip.isLeader()) + + bob.start() + bob.addLeadershipListener(SyncHelperListener(bob.nodeId, bobLeaderGain)) + bob.requestLeadership() + + assert(alice.isLeader()) + assertFalse(bob.isLeader()) + assertFalse(chip.isLeader()) + + val chipLeaderGain2 = CountDownLatch(1) + val chipLeaderLoss2 = CountDownLatch(1) + chip.addLeadershipListener(SyncHelperListener(chip.nodeId, chipLeaderGain2, chipLeaderLoss2)) + alice.relinquishLeadership() + aliceLeaderLoss.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + bobLeaderGain.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + if (chipLeaderGain.count == 0L) //wait to lose leadership if gained + chipLeaderLoss2.await(ELECTION_TIMEOUT, TimeUnit.MILLISECONDS) + + assertFalse(alice.isLeader()) + assert(bob.isLeader()) + assertFalse(chip.isLeader()) + + listOf(alice, bob, chip).forEach { client -> client.close() } + } + + @Test + fun `clients randomly do things`() { + val alice = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "ALICE", 0) + val bob = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "BOB", 1) + val chip = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "CHIP", 2) + val dave = ZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test9"), "DAVE", 3) + val countDownLatch = CountDownLatch(3) + val leaderBuffer = mutableListOf() + + listOf(alice, bob, chip, dave).forEach { client -> + thread{ + client.addLeadershipListener(HelperListener(client.nodeId, leaderBuffer)) + client.start() + val randomizer = Random() + val actions = listOf(Action.RELINQUISH, Action.REQUEST) + for (i in 1..100) { + val action = actions[randomizer.nextInt(actions.size)] + when(action) { + Action.REQUEST -> client.requestLeadership() + Action.RELINQUISH -> client.relinquishLeadership() + } + Thread.sleep(100) + } + + + countDownLatch.countDown() + } + } + + countDownLatch.await(120, TimeUnit.SECONDS) + //only one leader should exist + var leaderCount = 0 + var leaderId = "" + + listOf(alice, bob, chip, dave).forEach { client -> + if (client.isLeader()) { + leaderCount++ + leaderId = client.nodeId + } + } + + assert(leaderCount <= 1) + assert(leaderBuffer.size <= 1) + if (leaderBuffer.size == 1) { + println(leaderBuffer) + assertEquals(leaderBuffer.first(), leaderId) + } + listOf(alice, bob, chip, dave).forEach { client -> client.close() } + } + + private enum class Action { + START, STOP, REQUEST, RELINQUISH + } + + private class HelperListener(private val nodeId: String, + private val leaders: MutableList) : LeaderLatchListener{ + @Synchronized + override fun notLeader() { + leaders.remove(nodeId) + } + + @Synchronized + override fun isLeader() { + leaders.add(nodeId) + } + } + private class SyncHelperListener(private val nodeId: String, + private val leaderGain: CountDownLatch = CountDownLatch(1), + private val leaderLoss: CountDownLatch = CountDownLatch(1)) : LeaderLatchListener { + override fun notLeader() { + log.info("$nodeId is no longer leader.") + leaderLoss.countDown() + + } + override fun isLeader() { + log.info("$nodeId is the new leader.") + leaderGain.countDown() + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt index d4bb045015..1195c3ae3a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt @@ -14,16 +14,15 @@ import net.corda.core.internal.concurrent.thenMatch import net.corda.core.utilities.loggerFor import net.corda.node.VersionInfo import net.corda.node.internal.cordapp.CordappLoader -import net.corda.node.services.config.GraphiteOptions import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.RelayConfiguration import net.corda.node.services.statemachine.MultiThreadedStateMachineManager -import net.corda.node.services.statemachine.SingleThreadedStateMachineManager import net.corda.node.services.statemachine.StateMachineManager import net.corda.nodeapi.internal.persistence.CordaPersistence import org.fusesource.jansi.Ansi import org.fusesource.jansi.AnsiConsole import java.io.IOException +import java.net.Inet6Address import java.net.InetAddress import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -38,11 +37,18 @@ open class EnterpriseNode(configuration: NodeConfiguration, private val logger by lazy { loggerFor() } private fun defaultGraphitePrefix(legalName: CordaX500Name): String { - return legalName.organisation + "_" + InetAddress.getLocalHost().hostAddress.trim().replace(".", "_") + return (legalName.organisation + "_" + legalName.locality + "_" + legalName.country + "_" + Inet6Address.getLocalHost().hostAddress) } - private fun getGraphitePrefix(configuration: NodeConfiguration): String { - return configuration.graphiteOptions!!.prefix ?: defaultGraphitePrefix(configuration.myLegalName) + fun getGraphitePrefix(configuration: NodeConfiguration): String { + val customPrefix = configuration.graphiteOptions!!.prefix + // Create a graphite prefix stripping all non-allowed characteres + val graphiteName = (customPrefix ?: defaultGraphitePrefix(configuration.myLegalName)) + .trim().replace(Regex("[^0-9a-zA-Z_]"), "_") + if (customPrefix != null && graphiteName != customPrefix) { + logger.warn("Invalid graphite prefix ${customPrefix} specified in config - got mangled to ${graphiteName}. Only letters, numbers and underscores are allowed") + } + return graphiteName } } @@ -140,7 +146,7 @@ D""".trimStart() GraphiteReporter.forRegistry(metrics) .prefixedWith(getGraphitePrefix(configuration)) .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.MINUTES) + .convertRatesTo(TimeUnit.SECONDS) .filter(MetricFilter.ALL) .build(PickledGraphite(configuration.graphiteOptions!!.server, configuration.graphiteOptions!!.port)) .start(configuration.graphiteOptions!!.sampleInvervallSeconds, TimeUnit.SECONDS) @@ -181,4 +187,4 @@ D""".trimStart() return super.makeStateMachineManager(database) } } -} \ No newline at end of file +} diff --git a/node/src/test/kotlin/net/corda/node/internal/EnterpriseNodeTest.kt b/node/src/test/kotlin/net/corda/node/internal/EnterpriseNodeTest.kt new file mode 100644 index 0000000000..6c5ec1d73a --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/EnterpriseNodeTest.kt @@ -0,0 +1,29 @@ +package net.corda.node.internal + +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.identity.CordaX500Name +import net.corda.node.services.config.GraphiteOptions +import net.corda.node.services.config.NodeConfiguration +import org.junit.Test + +class EnterpriseNodeTest { + @Test + fun `Check sanitizing of graphite names`() { + checkReplacement("abc", "abc") + checkReplacement("abc.1.2", "abc_1_2") + checkReplacement("abc", "foo__bar_", "foo (bar)") + + } + + fun checkReplacement(orgname: String, expectedName: String, custom: String? = null) { + val nodeConfig = mock() { + whenever(it.myLegalName).thenReturn(CordaX500Name(orgname, "London", "GB")) + whenever(it.graphiteOptions).thenReturn(GraphiteOptions("server", 12345, custom)) + } + + val expectedPattern = if (custom == null) "${expectedName}_London_GB_\\d+_\\d+_\\d+_\\d+" else expectedName + val createdName = EnterpriseNode.getGraphitePrefix(nodeConfig) + assert(Regex(expectedPattern).matches(createdName), { "${createdName} did not match ${expectedPattern}" }) + } +} \ No newline at end of file