mirror of
https://github.com/corda/corda.git
synced 2025-01-31 16:35:43 +00:00
* CORDA-1001 - Remove peristent map in NodeSchedulerService (#763) * Add scheduled flow test that uses multithreaded node * Replace use of PersistentMap in NodeSchedulerService * Correct class name and remove duplicate test * Address initial PR comments * Remove debugging code * Remove acidentally added line * Move Scheduled State contracts to internal module * Put things in the right places * Add changelog message * Fix countdown issue * Addressing PR comments * Remove unused class
This commit is contained in:
parent
e564303869
commit
100f680042
@ -36,6 +36,9 @@ Unreleased
|
||||
|
||||
* java.security.cert.X509CRL serialization support added.
|
||||
|
||||
* Replaced the ``PersistentMap`` in ``NodeSchedulerService`` with an implementation that only loads the next scheduled
|
||||
state from the database into memory, rather than them all.
|
||||
|
||||
* Upgraded H2 to v1.4.197.
|
||||
|
||||
* Shell (embedded available only in dev mode or via SSH) connects to the node via RPC instead of using the ``CordaRPCOps`` object directly.
|
||||
|
@ -0,0 +1,126 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.node.services.events
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.collect.ImmutableList
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.node.services.queryBy
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.testMessage.ScheduledState
|
||||
import net.corda.testMessage.SpentState
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import org.junit.Test
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class ScheduledFlowIntegrationTests {
|
||||
@StartableByRPC
|
||||
class InsertInitialStateFlow(private val destination: Party, private val notary: Party, private val identity: Int = 1, private val scheduledFor: Instant? = null) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val scheduledState = ScheduledState(scheduledFor
|
||||
?: serviceHub.clock.instant(), ourIdentity, destination, identity.toString())
|
||||
val builder = TransactionBuilder(notary)
|
||||
.addOutputState(scheduledState, DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(ourIdentity.owningKey))
|
||||
val tx = serviceHub.signInitialTransaction(builder)
|
||||
subFlow(FinalityFlow(tx))
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class AnotherFlow(private val identity: String) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val results = serviceHub.vaultService.queryBy<ScheduledState>(QueryCriteria.LinearStateQueryCriteria(externalId = ImmutableList.of(identity)))
|
||||
val state = results.states.firstOrNull() ?: return
|
||||
require(!state.state.data.processed) { "Cannot spend an already processed state" }
|
||||
val lock = UUID.randomUUID()
|
||||
serviceHub.vaultService.softLockReserve(lock, NonEmptySet.of(state.ref))
|
||||
val notary = state.state.notary
|
||||
val outputState = SpentState(identity, ourIdentity, state.state.data.destination)
|
||||
val builder = TransactionBuilder(notary)
|
||||
.addInputState(state)
|
||||
.addOutputState(outputState, DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(ourIdentity.owningKey))
|
||||
val tx = serviceHub.signInitialTransaction(builder)
|
||||
subFlow(FinalityFlow(tx, outputState.participants.toSet()))
|
||||
}
|
||||
}
|
||||
|
||||
private fun MutableList<CordaFuture<*>>.getOrThrowAll() {
|
||||
forEach {
|
||||
try {
|
||||
it.getOrThrow()
|
||||
} catch (ex: Exception) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test that when states are being spent at the same time that schedules trigger everything is processed`() {
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
extraCordappPackagesToScan = listOf("net.corda.testing.contracts", "net.corda.testMessage")
|
||||
)) {
|
||||
val N = 23
|
||||
val rpcUser = User("admin", "admin", setOf("ALL"))
|
||||
val (alice, bob) = listOf(ALICE_NAME, BOB_NAME).map { startNode(providedName = it, rpcUsers = listOf(rpcUser)) }.transpose().getOrThrow()
|
||||
|
||||
val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password)
|
||||
val bobClient = CordaRPCClient(bob.rpcAddress).start(rpcUser.username, rpcUser.password)
|
||||
|
||||
val scheduledFor = Instant.now().plusSeconds(20)
|
||||
val initialiseFutures = mutableListOf<CordaFuture<*>>()
|
||||
for (i in 0 until N) {
|
||||
initialiseFutures.add(aliceClient.proxy.startFlow(::InsertInitialStateFlow, bob.nodeInfo.legalIdentities.first(), defaultNotaryIdentity, i, scheduledFor).returnValue)
|
||||
initialiseFutures.add(bobClient.proxy.startFlow(::InsertInitialStateFlow, alice.nodeInfo.legalIdentities.first(), defaultNotaryIdentity, i + 100, scheduledFor).returnValue)
|
||||
}
|
||||
initialiseFutures.getOrThrowAll()
|
||||
|
||||
val spendAttemptFutures = mutableListOf<CordaFuture<*>>()
|
||||
for (i in (0 until N).reversed()) {
|
||||
spendAttemptFutures.add(aliceClient.proxy.startFlow(::AnotherFlow, (i).toString()).returnValue)
|
||||
spendAttemptFutures.add(bobClient.proxy.startFlow(::AnotherFlow, (i + 100).toString()).returnValue)
|
||||
}
|
||||
spendAttemptFutures.getOrThrowAll()
|
||||
|
||||
val aliceStates = aliceClient.proxy.vaultQuery(ScheduledState::class.java).states.filter { it.state.data.processed }
|
||||
val aliceSpentStates = aliceClient.proxy.vaultQuery(SpentState::class.java).states
|
||||
|
||||
val bobStates = bobClient.proxy.vaultQuery(ScheduledState::class.java).states.filter { it.state.data.processed }
|
||||
val bobSpentStates = bobClient.proxy.vaultQuery(SpentState::class.java).states
|
||||
|
||||
assertEquals(aliceStates.count() + aliceSpentStates.count(), N * 2)
|
||||
assertEquals(bobStates.count() + bobSpentStates.count(), N * 2)
|
||||
assertEquals(aliceSpentStates.count(), bobSpentStates.count())
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,75 @@
|
||||
/*
|
||||
* R3 Proprietary and Confidential
|
||||
*
|
||||
* Copyright (c) 2018 R3 Limited. All rights reserved.
|
||||
*
|
||||
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
|
||||
*
|
||||
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
|
||||
*/
|
||||
|
||||
package net.corda.testMessage
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowLogicRefFactory
|
||||
import net.corda.core.flows.SchedulableFlow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.reflect.jvm.jvmName
|
||||
|
||||
@SchedulableFlow
|
||||
class ScheduledFlow(private val stateRef: StateRef) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val state = serviceHub.toStateAndRef<ScheduledState>(stateRef)
|
||||
val scheduledState = state.state.data
|
||||
// Only run flow over states originating on this node
|
||||
if (!serviceHub.myInfo.isLegalIdentity(scheduledState.source)) {
|
||||
return
|
||||
}
|
||||
require(!scheduledState.processed) { "State should not have been previously processed" }
|
||||
val lock = UUID.randomUUID()
|
||||
serviceHub.vaultService.softLockReserve(lock, NonEmptySet.of(state.ref))
|
||||
val notary = state.state.notary
|
||||
val newStateOutput = scheduledState.copy(processed = true)
|
||||
val builder = TransactionBuilder(notary)
|
||||
.addInputState(state)
|
||||
.addOutputState(newStateOutput, DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(ourIdentity.owningKey))
|
||||
val tx = serviceHub.signInitialTransaction(builder)
|
||||
subFlow(FinalityFlow(tx, setOf(scheduledState.destination)))
|
||||
}
|
||||
}
|
||||
|
||||
data class ScheduledState(val creationTime: Instant,
|
||||
val source: Party,
|
||||
val destination: Party,
|
||||
val identity: String,
|
||||
val processed: Boolean = false,
|
||||
val scheduledFor: Instant = creationTime,
|
||||
override val linearId: UniqueIdentifier = UniqueIdentifier(externalId = identity)) : SchedulableState, LinearState {
|
||||
override val participants get() = listOf(source, destination)
|
||||
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
|
||||
return if (!processed) {
|
||||
val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.jvmName, thisStateRef)
|
||||
ScheduledActivity(logicRef, scheduledFor)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data class SpentState(val identity: String,
|
||||
val source: Party,
|
||||
val destination: Party,
|
||||
override val linearId: UniqueIdentifier = UniqueIdentifier(externalId = identity)) : LinearState {
|
||||
override val participants: List<Party> get() = listOf(source, destination)
|
||||
}
|
@ -8,7 +8,6 @@ import net.corda.core.contracts.SchedulableState
|
||||
import net.corda.core.contracts.ScheduledActivity
|
||||
import net.corda.core.contracts.ScheduledStateRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowLogicRefFactory
|
||||
import net.corda.core.internal.ThreadBox
|
||||
@ -27,7 +26,6 @@ import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.NodePropertiesStore
|
||||
import net.corda.node.services.api.SchedulerService
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.utilities.PersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
@ -36,7 +34,6 @@ import org.slf4j.Logger
|
||||
import java.io.Serializable
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.Column
|
||||
@ -67,11 +64,12 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
private val nodeProperties: NodePropertiesStore,
|
||||
private val drainingModePollPeriod: Duration,
|
||||
private val log: Logger = staticLog,
|
||||
private val scheduledStates: MutableMap<StateRef, ScheduledStateRef> = createMap())
|
||||
private val schedulerRepo: ScheduledFlowRepository = PersistentScheduledFlowRepository(database))
|
||||
: SchedulerService, SingletonSerializeAsToken() {
|
||||
|
||||
companion object {
|
||||
private val staticLog get() = contextLogger()
|
||||
|
||||
/**
|
||||
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
|
||||
* used in demos or testing. This will substitute a Fiber compatible Future so the current
|
||||
@ -110,26 +108,6 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
return future.isDone
|
||||
}
|
||||
|
||||
fun createMap(): PersistentMap<StateRef, ScheduledStateRef, PersistentScheduledState, PersistentStateRef> {
|
||||
return PersistentMap(
|
||||
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||
fromPersistentEntity = {
|
||||
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
||||
val txId = it.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val index = it.output.index ?: throw IllegalStateException("DB returned null SecureHash index")
|
||||
Pair(StateRef(SecureHash.parse(txId), index),
|
||||
ScheduledStateRef(StateRef(SecureHash.parse(txId), index), it.scheduledAt))
|
||||
},
|
||||
toPersistentEntity = { key: StateRef, value: ScheduledStateRef ->
|
||||
PersistentScheduledState().apply {
|
||||
output = PersistentStateRef(key.txhash.toString(), key.index)
|
||||
scheduledAt = value.scheduledAt
|
||||
}
|
||||
},
|
||||
persistentEntityClass = PersistentScheduledState::class.java
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation and set to true when a result
|
||||
* or [Throwable] is available in the original.
|
||||
@ -160,9 +138,8 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
) : Serializable
|
||||
|
||||
private class InnerState {
|
||||
var scheduledStatesQueue: PriorityQueue<ScheduledStateRef> = PriorityQueue({ a, b -> a.scheduledAt.compareTo(b.scheduledAt) })
|
||||
|
||||
var rescheduled: GuavaSettableFuture<Boolean>? = null
|
||||
var nextScheduledAction: ScheduledStateRef? = null
|
||||
}
|
||||
|
||||
// Used to de-duplicate flow starts in case a flow is starting but the corresponding entry hasn't been removed yet
|
||||
@ -173,27 +150,21 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
|
||||
fun start() {
|
||||
mutex.locked {
|
||||
scheduledStatesQueue.addAll(scheduledStates.values)
|
||||
rescheduleWakeUp()
|
||||
}
|
||||
}
|
||||
|
||||
override fun scheduleStateActivity(action: ScheduledStateRef) {
|
||||
log.trace { "Schedule $action" }
|
||||
val previousState = scheduledStates[action.ref]
|
||||
scheduledStates[action.ref] = action
|
||||
if (!schedulerRepo.merge(action)) {
|
||||
// Only increase the number of unfinished schedules if the state didn't already exist on the queue
|
||||
unfinishedSchedules.countUp()
|
||||
}
|
||||
mutex.locked {
|
||||
val previousEarliest = scheduledStatesQueue.peek()
|
||||
scheduledStatesQueue.remove(previousState)
|
||||
scheduledStatesQueue.add(action)
|
||||
if (previousState == null && action !in startingStateRefs) {
|
||||
unfinishedSchedules.countUp()
|
||||
}
|
||||
|
||||
if (action.scheduledAt.isBefore(previousEarliest?.scheduledAt ?: Instant.MAX)) {
|
||||
if (action.scheduledAt < nextScheduledAction?.scheduledAt ?: Instant.MAX) {
|
||||
// We are earliest
|
||||
rescheduleWakeUp()
|
||||
} else if (previousEarliest?.ref == action.ref && previousEarliest.scheduledAt != action.scheduledAt) {
|
||||
} else if (action.ref == nextScheduledAction?.ref && action.scheduledAt != nextScheduledAction?.scheduledAt) {
|
||||
// We were earliest but might not be any more
|
||||
rescheduleWakeUp()
|
||||
}
|
||||
@ -202,17 +173,12 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
|
||||
override fun unscheduleStateActivity(ref: StateRef) {
|
||||
log.trace { "Unschedule $ref" }
|
||||
val removedAction = scheduledStates.remove(ref)
|
||||
if (startingStateRefs.all { it.ref != ref } && schedulerRepo.delete(ref)) {
|
||||
unfinishedSchedules.countDown()
|
||||
}
|
||||
mutex.locked {
|
||||
if (removedAction != null) {
|
||||
val wasNext = (removedAction == scheduledStatesQueue.peek())
|
||||
val wasRemoved = scheduledStatesQueue.remove(removedAction)
|
||||
if (wasRemoved) {
|
||||
unfinishedSchedules.countDown()
|
||||
}
|
||||
if (wasNext) {
|
||||
rescheduleWakeUp()
|
||||
}
|
||||
if (nextScheduledAction?.ref == ref) {
|
||||
rescheduleWakeUp()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -231,7 +197,9 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
|
||||
rescheduled?.cancel(false)
|
||||
rescheduled = GuavaSettableFuture.create()
|
||||
Pair(scheduledStatesQueue.peek(), rescheduled!!)
|
||||
//get the next scheduled action that isn't currently running
|
||||
nextScheduledAction = schedulerRepo.getLatest(startingStateRefs.size + 1).firstOrNull { !startingStateRefs.contains(it.second) }?.second
|
||||
Pair(nextScheduledAction, rescheduled!!)
|
||||
}
|
||||
if (scheduledState != null) {
|
||||
schedulerTimerExecutor.execute {
|
||||
@ -261,7 +229,7 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
|
||||
private inner class FlowStartDeduplicationHandler(val scheduledState: ScheduledStateRef) : DeduplicationHandler {
|
||||
override fun insideDatabaseTransaction() {
|
||||
scheduledStates.remove(scheduledState.ref)
|
||||
schedulerRepo.delete(scheduledState.ref)
|
||||
}
|
||||
|
||||
override fun afterDatabaseTransaction() {
|
||||
@ -276,12 +244,9 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
||||
var flowName: String? = "(unknown)"
|
||||
try {
|
||||
// We need to check this before the database transaction, otherwise there is a subtle race between a
|
||||
// doubly-reached deadline and the removal from [startingStateRefs].
|
||||
if (scheduledState !in startingStateRefs) {
|
||||
val scheduledFlow = database.transaction { getScheduledFlow(scheduledState) }
|
||||
database.transaction {
|
||||
val scheduledFlow = getFlow(scheduledState)
|
||||
if (scheduledFlow != null) {
|
||||
startingStateRefs.add(scheduledState)
|
||||
flowName = scheduledFlow.javaClass.name
|
||||
// TODO refactor the scheduler to store and propagate the original invocation context
|
||||
val context = InvocationContext.newInstance(InvocationOrigin.Scheduled(scheduledState))
|
||||
@ -297,24 +262,20 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
}
|
||||
}
|
||||
|
||||
private fun getScheduledFlow(scheduledState: ScheduledStateRef): FlowLogic<*>? {
|
||||
private fun getFlow(scheduledState: ScheduledStateRef): FlowLogic<*>? {
|
||||
val scheduledActivity = getScheduledActivity(scheduledState)
|
||||
var scheduledFlow: FlowLogic<*>? = null
|
||||
mutex.locked {
|
||||
// need to remove us from those scheduled, but only if we are still next
|
||||
val previousState = scheduledStates[scheduledState.ref]
|
||||
if (previousState != null && previousState === scheduledState) {
|
||||
if (nextScheduledAction != null && nextScheduledAction === scheduledState) {
|
||||
if (scheduledActivity == null) {
|
||||
log.info("Scheduled state $scheduledState has rescheduled to never.")
|
||||
unfinishedSchedules.countDown()
|
||||
scheduledStates.remove(scheduledState.ref)
|
||||
scheduledStatesQueue.remove(scheduledState)
|
||||
schedulerRepo.delete(scheduledState.ref)
|
||||
} else if (scheduledActivity.scheduledAt.isAfter(clock.instant())) {
|
||||
log.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
|
||||
val newState = ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
|
||||
scheduledStates[scheduledState.ref] = newState
|
||||
scheduledStatesQueue.remove(scheduledState)
|
||||
scheduledStatesQueue.add(newState)
|
||||
schedulerRepo.merge(newState)
|
||||
} else {
|
||||
val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
|
||||
scheduledFlow = when {
|
||||
@ -325,7 +286,8 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
}
|
||||
else -> {
|
||||
log.trace { "Scheduler starting FlowLogic $flowLogic" }
|
||||
scheduledStatesQueue.remove(scheduledState)
|
||||
//Add this to the in memory list of starting refs so it is not picked up on the next rescheduleWakeUp()
|
||||
startingStateRefs.add(scheduledState)
|
||||
flowLogic
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,68 @@
|
||||
package net.corda.node.services.events
|
||||
|
||||
import net.corda.core.contracts.ScheduledStateRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
|
||||
interface ScheduledFlowRepository {
|
||||
fun delete(key: StateRef): Boolean
|
||||
fun merge(value: ScheduledStateRef): Boolean
|
||||
fun getLatest(lookahead: Int) : List<Pair<StateRef, ScheduledStateRef>>
|
||||
}
|
||||
|
||||
class PersistentScheduledFlowRepository(val database: CordaPersistence): ScheduledFlowRepository {
|
||||
private fun toPersistentEntityKey(stateRef: StateRef): PersistentStateRef {
|
||||
return PersistentStateRef(stateRef.txhash.toString(), stateRef.index)
|
||||
}
|
||||
|
||||
private fun toPersistentEntity(key: StateRef, value: ScheduledStateRef): NodeSchedulerService.PersistentScheduledState {
|
||||
return NodeSchedulerService.PersistentScheduledState().apply {
|
||||
output = PersistentStateRef(key.txhash.toString(), key.index)
|
||||
scheduledAt = value.scheduledAt
|
||||
}
|
||||
}
|
||||
|
||||
private fun fromPersistentEntity(scheduledStateRecord: NodeSchedulerService.PersistentScheduledState): Pair<StateRef, ScheduledStateRef> {
|
||||
val txId = scheduledStateRecord.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||
val index = scheduledStateRecord.output.index ?: throw IllegalStateException("DB returned null integer index")
|
||||
return Pair(StateRef(SecureHash.parse(txId), index), ScheduledStateRef(StateRef(SecureHash.parse(txId), index), scheduledStateRecord.scheduledAt))
|
||||
}
|
||||
|
||||
override fun delete(key: StateRef): Boolean {
|
||||
return database.transaction {
|
||||
val elem = session.find(NodeSchedulerService.PersistentScheduledState::class.java, toPersistentEntityKey(key!!))
|
||||
if (elem != null) {
|
||||
session.remove(elem)
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun merge(value: ScheduledStateRef): Boolean {
|
||||
return database.transaction {
|
||||
val existingEntry = session.find(NodeSchedulerService.PersistentScheduledState::class.java, toPersistentEntityKey(value.ref))
|
||||
if (existingEntry != null) {
|
||||
session.merge(toPersistentEntity(value.ref, value))
|
||||
true
|
||||
} else {
|
||||
session.save(toPersistentEntity(value.ref, value))
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun getLatest(lookahead: Int) : List<Pair<StateRef, ScheduledStateRef>> {
|
||||
return database.transaction {
|
||||
val criteriaQuery = session.criteriaBuilder.createQuery(NodeSchedulerService.PersistentScheduledState::class.java)
|
||||
val shed = criteriaQuery.from(NodeSchedulerService.PersistentScheduledState::class.java)
|
||||
criteriaQuery.select(shed)
|
||||
criteriaQuery.orderBy(session.criteriaBuilder.asc(shed.get<NodeSchedulerService.PersistentScheduledState>("scheduledAt")))
|
||||
session.createQuery(criteriaQuery).setFirstResult(0).setMaxResults(lookahead)
|
||||
.resultList.map { e -> fromPersistentEntity(e as NodeSchedulerService.PersistentScheduledState) }
|
||||
}
|
||||
}
|
||||
}
|
@ -14,6 +14,7 @@ import net.corda.core.utilities.days
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.NodePropertiesStore
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||
@ -30,6 +31,7 @@ import org.slf4j.Logger
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
open class NodeSchedulerServiceTestBase {
|
||||
protected class Event(time: Instant) {
|
||||
@ -48,8 +50,14 @@ open class NodeSchedulerServiceTestBase {
|
||||
rigorousMock<DatabaseTransaction>().block()
|
||||
}.whenever(it).transaction(any())
|
||||
}
|
||||
|
||||
protected val flowStarter = rigorousMock<FlowStarter>().also {
|
||||
doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any(), any())
|
||||
doAnswer {
|
||||
val dedupe = it.arguments[2] as DeduplicationHandler
|
||||
dedupe.insideDatabaseTransaction()
|
||||
dedupe.afterDatabaseTransaction()
|
||||
openFuture<FlowStateMachine<*>>()
|
||||
}.whenever(it).startFlow(any<FlowLogic<*>>(), any(), any())
|
||||
}
|
||||
private val flowsDraingMode = rigorousMock<NodePropertiesStore.FlowsDrainingModeOperations>().also {
|
||||
doReturn(false).whenever(it).isEnabled()
|
||||
@ -88,6 +96,31 @@ open class NodeSchedulerServiceTestBase {
|
||||
protected fun assertStarted(event: Event) = assertStarted(event.flowLogic)
|
||||
}
|
||||
|
||||
class MockScheduledFlowRepository : ScheduledFlowRepository {
|
||||
private val map = HashMap<StateRef, ScheduledStateRef>()
|
||||
|
||||
override fun getLatest(lookahead: Int): List<Pair<StateRef, ScheduledStateRef>> {
|
||||
return map.values.sortedBy { it.scheduledAt }.map { Pair(it.ref, it) }
|
||||
}
|
||||
|
||||
override fun merge(value: ScheduledStateRef): Boolean {
|
||||
var result = false
|
||||
if (map.containsKey(value.ref)) {
|
||||
result = true
|
||||
}
|
||||
map.put(value.ref, value)
|
||||
return result
|
||||
}
|
||||
|
||||
override fun delete(key: StateRef): Boolean {
|
||||
if (map.containsKey(key)) {
|
||||
map.remove(key)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
|
||||
private val database = rigorousMock<CordaPersistence>().also {
|
||||
doAnswer {
|
||||
@ -105,7 +138,9 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
|
||||
nodeProperties = nodeProperties,
|
||||
drainingModePollPeriod = Duration.ofSeconds(5),
|
||||
log = log,
|
||||
scheduledStates = mutableMapOf()).apply { start() }
|
||||
schedulerRepo = MockScheduledFlowRepository()
|
||||
).apply { start() }
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val tearDown = object : TestWatcher() {
|
||||
@ -228,6 +263,22 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test that correct item is returned`() {
|
||||
val dataSourceProps = MockServices.makeTestDataSourceProperties()
|
||||
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
|
||||
database.transaction {
|
||||
val repo = PersistentScheduledFlowRepository(database)
|
||||
val stateRef = StateRef(SecureHash.randomSHA256(), 0)
|
||||
val ssr = ScheduledStateRef(stateRef, mark)
|
||||
repo.merge(ssr)
|
||||
|
||||
val output = repo.getLatest(5).firstOrNull()
|
||||
assertEquals(output?.first, stateRef)
|
||||
assertEquals(output?.second, ssr)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test that schedule is persisted`() {
|
||||
val dataSourceProps = MockServices.makeTestDataSourceProperties()
|
||||
|
@ -0,0 +1,69 @@
|
||||
package net.corda.node.services.events
|
||||
|
||||
import net.corda.core.contracts.ScheduledStateRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.node.MockServices
|
||||
import org.junit.Test
|
||||
import java.time.Instant
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNull
|
||||
|
||||
class PersistentScheduledFlowRepositoryTest {
|
||||
private val databaseConfig: DatabaseConfig = DatabaseConfig()
|
||||
private val mark = Instant.now()
|
||||
|
||||
@Test
|
||||
fun `test that earliest item is returned`() {
|
||||
val laterTime = mark + 1.days
|
||||
val dataSourceProps = MockServices.makeTestDataSourceProperties()
|
||||
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
|
||||
|
||||
database.transaction {
|
||||
val repo = PersistentScheduledFlowRepository(database)
|
||||
val laterStateRef = StateRef(SecureHash.randomSHA256(), 0)
|
||||
val laterSsr = ScheduledStateRef(laterStateRef, laterTime)
|
||||
repo.merge(laterSsr)
|
||||
|
||||
val earlierStateRef = StateRef(SecureHash.randomSHA256(), 0)
|
||||
val earlierSsr = ScheduledStateRef(earlierStateRef, mark)
|
||||
repo.merge(earlierSsr)
|
||||
|
||||
val output = repo.getLatest(5).firstOrNull()
|
||||
assertEquals(output?.first, earlierStateRef)
|
||||
assertEquals(output?.second, earlierSsr)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test that item is rescheduled`() {
|
||||
val laterTime = mark + 1.days
|
||||
val dataSourceProps = MockServices.makeTestDataSourceProperties()
|
||||
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
|
||||
database.transaction {
|
||||
val repo = PersistentScheduledFlowRepository(database)
|
||||
val stateRef = StateRef(SecureHash.randomSHA256(), 0)
|
||||
val laterSsr = ScheduledStateRef(stateRef, laterTime)
|
||||
|
||||
repo.merge(laterSsr)
|
||||
|
||||
//Update the existing scheduled flow to an earlier time
|
||||
val updatedEarlierSsr = ScheduledStateRef(stateRef, mark)
|
||||
repo.merge(updatedEarlierSsr)
|
||||
|
||||
val output = repo.getLatest(5).firstOrNull()
|
||||
assertEquals(output?.first, stateRef)
|
||||
assertEquals(output?.second, updatedEarlierSsr)
|
||||
|
||||
repo.delete(output?.first!!)
|
||||
|
||||
//There should be no more outputs
|
||||
val nextOutput = repo.getLatest(5).firstOrNull()
|
||||
assertNull(nextOutput)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user