mirror of
https://github.com/corda/corda.git
synced 2025-01-30 16:14:39 +00:00
CORDA-1002 Add node scheduler persistence tests (#2860)
* Add node scheduler persistence tests * Kill the scheduler threads after node scheduler tests * Address review comments
This commit is contained in:
parent
47c7be62c0
commit
329fa94a09
@ -247,6 +247,12 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
schedulerTimerExecutor.join()
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
internal fun cancelAndWait() {
|
||||
schedulerTimerExecutor.shutdownNow()
|
||||
schedulerTimerExecutor.join()
|
||||
}
|
||||
|
||||
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
||||
serverThread.execute {
|
||||
var flowName: String? = "(unknown)"
|
||||
|
@ -3,6 +3,7 @@ package net.corda.node.services.events
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.nhaarman.mockito_kotlin.*
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowLogicRef
|
||||
import net.corda.core.flows.FlowLogicRefFactory
|
||||
@ -11,12 +12,15 @@ import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.node.ServicesForResolution
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.NodePropertiesStore
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||
import net.corda.testing.internal.doLookup
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.TestClock
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
@ -27,38 +31,65 @@ import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
|
||||
class NodeSchedulerServiceTest {
|
||||
private val mark = Instant.now()
|
||||
private val testClock = TestClock(rigorousMock<Clock>().also {
|
||||
open class NodeSchedulerServiceTestBase {
|
||||
protected class Event(time: Instant) {
|
||||
val stateRef = rigorousMock<StateRef>()
|
||||
val flowLogic = rigorousMock<FlowLogic<*>>()
|
||||
val ssr = ScheduledStateRef(stateRef, time)
|
||||
}
|
||||
|
||||
protected val mark = Instant.now()!!
|
||||
protected val testClock = TestClock(rigorousMock<Clock>().also {
|
||||
doReturn(mark).whenever(it).instant()
|
||||
})
|
||||
protected val flowStarter = rigorousMock<FlowStarter>().also {
|
||||
doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any())
|
||||
}
|
||||
private val flowsDraingMode = rigorousMock<NodePropertiesStore.FlowsDrainingModeOperations>().also {
|
||||
doReturn(false).whenever(it).isEnabled()
|
||||
}
|
||||
protected val nodeProperties = rigorousMock<NodePropertiesStore>().also {
|
||||
doReturn(flowsDraingMode).whenever(it).flowsDrainingMode
|
||||
}
|
||||
protected val flows = mutableMapOf<FlowLogicRef, FlowLogic<*>>()
|
||||
protected val flowLogicRefFactory = rigorousMock<FlowLogicRefFactory>().also {
|
||||
doLookup(flows).whenever(it).toFlowLogic(any())
|
||||
}
|
||||
|
||||
protected val transactionStates = mutableMapOf<StateRef, TransactionState<*>>()
|
||||
protected val servicesForResolution = rigorousMock<ServicesForResolution>().also {
|
||||
doLookup(transactionStates).whenever(it).loadState(any())
|
||||
}
|
||||
protected val log = rigorousMock<Logger>().also {
|
||||
doReturn(false).whenever(it).isTraceEnabled
|
||||
doNothing().whenever(it).trace(any(), any<Any>())
|
||||
doNothing().whenever(it).info(any())
|
||||
doNothing().whenever(it).error(any(), any<Throwable>())
|
||||
}
|
||||
|
||||
protected fun assertWaitingFor(ssr: ScheduledStateRef, total: Int = 1) {
|
||||
// The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace:
|
||||
verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, ssr)
|
||||
}
|
||||
|
||||
protected fun assertWaitingFor(event: Event, total: Int = 1) = assertWaitingFor(event.ssr, total)
|
||||
|
||||
protected fun assertStarted(flowLogic: FlowLogic<*>) {
|
||||
// Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow:
|
||||
verify(flowStarter, timeout(5000)).startFlow(same(flowLogic)!!, any())
|
||||
}
|
||||
|
||||
protected fun assertStarted(event: Event) = assertStarted(event.flowLogic)
|
||||
}
|
||||
|
||||
class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
|
||||
private val database = rigorousMock<CordaPersistence>().also {
|
||||
doAnswer {
|
||||
val block: DatabaseTransaction.() -> Any? = uncheckedCast(it.arguments[0])
|
||||
rigorousMock<DatabaseTransaction>().block()
|
||||
}.whenever(it).transaction(any())
|
||||
}
|
||||
private val flowStarter = rigorousMock<FlowStarter>().also {
|
||||
doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any())
|
||||
}
|
||||
private val flowsDraingMode = rigorousMock<NodePropertiesStore.FlowsDrainingModeOperations>().also {
|
||||
doReturn(false).whenever(it).isEnabled()
|
||||
}
|
||||
private val nodeProperties = rigorousMock<NodePropertiesStore>().also {
|
||||
doReturn(flowsDraingMode).whenever(it).flowsDrainingMode
|
||||
}
|
||||
private val transactionStates = mutableMapOf<StateRef, TransactionState<*>>()
|
||||
private val servicesForResolution = rigorousMock<ServicesForResolution>().also {
|
||||
doLookup(transactionStates).whenever(it).loadState(any())
|
||||
}
|
||||
private val flows = mutableMapOf<FlowLogicRef, FlowLogic<*>>()
|
||||
private val flowLogicRefFactory = rigorousMock<FlowLogicRefFactory>().also {
|
||||
doLookup(flows).whenever(it).toFlowLogic(any())
|
||||
}
|
||||
private val log = rigorousMock<Logger>().also {
|
||||
doReturn(false).whenever(it).isTraceEnabled
|
||||
doNothing().whenever(it).trace(any(), any<Any>())
|
||||
}
|
||||
|
||||
private val scheduler = NodeSchedulerService(
|
||||
testClock,
|
||||
database,
|
||||
@ -79,12 +110,6 @@ class NodeSchedulerServiceTest {
|
||||
}
|
||||
}
|
||||
|
||||
private class Event(time: Instant) {
|
||||
val stateRef = rigorousMock<StateRef>()
|
||||
val flowLogic = rigorousMock<FlowLogic<*>>()
|
||||
val ssr = ScheduledStateRef(stateRef, time)
|
||||
}
|
||||
|
||||
private fun schedule(time: Instant) = Event(time).apply {
|
||||
val logicRef = rigorousMock<FlowLogicRef>()
|
||||
transactionStates[stateRef] = rigorousMock<TransactionState<SchedulableState>>().also {
|
||||
@ -96,16 +121,6 @@ class NodeSchedulerServiceTest {
|
||||
scheduler.scheduleStateActivity(ssr)
|
||||
}
|
||||
|
||||
private fun assertWaitingFor(event: Event, total: Int = 1) {
|
||||
// The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace:
|
||||
verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, event.ssr)
|
||||
}
|
||||
|
||||
private fun assertStarted(event: Event) {
|
||||
// Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow:
|
||||
verify(flowStarter, timeout(5000)).startFlow(same(event.flowLogic)!!, any())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test activity due now`() {
|
||||
assertStarted(schedule(mark))
|
||||
@ -184,3 +199,98 @@ class NodeSchedulerServiceTest {
|
||||
testClock.advanceBy(1.days)
|
||||
}
|
||||
}
|
||||
|
||||
class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
|
||||
private val databaseConfig: DatabaseConfig = DatabaseConfig()
|
||||
|
||||
fun createScheduler(db: CordaPersistence): NodeSchedulerService {
|
||||
return NodeSchedulerService(
|
||||
testClock,
|
||||
db,
|
||||
flowStarter,
|
||||
servicesForResolution,
|
||||
serverThread = MoreExecutors.directExecutor(),
|
||||
flowLogicRefFactory = flowLogicRefFactory,
|
||||
nodeProperties = nodeProperties,
|
||||
drainingModePollPeriod = Duration.ofSeconds(5),
|
||||
log = log).apply { start() }
|
||||
}
|
||||
|
||||
fun transactionStateMock(logicRef: FlowLogicRef, time: Instant): TransactionState<*> {
|
||||
return rigorousMock<TransactionState<SchedulableState>>().also {
|
||||
doReturn(rigorousMock<SchedulableState>().also {
|
||||
doReturn(ScheduledActivity(logicRef, time)).whenever(it).nextScheduledActivity(any(), any())
|
||||
}).whenever(it).data
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test that schedule is persisted`() {
|
||||
val dataSourceProps = MockServices.makeTestDataSourceProperties()
|
||||
val timeInTheFuture = mark + 1.days
|
||||
val stateRef = StateRef(SecureHash.zeroHash, 0)
|
||||
|
||||
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
|
||||
val scheduler = database.transaction {
|
||||
createScheduler(database)
|
||||
}
|
||||
|
||||
val ssr1 = ScheduledStateRef(stateRef, timeInTheFuture)
|
||||
database.transaction {
|
||||
scheduler.scheduleStateActivity(ssr1)
|
||||
}
|
||||
// XXX: For some reason without the commit the db closes without writing the transactions
|
||||
database.dataSource.connection.commit()
|
||||
// Force the thread to shut down with operations waiting
|
||||
scheduler.cancelAndWait()
|
||||
database.close()
|
||||
|
||||
val flowLogic = rigorousMock<FlowLogic<*>>()
|
||||
val logicRef = rigorousMock<FlowLogicRef>()
|
||||
|
||||
transactionStates[stateRef] = transactionStateMock(logicRef, timeInTheFuture)
|
||||
flows[logicRef] = flowLogic
|
||||
|
||||
val newDatabase = configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock())
|
||||
val newScheduler = newDatabase.transaction {
|
||||
createScheduler(newDatabase)
|
||||
}
|
||||
testClock.advanceBy(1.days)
|
||||
assertStarted(flowLogic)
|
||||
|
||||
newScheduler.join()
|
||||
newDatabase.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test that if schedule is updated then the flow is invoked on the correct schedule`() {
|
||||
val dataSourceProps = MockServices.makeTestDataSourceProperties()
|
||||
val timeInTheFuture = mark + 1.days
|
||||
val stateRef = StateRef(SecureHash.allOnesHash, 0)
|
||||
|
||||
val ssr1 = ScheduledStateRef(stateRef, mark)
|
||||
val ssr2 = ScheduledStateRef(stateRef, timeInTheFuture)
|
||||
val logicRef = rigorousMock<FlowLogicRef>()
|
||||
val flowLogic = rigorousMock<FlowLogic<*>>()
|
||||
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
|
||||
|
||||
val scheduler = database.transaction {
|
||||
createScheduler(database)
|
||||
}
|
||||
|
||||
transactionStates[stateRef] = transactionStateMock(logicRef, timeInTheFuture)
|
||||
flows[logicRef] = flowLogic
|
||||
|
||||
database.transaction {
|
||||
scheduler.scheduleStateActivity(ssr1)
|
||||
session.flush()
|
||||
scheduler.scheduleStateActivity(ssr2)
|
||||
}
|
||||
assertWaitingFor(ssr1)
|
||||
testClock.advanceBy(1.days)
|
||||
assertStarted(flowLogic)
|
||||
|
||||
scheduler.join()
|
||||
database.close()
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user