From fe3c2b39839387f3a655024f26d96bc600b74d50 Mon Sep 17 00:00:00 2001 From: Andrzej Cichocki Date: Tue, 2 Jan 2018 13:11:43 +0000 Subject: [PATCH] CORDA-891 Refactoring for #2273 (#2306) * Make FlowLogicRefFactoryImpl a class. * Replace instanceof with polymorphism. * Fix out-of-scope spelling error. --- .../net/corda/core/flows/FlowLogicRef.kt | 2 ++ .../net/corda/node/internal/AbstractNode.kt | 27 ++++++++++++------- .../net/corda/node/internal/CordaClock.kt | 10 ++++--- .../kotlin/net/corda/node/internal/Node.kt | 3 +-- .../node/services/api/ServiceHubInternal.kt | 8 +----- .../services/events/NodeSchedulerService.kt | 24 ++++++++--------- .../events/ScheduledActivityObserver.kt | 10 +++---- .../statemachine/FlowLogicRefFactoryImpl.kt | 10 +++---- .../events/FlowLogicRefFromJavaTest.java | 6 +++-- .../node/services/events/FlowLogicRefTest.kt | 17 ++++++------ .../events/NodeSchedulerServiceTest.kt | 5 ++-- .../corda/node/utilities/ClockUtilsTest.kt | 6 +++-- 12 files changed, 69 insertions(+), 59 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt index a3043709d0..4768ff9259 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt @@ -11,6 +11,8 @@ import net.corda.core.serialization.CordaSerializable @DoNotImplement interface FlowLogicRefFactory { fun create(flowClass: Class>, vararg args: Any?): FlowLogicRef + fun createForRPC(flowClass: Class>, vararg args: Any?): FlowLogicRef + fun toFlowLogic(ref: FlowLogicRef): FlowLogic<*> } @CordaSerializable diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index c2ef31ae33..064b450dea 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -104,7 +104,7 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair // In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the // AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in. abstract class AbstractNode(val configuration: NodeConfiguration, - val platformClock: Clock, + val platformClock: CordaClock, protected val versionInfo: VersionInfo, protected val cordappLoader: CordappLoader, private val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() { @@ -217,14 +217,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, database, info, identityService, networkMapCache) val notaryService = makeNotaryService(nodeServices, database) val smm = makeStateMachineManager(database) - val flowStarter = FlowStarterImpl(serverThread, smm) + val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader) + val flowStarter = FlowStarterImpl(serverThread, smm, flowLogicRefFactory) val schedulerService = NodeSchedulerService( platformClock, database, flowStarter, transactionStorage, unfinishedSchedules = busyNodeLatch, - serverThread = serverThread) + serverThread = serverThread, + flowLogicRefFactory = flowLogicRefFactory) if (serverThread is ExecutorService) { runOnStop += { // We wait here, even though any in-flight messages should have been drained away because the @@ -233,7 +235,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS) } } - makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService) + makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory) val rpcOps = makeRPCOps(flowStarter, database, smm) startMessagingService(rpcOps) installCoreFlows() @@ -241,7 +243,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, tokenizableServices = nodeServices + cordaServices + schedulerService registerCordappFlows(smm) _services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } - FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader startShell(rpcOps) Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) } @@ -558,10 +559,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } protected open fun makeTransactionStorage(database: CordaPersistence): WritableTransactionStorage = DBTransactionStorage() - - private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager, schemaService: SchemaService) { + private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager, schemaService: SchemaService, flowLogicRefFactory: FlowLogicRefFactory) { VaultSoftLockManager.install(services.vaultService, smm) - ScheduledActivityObserver.install(services.vaultService, schedulerService) + ScheduledActivityObserver.install(services.vaultService, schedulerService, flowLogicRefFactory) HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService) } @@ -820,10 +820,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } -internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager) : FlowStarter { +internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter { override fun startFlow(logic: FlowLogic, context: InvocationContext): CordaFuture> { return serverThread.fetchFrom { smm.startFlow(logic, context) } } + + override fun invokeFlowAsync( + logicType: Class>, + context: InvocationContext, + vararg args: Any?): CordaFuture> { + val logicRef = flowLogicRefFactory.createForRPC(logicType, *args) + val logic: FlowLogic = uncheckedCast(flowLogicRefFactory.toFlowLogic(logicRef)) + return startFlow(logic, context) + } } class ConfigurationException(message: String) : CordaException(message) diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaClock.kt b/node/src/main/kotlin/net/corda/node/internal/CordaClock.kt index cd86529f67..c08961ee77 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaClock.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaClock.kt @@ -22,10 +22,15 @@ abstract class CordaClock : Clock(), SerializeAsToken { override fun getZone(): ZoneId = delegateClock.zone @Deprecated("Do not use this. Instead seek to use ZonedDateTime methods.", level = DeprecationLevel.ERROR) override fun withZone(zone: ZoneId) = throw UnsupportedOperationException("Tokenized clock does not support withZone()") + + /** This is an observer on the mutation count of this [Clock], which reflects the occurrence of mutations. */ + abstract val mutations: Observable } @ThreadSafe -class SimpleClock(override val delegateClock: Clock) : CordaClock() +class SimpleClock(override val delegateClock: Clock) : CordaClock() { + override val mutations: Observable = Observable.never() +} /** * An abstract class with helper methods for a type of Clock that might have it's concept of "now" adjusted externally. @@ -38,8 +43,7 @@ abstract class MutableClock(private var _delegateClock: Clock) : CordaClock() { _delegateClock = clock } private val _version = AtomicLong(0L) - /** This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations. */ - val mutations: Observable by lazy { + override val mutations: Observable by lazy { Observable.create { subscriber: Subscriber -> if (!subscriber.isUnsubscribed) { mutationObservers.add(subscriber) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 201f31a20b..69faba369f 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -2,7 +2,6 @@ package net.corda.node.internal import com.codahale.metrics.JmxReporter import net.corda.core.concurrent.CordaFuture -import net.corda.core.context.AuthServiceId import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.thenMatch import net.corda.core.internal.uncheckedCast @@ -67,7 +66,7 @@ open class Node(configuration: NodeConfiguration, exitProcess(1) } - private fun createClock(configuration: NodeConfiguration): Clock { + private fun createClock(configuration: NodeConfiguration): CordaClock { return (if (configuration.useTestClock) ::DemoClock else ::SimpleClock)(Clock.systemUTC()) } diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index e9794842d6..7a4f2f669d 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -6,7 +6,6 @@ import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.FlowStateMachine -import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.DataFeed import net.corda.core.messaging.StateMachineTransactionMapping import net.corda.core.node.NodeInfo @@ -21,7 +20,6 @@ import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.internal.cordapp.CordappProviderInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.MessagingService -import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -137,11 +135,7 @@ interface FlowStarter { fun invokeFlowAsync( logicType: Class>, context: InvocationContext, - vararg args: Any?): CordaFuture> { - val logicRef = FlowLogicRefFactoryImpl.createForRPC(logicType, *args) - val logic: FlowLogic = uncheckedCast(FlowLogicRefFactoryImpl.toFlowLogic(logicRef)) - return startFlow(logic, context) - } + vararg args: Any?): CordaFuture> } interface StartedNodeServices : ServiceHubInternal, FlowStarter diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 31badf6ec0..8c18043db7 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -10,6 +10,7 @@ import net.corda.core.contracts.ScheduledStateRef import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.internal.ThreadBox import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.concurrent.flatMap @@ -19,10 +20,10 @@ import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace +import net.corda.node.internal.CordaClock import net.corda.node.internal.MutableClock import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.SchedulerService -import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -55,13 +56,14 @@ import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture * activity. Only replace this for unit testing purposes. This is not the executor the [FlowLogic] is launched on. */ @ThreadSafe -class NodeSchedulerService(private val clock: Clock, +class NodeSchedulerService(private val clock: CordaClock, private val database: CordaPersistence, private val flowStarter: FlowStarter, private val stateLoader: StateLoader, private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(), private val unfinishedSchedules: ReusableLatch = ReusableLatch(), - private val serverThread: AffinityExecutor) + private val serverThread: AffinityExecutor, + private val flowLogicRefFactory: FlowLogicRefFactory) : SchedulerService, SingletonSerializeAsToken() { companion object { @@ -78,16 +80,12 @@ class NodeSchedulerService(private val clock: Clock, @Suspendable @VisibleForTesting // We specify full classpath on SettableFuture to differentiate it from the Quasar class of the same name - fun awaitWithDeadline(clock: Clock, deadline: Instant, future: Future<*> = GuavaSettableFuture.create()): Boolean { + fun awaitWithDeadline(clock: CordaClock, deadline: Instant, future: Future<*> = GuavaSettableFuture.create()): Boolean { var nanos: Long do { val originalFutureCompleted = makeStrandFriendlySettableFuture(future) - val subscription = if (clock is MutableClock) { - clock.mutations.first().subscribe { - originalFutureCompleted.set(false) - } - } else { - null + val subscription = clock.mutations.first().subscribe { + originalFutureCompleted.set(false) } nanos = (clock.instant() until deadline).toNanos() if (nanos > 0) { @@ -102,7 +100,7 @@ class NodeSchedulerService(private val clock: Clock, // No need to take action as will fall out of the loop due to future.isDone } } - subscription?.unsubscribe() + subscription.unsubscribe() originalFutureCompleted.cancel(false) } while (nanos > 0 && !future.isDone) return future.isDone @@ -279,7 +277,7 @@ class NodeSchedulerService(private val clock: Clock, scheduledStatesQueue.remove(scheduledState) scheduledStatesQueue.add(newState) } else { - val flowLogic = FlowLogicRefFactoryImpl.toFlowLogic(scheduledActivity.logicRef) + val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef) log.trace { "Scheduler starting FlowLogic $flowLogic" } scheduledFlow = flowLogic scheduledStates.remove(scheduledState.ref) @@ -297,7 +295,7 @@ class NodeSchedulerService(private val clock: Clock, val state = txState.data as SchedulableState return try { // This can throw as running contract code. - state.nextScheduledActivity(scheduledState.ref, FlowLogicRefFactoryImpl) + state.nextScheduledActivity(scheduledState.ref, flowLogicRefFactory) } catch (e: Exception) { log.error("Attempt to run scheduled state $scheduledState resulted in error.", e) null diff --git a/node/src/main/kotlin/net/corda/node/services/events/ScheduledActivityObserver.kt b/node/src/main/kotlin/net/corda/node/services/events/ScheduledActivityObserver.kt index 3020a9e528..2c3e939c98 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/ScheduledActivityObserver.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/ScheduledActivityObserver.kt @@ -4,19 +4,19 @@ import net.corda.core.contracts.ContractState import net.corda.core.contracts.SchedulableState import net.corda.core.contracts.ScheduledStateRef import net.corda.core.contracts.StateAndRef +import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.node.services.VaultService import net.corda.node.services.api.SchedulerService -import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl /** * This observes the vault and schedules and unschedules activities appropriately based on state production and * consumption. */ -class ScheduledActivityObserver private constructor(private val schedulerService: SchedulerService) { +class ScheduledActivityObserver private constructor(private val schedulerService: SchedulerService, private val FlowLogicRefFactory: FlowLogicRefFactory) { companion object { @JvmStatic - fun install(vaultService: VaultService, schedulerService: SchedulerService) { - val observer = ScheduledActivityObserver(schedulerService) + fun install(vaultService: VaultService, schedulerService: SchedulerService, flowLogicRefFactory: FlowLogicRefFactory) { + val observer = ScheduledActivityObserver(schedulerService, flowLogicRefFactory) vaultService.rawUpdates.subscribe { (consumed, produced) -> consumed.forEach { schedulerService.unscheduleStateActivity(it.ref) } produced.forEach { observer.scheduleStateActivity(it) } @@ -32,7 +32,7 @@ class ScheduledActivityObserver private constructor(private val schedulerService private fun scheduleStateActivity(produced: StateAndRef) { val producedState = produced.state.data if (producedState is SchedulableState) { - val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, FlowLogicRefFactoryImpl)?.scheduledAt } ?: return + val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, FlowLogicRefFactory)?.scheduledAt } ?: return schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt)) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt index f0f10ebed4..1b20e75c5b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt @@ -31,10 +31,8 @@ data class FlowLogicRefImpl internal constructor(val flowLogicClassName: String, * measure we might want the ability for the node admin to blacklist a flow such that it moves immediately to the "Flow Hospital" * in response to a potential malicious use or buggy update to an app etc. */ -object FlowLogicRefFactoryImpl : SingletonSerializeAsToken(), FlowLogicRefFactory { - // TODO: Replace with a per app classloader/cordapp provider/cordapp loader - this will do for now - var classloader: ClassLoader = javaClass.classLoader - +// TODO: Replace with a per app classloader/cordapp provider/cordapp loader - this will do for now +class FlowLogicRefFactoryImpl(private val classloader: ClassLoader) : SingletonSerializeAsToken(), FlowLogicRefFactory { override fun create(flowClass: Class>, vararg args: Any?): FlowLogicRef { if (!flowClass.isAnnotationPresent(SchedulableFlow::class.java)) { throw IllegalFlowLogicException(flowClass, "because it's not a schedulable flow") @@ -42,7 +40,7 @@ object FlowLogicRefFactoryImpl : SingletonSerializeAsToken(), FlowLogicRefFactor return createForRPC(flowClass, *args) } - fun createForRPC(flowClass: Class>, vararg args: Any?): FlowLogicRef { + override fun createForRPC(flowClass: Class>, vararg args: Any?): FlowLogicRef { // TODO: This is used via RPC but it's probably better if we pass in argument names and values explicitly // to avoid requiring only a single constructor. val argTypes = args.map { it?.javaClass } @@ -81,7 +79,7 @@ object FlowLogicRefFactoryImpl : SingletonSerializeAsToken(), FlowLogicRefFactor return FlowLogicRefImpl(type.name, args) } - fun toFlowLogic(ref: FlowLogicRef): FlowLogic<*> { + override fun toFlowLogic(ref: FlowLogicRef): FlowLogic<*> { if (ref !is FlowLogicRefImpl) throw IllegalFlowLogicException(ref.javaClass, "FlowLogicRef was not created via correct FlowLogicRefFactory interface") val klass = Class.forName(ref.flowLogicClassName, true, classloader).asSubclass(FlowLogic::class.java) return createConstructor(klass, ref.args)() diff --git a/node/src/test/java/net/corda/node/services/events/FlowLogicRefFromJavaTest.java b/node/src/test/java/net/corda/node/services/events/FlowLogicRefFromJavaTest.java index 5e0a69cbad..0f83de90cf 100644 --- a/node/src/test/java/net/corda/node/services/events/FlowLogicRefFromJavaTest.java +++ b/node/src/test/java/net/corda/node/services/events/FlowLogicRefFromJavaTest.java @@ -48,13 +48,15 @@ public class FlowLogicRefFromJavaTest { } } + private final FlowLogicRefFactoryImpl flowLogicRefFactory = new FlowLogicRefFactoryImpl(FlowLogicRefFactoryImpl.class.getClassLoader()); + @Test public void test() { - FlowLogicRefFactoryImpl.INSTANCE.createForRPC(JavaFlowLogic.class, new ParamType1(1), new ParamType2("Hello Jack")); + flowLogicRefFactory.createForRPC(JavaFlowLogic.class, new ParamType1(1), new ParamType2("Hello Jack")); } @Test public void testNoArg() { - FlowLogicRefFactoryImpl.INSTANCE.createForRPC(JavaNoArgFlowLogic.class); + flowLogicRefFactory.createForRPC(JavaNoArgFlowLogic.class); } } diff --git a/node/src/test/kotlin/net/corda/node/services/events/FlowLogicRefTest.kt b/node/src/test/kotlin/net/corda/node/services/events/FlowLogicRefTest.kt index ae6a5b7680..3794a39ebf 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/FlowLogicRefTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/FlowLogicRefTest.kt @@ -34,47 +34,48 @@ class FlowLogicRefTest { override fun call() = Unit } + private val flowLogicRefFactory = FlowLogicRefFactoryImpl(FlowLogicRefFactoryImpl::class.java.classLoader) @Test fun `create kotlin no arg`() { - FlowLogicRefFactoryImpl.createForRPC(KotlinNoArgFlowLogic::class.java) + flowLogicRefFactory.createForRPC(KotlinNoArgFlowLogic::class.java) } @Test fun `create kotlin`() { val args = mapOf(Pair("A", ParamType1(1)), Pair("b", ParamType2("Hello Jack"))) - FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args) + flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, args) } @Test fun `create primary`() { - FlowLogicRefFactoryImpl.createForRPC(KotlinFlowLogic::class.java, ParamType1(1), ParamType2("Hello Jack")) + flowLogicRefFactory.createForRPC(KotlinFlowLogic::class.java, ParamType1(1), ParamType2("Hello Jack")) } @Test fun `create kotlin void`() { - FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, emptyMap()) + flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, emptyMap()) } @Test fun `create kotlin non primary`() { val args = mapOf(Pair("C", ParamType2("Hello Jack"))) - FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args) + flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, args) } @Test fun `create java primitive no registration required`() { val args = mapOf(Pair("primitive", "A string")) - FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args) + flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, args) } @Test fun `create kotlin primitive no registration required`() { val args = mapOf(Pair("kotlinType", 3)) - FlowLogicRefFactoryImpl.createKotlin(KotlinFlowLogic::class.java, args) + flowLogicRefFactory.createKotlin(KotlinFlowLogic::class.java, args) } @Test(expected = IllegalFlowLogicException::class) fun `create for non-schedulable flow logic`() { - FlowLogicRefFactoryImpl.create(NonSchedulableFlow::class.java) + flowLogicRefFactory.create(NonSchedulableFlow::class.java) } } diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index d2998cf641..f771b5a197 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -63,6 +63,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { @Rule @JvmField val testSerialization = SerializationEnvironmentRule(true) + private val flowLogicRefFactory = FlowLogicRefFactoryImpl(FlowLogicRefFactoryImpl::class.java.classLoader) private val realClock: Clock = Clock.systemUTC() private val stoppedClock: Clock = Clock.fixed(realClock.instant(), realClock.zone) private val testClock = TestClock(stoppedClock) @@ -121,7 +122,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { } smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) mockSMM = StateMachineManagerImpl(services, DBCheckpointStorage(), smmExecutor, database) - scheduler = NodeSchedulerService(testClock, database, FlowStarterImpl(smmExecutor, mockSMM), validatedTransactions, schedulerGatedExecutor, serverThread = smmExecutor) + scheduler = NodeSchedulerService(testClock, database, FlowStarterImpl(smmExecutor, mockSMM, flowLogicRefFactory), validatedTransactions, schedulerGatedExecutor, serverThread = smmExecutor, flowLogicRefFactory = flowLogicRefFactory) mockSMM.changes.subscribe { change -> if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) { smmHasRemovedAllFlows.countDown() @@ -304,7 +305,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { database.transaction { apply { val freshKey = kms.freshKey() - val state = TestState(FlowLogicRefFactoryImpl.createForRPC(TestFlowLogic::class.java, increment), instant, DUMMY_IDENTITY_1.party) + val state = TestState(flowLogicRefFactory.createForRPC(TestFlowLogic::class.java, increment), instant, DUMMY_IDENTITY_1.party) val builder = TransactionBuilder(null).apply { addOutputState(state, DummyContract.PROGRAM_ID, DUMMY_NOTARY) addCommand(Command(), freshKey) diff --git a/node/src/test/kotlin/net/corda/node/utilities/ClockUtilsTest.kt b/node/src/test/kotlin/net/corda/node/utilities/ClockUtilsTest.kt index 783564a664..d8dbfac65f 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/ClockUtilsTest.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/ClockUtilsTest.kt @@ -8,6 +8,8 @@ import com.google.common.util.concurrent.SettableFuture import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.hours import net.corda.core.utilities.minutes +import net.corda.node.internal.CordaClock +import net.corda.node.internal.SimpleClock import net.corda.node.services.events.NodeSchedulerService import net.corda.testing.node.TestClock import org.junit.After @@ -25,13 +27,13 @@ import kotlin.test.fail class ClockUtilsTest { lateinit var realClock: Clock - lateinit var stoppedClock: Clock + lateinit var stoppedClock: CordaClock lateinit var executor: ExecutorService @Before fun setup() { realClock = Clock.systemUTC() - stoppedClock = Clock.fixed(realClock.instant(), realClock.zone) + stoppedClock = SimpleClock(Clock.fixed(realClock.instant(), realClock.zone)) executor = Executors.newSingleThreadExecutor() }