mirror of
https://github.com/corda/corda.git
synced 2025-06-17 22:58:19 +00:00
Tweak scheduled flow tests to improve error diagnosis
* Add logging of the specific differences rather than dumping a long list of states to stdout * Verify all flows complete successfully
This commit is contained in:
@ -1,6 +1,7 @@
|
|||||||
package net.corda.node.services.events
|
package net.corda.node.services.events
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.crypto.containsAny
|
import net.corda.core.crypto.containsAny
|
||||||
import net.corda.core.flows.*
|
import net.corda.core.flows.*
|
||||||
@ -15,10 +16,11 @@ import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria
|
|||||||
import net.corda.core.node.services.vault.Sort
|
import net.corda.core.node.services.vault.Sort
|
||||||
import net.corda.core.node.services.vault.SortAttribute
|
import net.corda.core.node.services.vault.SortAttribute
|
||||||
import net.corda.core.transactions.TransactionBuilder
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
import net.corda.testing.DUMMY_NOTARY
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.node.services.network.NetworkMapService
|
import net.corda.node.services.network.NetworkMapService
|
||||||
import net.corda.node.services.statemachine.StateMachineManager
|
import net.corda.node.services.statemachine.StateMachineManager
|
||||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||||
|
import net.corda.testing.DUMMY_NOTARY
|
||||||
import net.corda.testing.contracts.DummyContract
|
import net.corda.testing.contracts.DummyContract
|
||||||
import net.corda.testing.node.MockNetwork
|
import net.corda.testing.node.MockNetwork
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
@ -30,6 +32,10 @@ import java.time.Instant
|
|||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
class ScheduledFlowTests {
|
class ScheduledFlowTests {
|
||||||
|
companion object {
|
||||||
|
val PAGE_SIZE = 20
|
||||||
|
val SORTING = Sort(listOf(Sort.SortColumn(SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID), Sort.Direction.DESC)))
|
||||||
|
}
|
||||||
lateinit var mockNet: MockNetwork
|
lateinit var mockNet: MockNetwork
|
||||||
lateinit var notaryNode: MockNetwork.MockNode
|
lateinit var notaryNode: MockNetwork.MockNode
|
||||||
lateinit var nodeA: MockNetwork.MockNode
|
lateinit var nodeA: MockNetwork.MockNode
|
||||||
@ -133,33 +139,50 @@ class ScheduledFlowTests {
|
|||||||
@Test
|
@Test
|
||||||
fun `run a whole batch of scheduled flows`() {
|
fun `run a whole batch of scheduled flows`() {
|
||||||
val N = 100
|
val N = 100
|
||||||
|
val futures = mutableListOf<CordaFuture<*>>()
|
||||||
for (i in 0..N - 1) {
|
for (i in 0..N - 1) {
|
||||||
nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity))
|
futures.add(nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity)).resultFuture)
|
||||||
nodeB.services.startFlow(InsertInitialStateFlow(nodeA.info.legalIdentity))
|
futures.add(nodeB.services.startFlow(InsertInitialStateFlow(nodeA.info.legalIdentity)).resultFuture)
|
||||||
}
|
}
|
||||||
mockNet.waitQuiescent()
|
mockNet.waitQuiescent()
|
||||||
|
|
||||||
val statesFromA = nodeA.database.transaction {
|
// Check all of the flows completed successfully
|
||||||
|
futures.forEach { it.getOrThrow() }
|
||||||
|
|
||||||
|
// Convert the states into maps to make error reporting easier
|
||||||
|
val statesFromA: List<StateAndRef<ScheduledState>> = nodeA.database.transaction {
|
||||||
queryStatesWithPaging(nodeA.services.vaultQueryService)
|
queryStatesWithPaging(nodeA.services.vaultQueryService)
|
||||||
}
|
}
|
||||||
val statesFromB = nodeB.database.transaction {
|
val statesFromB: List<StateAndRef<ScheduledState>> = nodeB.database.transaction {
|
||||||
queryStatesWithPaging(nodeB.services.vaultQueryService)
|
queryStatesWithPaging(nodeB.services.vaultQueryService)
|
||||||
}
|
}
|
||||||
assertEquals(2 * N, statesFromA.count(), "Expect all states to be present")
|
assertEquals(2 * N, statesFromA.count(), "Expect all states to be present")
|
||||||
|
statesFromA.forEach { ref ->
|
||||||
|
if (ref !in statesFromB) {
|
||||||
|
throw IllegalStateException("State $ref is only present on node A.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
statesFromB.forEach { ref ->
|
||||||
|
if (ref !in statesFromA) {
|
||||||
|
throw IllegalStateException("State $ref is only present on node B.")
|
||||||
|
}
|
||||||
|
}
|
||||||
assertEquals(statesFromA, statesFromB, "Expect identical data on both nodes")
|
assertEquals(statesFromA, statesFromB, "Expect identical data on both nodes")
|
||||||
assertTrue("Expect all states have run the scheduled task", statesFromB.all { it.state.data.processed })
|
assertTrue("Expect all states have run the scheduled task", statesFromB.all { it.state.data.processed })
|
||||||
}
|
}
|
||||||
|
|
||||||
// Demonstrate Vault Query paging and sorting
|
/**
|
||||||
val PAGE_SIZE = 20
|
* Query all states from the Vault, fetching results as a series of pages with ordered states in order to perform
|
||||||
val sorting = Sort(listOf(Sort.SortColumn(SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID), Sort.Direction.DESC)))
|
* integration testing of that functionality.
|
||||||
|
*
|
||||||
|
* @return states ordered by the transaction ID.
|
||||||
|
*/
|
||||||
private fun queryStatesWithPaging(vaultQueryService: VaultQueryService): List<StateAndRef<ScheduledState>> {
|
private fun queryStatesWithPaging(vaultQueryService: VaultQueryService): List<StateAndRef<ScheduledState>> {
|
||||||
var pageNumber = DEFAULT_PAGE_NUM
|
var pageNumber = DEFAULT_PAGE_NUM
|
||||||
val states = mutableListOf<StateAndRef<ScheduledState>>()
|
val states = mutableListOf<StateAndRef<ScheduledState>>()
|
||||||
do {
|
do {
|
||||||
val pageSpec = PageSpecification(pageSize = PAGE_SIZE, pageNumber = pageNumber)
|
val pageSpec = PageSpecification(pageSize = PAGE_SIZE, pageNumber = pageNumber)
|
||||||
val results = vaultQueryService.queryBy<ScheduledState>(VaultQueryCriteria(), pageSpec, sorting)
|
val results = vaultQueryService.queryBy<ScheduledState>(VaultQueryCriteria(), pageSpec, SORTING)
|
||||||
states.addAll(results.states)
|
states.addAll(results.states)
|
||||||
pageNumber++
|
pageNumber++
|
||||||
} while ((pageSpec.pageSize * (pageNumber)) <= results.totalStatesAvailable)
|
} while ((pageSpec.pageSize * (pageNumber)) <= results.totalStatesAvailable)
|
||||||
|
Reference in New Issue
Block a user