diff --git a/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt b/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt index a9bb8beb13..97ff1b7182 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt @@ -1,6 +1,7 @@ package net.corda.node.services.events import co.paralleluniverse.fibers.Suspendable +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.* import net.corda.core.crypto.containsAny import net.corda.core.flows.* @@ -15,10 +16,11 @@ import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria import net.corda.core.node.services.vault.Sort import net.corda.core.node.services.vault.SortAttribute import net.corda.core.transactions.TransactionBuilder -import net.corda.testing.DUMMY_NOTARY +import net.corda.core.utilities.getOrThrow import net.corda.node.services.network.NetworkMapService import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.ValidatingNotaryService +import net.corda.testing.DUMMY_NOTARY import net.corda.testing.contracts.DummyContract import net.corda.testing.node.MockNetwork import org.junit.After @@ -30,6 +32,10 @@ import java.time.Instant import kotlin.test.assertEquals class ScheduledFlowTests { + companion object { + val PAGE_SIZE = 20 + val SORTING = Sort(listOf(Sort.SortColumn(SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID), Sort.Direction.DESC))) + } lateinit var mockNet: MockNetwork lateinit var notaryNode: MockNetwork.MockNode lateinit var nodeA: MockNetwork.MockNode @@ -133,33 +139,50 @@ class ScheduledFlowTests { @Test fun `run a whole batch of scheduled flows`() { val N = 100 + val futures = mutableListOf>() for (i in 0..N - 1) { - nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity)) - nodeB.services.startFlow(InsertInitialStateFlow(nodeA.info.legalIdentity)) + futures.add(nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity)).resultFuture) + futures.add(nodeB.services.startFlow(InsertInitialStateFlow(nodeA.info.legalIdentity)).resultFuture) } mockNet.waitQuiescent() - val statesFromA = nodeA.database.transaction { + // Check all of the flows completed successfully + futures.forEach { it.getOrThrow() } + + // Convert the states into maps to make error reporting easier + val statesFromA: List> = nodeA.database.transaction { queryStatesWithPaging(nodeA.services.vaultQueryService) } - val statesFromB = nodeB.database.transaction { + val statesFromB: List> = nodeB.database.transaction { queryStatesWithPaging(nodeB.services.vaultQueryService) } assertEquals(2 * N, statesFromA.count(), "Expect all states to be present") + statesFromA.forEach { ref -> + if (ref !in statesFromB) { + throw IllegalStateException("State $ref is only present on node A.") + } + } + statesFromB.forEach { ref -> + if (ref !in statesFromA) { + throw IllegalStateException("State $ref is only present on node B.") + } + } assertEquals(statesFromA, statesFromB, "Expect identical data on both nodes") assertTrue("Expect all states have run the scheduled task", statesFromB.all { it.state.data.processed }) } - // Demonstrate Vault Query paging and sorting - val PAGE_SIZE = 20 - val sorting = Sort(listOf(Sort.SortColumn(SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID), Sort.Direction.DESC))) - + /** + * Query all states from the Vault, fetching results as a series of pages with ordered states in order to perform + * integration testing of that functionality. + * + * @return states ordered by the transaction ID. + */ private fun queryStatesWithPaging(vaultQueryService: VaultQueryService): List> { var pageNumber = DEFAULT_PAGE_NUM val states = mutableListOf>() do { val pageSpec = PageSpecification(pageSize = PAGE_SIZE, pageNumber = pageNumber) - val results = vaultQueryService.queryBy(VaultQueryCriteria(), pageSpec, sorting) + val results = vaultQueryService.queryBy(VaultQueryCriteria(), pageSpec, SORTING) states.addAll(results.states) pageNumber++ } while ((pageSpec.pageSize * (pageNumber)) <= results.totalStatesAvailable)