ENT-933 Prioritise flow completion. Executor prioritises flows created first. (#1255)

* ENT-933 Prioritise flow completion.  Executor prioritises flows created first.

* Unit tests

* More unit tests

* Formatting
This commit is contained in:
Rick Parker 2018-07-12 08:30:02 +01:00 committed by GitHub
parent 5f8954c789
commit a23b3f4b29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 215 additions and 9 deletions

View File

@ -15,7 +15,10 @@ import net.corda.core.DeleteForDJVM
import net.corda.core.DoNotImplement
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import org.slf4j.Logger
@ -51,4 +54,5 @@ interface FlowStateMachine<FLOWRETURN> {
val context: InvocationContext
val ourIdentity: Party
val ourSenderUUID: String?
val creationTime: Long
}

View File

@ -14,10 +14,8 @@ import com.codahale.metrics.MetricFilter
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.graphite.GraphiteReporter
import com.codahale.metrics.graphite.PickledGraphite
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.jcraft.jsch.JSch
import com.jcraft.jsch.JSchException
import io.netty.util.concurrent.FastThreadLocalThread
import net.corda.core.crypto.newSecureRandom
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.Emoji
@ -27,6 +25,7 @@ import net.corda.node.VersionInfo
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.RelayConfiguration
import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor
import net.corda.node.services.statemachine.MultiThreadedStateMachineManager
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -35,7 +34,6 @@ import org.fusesource.jansi.AnsiConsole
import java.io.IOException
import java.net.Inet6Address
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
open class EnterpriseNode(configuration: NodeConfiguration,
@ -185,10 +183,7 @@ D""".trimStart()
private fun makeStateMachineExecutorService(): ExecutorService {
log.info("Multi-threaded state machine manager with ${configuration.enterpriseConfiguration.tuning.flowThreadPoolSize} threads.")
return Executors.newFixedThreadPool(
configuration.enterpriseConfiguration.tuning.flowThreadPoolSize,
ThreadFactoryBuilder().setNameFormat("flow-worker").setThreadFactory(::FastThreadLocalThread).build()
)
return MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize)
}
override fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {

View File

@ -0,0 +1,23 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.FiberSchedulerTask
import net.corda.core.internal.FlowStateMachine
import kotlin.math.sign
class FlowStateMachineComparator : Comparator<Runnable> {
override fun compare(o1: Runnable, o2: Runnable): Int {
return if (o1 is FiberSchedulerTask) {
if (o2 is FiberSchedulerTask) {
(((o1.fiber as? FlowStateMachine<*>)?.creationTime
?: Long.MAX_VALUE) - ((o2.fiber as? FlowStateMachine<*>)?.creationTime
?: Long.MAX_VALUE)).sign
} else {
-1
}
} else if (o2 is FiberSchedulerTask) {
1
} else {
0
}
}
}

View File

@ -50,7 +50,8 @@ class TransientReference<out A>(@Transient val value: A)
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override val logic: FlowLogic<R>,
scheduler: FiberScheduler
scheduler: FiberScheduler,
override val creationTime: Long = System.currentTimeMillis()
) : Fiber<Unit>(id.toString(), scheduler), FlowStateMachine<R>, FlowFiber {
companion object {
/**

View File

@ -0,0 +1,12 @@
package net.corda.node.services.statemachine
import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.netty.util.concurrent.FastThreadLocalThread
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
class MultiThreadedStateMachineExecutor(poolSize: Int) : ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
PriorityBlockingQueue<Runnable>(poolSize * 4, FlowStateMachineComparator()),
ThreadFactoryBuilder().setNameFormat("flow-worker").setThreadFactory(::FastThreadLocalThread).build())

View File

@ -0,0 +1,171 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.fibers.FiberScheduler
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.node.TestClock
import org.junit.Assert
import org.junit.Rule
import org.junit.Test
import org.slf4j.Logger
import java.time.Clock
import java.time.Duration
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executor
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
class FlowStateMachineComparatorTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()
private object EmptyFlow : FlowLogic<Unit>() {
override fun call() {}
}
private object DummyExecutor : Executor {
override fun execute(command: Runnable?) {}
}
@Test
fun `sort order of state machines is as expected`() {
val scheduler = FiberExecutorScheduler("TestScheduler", DummyExecutor)
val clock = TestClock(Clock.systemUTC())
val sm1 = FlowStateMachineImpl<Unit>(StateMachineRunId(UUID.randomUUID()),
scheduler = scheduler,
logic = EmptyFlow, creationTime = clock.millis())
clock.advanceBy(Duration.ofSeconds(1))
val sm2 = FlowStateMachineImpl<Unit>(StateMachineRunId(UUID.randomUUID()),
scheduler = scheduler,
logic = EmptyFlow, creationTime = clock.millis())
val comparator = FlowStateMachineComparator()
Assert.assertEquals(-1, comparator.compare(sm1.task as Runnable, sm2.task as Runnable))
Assert.assertEquals(0, comparator.compare(sm1.task as Runnable, sm1.task as Runnable))
Assert.assertEquals(1, comparator.compare(sm2.task as Runnable, sm1.task as Runnable))
}
@Test
fun `serialized flow maintains creation time`() {
val scheduler = FiberExecutorScheduler("TestScheduler", DummyExecutor)
val clock = TestClock(Clock.systemUTC())
clock.advanceBy(Duration.ofDays(1)) // Move this away from "now" to check that it's not a coincidence.
val sm1 = FlowStateMachineImpl<Unit>(StateMachineRunId(UUID.randomUUID()),
scheduler = scheduler,
logic = EmptyFlow, creationTime = clock.millis())
val sm2 = sm1.serialize(context = SerializationDefaults.CHECKPOINT_CONTEXT).deserialize(context = SerializationDefaults.CHECKPOINT_CONTEXT)
Fiber.unparkDeserialized(sm2, scheduler)
val comparator = FlowStateMachineComparator()
Assert.assertEquals(0, comparator.compare(sm1.task as Runnable, sm2.task as Runnable))
}
private class BlockerFlow : FlowLogic<Unit>() {
val barrier = CountDownLatch(1)
override fun call() {
barrier.await()
}
}
private class AddToListFlow(val list: MutableList<Long>) : FlowLogic<Unit>() {
override fun call() {
list += stateMachine.creationTime
}
}
private class TestFlowStateMachine(override val creationTime: Long, override val logic: FlowLogic<Unit>, scheduler: FiberScheduler) : FlowStateMachine<Unit>, Fiber<Unit>(scheduler) {
@Suspendable
@Throws(InterruptedException::class)
override fun run() {
logic.stateMachine = this
return logic.call()
}
override fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN {
throw NotImplementedError()
}
override fun initiateFlow(party: Party): FlowSession {
throw NotImplementedError()
}
override fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) {
throw NotImplementedError()
}
override fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>) {
throw NotImplementedError()
}
override fun <SUBFLOWRETURN> subFlow(subFlow: FlowLogic<SUBFLOWRETURN>): SUBFLOWRETURN {
throw NotImplementedError()
}
override fun flowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot? {
throw NotImplementedError()
}
override fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>) {
throw NotImplementedError()
}
override val serviceHub: ServiceHub
get() = throw NotImplementedError()
override val logger: Logger
get() = throw NotImplementedError()
override val id: StateMachineRunId
get() = throw NotImplementedError()
override val resultFuture: CordaFuture<Unit>
get() = throw NotImplementedError()
override val context: InvocationContext
get() = throw NotImplementedError()
override val ourIdentity: Party
get() = throw NotImplementedError()
override val ourSenderUUID: String?
get() = throw NotImplementedError()
}
@Test
fun `test executor`() {
val executor = MultiThreadedStateMachineExecutor(1)
val scheduler = FiberExecutorScheduler("TestScheduler", executor)
val clock = TestClock(Clock.systemUTC())
val blockerLogic = BlockerFlow()
val list: MutableList<Long> = Collections.synchronizedList(arrayListOf())
val blocker = TestFlowStateMachine(scheduler = scheduler,
logic = blockerLogic, creationTime = clock.millis())
val sm1 = TestFlowStateMachine(scheduler = scheduler,
logic = AddToListFlow(list), creationTime = clock.millis())
clock.advanceBy(Duration.ofSeconds(1))
val sm2 = TestFlowStateMachine(scheduler = scheduler,
logic = AddToListFlow(list), creationTime = clock.millis())
blocker.start()
sm2.start()
sm1.start()
blockerLogic.barrier.countDown()
Thread.sleep(1000)
executor.shutdown()
executor.awaitTermination(1, TimeUnit.HOURS)
assertEquals(2, list.size)
assertEquals(sm1.creationTime, list[0])
assertEquals(sm2.creationTime, list[1])
}
}