Flow worker POC zookeeper flow partitioning (#1369)

* Flow worker zookeeper WIP

* remove copyright decaration

* remove shaded curator and depends on node-api shadow instead
This commit is contained in:
Patrick Kuo 2018-09-06 10:09:31 +01:00 committed by PokeyBot
parent 4ee63c77d7
commit 767580c298
7 changed files with 487 additions and 5 deletions

View File

@ -25,12 +25,14 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
compile(project(':core'))
compile(project(':node'))
compile project(':core')
compile project(':node')
compile project(path: ':node-api', configuration: 'shadow')
testCompile "org.apache.curator:curator-test:${curator_version}"
// Apache Curator: a client library for Zookeeper
testCompile "junit:junit:$junit_version"
testCompile(project(':node-driver'))
integrationTestCompile(project(':bridge'))
testCompile project(':node-driver')
integrationTestCompile project(':bridge')
}
task integrationTest(type: Test) {

View File

@ -0,0 +1,94 @@
package net.corda.flowworker.zookeeper
import net.corda.testing.core.SerializationEnvironmentRule
import org.apache.curator.test.TestingServer
import org.apache.curator.utils.ZKPaths
import org.junit.After
import org.junit.Assert.assertFalse
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class FlowWorkerZkClientTest {
@Rule
@JvmField
val serializationEnvironment = SerializationEnvironmentRule(true)
private lateinit var zkServer: TestingServer
private companion object {
private val ELECTION_PATH = "/example/leader"
private val FLOW_BUCKETS_PATH = "/example/flowBuckets"
}
@Before
fun setup() {
zkServer = TestingServer(59980, true)
}
@After
fun cleanUp() {
zkServer.stop()
}
@Test
fun `client with highest priority becomes leader`() {
println(zkServer.connectString)
val alice = FlowWorkerZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), ZKPaths.makePath(FLOW_BUCKETS_PATH, "test5"), "ALICE", 0)
val bob = FlowWorkerZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), ZKPaths.makePath(FLOW_BUCKETS_PATH, "test5"), "BOB", 1)
val chip = FlowWorkerZkClient(zkServer.connectString, ZKPaths.makePath(ELECTION_PATH, "test5"), ZKPaths.makePath(FLOW_BUCKETS_PATH, "test5"), "CHIP", 2)
val aliceLeaderGain = CountDownLatch(1)
val bobLeaderGain = CountDownLatch(1)
val bobLeaderLoss = CountDownLatch(1)
val chipLeaderLoss = CountDownLatch(1)
val chipLeaderGain = CountDownLatch(1)
listOf(alice, bob, chip).forEach { client ->
thread {
when (client) {
alice -> client.registration.subscribe { if (it.isLeader) aliceLeaderGain.countDown() }
bob -> client.registration.subscribe { if (it.isLeader) bobLeaderGain.countDown() else bobLeaderLoss.countDown() }
chip -> client.registration.subscribe { if (it.isLeader) chipLeaderGain.countDown() else chipLeaderLoss.countDown() }
}
client.start()
}
}
aliceLeaderGain.await()
require(alice.isLeader())
assertFalse(bob.isLeader())
assertFalse(chip.isLeader())
while (alice.partition.value?.first != 0L) {
Thread.sleep(1000)
}
assertEquals(0L to Long.MAX_VALUE / 3, alice.partition.value)
assertEquals(Long.MAX_VALUE / 3 + 1 to Long.MAX_VALUE / 3 * 2, bob.partition.value)
assertEquals(Long.MAX_VALUE / 3 * 2 + 1 to Long.MAX_VALUE, chip.partition.value)
alice.close()
bobLeaderGain.await()
require(bob.isLeader())
while (bob.partition.value?.first != 0L) {
Thread.sleep(1000)
}
assertEquals(0L to Long.MAX_VALUE / 2, bob.partition.value)
assertEquals(Long.MAX_VALUE / 2 + 1 to Long.MAX_VALUE, chip.partition.value)
bob.close()
chipLeaderGain.await()
require(chip.isLeader())
while (chip.partition.value?.first != 0L) {
Thread.sleep(1000)
}
assertEquals(0L to Long.MAX_VALUE, chip.partition.value)
chip.close()
}
}

View File

@ -0,0 +1,42 @@
package net.corda.flowworker.zookeeper
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.imps.CuratorFrameworkState
import org.apache.curator.retry.RetryForever
import org.apache.curator.retry.RetryNTimes
import org.apache.curator.utils.CloseableUtils
import java.io.Closeable
abstract class AbstractZkClient(connectionString: String,
retryInterval: Int = 500,
retryCount: Int = 1) : Closeable {
protected val client: CuratorFramework
init {
val retryPolicy = if (retryCount == -1) {
RetryForever(retryInterval)
} else {
RetryNTimes(retryCount, retryInterval)
}
client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
}
fun start() {
if (client.state != CuratorFrameworkState.STARTED) {
client.start()
startInternal()
}
}
protected abstract fun startInternal()
fun isStarted(): Boolean {
return client.state == CuratorFrameworkState.STARTED
}
override fun close() {
CloseableUtils.closeQuietly(client)
}
}

View File

@ -0,0 +1,87 @@
package net.corda.flowworker.zookeeper
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.AfterConnectionEstablished
import org.apache.curator.framework.state.ConnectionState
import org.apache.curator.framework.state.ConnectionStateListener
import java.io.Closeable
import java.io.IOException
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
abstract class AbstractZkLatch(client: CuratorFramework) : Closeable, ConnectionStateListener {
companion object {
private val logger = contextLogger()
}
private val watchedClient = client.newWatcherRemoveCuratorFramework()
private val _started = AtomicBoolean()
val started get() = _started.get()
private val startTask = AtomicReference<Future<*>>()
fun start() {
require(_started.compareAndSet(false, true)) { "Cannot be started more than once." }
startTask.set(AfterConnectionEstablished.execute(watchedClient) {
try {
watchedClient.connectionStateListenable.addListener(this)
try {
initiateLatch(watchedClient)
} catch (e: Exception) {
logger.error("An error occurred while resetting leadership.", e)
}
} finally {
startTask.set(null)
}
})
}
override fun stateChanged(client: CuratorFramework, newState: ConnectionState) {
logger.info("State change. New state: $newState")
when (newState) {
ConnectionState.RECONNECTED -> {
try {
if (watchedClient.connectionStateErrorPolicy.isErrorState(ConnectionState.SUSPENDED)) {
logger.info("Client reconnected. Resetting latch.")
initiateLatch(watchedClient)
}
} catch (e: Exception) {
logger.error("Could not reset leader latch.", e)
reset(watchedClient)
}
}
ConnectionState.SUSPENDED -> {
if (watchedClient.connectionStateErrorPolicy.isErrorState(ConnectionState.SUSPENDED))
reset(watchedClient)
}
ConnectionState.LOST -> reset(watchedClient)
else -> logger.debug { "Ignoring state change $newState" }
}
}
override fun close() {
require(_started.compareAndSet(true, false)) { "Already closed or has not been started." }
startTask.getAndSet(null)?.cancel(true)
try {
watchedClient.removeWatchers()
reset(watchedClient)
} catch (e: Exception) {
throw IOException(e)
} finally {
watchedClient.connectionStateListenable.removeListener(this)
}
}
protected abstract fun initiateLatch(startedClient: CuratorFramework)
protected abstract fun reset(startedClient: CuratorFramework)
}

View File

@ -0,0 +1,85 @@
package net.corda.flowworker.zookeeper
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.utils.ZKPaths
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.Watcher
import org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged
import org.apache.zookeeper.Watcher.Event.EventType.NodeDeleted
import rx.subjects.BehaviorSubject
class FlowWorkerPartitioner(client: CuratorFramework,
private val partitionPath: String,
private val leaderLatch: FlowWorkerRegister) : AbstractZkLatch(client) {
companion object {
private val logger = contextLogger()
}
val partition: BehaviorSubject<Pair<Long, Long>?> = BehaviorSubject.create()
override fun initiateLatch(startedClient: CuratorFramework) {
// Make sure the partition path exists before watching it.
startedClient.createContainers(ZKPaths.makePath(partitionPath, null))
// Listen to partition definition from leader
watchFlowPartition(startedClient)
// Leader will create the partitions after elected.
leaderLatch.registration.subscribe { registration ->
if (registration.isLeader && started) {
val bucketSize = Long.MAX_VALUE / registration.workers.size
val upperBound = registration.workers.mapIndexed { index, _ ->
bucketSize * (index + 1)
}.dropLast(1) + Long.MAX_VALUE // Make sure upper bound end with the MAX value.
val lowerBound = listOf(0L) + upperBound.dropLast(1).map { it + 1 } // Make sure lower bound start from 0
val partitions = registration.workers.zip(lowerBound.zip(upperBound)).toMap()
startedClient.create()
.creatingParentContainersIfNeeded()
.withProtection()
.withMode(CreateMode.EPHEMERAL)
.inBackground { _, e ->
logger.info("Partition info created in path ${e.path}")
}.forPath(ZKPaths.makePath(partitionPath, "partition"), partitions.serialize().bytes)
}
}
}
override fun reset(startedClient: CuratorFramework) {
partition.onNext(null)
}
private fun watchFlowPartition(watchedClient: CuratorFramework) {
watchedClient.children.usingWatcher(Watcher {
if (started) {
watchFlowPartition(watchedClient)
when (it.type) {
NodeChildrenChanged -> {
watchedClient.children.inBackground { _, event ->
// TODO: Determine which is the latest, could happen if partition from last leader didn't get deleted.
if (event.children.isNotEmpty()) {
watchedClient.data.inBackground { _, e ->
val partitions = e.data.deserialize<Map<String, Pair<Long, Long>>>()
partition.onNext(partitions[leaderLatch.registration.value.myPath?.split("/")?.last()])
logger.info("My partition is ${partition.value}")
}.forPath(ZKPaths.makePath(partitionPath, event.children.last()))
} else {
// No partition data i.e leader is down, set partition to null
reset(watchedClient)
}
}.forPath(it.path)
}
NodeDeleted -> reset(watchedClient)
else -> throw IllegalArgumentException("Unsupported event type ${it.type}.")
}
}
}).forPath(ZKPaths.makePath(partitionPath, null))
}
override fun close() {
super.close()
partition.onCompleted()
}
}

View File

@ -0,0 +1,131 @@
package net.corda.flowworker.zookeeper
import net.corda.core.utilities.contextLogger
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.locks.LockInternals
import org.apache.curator.framework.recipes.locks.LockInternalsSorter
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver
import org.apache.curator.utils.ZKPaths
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.KeeperException.Code.NONODE
import org.apache.zookeeper.KeeperException.Code.OK
import org.apache.zookeeper.Watcher
import org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged
import rx.subjects.BehaviorSubject
/**
* Use for registration with the zookeeper worker group and worker leader election.
*/
class FlowWorkerRegister(client: CuratorFramework,
private val path: String,
private val nodeId: String,
private val priority: Int) : AbstractZkLatch(client) {
private companion object {
private val logger = contextLogger()
/** Used to split the zNode path and extract the priority value for sorting and comparison */
private const val LOCK_NAME = "-latch-"
private val workerSorter = LockInternalsSorter { str, lockName -> StandardLockInternalsDriver.standardFixForSorting(str, lockName) }
}
val registration: BehaviorSubject<FlowWorkerRegistration> = BehaviorSubject.create(FlowWorkerRegistration())
/**
* Leaves the election process, relinquishing leadership if acquired.
* Cleans up all watchers and connection listener
*/
@Throws(Exception::class)
override fun initiateLatch(startedClient: CuratorFramework) {
logger.info("$nodeId latch started for path $path.")
reset(startedClient)
val latchName = "$nodeId$LOCK_NAME${"%05d".format(priority)}" // Fixed width priority to ensure numeric sorting
startedClient.create()
.creatingParentContainersIfNeeded()
.withProtection().withMode(CreateMode.EPHEMERAL)
.inBackground { _, event ->
if (event.resultCode == OK.intValue()) {
startedClient.setNode(event.name)
if (!started) {
startedClient.setNode(null)
} else {
logger.info("$nodeId is joining election with node ${registration.value.myPath}")
startedClient.watchLeader()
startedClient.processCandidates()
}
} else {
logger.error("processCandidates() failed: " + event.resultCode)
}
}.forPath(ZKPaths.makePath(path, latchName), nodeId.toByteArray(Charsets.UTF_8))
}
private fun CuratorFramework.watchLeader() {
children.usingWatcher(Watcher {
logger.info("Client $nodeId detected event ${it.type}.")
if (started) {
watchLeader()
if (NodeChildrenChanged == it.type && registration.value.myPath != null) {
try {
logger.info("Change detected in children nodes of path $path. Checking candidates.")
processCandidates()
} catch (e: Exception) {
logger.error("An error occurred checking the leadership.", e)
}
}
}
}).inBackground { _, event ->
if (event.resultCode == NONODE.intValue()) {
reset(this)
}
}.forPath(path)
}
private fun setLeadership(newValue: Boolean, workers: List<String> = emptyList()) {
logger.info("Setting leadership to $newValue. Old value was ${registration.value.isLeader}.")
registration.onNext(registration.value.copy(isLeader = newValue, workers = workers))
}
@Throws(Exception::class)
private fun CuratorFramework.processCandidates() {
children.inBackground { _, event ->
if (event.resultCode == OK.intValue())
checkLeadership(event.children)
}.forPath(ZKPaths.makePath(path, null))
}
@Throws(Exception::class)
private fun CuratorFramework.checkLeadership(children: List<String>) {
val localOurPath = registration.value.myPath
val sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, workerSorter, children)
val ownIndex = if (localOurPath != null) sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) else -1
logger.debug("Election candidates are: $sortedChildren")
when {
ownIndex < 0 -> {
logger.error("Can't find our zNode[$nodeId]. Resetting. Index: $ownIndex. My path is ${registration.value.myPath}")
reset(this)
}
ownIndex == 0 -> setLeadership(true, sortedChildren)
else -> setLeadership(false)
}
}
@Throws(Exception::class)
private fun CuratorFramework.setNode(newValue: String?) {
val oldPath: String? = registration.value.myPath
registration.onNext(registration.value.copy(myPath = newValue))
if (oldPath != null) {
logger.info("Deleting node $oldPath.")
delete().guaranteed().inBackground().forPath(oldPath)
}
}
override fun reset(startedClient: CuratorFramework) {
setLeadership(false)
startedClient.setNode(null)
}
override fun close() {
super.close()
registration.onCompleted()
}
}
data class FlowWorkerRegistration(val myPath: String? = null, val isLeader: Boolean = false, val workers: List<String> = emptyList())

View File

@ -0,0 +1,41 @@
package net.corda.flowworker.zookeeper
import net.corda.core.utilities.contextLogger
import org.apache.curator.utils.CloseableUtils
class FlowWorkerZkClient(connectionString: String,
electionPath: String,
partitionPath: String,
private val nodeId: String,
priority: Int,
retryInterval: Int = 500,
retryCount: Int = 1) : AbstractZkClient(connectionString, retryInterval, retryCount) {
private companion object {
private val logger = contextLogger()
}
private val leaderLatch = FlowWorkerRegister(client, electionPath, nodeId, priority)
private val partitioner = FlowWorkerPartitioner(client, partitionPath, leaderLatch)
val registration = leaderLatch.registration
val partition = partitioner.partition
override fun startInternal() {
logger.info("Client $nodeId is starting.")
partitioner.start()
logger.info("Client $nodeId is attempting to become leader.")
leaderLatch.start()
}
override fun close() {
logger.info("Client $nodeId is stopping.")
CloseableUtils.closeQuietly(leaderLatch)
CloseableUtils.closeQuietly(partitioner)
super.close()
}
fun isLeader(): Boolean {
return registration.value.isLeader
}
}