diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 5d0f410e30..f1dc7ffd0f 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -36,6 +36,9 @@ Unreleased * java.security.cert.X509CRL serialization support added. +* Replaced the ``PersistentMap`` in ``NodeSchedulerService`` with an implementation that only loads the next scheduled + state from the database into memory, rather than them all. + * Upgraded H2 to v1.4.197. * Shell (embedded available only in dev mode or via SSH) connects to the node via RPC instead of using the ``CordaRPCOps`` object directly. diff --git a/node/src/integration-test/kotlin/net/corda/node/services/events/ScheduledFlowIntegrationTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/events/ScheduledFlowIntegrationTests.kt new file mode 100644 index 0000000000..417da3155a --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/events/ScheduledFlowIntegrationTests.kt @@ -0,0 +1,126 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.node.services.events + +import co.paralleluniverse.fibers.Suspendable +import com.google.common.collect.ImmutableList +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.concurrent.CordaFuture +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.transpose +import net.corda.core.messaging.startFlow +import net.corda.core.node.services.queryBy +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.NonEmptySet +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.testMessage.ScheduledState +import net.corda.testMessage.SpentState +import net.corda.testing.contracts.DummyContract +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.dummyCommand +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.node.User +import org.junit.Test +import java.time.Instant +import java.util.* +import kotlin.test.assertEquals + +class ScheduledFlowIntegrationTests { + @StartableByRPC + class InsertInitialStateFlow(private val destination: Party, private val notary: Party, private val identity: Int = 1, private val scheduledFor: Instant? = null) : FlowLogic() { + @Suspendable + override fun call() { + val scheduledState = ScheduledState(scheduledFor + ?: serviceHub.clock.instant(), ourIdentity, destination, identity.toString()) + val builder = TransactionBuilder(notary) + .addOutputState(scheduledState, DummyContract.PROGRAM_ID) + .addCommand(dummyCommand(ourIdentity.owningKey)) + val tx = serviceHub.signInitialTransaction(builder) + subFlow(FinalityFlow(tx)) + } + } + + @StartableByRPC + class AnotherFlow(private val identity: String) : FlowLogic() { + @Suspendable + override fun call() { + val results = serviceHub.vaultService.queryBy(QueryCriteria.LinearStateQueryCriteria(externalId = ImmutableList.of(identity))) + val state = results.states.firstOrNull() ?: return + require(!state.state.data.processed) { "Cannot spend an already processed state" } + val lock = UUID.randomUUID() + serviceHub.vaultService.softLockReserve(lock, NonEmptySet.of(state.ref)) + val notary = state.state.notary + val outputState = SpentState(identity, ourIdentity, state.state.data.destination) + val builder = TransactionBuilder(notary) + .addInputState(state) + .addOutputState(outputState, DummyContract.PROGRAM_ID) + .addCommand(dummyCommand(ourIdentity.owningKey)) + val tx = serviceHub.signInitialTransaction(builder) + subFlow(FinalityFlow(tx, outputState.participants.toSet())) + } + } + + private fun MutableList>.getOrThrowAll() { + forEach { + try { + it.getOrThrow() + } catch (ex: Exception) { + } + } + } + + @Test + fun `test that when states are being spent at the same time that schedules trigger everything is processed`() { + driver(DriverParameters( + startNodesInProcess = true, + extraCordappPackagesToScan = listOf("net.corda.testing.contracts", "net.corda.testMessage") + )) { + val N = 23 + val rpcUser = User("admin", "admin", setOf("ALL")) + val (alice, bob) = listOf(ALICE_NAME, BOB_NAME).map { startNode(providedName = it, rpcUsers = listOf(rpcUser)) }.transpose().getOrThrow() + + val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password) + val bobClient = CordaRPCClient(bob.rpcAddress).start(rpcUser.username, rpcUser.password) + + val scheduledFor = Instant.now().plusSeconds(20) + val initialiseFutures = mutableListOf>() + for (i in 0 until N) { + initialiseFutures.add(aliceClient.proxy.startFlow(::InsertInitialStateFlow, bob.nodeInfo.legalIdentities.first(), defaultNotaryIdentity, i, scheduledFor).returnValue) + initialiseFutures.add(bobClient.proxy.startFlow(::InsertInitialStateFlow, alice.nodeInfo.legalIdentities.first(), defaultNotaryIdentity, i + 100, scheduledFor).returnValue) + } + initialiseFutures.getOrThrowAll() + + val spendAttemptFutures = mutableListOf>() + for (i in (0 until N).reversed()) { + spendAttemptFutures.add(aliceClient.proxy.startFlow(::AnotherFlow, (i).toString()).returnValue) + spendAttemptFutures.add(bobClient.proxy.startFlow(::AnotherFlow, (i + 100).toString()).returnValue) + } + spendAttemptFutures.getOrThrowAll() + + val aliceStates = aliceClient.proxy.vaultQuery(ScheduledState::class.java).states.filter { it.state.data.processed } + val aliceSpentStates = aliceClient.proxy.vaultQuery(SpentState::class.java).states + + val bobStates = bobClient.proxy.vaultQuery(ScheduledState::class.java).states.filter { it.state.data.processed } + val bobSpentStates = bobClient.proxy.vaultQuery(SpentState::class.java).states + + assertEquals(aliceStates.count() + aliceSpentStates.count(), N * 2) + assertEquals(bobStates.count() + bobSpentStates.count(), N * 2) + assertEquals(aliceSpentStates.count(), bobSpentStates.count()) + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/testMessage/ScheduledState.kt b/node/src/integration-test/kotlin/net/corda/testMessage/ScheduledState.kt new file mode 100644 index 0000000000..cc5f14e8ee --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/testMessage/ScheduledState.kt @@ -0,0 +1,75 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.testMessage + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.contracts.* +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowLogicRefFactory +import net.corda.core.flows.SchedulableFlow +import net.corda.core.identity.Party +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.NonEmptySet +import net.corda.testing.contracts.DummyContract +import net.corda.testing.core.dummyCommand +import java.time.Instant +import java.util.* +import kotlin.reflect.jvm.jvmName + +@SchedulableFlow +class ScheduledFlow(private val stateRef: StateRef) : FlowLogic() { + @Suspendable + override fun call() { + val state = serviceHub.toStateAndRef(stateRef) + val scheduledState = state.state.data + // Only run flow over states originating on this node + if (!serviceHub.myInfo.isLegalIdentity(scheduledState.source)) { + return + } + require(!scheduledState.processed) { "State should not have been previously processed" } + val lock = UUID.randomUUID() + serviceHub.vaultService.softLockReserve(lock, NonEmptySet.of(state.ref)) + val notary = state.state.notary + val newStateOutput = scheduledState.copy(processed = true) + val builder = TransactionBuilder(notary) + .addInputState(state) + .addOutputState(newStateOutput, DummyContract.PROGRAM_ID) + .addCommand(dummyCommand(ourIdentity.owningKey)) + val tx = serviceHub.signInitialTransaction(builder) + subFlow(FinalityFlow(tx, setOf(scheduledState.destination))) + } +} + +data class ScheduledState(val creationTime: Instant, + val source: Party, + val destination: Party, + val identity: String, + val processed: Boolean = false, + val scheduledFor: Instant = creationTime, + override val linearId: UniqueIdentifier = UniqueIdentifier(externalId = identity)) : SchedulableState, LinearState { + override val participants get() = listOf(source, destination) + override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? { + return if (!processed) { + val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.jvmName, thisStateRef) + ScheduledActivity(logicRef, scheduledFor) + } else { + null + } + } +} + +data class SpentState(val identity: String, + val source: Party, + val destination: Party, + override val linearId: UniqueIdentifier = UniqueIdentifier(externalId = identity)) : LinearState { + override val participants: List get() = listOf(source, destination) +} diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index bfada7354d..22de33c70e 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -8,7 +8,6 @@ import net.corda.core.contracts.SchedulableState import net.corda.core.contracts.ScheduledActivity import net.corda.core.contracts.ScheduledStateRef import net.corda.core.contracts.StateRef -import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.internal.ThreadBox @@ -27,7 +26,6 @@ import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.SchedulerService import net.corda.node.services.messaging.DeduplicationHandler -import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import org.apache.activemq.artemis.utils.ReusableLatch @@ -36,7 +34,6 @@ import org.slf4j.Logger import java.io.Serializable import java.time.Duration import java.time.Instant -import java.util.* import java.util.concurrent.* import javax.annotation.concurrent.ThreadSafe import javax.persistence.Column @@ -67,11 +64,12 @@ class NodeSchedulerService(private val clock: CordaClock, private val nodeProperties: NodePropertiesStore, private val drainingModePollPeriod: Duration, private val log: Logger = staticLog, - private val scheduledStates: MutableMap = createMap()) + private val schedulerRepo: ScheduledFlowRepository = PersistentScheduledFlowRepository(database)) : SchedulerService, SingletonSerializeAsToken() { companion object { private val staticLog get() = contextLogger() + /** * Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations * used in demos or testing. This will substitute a Fiber compatible Future so the current @@ -110,26 +108,6 @@ class NodeSchedulerService(private val clock: CordaClock, return future.isDone } - fun createMap(): PersistentMap { - return PersistentMap( - toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, - fromPersistentEntity = { - //TODO null check will become obsolete after making DB/JPA columns not nullable - val txId = it.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") - val index = it.output.index ?: throw IllegalStateException("DB returned null SecureHash index") - Pair(StateRef(SecureHash.parse(txId), index), - ScheduledStateRef(StateRef(SecureHash.parse(txId), index), it.scheduledAt)) - }, - toPersistentEntity = { key: StateRef, value: ScheduledStateRef -> - PersistentScheduledState().apply { - output = PersistentStateRef(key.txhash.toString(), key.index) - scheduledAt = value.scheduledAt - } - }, - persistentEntityClass = PersistentScheduledState::class.java - ) - } - /** * Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation and set to true when a result * or [Throwable] is available in the original. @@ -160,9 +138,8 @@ class NodeSchedulerService(private val clock: CordaClock, ) : Serializable private class InnerState { - var scheduledStatesQueue: PriorityQueue = PriorityQueue({ a, b -> a.scheduledAt.compareTo(b.scheduledAt) }) - var rescheduled: GuavaSettableFuture? = null + var nextScheduledAction: ScheduledStateRef? = null } // Used to de-duplicate flow starts in case a flow is starting but the corresponding entry hasn't been removed yet @@ -173,27 +150,21 @@ class NodeSchedulerService(private val clock: CordaClock, // We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow. fun start() { mutex.locked { - scheduledStatesQueue.addAll(scheduledStates.values) rescheduleWakeUp() } } override fun scheduleStateActivity(action: ScheduledStateRef) { log.trace { "Schedule $action" } - val previousState = scheduledStates[action.ref] - scheduledStates[action.ref] = action + if (!schedulerRepo.merge(action)) { + // Only increase the number of unfinished schedules if the state didn't already exist on the queue + unfinishedSchedules.countUp() + } mutex.locked { - val previousEarliest = scheduledStatesQueue.peek() - scheduledStatesQueue.remove(previousState) - scheduledStatesQueue.add(action) - if (previousState == null && action !in startingStateRefs) { - unfinishedSchedules.countUp() - } - - if (action.scheduledAt.isBefore(previousEarliest?.scheduledAt ?: Instant.MAX)) { + if (action.scheduledAt < nextScheduledAction?.scheduledAt ?: Instant.MAX) { // We are earliest rescheduleWakeUp() - } else if (previousEarliest?.ref == action.ref && previousEarliest.scheduledAt != action.scheduledAt) { + } else if (action.ref == nextScheduledAction?.ref && action.scheduledAt != nextScheduledAction?.scheduledAt) { // We were earliest but might not be any more rescheduleWakeUp() } @@ -202,17 +173,12 @@ class NodeSchedulerService(private val clock: CordaClock, override fun unscheduleStateActivity(ref: StateRef) { log.trace { "Unschedule $ref" } - val removedAction = scheduledStates.remove(ref) + if (startingStateRefs.all { it.ref != ref } && schedulerRepo.delete(ref)) { + unfinishedSchedules.countDown() + } mutex.locked { - if (removedAction != null) { - val wasNext = (removedAction == scheduledStatesQueue.peek()) - val wasRemoved = scheduledStatesQueue.remove(removedAction) - if (wasRemoved) { - unfinishedSchedules.countDown() - } - if (wasNext) { - rescheduleWakeUp() - } + if (nextScheduledAction?.ref == ref) { + rescheduleWakeUp() } } } @@ -231,7 +197,9 @@ class NodeSchedulerService(private val clock: CordaClock, val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked { rescheduled?.cancel(false) rescheduled = GuavaSettableFuture.create() - Pair(scheduledStatesQueue.peek(), rescheduled!!) + //get the next scheduled action that isn't currently running + nextScheduledAction = schedulerRepo.getLatest(startingStateRefs.size + 1).firstOrNull { !startingStateRefs.contains(it.second) }?.second + Pair(nextScheduledAction, rescheduled!!) } if (scheduledState != null) { schedulerTimerExecutor.execute { @@ -261,7 +229,7 @@ class NodeSchedulerService(private val clock: CordaClock, private inner class FlowStartDeduplicationHandler(val scheduledState: ScheduledStateRef) : DeduplicationHandler { override fun insideDatabaseTransaction() { - scheduledStates.remove(scheduledState.ref) + schedulerRepo.delete(scheduledState.ref) } override fun afterDatabaseTransaction() { @@ -276,12 +244,9 @@ class NodeSchedulerService(private val clock: CordaClock, private fun onTimeReached(scheduledState: ScheduledStateRef) { var flowName: String? = "(unknown)" try { - // We need to check this before the database transaction, otherwise there is a subtle race between a - // doubly-reached deadline and the removal from [startingStateRefs]. - if (scheduledState !in startingStateRefs) { - val scheduledFlow = database.transaction { getScheduledFlow(scheduledState) } + database.transaction { + val scheduledFlow = getFlow(scheduledState) if (scheduledFlow != null) { - startingStateRefs.add(scheduledState) flowName = scheduledFlow.javaClass.name // TODO refactor the scheduler to store and propagate the original invocation context val context = InvocationContext.newInstance(InvocationOrigin.Scheduled(scheduledState)) @@ -297,24 +262,20 @@ class NodeSchedulerService(private val clock: CordaClock, } } - private fun getScheduledFlow(scheduledState: ScheduledStateRef): FlowLogic<*>? { + private fun getFlow(scheduledState: ScheduledStateRef): FlowLogic<*>? { val scheduledActivity = getScheduledActivity(scheduledState) var scheduledFlow: FlowLogic<*>? = null mutex.locked { // need to remove us from those scheduled, but only if we are still next - val previousState = scheduledStates[scheduledState.ref] - if (previousState != null && previousState === scheduledState) { + if (nextScheduledAction != null && nextScheduledAction === scheduledState) { if (scheduledActivity == null) { log.info("Scheduled state $scheduledState has rescheduled to never.") unfinishedSchedules.countDown() - scheduledStates.remove(scheduledState.ref) - scheduledStatesQueue.remove(scheduledState) + schedulerRepo.delete(scheduledState.ref) } else if (scheduledActivity.scheduledAt.isAfter(clock.instant())) { log.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.") val newState = ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt) - scheduledStates[scheduledState.ref] = newState - scheduledStatesQueue.remove(scheduledState) - scheduledStatesQueue.add(newState) + schedulerRepo.merge(newState) } else { val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef) scheduledFlow = when { @@ -325,7 +286,8 @@ class NodeSchedulerService(private val clock: CordaClock, } else -> { log.trace { "Scheduler starting FlowLogic $flowLogic" } - scheduledStatesQueue.remove(scheduledState) + //Add this to the in memory list of starting refs so it is not picked up on the next rescheduleWakeUp() + startingStateRefs.add(scheduledState) flowLogic } } diff --git a/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt b/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt new file mode 100644 index 0000000000..21a796d841 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepository.kt @@ -0,0 +1,68 @@ +package net.corda.node.services.events + +import net.corda.core.contracts.ScheduledStateRef +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.schemas.PersistentStateRef +import net.corda.nodeapi.internal.persistence.CordaPersistence + +interface ScheduledFlowRepository { + fun delete(key: StateRef): Boolean + fun merge(value: ScheduledStateRef): Boolean + fun getLatest(lookahead: Int) : List> +} + +class PersistentScheduledFlowRepository(val database: CordaPersistence): ScheduledFlowRepository { + private fun toPersistentEntityKey(stateRef: StateRef): PersistentStateRef { + return PersistentStateRef(stateRef.txhash.toString(), stateRef.index) + } + + private fun toPersistentEntity(key: StateRef, value: ScheduledStateRef): NodeSchedulerService.PersistentScheduledState { + return NodeSchedulerService.PersistentScheduledState().apply { + output = PersistentStateRef(key.txhash.toString(), key.index) + scheduledAt = value.scheduledAt + } + } + + private fun fromPersistentEntity(scheduledStateRecord: NodeSchedulerService.PersistentScheduledState): Pair { + val txId = scheduledStateRecord.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId") + val index = scheduledStateRecord.output.index ?: throw IllegalStateException("DB returned null integer index") + return Pair(StateRef(SecureHash.parse(txId), index), ScheduledStateRef(StateRef(SecureHash.parse(txId), index), scheduledStateRecord.scheduledAt)) + } + + override fun delete(key: StateRef): Boolean { + return database.transaction { + val elem = session.find(NodeSchedulerService.PersistentScheduledState::class.java, toPersistentEntityKey(key!!)) + if (elem != null) { + session.remove(elem) + true + } else { + false + } + } + } + + override fun merge(value: ScheduledStateRef): Boolean { + return database.transaction { + val existingEntry = session.find(NodeSchedulerService.PersistentScheduledState::class.java, toPersistentEntityKey(value.ref)) + if (existingEntry != null) { + session.merge(toPersistentEntity(value.ref, value)) + true + } else { + session.save(toPersistentEntity(value.ref, value)) + false + } + } + } + + override fun getLatest(lookahead: Int) : List> { + return database.transaction { + val criteriaQuery = session.criteriaBuilder.createQuery(NodeSchedulerService.PersistentScheduledState::class.java) + val shed = criteriaQuery.from(NodeSchedulerService.PersistentScheduledState::class.java) + criteriaQuery.select(shed) + criteriaQuery.orderBy(session.criteriaBuilder.asc(shed.get("scheduledAt"))) + session.createQuery(criteriaQuery).setFirstResult(0).setMaxResults(lookahead) + .resultList.map { e -> fromPersistentEntity(e as NodeSchedulerService.PersistentScheduledState) } + } + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index fa1271405a..39ca9a3727 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -14,6 +14,7 @@ import net.corda.core.utilities.days import net.corda.node.internal.configureDatabase import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.NodePropertiesStore +import net.corda.node.services.messaging.DeduplicationHandler import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseTransaction @@ -30,6 +31,7 @@ import org.slf4j.Logger import java.time.Clock import java.time.Duration import java.time.Instant +import kotlin.test.assertEquals open class NodeSchedulerServiceTestBase { protected class Event(time: Instant) { @@ -48,8 +50,14 @@ open class NodeSchedulerServiceTestBase { rigorousMock().block() }.whenever(it).transaction(any()) } + protected val flowStarter = rigorousMock().also { - doReturn(openFuture>()).whenever(it).startFlow(any>(), any(), any()) + doAnswer { + val dedupe = it.arguments[2] as DeduplicationHandler + dedupe.insideDatabaseTransaction() + dedupe.afterDatabaseTransaction() + openFuture>() + }.whenever(it).startFlow(any>(), any(), any()) } private val flowsDraingMode = rigorousMock().also { doReturn(false).whenever(it).isEnabled() @@ -88,6 +96,31 @@ open class NodeSchedulerServiceTestBase { protected fun assertStarted(event: Event) = assertStarted(event.flowLogic) } +class MockScheduledFlowRepository : ScheduledFlowRepository { + private val map = HashMap() + + override fun getLatest(lookahead: Int): List> { + return map.values.sortedBy { it.scheduledAt }.map { Pair(it.ref, it) } + } + + override fun merge(value: ScheduledStateRef): Boolean { + var result = false + if (map.containsKey(value.ref)) { + result = true + } + map.put(value.ref, value) + return result + } + + override fun delete(key: StateRef): Boolean { + if (map.containsKey(key)) { + map.remove(key) + return true + } + return false + } +} + class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() { private val database = rigorousMock().also { doAnswer { @@ -105,7 +138,9 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() { nodeProperties = nodeProperties, drainingModePollPeriod = Duration.ofSeconds(5), log = log, - scheduledStates = mutableMapOf()).apply { start() } + schedulerRepo = MockScheduledFlowRepository() + ).apply { start() } + @Rule @JvmField val tearDown = object : TestWatcher() { @@ -228,6 +263,22 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() { } } + @Test + fun `test that correct item is returned`() { + val dataSourceProps = MockServices.makeTestDataSourceProperties() + val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock()) + database.transaction { + val repo = PersistentScheduledFlowRepository(database) + val stateRef = StateRef(SecureHash.randomSHA256(), 0) + val ssr = ScheduledStateRef(stateRef, mark) + repo.merge(ssr) + + val output = repo.getLatest(5).firstOrNull() + assertEquals(output?.first, stateRef) + assertEquals(output?.second, ssr) + } + } + @Test fun `test that schedule is persisted`() { val dataSourceProps = MockServices.makeTestDataSourceProperties() diff --git a/node/src/test/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepositoryTest.kt b/node/src/test/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepositoryTest.kt new file mode 100644 index 0000000000..d6421757fd --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/events/PersistentScheduledFlowRepositoryTest.kt @@ -0,0 +1,69 @@ +package net.corda.node.services.events + +import net.corda.core.contracts.ScheduledStateRef +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.utilities.days +import net.corda.node.internal.configureDatabase +import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.testing.internal.rigorousMock +import net.corda.testing.node.MockServices +import org.junit.Test +import java.time.Instant +import kotlin.test.assertEquals +import kotlin.test.assertNull + +class PersistentScheduledFlowRepositoryTest { + private val databaseConfig: DatabaseConfig = DatabaseConfig() + private val mark = Instant.now() + + @Test + fun `test that earliest item is returned`() { + val laterTime = mark + 1.days + val dataSourceProps = MockServices.makeTestDataSourceProperties() + val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock()) + + database.transaction { + val repo = PersistentScheduledFlowRepository(database) + val laterStateRef = StateRef(SecureHash.randomSHA256(), 0) + val laterSsr = ScheduledStateRef(laterStateRef, laterTime) + repo.merge(laterSsr) + + val earlierStateRef = StateRef(SecureHash.randomSHA256(), 0) + val earlierSsr = ScheduledStateRef(earlierStateRef, mark) + repo.merge(earlierSsr) + + val output = repo.getLatest(5).firstOrNull() + assertEquals(output?.first, earlierStateRef) + assertEquals(output?.second, earlierSsr) + } + } + + @Test + fun `test that item is rescheduled`() { + val laterTime = mark + 1.days + val dataSourceProps = MockServices.makeTestDataSourceProperties() + val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock()) + database.transaction { + val repo = PersistentScheduledFlowRepository(database) + val stateRef = StateRef(SecureHash.randomSHA256(), 0) + val laterSsr = ScheduledStateRef(stateRef, laterTime) + + repo.merge(laterSsr) + + //Update the existing scheduled flow to an earlier time + val updatedEarlierSsr = ScheduledStateRef(stateRef, mark) + repo.merge(updatedEarlierSsr) + + val output = repo.getLatest(5).firstOrNull() + assertEquals(output?.first, stateRef) + assertEquals(output?.second, updatedEarlierSsr) + + repo.delete(output?.first!!) + + //There should be no more outputs + val nextOutput = repo.getLatest(5).firstOrNull() + assertNull(nextOutput) + } + } +} \ No newline at end of file