mirror of
https://github.com/corda/corda.git
synced 2025-06-18 07:08:15 +00:00
CORDA-891 Convert NodeSchedulerServiceTest into a unit test (#2273)
This commit is contained in:
@ -14,6 +14,7 @@ import net.corda.core.flows.FlowLogicRefFactory
|
|||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.VisibleForTesting
|
import net.corda.core.internal.VisibleForTesting
|
||||||
import net.corda.core.internal.concurrent.flatMap
|
import net.corda.core.internal.concurrent.flatMap
|
||||||
|
import net.corda.core.internal.join
|
||||||
import net.corda.core.internal.until
|
import net.corda.core.internal.until
|
||||||
import net.corda.core.node.StateLoader
|
import net.corda.core.node.StateLoader
|
||||||
import net.corda.core.schemas.PersistentStateRef
|
import net.corda.core.schemas.PersistentStateRef
|
||||||
@ -24,12 +25,11 @@ import net.corda.node.internal.CordaClock
|
|||||||
import net.corda.node.internal.MutableClock
|
import net.corda.node.internal.MutableClock
|
||||||
import net.corda.node.services.api.FlowStarter
|
import net.corda.node.services.api.FlowStarter
|
||||||
import net.corda.node.services.api.SchedulerService
|
import net.corda.node.services.api.SchedulerService
|
||||||
import net.corda.node.utilities.AffinityExecutor
|
|
||||||
import net.corda.node.utilities.PersistentMap
|
import net.corda.node.utilities.PersistentMap
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||||
import java.time.Clock
|
import org.slf4j.Logger
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.*
|
import java.util.concurrent.*
|
||||||
@ -51,23 +51,21 @@ import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture
|
|||||||
* is the outcome of the activity in order to schedule another activity. Once we have implemented more persistence
|
* is the outcome of the activity in order to schedule another activity. Once we have implemented more persistence
|
||||||
* in the nodes, maybe we can consider multiple activities and whether the activities have been completed or not,
|
* in the nodes, maybe we can consider multiple activities and whether the activities have been completed or not,
|
||||||
* but that starts to sound a lot like off-ledger state.
|
* but that starts to sound a lot like off-ledger state.
|
||||||
*
|
|
||||||
* @param schedulerTimerExecutor The executor the scheduler blocks on waiting for the clock to advance to the next
|
|
||||||
* activity. Only replace this for unit testing purposes. This is not the executor the [FlowLogic] is launched on.
|
|
||||||
*/
|
*/
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class NodeSchedulerService(private val clock: CordaClock,
|
class NodeSchedulerService(private val clock: CordaClock,
|
||||||
private val database: CordaPersistence,
|
private val database: CordaPersistence,
|
||||||
private val flowStarter: FlowStarter,
|
private val flowStarter: FlowStarter,
|
||||||
private val stateLoader: StateLoader,
|
private val stateLoader: StateLoader,
|
||||||
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
|
|
||||||
private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
|
private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
|
||||||
private val serverThread: AffinityExecutor,
|
private val serverThread: Executor,
|
||||||
private val flowLogicRefFactory: FlowLogicRefFactory)
|
private val flowLogicRefFactory: FlowLogicRefFactory,
|
||||||
|
private val log: Logger = staticLog,
|
||||||
|
scheduledStates: MutableMap<StateRef, ScheduledStateRef> = createMap())
|
||||||
: SchedulerService, SingletonSerializeAsToken() {
|
: SchedulerService, SingletonSerializeAsToken() {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val staticLog get() = contextLogger()
|
||||||
/**
|
/**
|
||||||
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
|
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
|
||||||
* used in demos or testing. This will substitute a Fiber compatible Future so the current
|
* used in demos or testing. This will substitute a Fiber compatible Future so the current
|
||||||
@ -131,7 +129,7 @@ class NodeSchedulerService(private val clock: CordaClock,
|
|||||||
* or [Throwable] is available in the original.
|
* or [Throwable] is available in the original.
|
||||||
*
|
*
|
||||||
* We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context
|
* We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context
|
||||||
* switch. There's no need to checkpoint our [Fiber]s as there's no external effect of waiting.
|
* switch. There's no need to checkpoint our [co.paralleluniverse.fibers.Fiber]s as there's no external effect of waiting.
|
||||||
*/
|
*/
|
||||||
private fun <T : Any> makeStrandFriendlySettableFuture(future: Future<T>) = QuasarSettableFuture<Boolean>().also { g ->
|
private fun <T : Any> makeStrandFriendlySettableFuture(future: Future<T>) = QuasarSettableFuture<Boolean>().also { g ->
|
||||||
when (future) {
|
when (future) {
|
||||||
@ -140,6 +138,9 @@ class NodeSchedulerService(private val clock: CordaClock,
|
|||||||
else -> throw IllegalArgumentException("Cannot make future $future Strand friendly.")
|
else -> throw IllegalArgumentException("Cannot make future $future Strand friendly.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
internal val schedulingAsNextFormat = "Scheduling as next {}"
|
||||||
}
|
}
|
||||||
|
|
||||||
@Entity
|
@Entity
|
||||||
@ -152,20 +153,17 @@ class NodeSchedulerService(private val clock: CordaClock,
|
|||||||
var scheduledAt: Instant = Instant.now()
|
var scheduledAt: Instant = Instant.now()
|
||||||
)
|
)
|
||||||
|
|
||||||
private class InnerState {
|
private class InnerState(var scheduledStates: MutableMap<StateRef, ScheduledStateRef>) {
|
||||||
var scheduledStates = createMap()
|
|
||||||
|
|
||||||
var scheduledStatesQueue: PriorityQueue<ScheduledStateRef> = PriorityQueue({ a, b -> a.scheduledAt.compareTo(b.scheduledAt) })
|
var scheduledStatesQueue: PriorityQueue<ScheduledStateRef> = PriorityQueue({ a, b -> a.scheduledAt.compareTo(b.scheduledAt) })
|
||||||
|
|
||||||
var rescheduled: GuavaSettableFuture<Boolean>? = null
|
var rescheduled: GuavaSettableFuture<Boolean>? = null
|
||||||
}
|
}
|
||||||
|
|
||||||
private val mutex = ThreadBox(InnerState())
|
private val mutex = ThreadBox(InnerState(scheduledStates))
|
||||||
|
|
||||||
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
|
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
|
||||||
fun start() {
|
fun start() {
|
||||||
mutex.locked {
|
mutex.locked {
|
||||||
scheduledStatesQueue.addAll(scheduledStates.all().map { it.second }.toMutableList())
|
scheduledStatesQueue.addAll(scheduledStates.values)
|
||||||
rescheduleWakeUp()
|
rescheduleWakeUp()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -206,6 +204,7 @@ class NodeSchedulerService(private val clock: CordaClock,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
|
||||||
/**
|
/**
|
||||||
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
|
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
|
||||||
* [awaitWithDeadline] used below drops through without running the action. We then create a new
|
* [awaitWithDeadline] used below drops through without running the action. We then create a new
|
||||||
@ -223,7 +222,7 @@ class NodeSchedulerService(private val clock: CordaClock,
|
|||||||
}
|
}
|
||||||
if (scheduledState != null) {
|
if (scheduledState != null) {
|
||||||
schedulerTimerExecutor.execute {
|
schedulerTimerExecutor.execute {
|
||||||
log.trace { "Scheduling as next $scheduledState" }
|
log.trace(schedulingAsNextFormat, scheduledState)
|
||||||
// This will block the scheduler single thread until the scheduled time (returns false) OR
|
// This will block the scheduler single thread until the scheduled time (returns false) OR
|
||||||
// the Future is cancelled due to rescheduling (returns true).
|
// the Future is cancelled due to rescheduling (returns true).
|
||||||
if (!awaitWithDeadline(clock, scheduledState.scheduledAt, ourRescheduledFuture)) {
|
if (!awaitWithDeadline(clock, scheduledState.scheduledAt, ourRescheduledFuture)) {
|
||||||
@ -236,6 +235,11 @@ class NodeSchedulerService(private val clock: CordaClock,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
internal fun join() {
|
||||||
|
schedulerTimerExecutor.join()
|
||||||
|
}
|
||||||
|
|
||||||
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
||||||
serverThread.execute {
|
serverThread.execute {
|
||||||
var flowName: String? = "(unknown)"
|
var flowName: String? = "(unknown)"
|
||||||
|
@ -1,11 +1,9 @@
|
|||||||
package net.corda.node.utilities
|
package net.corda.node.utilities
|
||||||
|
|
||||||
import com.google.common.util.concurrent.SettableFuture
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
import com.google.common.util.concurrent.Uninterruptibles
|
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.CompletableFuture
|
import java.util.concurrent.CompletableFuture
|
||||||
import java.util.concurrent.Executor
|
import java.util.concurrent.Executor
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor
|
import java.util.concurrent.ScheduledThreadPoolExecutor
|
||||||
import java.util.function.Supplier
|
import java.util.function.Supplier
|
||||||
|
|
||||||
@ -83,31 +81,4 @@ interface AffinityExecutor : Executor {
|
|||||||
} while (!f.get())
|
} while (!f.get())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* An executor useful for unit tests: allows the current thread to block until a command arrives from another
|
|
||||||
* thread, which is then executed. Inbound closures/commands stack up until they are cleared by looping.
|
|
||||||
*
|
|
||||||
* @param alwaysQueue If true, executeASAP will never short-circuit and will always queue up.
|
|
||||||
*/
|
|
||||||
class Gate(private val alwaysQueue: Boolean = false) : AffinityExecutor {
|
|
||||||
private val thisThread = Thread.currentThread()
|
|
||||||
private val commandQ = LinkedBlockingQueue<Runnable>()
|
|
||||||
|
|
||||||
override val isOnThread: Boolean
|
|
||||||
get() = !alwaysQueue && Thread.currentThread() === thisThread
|
|
||||||
|
|
||||||
override fun execute(command: Runnable) {
|
|
||||||
Uninterruptibles.putUninterruptibly(commandQ, command)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun waitAndRun() {
|
|
||||||
val runnable = Uninterruptibles.takeUninterruptibly(commandQ)
|
|
||||||
runnable.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun flush() {
|
|
||||||
throw UnsupportedOperationException()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,323 +1,176 @@
|
|||||||
package net.corda.node.services.events
|
package net.corda.node.services.events
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
import com.google.common.util.concurrent.MoreExecutors
|
||||||
import com.codahale.metrics.MetricRegistry
|
|
||||||
import com.nhaarman.mockito_kotlin.*
|
import com.nhaarman.mockito_kotlin.*
|
||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.crypto.generateKeyPair
|
|
||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.flows.FlowLogicRef
|
import net.corda.core.flows.FlowLogicRef
|
||||||
import net.corda.core.flows.FlowLogicRefFactory
|
import net.corda.core.flows.FlowLogicRefFactory
|
||||||
import net.corda.core.identity.AbstractParty
|
|
||||||
import net.corda.core.identity.CordaX500Name
|
|
||||||
import net.corda.core.identity.Party
|
|
||||||
import net.corda.core.internal.concurrent.doneFuture
|
|
||||||
import net.corda.core.node.NodeInfo
|
|
||||||
import net.corda.core.node.ServiceHub
|
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
|
||||||
import net.corda.core.transactions.TransactionBuilder
|
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
|
||||||
import net.corda.core.utilities.days
|
import net.corda.core.utilities.days
|
||||||
import net.corda.node.internal.FlowStarterImpl
|
|
||||||
import net.corda.node.internal.cordapp.CordappLoader
|
|
||||||
import net.corda.node.internal.cordapp.CordappProviderImpl
|
|
||||||
import net.corda.node.services.api.MonitoringService
|
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
|
||||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
|
||||||
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
|
||||||
import net.corda.node.services.statemachine.StateMachineManager
|
|
||||||
import net.corda.node.services.statemachine.StateMachineManagerImpl
|
|
||||||
import net.corda.node.services.vault.NodeVaultService
|
|
||||||
import net.corda.node.utilities.AffinityExecutor
|
|
||||||
import net.corda.node.internal.configureDatabase
|
|
||||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
|
||||||
import net.corda.testing.*
|
|
||||||
import net.corda.testing.contracts.DummyContract
|
|
||||||
import net.corda.testing.internal.rigorousMock
|
import net.corda.testing.internal.rigorousMock
|
||||||
import net.corda.testing.node.*
|
import net.corda.core.internal.FlowStateMachine
|
||||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
import net.corda.testing.services.MockAttachmentStorage
|
import net.corda.core.internal.uncheckedCast
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import net.corda.core.node.StateLoader
|
||||||
import org.junit.After
|
import net.corda.node.services.api.FlowStarter
|
||||||
import org.junit.Before
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
|
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||||
|
import net.corda.testing.internal.doLookup
|
||||||
|
import net.corda.testing.node.TestClock
|
||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
import org.junit.rules.TestWatcher
|
||||||
|
import org.junit.runner.Description
|
||||||
|
import org.slf4j.Logger
|
||||||
import java.time.Clock
|
import java.time.Clock
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.concurrent.CountDownLatch
|
|
||||||
import java.util.concurrent.Executors
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import kotlin.test.assertTrue
|
|
||||||
|
|
||||||
class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
class NodeSchedulerServiceTest {
|
||||||
private companion object {
|
private val mark = Instant.now()
|
||||||
val ALICE_KEY = TestIdentity(ALICE_NAME, 70).keyPair
|
private val testClock = TestClock(rigorousMock<Clock>().also {
|
||||||
val DUMMY_IDENTITY_1 = getTestPartyAndCertificate(Party(CordaX500Name("Dummy", "Madrid", "ES"), generateKeyPair().public))
|
doReturn(mark).whenever(it).instant()
|
||||||
val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20).party
|
})
|
||||||
val myInfo = NodeInfo(listOf(NetworkHostAndPort("mockHost", 30000)), listOf(DUMMY_IDENTITY_1), 1, serial = 1L)
|
private val database = rigorousMock<CordaPersistence>().also {
|
||||||
|
doAnswer {
|
||||||
|
val block: DatabaseTransaction.() -> Any? = uncheckedCast(it.arguments[0])
|
||||||
|
rigorousMock<DatabaseTransaction>().block()
|
||||||
|
}.whenever(it).transaction(any())
|
||||||
}
|
}
|
||||||
|
private val flowStarter = rigorousMock<FlowStarter>().also {
|
||||||
|
doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any())
|
||||||
|
}
|
||||||
|
private val transactionStates = mutableMapOf<StateRef, TransactionState<*>>()
|
||||||
|
private val stateLoader = rigorousMock<StateLoader>().also {
|
||||||
|
doLookup(transactionStates).whenever(it).loadState(any())
|
||||||
|
}
|
||||||
|
private val flows = mutableMapOf<FlowLogicRef, FlowLogic<*>>()
|
||||||
|
private val flowLogicRefFactory = rigorousMock<FlowLogicRefFactory>().also {
|
||||||
|
doLookup(flows).whenever(it).toFlowLogic(any())
|
||||||
|
}
|
||||||
|
private val log = rigorousMock<Logger>().also {
|
||||||
|
doReturn(false).whenever(it).isTraceEnabled
|
||||||
|
doNothing().whenever(it).trace(any(), any<Any>())
|
||||||
|
}
|
||||||
|
private val scheduler = NodeSchedulerService(
|
||||||
|
testClock,
|
||||||
|
database,
|
||||||
|
flowStarter,
|
||||||
|
stateLoader,
|
||||||
|
serverThread = MoreExecutors.directExecutor(),
|
||||||
|
flowLogicRefFactory = flowLogicRefFactory,
|
||||||
|
log = log,
|
||||||
|
scheduledStates = mutableMapOf()).apply { start() }
|
||||||
@Rule
|
@Rule
|
||||||
@JvmField
|
@JvmField
|
||||||
val testSerialization = SerializationEnvironmentRule(true)
|
val tearDown = object : TestWatcher() {
|
||||||
private val flowLogicRefFactory = FlowLogicRefFactoryImpl(FlowLogicRefFactoryImpl::class.java.classLoader)
|
override fun succeeded(description: Description) {
|
||||||
private val realClock: Clock = Clock.systemUTC()
|
scheduler.join()
|
||||||
private val stoppedClock: Clock = Clock.fixed(realClock.instant(), realClock.zone)
|
verifyNoMoreInteractions(flowStarter)
|
||||||
private val testClock = TestClock(stoppedClock)
|
|
||||||
|
|
||||||
private val schedulerGatedExecutor = AffinityExecutor.Gate(true)
|
|
||||||
|
|
||||||
abstract class Services : ServiceHubInternal, TestReference
|
|
||||||
|
|
||||||
private lateinit var services: Services
|
|
||||||
private lateinit var scheduler: NodeSchedulerService
|
|
||||||
private lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor
|
|
||||||
private lateinit var database: CordaPersistence
|
|
||||||
private lateinit var countDown: CountDownLatch
|
|
||||||
private lateinit var smmHasRemovedAllFlows: CountDownLatch
|
|
||||||
private lateinit var kms: MockKeyManagementService
|
|
||||||
private lateinit var mockSMM: StateMachineManager
|
|
||||||
var calls: Int = 0
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Have a reference to this test added to [ServiceHub] so that when the [FlowLogic] runs it can access the test instance.
|
|
||||||
* The [TestState] is serialized and deserialized so attempting to use a transient field won't work, as it just
|
|
||||||
* results in NPE.
|
|
||||||
*/
|
|
||||||
interface TestReference {
|
|
||||||
val testReference: NodeSchedulerServiceTest
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
fun setup() {
|
|
||||||
countDown = CountDownLatch(1)
|
|
||||||
smmHasRemovedAllFlows = CountDownLatch(1)
|
|
||||||
calls = 0
|
|
||||||
val dataSourceProps = makeTestDataSourceProperties()
|
|
||||||
database = configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock())
|
|
||||||
val identityService = makeTestIdentityService()
|
|
||||||
kms = MockKeyManagementService(identityService, ALICE_KEY)
|
|
||||||
val configuration = rigorousMock<NodeConfiguration>().also {
|
|
||||||
doReturn(true).whenever(it).devMode
|
|
||||||
doReturn(null).whenever(it).devModeOptions
|
|
||||||
}
|
|
||||||
val validatedTransactions = MockTransactionStorage()
|
|
||||||
database.transaction {
|
|
||||||
services = rigorousMock<Services>().also {
|
|
||||||
doReturn(configuration).whenever(it).configuration
|
|
||||||
doReturn(MonitoringService(MetricRegistry())).whenever(it).monitoringService
|
|
||||||
doReturn(validatedTransactions).whenever(it).validatedTransactions
|
|
||||||
doReturn(rigorousMock<NetworkMapCacheInternal>().also {
|
|
||||||
doReturn(doneFuture(null)).whenever(it).nodeReady
|
|
||||||
}).whenever(it).networkMapCache
|
|
||||||
doReturn(myInfo).whenever(it).myInfo
|
|
||||||
doReturn(kms).whenever(it).keyManagementService
|
|
||||||
doReturn(CordappProviderImpl(CordappLoader.createWithTestPackages(listOf("net.corda.testing.contracts")), MockAttachmentStorage())).whenever(it).cordappProvider
|
|
||||||
doReturn(NodeVaultService(testClock, kms, validatedTransactions, database.hibernateConfig)).whenever(it).vaultService
|
|
||||||
doReturn(this@NodeSchedulerServiceTest).whenever(it).testReference
|
|
||||||
|
|
||||||
}
|
|
||||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
|
||||||
mockSMM = StateMachineManagerImpl(services, DBCheckpointStorage(), smmExecutor, database)
|
|
||||||
scheduler = NodeSchedulerService(testClock, database, FlowStarterImpl(smmExecutor, mockSMM, flowLogicRefFactory), validatedTransactions, schedulerGatedExecutor, serverThread = smmExecutor, flowLogicRefFactory = flowLogicRefFactory)
|
|
||||||
mockSMM.changes.subscribe { change ->
|
|
||||||
if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) {
|
|
||||||
smmHasRemovedAllFlows.countDown()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mockSMM.start(emptyList())
|
|
||||||
scheduler.start()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private var allowedUnsuspendedFiberCount = 0
|
private class Event(time: Instant) {
|
||||||
@After
|
val stateRef = rigorousMock<StateRef>()
|
||||||
fun tearDown() {
|
val flowLogic = rigorousMock<FlowLogic<*>>()
|
||||||
// We need to make sure the StateMachineManager is done before shutting down executors.
|
val ssr = ScheduledStateRef(stateRef, time)
|
||||||
if (mockSMM.allStateMachines.isNotEmpty()) {
|
|
||||||
smmHasRemovedAllFlows.await()
|
|
||||||
}
|
|
||||||
smmExecutor.shutdown()
|
|
||||||
smmExecutor.awaitTermination(60, TimeUnit.SECONDS)
|
|
||||||
database.close()
|
|
||||||
mockSMM.stop(allowedUnsuspendedFiberCount)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ignore IntelliJ when it says these properties can be private, if they are we cannot serialise them
|
private fun schedule(time: Instant) = Event(time).apply {
|
||||||
// in AMQP.
|
val logicRef = rigorousMock<FlowLogicRef>()
|
||||||
@Suppress("MemberVisibilityCanPrivate")
|
transactionStates[stateRef] = rigorousMock<TransactionState<SchedulableState>>().also {
|
||||||
class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant, val myIdentity: Party) : LinearState, SchedulableState {
|
doReturn(rigorousMock<SchedulableState>().also {
|
||||||
override val participants: List<AbstractParty>
|
doReturn(ScheduledActivity(logicRef, time)).whenever(it).nextScheduledActivity(same(stateRef)!!, any())
|
||||||
get() = listOf(myIdentity)
|
}).whenever(it).data
|
||||||
|
|
||||||
override val linearId = UniqueIdentifier()
|
|
||||||
|
|
||||||
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
|
|
||||||
return ScheduledActivity(flowLogicRef, instant)
|
|
||||||
}
|
}
|
||||||
|
flows[logicRef] = flowLogic
|
||||||
|
scheduler.scheduleStateActivity(ssr)
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestFlowLogic(private val increment: Int = 1) : FlowLogic<Unit>() {
|
private fun assertWaitingFor(event: Event, total: Int = 1) {
|
||||||
@Suspendable
|
// The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace:
|
||||||
override fun call() {
|
verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, event.ssr)
|
||||||
(serviceHub as TestReference).testReference.calls += increment
|
|
||||||
(serviceHub as TestReference).testReference.countDown.countDown()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class Command : TypeOnlyCommandData()
|
private fun assertStarted(event: Event) {
|
||||||
|
// Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow:
|
||||||
|
verify(flowStarter, timeout(5000)).startFlow(same(event.flowLogic)!!, any())
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test activity due now`() {
|
fun `test activity due now`() {
|
||||||
val time = stoppedClock.instant()
|
assertStarted(schedule(mark))
|
||||||
scheduleTX(time)
|
|
||||||
|
|
||||||
assertThat(calls).isEqualTo(0)
|
|
||||||
schedulerGatedExecutor.waitAndRun()
|
|
||||||
countDown.await()
|
|
||||||
assertThat(calls).isEqualTo(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test activity due in the past`() {
|
fun `test activity due in the past`() {
|
||||||
val time = stoppedClock.instant() - 1.days
|
assertStarted(schedule(mark - 1.days))
|
||||||
scheduleTX(time)
|
|
||||||
|
|
||||||
assertThat(calls).isEqualTo(0)
|
|
||||||
schedulerGatedExecutor.waitAndRun()
|
|
||||||
countDown.await()
|
|
||||||
assertThat(calls).isEqualTo(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test activity due in the future`() {
|
fun `test activity due in the future`() {
|
||||||
val time = stoppedClock.instant() + 1.days
|
val event = schedule(mark + 1.days)
|
||||||
scheduleTX(time)
|
assertWaitingFor(event)
|
||||||
|
|
||||||
val backgroundExecutor = Executors.newSingleThreadExecutor()
|
|
||||||
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
|
|
||||||
assertThat(calls).isEqualTo(0)
|
|
||||||
testClock.advanceBy(1.days)
|
testClock.advanceBy(1.days)
|
||||||
backgroundExecutor.shutdown()
|
assertStarted(event)
|
||||||
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
|
|
||||||
countDown.await()
|
|
||||||
assertThat(calls).isEqualTo(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test activity due in the future and schedule another earlier`() {
|
fun `test activity due in the future and schedule another earlier`() {
|
||||||
val time = stoppedClock.instant() + 1.days
|
val event2 = schedule(mark + 2.days)
|
||||||
scheduleTX(time + 1.days)
|
val event1 = schedule(mark + 1.days)
|
||||||
|
assertWaitingFor(event1)
|
||||||
val backgroundExecutor = Executors.newSingleThreadExecutor()
|
|
||||||
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
|
|
||||||
assertThat(calls).isEqualTo(0)
|
|
||||||
scheduleTX(time, 3)
|
|
||||||
|
|
||||||
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
|
|
||||||
testClock.advanceBy(1.days)
|
testClock.advanceBy(1.days)
|
||||||
countDown.await()
|
assertStarted(event1)
|
||||||
assertThat(calls).isEqualTo(3)
|
assertWaitingFor(event2, 2)
|
||||||
backgroundExecutor.shutdown()
|
testClock.advanceBy(1.days)
|
||||||
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
|
assertStarted(event2)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test activity due in the future and schedule another later`() {
|
fun `test activity due in the future and schedule another later`() {
|
||||||
allowedUnsuspendedFiberCount = 1
|
val event1 = schedule(mark + 1.days)
|
||||||
val time = stoppedClock.instant() + 1.days
|
val event2 = schedule(mark + 2.days)
|
||||||
scheduleTX(time)
|
assertWaitingFor(event1)
|
||||||
|
|
||||||
val backgroundExecutor = Executors.newSingleThreadExecutor()
|
|
||||||
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
|
|
||||||
assertThat(calls).isEqualTo(0)
|
|
||||||
scheduleTX(time + 1.days, 3)
|
|
||||||
|
|
||||||
testClock.advanceBy(1.days)
|
testClock.advanceBy(1.days)
|
||||||
countDown.await()
|
assertStarted(event1)
|
||||||
assertThat(calls).isEqualTo(1)
|
assertWaitingFor(event2)
|
||||||
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
|
|
||||||
testClock.advanceBy(1.days)
|
testClock.advanceBy(1.days)
|
||||||
backgroundExecutor.shutdown()
|
assertStarted(event2)
|
||||||
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test activity due in the future and schedule another for same time`() {
|
fun `test activity due in the future and schedule another for same time`() {
|
||||||
val time = stoppedClock.instant() + 1.days
|
val eventA = schedule(mark + 1.days)
|
||||||
scheduleTX(time)
|
val eventB = schedule(mark + 1.days)
|
||||||
|
assertWaitingFor(eventA)
|
||||||
val backgroundExecutor = Executors.newSingleThreadExecutor()
|
|
||||||
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
|
|
||||||
assertThat(calls).isEqualTo(0)
|
|
||||||
scheduleTX(time, 3)
|
|
||||||
|
|
||||||
testClock.advanceBy(1.days)
|
testClock.advanceBy(1.days)
|
||||||
countDown.await()
|
assertStarted(eventA)
|
||||||
assertThat(calls).isEqualTo(1)
|
assertStarted(eventB)
|
||||||
backgroundExecutor.shutdown()
|
}
|
||||||
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
|
|
||||||
|
@Test
|
||||||
|
fun `test activity due in the future and schedule another for same time then unschedule second`() {
|
||||||
|
val eventA = schedule(mark + 1.days)
|
||||||
|
val eventB = schedule(mark + 1.days)
|
||||||
|
scheduler.unscheduleStateActivity(eventB.stateRef)
|
||||||
|
assertWaitingFor(eventA)
|
||||||
|
testClock.advanceBy(1.days)
|
||||||
|
assertStarted(eventA)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test activity due in the future and schedule another for same time then unschedule original`() {
|
fun `test activity due in the future and schedule another for same time then unschedule original`() {
|
||||||
val time = stoppedClock.instant() + 1.days
|
val eventA = schedule(mark + 1.days)
|
||||||
val scheduledRef1 = scheduleTX(time)
|
val eventB = schedule(mark + 1.days)
|
||||||
|
scheduler.unscheduleStateActivity(eventA.stateRef)
|
||||||
val backgroundExecutor = Executors.newSingleThreadExecutor()
|
assertWaitingFor(eventA) // XXX: Shouldn't it be waiting for eventB now?
|
||||||
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
|
|
||||||
assertThat(calls).isEqualTo(0)
|
|
||||||
scheduleTX(time, 3)
|
|
||||||
|
|
||||||
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
|
|
||||||
database.transaction {
|
|
||||||
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
|
|
||||||
}
|
|
||||||
testClock.advanceBy(1.days)
|
testClock.advanceBy(1.days)
|
||||||
countDown.await()
|
assertStarted(eventB)
|
||||||
assertThat(calls).isEqualTo(3)
|
|
||||||
backgroundExecutor.shutdown()
|
|
||||||
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test activity due in the future then unschedule`() {
|
fun `test activity due in the future then unschedule`() {
|
||||||
val scheduledRef1 = scheduleTX(stoppedClock.instant() + 1.days)
|
scheduler.unscheduleStateActivity(schedule(mark + 1.days).stateRef)
|
||||||
|
|
||||||
val backgroundExecutor = Executors.newSingleThreadExecutor()
|
|
||||||
backgroundExecutor.execute { schedulerGatedExecutor.waitAndRun() }
|
|
||||||
assertThat(calls).isEqualTo(0)
|
|
||||||
|
|
||||||
database.transaction {
|
|
||||||
scheduler.unscheduleStateActivity(scheduledRef1!!.ref)
|
|
||||||
}
|
|
||||||
testClock.advanceBy(1.days)
|
testClock.advanceBy(1.days)
|
||||||
assertThat(calls).isEqualTo(0)
|
|
||||||
backgroundExecutor.shutdown()
|
|
||||||
assertTrue(backgroundExecutor.awaitTermination(60, TimeUnit.SECONDS))
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun scheduleTX(instant: Instant, increment: Int = 1): ScheduledStateRef? {
|
|
||||||
var scheduledRef: ScheduledStateRef? = null
|
|
||||||
database.transaction {
|
|
||||||
apply {
|
|
||||||
val freshKey = kms.freshKey()
|
|
||||||
val state = TestState(flowLogicRefFactory.createForRPC(TestFlowLogic::class.java, increment), instant, DUMMY_IDENTITY_1.party)
|
|
||||||
val builder = TransactionBuilder(null).apply {
|
|
||||||
addOutputState(state, DummyContract.PROGRAM_ID, DUMMY_NOTARY)
|
|
||||||
addCommand(Command(), freshKey)
|
|
||||||
}
|
|
||||||
val usefulTX = services.signInitialTransaction(builder, freshKey)
|
|
||||||
val txHash = usefulTX.id
|
|
||||||
|
|
||||||
services.recordTransactions(usefulTX)
|
|
||||||
scheduledRef = ScheduledStateRef(StateRef(txHash, 0), state.instant)
|
|
||||||
scheduler.scheduleStateActivity(scheduledRef!!)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return scheduledRef
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package net.corda.testing.internal
|
package net.corda.testing.internal
|
||||||
|
|
||||||
|
import com.nhaarman.mockito_kotlin.doAnswer
|
||||||
import net.corda.core.crypto.Crypto
|
import net.corda.core.crypto.Crypto
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
@ -108,3 +109,6 @@ fun createDevNodeCaCertPath(
|
|||||||
val nodeCa = createDevNodeCa(intermediateCa, legalName)
|
val nodeCa = createDevNodeCa(intermediateCa, legalName)
|
||||||
return Triple(rootCa, intermediateCa, nodeCa)
|
return Triple(rootCa, intermediateCa, nodeCa)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Application of [doAnswer] that gets a value from the given [map] using the arg at [argIndex] as key. */
|
||||||
|
fun doLookup(map: Map<*, *>, argIndex: Int = 0) = doAnswer { map[it.arguments[argIndex]] }
|
||||||
|
Reference in New Issue
Block a user