From a23b3f4b29d011c0c60057b3b3db742ff54e810a Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Thu, 12 Jul 2018 08:30:02 +0100 Subject: [PATCH] ENT-933 Prioritise flow completion. Executor prioritises flows created first. (#1255) * ENT-933 Prioritise flow completion. Executor prioritises flows created first. * Unit tests * More unit tests * Formatting --- .../corda/core/internal/FlowStateMachine.kt | 6 +- .../net/corda/node/internal/EnterpriseNode.kt | 9 +- .../FlowStateMachineComparator.kt | 23 +++ .../statemachine/FlowStateMachineImpl.kt | 3 +- .../MultiThreadedStateMachineExecutor.kt | 12 ++ .../FlowStateMachineComparatorTest.kt | 171 ++++++++++++++++++ 6 files changed, 215 insertions(+), 9 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparator.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineExecutor.kt create mode 100644 node/src/test/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparatorTest.kt diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt index f83db058e5..3abe03f707 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt @@ -15,7 +15,10 @@ import net.corda.core.DeleteForDJVM import net.corda.core.DoNotImplement import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext -import net.corda.core.flows.* +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.FlowStackSnapshot +import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party import net.corda.core.node.ServiceHub import org.slf4j.Logger @@ -51,4 +54,5 @@ interface FlowStateMachine { val context: InvocationContext val ourIdentity: Party val ourSenderUUID: String? + val creationTime: Long } diff --git a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt index 51b40d35c8..7044e40dc6 100644 --- a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt @@ -14,10 +14,8 @@ import com.codahale.metrics.MetricFilter import com.codahale.metrics.MetricRegistry import com.codahale.metrics.graphite.GraphiteReporter import com.codahale.metrics.graphite.PickledGraphite -import com.google.common.util.concurrent.ThreadFactoryBuilder import com.jcraft.jsch.JSch import com.jcraft.jsch.JSchException -import io.netty.util.concurrent.FastThreadLocalThread import net.corda.core.crypto.newSecureRandom import net.corda.core.identity.CordaX500Name import net.corda.core.internal.Emoji @@ -27,6 +25,7 @@ import net.corda.node.VersionInfo import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.RelayConfiguration +import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor import net.corda.node.services.statemachine.MultiThreadedStateMachineManager import net.corda.node.services.statemachine.StateMachineManager import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -35,7 +34,6 @@ import org.fusesource.jansi.AnsiConsole import java.io.IOException import java.net.Inet6Address import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit open class EnterpriseNode(configuration: NodeConfiguration, @@ -185,10 +183,7 @@ D""".trimStart() private fun makeStateMachineExecutorService(): ExecutorService { log.info("Multi-threaded state machine manager with ${configuration.enterpriseConfiguration.tuning.flowThreadPoolSize} threads.") - return Executors.newFixedThreadPool( - configuration.enterpriseConfiguration.tuning.flowThreadPoolSize, - ThreadFactoryBuilder().setNameFormat("flow-worker").setThreadFactory(::FastThreadLocalThread).build() - ) + return MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize) } override fun makeStateMachineManager(database: CordaPersistence): StateMachineManager { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparator.kt new file mode 100644 index 0000000000..fdfe1a9ee5 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparator.kt @@ -0,0 +1,23 @@ +package net.corda.node.services.statemachine + +import co.paralleluniverse.fibers.FiberSchedulerTask +import net.corda.core.internal.FlowStateMachine +import kotlin.math.sign + +class FlowStateMachineComparator : Comparator { + override fun compare(o1: Runnable, o2: Runnable): Int { + return if (o1 is FiberSchedulerTask) { + if (o2 is FiberSchedulerTask) { + (((o1.fiber as? FlowStateMachine<*>)?.creationTime + ?: Long.MAX_VALUE) - ((o2.fiber as? FlowStateMachine<*>)?.creationTime + ?: Long.MAX_VALUE)).sign + } else { + -1 + } + } else if (o2 is FiberSchedulerTask) { + 1 + } else { + 0 + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 7ca967bed8..4197630cac 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -50,7 +50,8 @@ class TransientReference(@Transient val value: A) class FlowStateMachineImpl(override val id: StateMachineRunId, override val logic: FlowLogic, - scheduler: FiberScheduler + scheduler: FiberScheduler, + override val creationTime: Long = System.currentTimeMillis() ) : Fiber(id.toString(), scheduler), FlowStateMachine, FlowFiber { companion object { /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineExecutor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineExecutor.kt new file mode 100644 index 0000000000..90f4b30d22 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineExecutor.kt @@ -0,0 +1,12 @@ +package net.corda.node.services.statemachine + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import io.netty.util.concurrent.FastThreadLocalThread +import java.util.concurrent.PriorityBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit + +class MultiThreadedStateMachineExecutor(poolSize: Int) : ThreadPoolExecutor(poolSize, poolSize, + 0L, TimeUnit.MILLISECONDS, + PriorityBlockingQueue(poolSize * 4, FlowStateMachineComparator()), + ThreadFactoryBuilder().setNameFormat("flow-worker").setThreadFactory(::FastThreadLocalThread).build()) \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparatorTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparatorTest.kt new file mode 100644 index 0000000000..1cf1a01c05 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowStateMachineComparatorTest.kt @@ -0,0 +1,171 @@ +package net.corda.node.services.statemachine + +import co.paralleluniverse.fibers.Fiber +import co.paralleluniverse.fibers.FiberExecutorScheduler +import co.paralleluniverse.fibers.FiberScheduler +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.concurrent.CordaFuture +import net.corda.core.context.InvocationContext +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.FlowStackSnapshot +import net.corda.core.flows.StateMachineRunId +import net.corda.core.identity.Party +import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.FlowStateMachine +import net.corda.core.node.ServiceHub +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.node.TestClock +import org.junit.Assert +import org.junit.Rule +import org.junit.Test +import org.slf4j.Logger +import java.time.Clock +import java.time.Duration +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executor +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals + +class FlowStateMachineComparatorTest { + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule() + + private object EmptyFlow : FlowLogic() { + override fun call() {} + } + + private object DummyExecutor : Executor { + override fun execute(command: Runnable?) {} + } + + @Test + fun `sort order of state machines is as expected`() { + val scheduler = FiberExecutorScheduler("TestScheduler", DummyExecutor) + val clock = TestClock(Clock.systemUTC()) + val sm1 = FlowStateMachineImpl(StateMachineRunId(UUID.randomUUID()), + scheduler = scheduler, + logic = EmptyFlow, creationTime = clock.millis()) + clock.advanceBy(Duration.ofSeconds(1)) + val sm2 = FlowStateMachineImpl(StateMachineRunId(UUID.randomUUID()), + scheduler = scheduler, + logic = EmptyFlow, creationTime = clock.millis()) + + val comparator = FlowStateMachineComparator() + Assert.assertEquals(-1, comparator.compare(sm1.task as Runnable, sm2.task as Runnable)) + Assert.assertEquals(0, comparator.compare(sm1.task as Runnable, sm1.task as Runnable)) + Assert.assertEquals(1, comparator.compare(sm2.task as Runnable, sm1.task as Runnable)) + } + + @Test + fun `serialized flow maintains creation time`() { + val scheduler = FiberExecutorScheduler("TestScheduler", DummyExecutor) + val clock = TestClock(Clock.systemUTC()) + clock.advanceBy(Duration.ofDays(1)) // Move this away from "now" to check that it's not a coincidence. + val sm1 = FlowStateMachineImpl(StateMachineRunId(UUID.randomUUID()), + scheduler = scheduler, + logic = EmptyFlow, creationTime = clock.millis()) + val sm2 = sm1.serialize(context = SerializationDefaults.CHECKPOINT_CONTEXT).deserialize(context = SerializationDefaults.CHECKPOINT_CONTEXT) + Fiber.unparkDeserialized(sm2, scheduler) + + val comparator = FlowStateMachineComparator() + Assert.assertEquals(0, comparator.compare(sm1.task as Runnable, sm2.task as Runnable)) + } + + private class BlockerFlow : FlowLogic() { + val barrier = CountDownLatch(1) + + override fun call() { + barrier.await() + } + } + + private class AddToListFlow(val list: MutableList) : FlowLogic() { + override fun call() { + list += stateMachine.creationTime + } + } + + private class TestFlowStateMachine(override val creationTime: Long, override val logic: FlowLogic, scheduler: FiberScheduler) : FlowStateMachine, Fiber(scheduler) { + @Suspendable + @Throws(InterruptedException::class) + override fun run() { + logic.stateMachine = this + return logic.call() + } + + override fun suspend(ioRequest: FlowIORequest, maySkipCheckpoint: Boolean): SUSPENDRETURN { + throw NotImplementedError() + } + + override fun initiateFlow(party: Party): FlowSession { + throw NotImplementedError() + } + + override fun checkFlowPermission(permissionName: String, extraAuditData: Map) { + throw NotImplementedError() + } + + override fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map) { + throw NotImplementedError() + } + + override fun subFlow(subFlow: FlowLogic): SUBFLOWRETURN { + throw NotImplementedError() + } + + override fun flowStackSnapshot(flowClass: Class>): FlowStackSnapshot? { + throw NotImplementedError() + } + + override fun persistFlowStackSnapshot(flowClass: Class>) { + throw NotImplementedError() + } + + override val serviceHub: ServiceHub + get() = throw NotImplementedError() + override val logger: Logger + get() = throw NotImplementedError() + override val id: StateMachineRunId + get() = throw NotImplementedError() + override val resultFuture: CordaFuture + get() = throw NotImplementedError() + override val context: InvocationContext + get() = throw NotImplementedError() + override val ourIdentity: Party + get() = throw NotImplementedError() + override val ourSenderUUID: String? + get() = throw NotImplementedError() + } + + @Test + fun `test executor`() { + val executor = MultiThreadedStateMachineExecutor(1) + val scheduler = FiberExecutorScheduler("TestScheduler", executor) + val clock = TestClock(Clock.systemUTC()) + val blockerLogic = BlockerFlow() + val list: MutableList = Collections.synchronizedList(arrayListOf()) + val blocker = TestFlowStateMachine(scheduler = scheduler, + logic = blockerLogic, creationTime = clock.millis()) + val sm1 = TestFlowStateMachine(scheduler = scheduler, + logic = AddToListFlow(list), creationTime = clock.millis()) + clock.advanceBy(Duration.ofSeconds(1)) + val sm2 = TestFlowStateMachine(scheduler = scheduler, + logic = AddToListFlow(list), creationTime = clock.millis()) + blocker.start() + sm2.start() + sm1.start() + blockerLogic.barrier.countDown() + Thread.sleep(1000) + executor.shutdown() + executor.awaitTermination(1, TimeUnit.HOURS) + assertEquals(2, list.size) + assertEquals(sm1.creationTime, list[0]) + assertEquals(sm2.creationTime, list[1]) + } +} \ No newline at end of file