From 9d66214f4a97710109df3baecb1244013c7d41ea Mon Sep 17 00:00:00 2001 From: Andrzej Cichocki Date: Fri, 5 Jan 2018 16:34:03 +0000 Subject: [PATCH] CORDA-891 Convert NodeSchedulerServiceTest into a unit test (#2273) --- .../services/events/NodeSchedulerService.kt | 38 +- .../corda/node/utilities/AffinityExecutor.kt | 29 -- .../events/NodeSchedulerServiceTest.kt | 365 ++++++------------ .../testing/internal/InternalTestUtils.kt | 4 + 4 files changed, 134 insertions(+), 302 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 8c18043db7..372ef392b8 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -14,6 +14,7 @@ import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.internal.ThreadBox import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.concurrent.flatMap +import net.corda.core.internal.join import net.corda.core.internal.until import net.corda.core.node.StateLoader import net.corda.core.schemas.PersistentStateRef @@ -24,12 +25,11 @@ import net.corda.node.internal.CordaClock import net.corda.node.internal.MutableClock import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.SchedulerService -import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import org.apache.activemq.artemis.utils.ReusableLatch -import java.time.Clock +import org.slf4j.Logger import java.time.Instant import java.util.* import java.util.concurrent.* @@ -51,23 +51,21 @@ import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture * is the outcome of the activity in order to schedule another activity. Once we have implemented more persistence * in the nodes, maybe we can consider multiple activities and whether the activities have been completed or not, * but that starts to sound a lot like off-ledger state. - * - * @param schedulerTimerExecutor The executor the scheduler blocks on waiting for the clock to advance to the next - * activity. Only replace this for unit testing purposes. This is not the executor the [FlowLogic] is launched on. */ @ThreadSafe class NodeSchedulerService(private val clock: CordaClock, private val database: CordaPersistence, private val flowStarter: FlowStarter, private val stateLoader: StateLoader, - private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(), private val unfinishedSchedules: ReusableLatch = ReusableLatch(), - private val serverThread: AffinityExecutor, - private val flowLogicRefFactory: FlowLogicRefFactory) + private val serverThread: Executor, + private val flowLogicRefFactory: FlowLogicRefFactory, + private val log: Logger = staticLog, + scheduledStates: MutableMap = createMap()) : SchedulerService, SingletonSerializeAsToken() { companion object { - private val log = contextLogger() + private val staticLog get() = contextLogger() /** * Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations * used in demos or testing. This will substitute a Fiber compatible Future so the current @@ -131,7 +129,7 @@ class NodeSchedulerService(private val clock: CordaClock, * or [Throwable] is available in the original. * * We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context - * switch. There's no need to checkpoint our [Fiber]s as there's no external effect of waiting. + * switch. There's no need to checkpoint our [co.paralleluniverse.fibers.Fiber]s as there's no external effect of waiting. */ private fun makeStrandFriendlySettableFuture(future: Future) = QuasarSettableFuture().also { g -> when (future) { @@ -140,6 +138,9 @@ class NodeSchedulerService(private val clock: CordaClock, else -> throw IllegalArgumentException("Cannot make future $future Strand friendly.") } } + + @VisibleForTesting + internal val schedulingAsNextFormat = "Scheduling as next {}" } @Entity @@ -152,20 +153,17 @@ class NodeSchedulerService(private val clock: CordaClock, var scheduledAt: Instant = Instant.now() ) - private class InnerState { - var scheduledStates = createMap() - + private class InnerState(var scheduledStates: MutableMap) { var scheduledStatesQueue: PriorityQueue = PriorityQueue({ a, b -> a.scheduledAt.compareTo(b.scheduledAt) }) var rescheduled: GuavaSettableFuture? = null } - private val mutex = ThreadBox(InnerState()) - + private val mutex = ThreadBox(InnerState(scheduledStates)) // We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow. fun start() { mutex.locked { - scheduledStatesQueue.addAll(scheduledStates.all().map { it.second }.toMutableList()) + scheduledStatesQueue.addAll(scheduledStates.values) rescheduleWakeUp() } } @@ -206,6 +204,7 @@ class NodeSchedulerService(private val clock: CordaClock, } } + private val schedulerTimerExecutor = Executors.newSingleThreadExecutor() /** * This method first cancels the [java.util.concurrent.Future] for any pending action so that the * [awaitWithDeadline] used below drops through without running the action. We then create a new @@ -223,7 +222,7 @@ class NodeSchedulerService(private val clock: CordaClock, } if (scheduledState != null) { schedulerTimerExecutor.execute { - log.trace { "Scheduling as next $scheduledState" } + log.trace(schedulingAsNextFormat, scheduledState) // This will block the scheduler single thread until the scheduled time (returns false) OR // the Future is cancelled due to rescheduling (returns true). if (!awaitWithDeadline(clock, scheduledState.scheduledAt, ourRescheduledFuture)) { @@ -236,6 +235,11 @@ class NodeSchedulerService(private val clock: CordaClock, } } + @VisibleForTesting + internal fun join() { + schedulerTimerExecutor.join() + } + private fun onTimeReached(scheduledState: ScheduledStateRef) { serverThread.execute { var flowName: String? = "(unknown)" diff --git a/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt b/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt index 89c7e68716..096a958a22 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt @@ -1,11 +1,9 @@ package net.corda.node.utilities import com.google.common.util.concurrent.SettableFuture -import com.google.common.util.concurrent.Uninterruptibles import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.Executor -import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.function.Supplier @@ -83,31 +81,4 @@ interface AffinityExecutor : Executor { } while (!f.get()) } } - - /** - * An executor useful for unit tests: allows the current thread to block until a command arrives from another - * thread, which is then executed. Inbound closures/commands stack up until they are cleared by looping. - * - * @param alwaysQueue If true, executeASAP will never short-circuit and will always queue up. - */ - class Gate(private val alwaysQueue: Boolean = false) : AffinityExecutor { - private val thisThread = Thread.currentThread() - private val commandQ = LinkedBlockingQueue() - - override val isOnThread: Boolean - get() = !alwaysQueue && Thread.currentThread() === thisThread - - override fun execute(command: Runnable) { - Uninterruptibles.putUninterruptibly(commandQ, command) - } - - fun waitAndRun() { - val runnable = Uninterruptibles.takeUninterruptibly(commandQ) - runnable.run() - } - - override fun flush() { - throw UnsupportedOperationException() - } - } } diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index f771b5a197..de82ec5bbf 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -1,323 +1,176 @@ package net.corda.node.services.events -import co.paralleluniverse.fibers.Suspendable -import com.codahale.metrics.MetricRegistry +import com.google.common.util.concurrent.MoreExecutors import com.nhaarman.mockito_kotlin.* import net.corda.core.contracts.* -import net.corda.core.crypto.generateKeyPair import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRef import net.corda.core.flows.FlowLogicRefFactory -import net.corda.core.identity.AbstractParty -import net.corda.core.identity.CordaX500Name -import net.corda.core.identity.Party -import net.corda.core.internal.concurrent.doneFuture -import net.corda.core.node.NodeInfo -import net.corda.core.node.ServiceHub -import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.transactions.TransactionBuilder -import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.days -import net.corda.node.internal.FlowStarterImpl -import net.corda.node.internal.cordapp.CordappLoader -import net.corda.node.internal.cordapp.CordappProviderImpl -import net.corda.node.services.api.MonitoringService -import net.corda.node.services.api.ServiceHubInternal -import net.corda.node.services.persistence.DBCheckpointStorage -import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl -import net.corda.node.services.statemachine.StateMachineManager -import net.corda.node.services.statemachine.StateMachineManagerImpl -import net.corda.node.services.vault.NodeVaultService -import net.corda.node.utilities.AffinityExecutor -import net.corda.node.internal.configureDatabase -import net.corda.node.services.api.NetworkMapCacheInternal -import net.corda.node.services.config.NodeConfiguration -import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.testing.* -import net.corda.testing.contracts.DummyContract import net.corda.testing.internal.rigorousMock -import net.corda.testing.node.* -import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties -import net.corda.testing.services.MockAttachmentStorage -import org.assertj.core.api.Assertions.assertThat -import org.junit.After -import org.junit.Before +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.uncheckedCast +import net.corda.core.node.StateLoader +import net.corda.node.services.api.FlowStarter +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.DatabaseTransaction +import net.corda.testing.internal.doLookup +import net.corda.testing.node.TestClock import org.junit.Rule import org.junit.Test +import org.junit.rules.TestWatcher +import org.junit.runner.Description +import org.slf4j.Logger import java.time.Clock import java.time.Instant -import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import kotlin.test.assertTrue -class NodeSchedulerServiceTest : SingletonSerializeAsToken() { - private companion object { - val ALICE_KEY = TestIdentity(ALICE_NAME, 70).keyPair - val DUMMY_IDENTITY_1 = getTestPartyAndCertificate(Party(CordaX500Name("Dummy", "Madrid", "ES"), generateKeyPair().public)) - val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20).party - val myInfo = NodeInfo(listOf(NetworkHostAndPort("mockHost", 30000)), listOf(DUMMY_IDENTITY_1), 1, serial = 1L) +class NodeSchedulerServiceTest { + private val mark = Instant.now() + private val testClock = TestClock(rigorousMock().also { + doReturn(mark).whenever(it).instant() + }) + private val database = rigorousMock().also { + doAnswer { + val block: DatabaseTransaction.() -> Any? = uncheckedCast(it.arguments[0]) + rigorousMock().block() + }.whenever(it).transaction(any()) } - + private val flowStarter = rigorousMock().also { + doReturn(openFuture>()).whenever(it).startFlow(any>(), any()) + } + private val transactionStates = mutableMapOf>() + private val stateLoader = rigorousMock().also { + doLookup(transactionStates).whenever(it).loadState(any()) + } + private val flows = mutableMapOf>() + private val flowLogicRefFactory = rigorousMock().also { + doLookup(flows).whenever(it).toFlowLogic(any()) + } + private val log = rigorousMock().also { + doReturn(false).whenever(it).isTraceEnabled + doNothing().whenever(it).trace(any(), any()) + } + private val scheduler = NodeSchedulerService( + testClock, + database, + flowStarter, + stateLoader, + serverThread = MoreExecutors.directExecutor(), + flowLogicRefFactory = flowLogicRefFactory, + log = log, + scheduledStates = mutableMapOf()).apply { start() } @Rule @JvmField - val testSerialization = SerializationEnvironmentRule(true) - private val flowLogicRefFactory = FlowLogicRefFactoryImpl(FlowLogicRefFactoryImpl::class.java.classLoader) - private val realClock: Clock = Clock.systemUTC() - private val stoppedClock: Clock = Clock.fixed(realClock.instant(), realClock.zone) - private val testClock = TestClock(stoppedClock) - - private val schedulerGatedExecutor = AffinityExecutor.Gate(true) - - abstract class Services : ServiceHubInternal, TestReference - - private lateinit var services: Services - private lateinit var scheduler: NodeSchedulerService - private lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor - private lateinit var database: CordaPersistence - private lateinit var countDown: CountDownLatch - private lateinit var smmHasRemovedAllFlows: CountDownLatch - private lateinit var kms: MockKeyManagementService - private lateinit var mockSMM: StateMachineManager - var calls: Int = 0 - - /** - * Have a reference to this test added to [ServiceHub] so that when the [FlowLogic] runs it can access the test instance. - * The [TestState] is serialized and deserialized so attempting to use a transient field won't work, as it just - * results in NPE. - */ - interface TestReference { - val testReference: NodeSchedulerServiceTest - } - - @Before - fun setup() { - countDown = CountDownLatch(1) - smmHasRemovedAllFlows = CountDownLatch(1) - calls = 0 - val dataSourceProps = makeTestDataSourceProperties() - database = configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock()) - val identityService = makeTestIdentityService() - kms = MockKeyManagementService(identityService, ALICE_KEY) - val configuration = rigorousMock().also { - doReturn(true).whenever(it).devMode - doReturn(null).whenever(it).devModeOptions - } - val validatedTransactions = MockTransactionStorage() - database.transaction { - services = rigorousMock().also { - doReturn(configuration).whenever(it).configuration - doReturn(MonitoringService(MetricRegistry())).whenever(it).monitoringService - doReturn(validatedTransactions).whenever(it).validatedTransactions - doReturn(rigorousMock().also { - doReturn(doneFuture(null)).whenever(it).nodeReady - }).whenever(it).networkMapCache - doReturn(myInfo).whenever(it).myInfo - doReturn(kms).whenever(it).keyManagementService - doReturn(CordappProviderImpl(CordappLoader.createWithTestPackages(listOf("net.corda.testing.contracts")), MockAttachmentStorage())).whenever(it).cordappProvider - doReturn(NodeVaultService(testClock, kms, validatedTransactions, database.hibernateConfig)).whenever(it).vaultService - doReturn(this@NodeSchedulerServiceTest).whenever(it).testReference - - } - smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) - mockSMM = StateMachineManagerImpl(services, DBCheckpointStorage(), smmExecutor, database) - scheduler = NodeSchedulerService(testClock, database, FlowStarterImpl(smmExecutor, mockSMM, flowLogicRefFactory), validatedTransactions, schedulerGatedExecutor, serverThread = smmExecutor, flowLogicRefFactory = flowLogicRefFactory) - mockSMM.changes.subscribe { change -> - if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) { - smmHasRemovedAllFlows.countDown() - } - } - mockSMM.start(emptyList()) - scheduler.start() + val tearDown = object : TestWatcher() { + override fun succeeded(description: Description) { + scheduler.join() + verifyNoMoreInteractions(flowStarter) } } - private var allowedUnsuspendedFiberCount = 0 - @After - fun tearDown() { - // We need to make sure the StateMachineManager is done before shutting down executors. - if (mockSMM.allStateMachines.isNotEmpty()) { - smmHasRemovedAllFlows.await() - } - smmExecutor.shutdown() - smmExecutor.awaitTermination(60, TimeUnit.SECONDS) - database.close() - mockSMM.stop(allowedUnsuspendedFiberCount) + private class Event(time: Instant) { + val stateRef = rigorousMock() + val flowLogic = rigorousMock>() + val ssr = ScheduledStateRef(stateRef, time) } - // Ignore IntelliJ when it says these properties can be private, if they are we cannot serialise them - // in AMQP. - @Suppress("MemberVisibilityCanPrivate") - class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant, val myIdentity: Party) : LinearState, SchedulableState { - override val participants: List - get() = listOf(myIdentity) - - override val linearId = UniqueIdentifier() - - override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? { - return ScheduledActivity(flowLogicRef, instant) + private fun schedule(time: Instant) = Event(time).apply { + val logicRef = rigorousMock() + transactionStates[stateRef] = rigorousMock>().also { + doReturn(rigorousMock().also { + doReturn(ScheduledActivity(logicRef, time)).whenever(it).nextScheduledActivity(same(stateRef)!!, any()) + }).whenever(it).data } + flows[logicRef] = flowLogic + scheduler.scheduleStateActivity(ssr) } - class TestFlowLogic(private val increment: Int = 1) : FlowLogic() { - @Suspendable - override fun call() { - (serviceHub as TestReference).testReference.calls += increment - (serviceHub as TestReference).testReference.countDown.countDown() - } + private fun assertWaitingFor(event: Event, total: Int = 1) { + // The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace: + verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, event.ssr) } - class Command : TypeOnlyCommandData() + private fun assertStarted(event: Event) { + // Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow: + verify(flowStarter, timeout(5000)).startFlow(same(event.flowLogic)!!, any()) + } @Test fun `test activity due now`() { - val time = stoppedClock.instant() - scheduleTX(time) - - assertThat(calls).isEqualTo(0) - schedulerGatedExecutor.waitAndRun() - countDown.await() - assertThat(calls).isEqualTo(1) + assertStarted(schedule(mark)) } @Test fun `test activity due in the past`() { - val time = stoppedClock.instant() - 1.days - scheduleTX(time) - - assertThat(calls).isEqualTo(0) - schedulerGatedExecutor.waitAndRun() - countDown.await() - assertThat(calls).isEqualTo(1) + assertStarted(schedule(mark - 1.days)) } @Test fun `test activity due in the future`() { - val time = stoppedClock.instant() + 1.days - scheduleTX(time) - - val backgroundExecutor = Executors.newSingleThreadExecutor() - backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } - assertThat(calls).isEqualTo(0) + val event = schedule(mark + 1.days) + assertWaitingFor(event) testClock.advanceBy(1.days) - backgroundExecutor.shutdown() - assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)) - countDown.await() - assertThat(calls).isEqualTo(1) + assertStarted(event) } @Test fun `test activity due in the future and schedule another earlier`() { - val time = stoppedClock.instant() + 1.days - scheduleTX(time + 1.days) - - val backgroundExecutor = Executors.newSingleThreadExecutor() - backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } - assertThat(calls).isEqualTo(0) - scheduleTX(time, 3) - - backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + val event2 = schedule(mark + 2.days) + val event1 = schedule(mark + 1.days) + assertWaitingFor(event1) testClock.advanceBy(1.days) - countDown.await() - assertThat(calls).isEqualTo(3) - backgroundExecutor.shutdown() - assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)) + assertStarted(event1) + assertWaitingFor(event2, 2) + testClock.advanceBy(1.days) + assertStarted(event2) } @Test fun `test activity due in the future and schedule another later`() { - allowedUnsuspendedFiberCount = 1 - val time = stoppedClock.instant() + 1.days - scheduleTX(time) - - val backgroundExecutor = Executors.newSingleThreadExecutor() - backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } - assertThat(calls).isEqualTo(0) - scheduleTX(time + 1.days, 3) - + val event1 = schedule(mark + 1.days) + val event2 = schedule(mark + 2.days) + assertWaitingFor(event1) testClock.advanceBy(1.days) - countDown.await() - assertThat(calls).isEqualTo(1) - backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } + assertStarted(event1) + assertWaitingFor(event2) testClock.advanceBy(1.days) - backgroundExecutor.shutdown() - assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)) + assertStarted(event2) } @Test fun `test activity due in the future and schedule another for same time`() { - val time = stoppedClock.instant() + 1.days - scheduleTX(time) - - val backgroundExecutor = Executors.newSingleThreadExecutor() - backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } - assertThat(calls).isEqualTo(0) - scheduleTX(time, 3) - + val eventA = schedule(mark + 1.days) + val eventB = schedule(mark + 1.days) + assertWaitingFor(eventA) testClock.advanceBy(1.days) - countDown.await() - assertThat(calls).isEqualTo(1) - backgroundExecutor.shutdown() - assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)) + assertStarted(eventA) + assertStarted(eventB) + } + + @Test + fun `test activity due in the future and schedule another for same time then unschedule second`() { + val eventA = schedule(mark + 1.days) + val eventB = schedule(mark + 1.days) + scheduler.unscheduleStateActivity(eventB.stateRef) + assertWaitingFor(eventA) + testClock.advanceBy(1.days) + assertStarted(eventA) } @Test fun `test activity due in the future and schedule another for same time then unschedule original`() { - val time = stoppedClock.instant() + 1.days - val scheduledRef1 = scheduleTX(time) - - val backgroundExecutor = Executors.newSingleThreadExecutor() - backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } - assertThat(calls).isEqualTo(0) - scheduleTX(time, 3) - - backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } - database.transaction { - scheduler.unscheduleStateActivity(scheduledRef1!!.ref) - } + val eventA = schedule(mark + 1.days) + val eventB = schedule(mark + 1.days) + scheduler.unscheduleStateActivity(eventA.stateRef) + assertWaitingFor(eventA) // XXX: Shouldn't it be waiting for eventB now? testClock.advanceBy(1.days) - countDown.await() - assertThat(calls).isEqualTo(3) - backgroundExecutor.shutdown() - assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)) + assertStarted(eventB) } @Test fun `test activity due in the future then unschedule`() { - val scheduledRef1 = scheduleTX(stoppedClock.instant() + 1.days) - - val backgroundExecutor = Executors.newSingleThreadExecutor() - backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() } - assertThat(calls).isEqualTo(0) - - database.transaction { - scheduler.unscheduleStateActivity(scheduledRef1!!.ref) - } + scheduler.unscheduleStateActivity(schedule(mark + 1.days).stateRef) testClock.advanceBy(1.days) - assertThat(calls).isEqualTo(0) - backgroundExecutor.shutdown() - assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS)) - } - - private fun scheduleTX(instant: Instant, increment: Int = 1): ScheduledStateRef? { - var scheduledRef: ScheduledStateRef? = null - database.transaction { - apply { - val freshKey = kms.freshKey() - val state = TestState(flowLogicRefFactory.createForRPC(TestFlowLogic::class.java, increment), instant, DUMMY_IDENTITY_1.party) - val builder = TransactionBuilder(null).apply { - addOutputState(state, DummyContract.PROGRAM_ID, DUMMY_NOTARY) - addCommand(Command(), freshKey) - } - val usefulTX = services.signInitialTransaction(builder, freshKey) - val txHash = usefulTX.id - - services.recordTransactions(usefulTX) - scheduledRef = ScheduledStateRef(StateRef(txHash, 0), state.instant) - scheduler.scheduleStateActivity(scheduledRef!!) - } - } - return scheduledRef } } diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt index ccf34284e7..60a7c40b00 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt @@ -1,5 +1,6 @@ package net.corda.testing.internal +import com.nhaarman.mockito_kotlin.doAnswer import net.corda.core.crypto.Crypto import net.corda.core.identity.CordaX500Name import net.corda.core.utilities.loggerFor @@ -108,3 +109,6 @@ fun createDevNodeCaCertPath( val nodeCa = createDevNodeCa(intermediateCa, legalName) return Triple(rootCa, intermediateCa, nodeCa) } + +/** Application of [doAnswer] that gets a value from the given [map] using the arg at [argIndex] as key. */ +fun doLookup(map: Map<*, *>, argIndex: Int = 0) = doAnswer { map[it.arguments[argIndex]] }