From f63e6cd2a65353e185681a33586a7b11eae68b8b Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Mon, 5 Dec 2016 17:33:28 +0000 Subject: [PATCH] Add some hooks to StateMachineManager and NodeSchedulerService so that unit tests of flows with scheduled actions can safely test for completion of their test activities. Typically this is done using a while loop whilst there are active fibers, or schedules and then blocking on the ReusuableLatches until the status changes and can be re-evaluated. Add unit tests of ScheduledFlow running on simulated network. Just use existing DumyContract in test DummyContract requires value equality so that assertEquals over states works as expected. Remove blank line. Add TODO on waitQuiescent. Fix minor build error --- .../net/corda/core/contracts/DummyContract.kt | 5 +- .../kotlin/net/corda/core/node/ServiceHub.kt | 14 +- .../docs/WorkflowTransactionBuildTutorial.kt | 4 - .../corda/contracts/JavaCommercialPaper.java | 2 +- .../net/corda/node/internal/AbstractNode.kt | 2 +- .../services/events/NodeSchedulerService.kt | 14 +- .../statemachine/StateMachineManager.kt | 5 + .../corda/node/services/ScheduledFlowTests.kt | 152 ++++++++++++++++++ .../testing/node/InMemoryMessagingNetwork.kt | 5 + .../kotlin/net/corda/testing/node/MockNode.kt | 23 +++ 10 files changed, 212 insertions(+), 14 deletions(-) create mode 100644 node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt diff --git a/core/src/main/kotlin/net/corda/core/contracts/DummyContract.kt b/core/src/main/kotlin/net/corda/core/contracts/DummyContract.kt index 7c6bd45846..ac27958ec9 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/DummyContract.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/DummyContract.kt @@ -9,7 +9,7 @@ import net.corda.core.transactions.TransactionBuilder val DUMMY_PROGRAM_ID = DummyContract() -class DummyContract : Contract { +data class DummyContract(override val legalContractReference: SecureHash = SecureHash.sha256("")) : Contract { interface State : ContractState { val magicNumber: Int @@ -44,9 +44,6 @@ class DummyContract : Contract { // Always accepts. } - // The "empty contract" - override val legalContractReference: SecureHash = SecureHash.sha256("") - companion object { @JvmStatic fun generateInitial(owner: PartyAndReference, magicNumber: Int, notary: Party): TransactionBuilder { diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index 18664fdf82..9542136c72 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -1,8 +1,6 @@ package net.corda.core.node -import net.corda.core.contracts.StateRef -import net.corda.core.contracts.TransactionResolutionException -import net.corda.core.contracts.TransactionState +import net.corda.core.contracts.* import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine import net.corda.core.messaging.MessagingService @@ -48,6 +46,16 @@ interface ServiceHub { return definingTx.tx.outputs[stateRef.index] } + /** + * Given a [StateRef] loads the referenced transaction and returns a [StateAndRef] + * + * @throws TransactionResolutionException if the [StateRef] points to a non-existent transaction. + */ + fun toStateAndRef(ref: StateRef): StateAndRef { + val definingTx = storageService.validatedTransactions.getTransaction(ref.txhash) ?: throw TransactionResolutionException(ref.txhash) + return definingTx.tx.outRef(ref.index) + } + /** * Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow. * diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt index 9328395df8..dae7c1efe8 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt @@ -20,10 +20,6 @@ object WorkflowTransactionBuildTutorial { } // DOCSTART 1 -// Helper method to access the StorageService and expand a StateRef into a StateAndRef -fun ServiceHub.toStateAndRef(ref: StateRef): StateAndRef { - return storageService.validatedTransactions.getTransaction(ref.txhash)!!.tx.outRef(ref.index) -} // Helper method to locate the latest Vault version of a LinearState from a possibly out of date StateRef inline fun ServiceHub.latest(ref: StateRef): StateAndRef { diff --git a/finance/src/main/java/net/corda/contracts/JavaCommercialPaper.java b/finance/src/main/java/net/corda/contracts/JavaCommercialPaper.java index 7159eadcd9..6fecd8d47d 100644 --- a/finance/src/main/java/net/corda/contracts/JavaCommercialPaper.java +++ b/finance/src/main/java/net/corda/contracts/JavaCommercialPaper.java @@ -115,7 +115,7 @@ public class JavaCommercialPaper implements Contract { } public State withoutOwner() { - return new State(issuance, CryptoUtilitiesKt.getNullCompositeKey(), faceValue, maturityDate); + return new State(issuance, CryptoUtilities.getNullCompositeKey(), faceValue, maturityDate); } @NotNull diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index ad4e7c7acb..8a3afb46f0 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -176,7 +176,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo get() = _networkMapRegistrationFuture /** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */ - val pluginRegistries: List by lazy { + open val pluginRegistries: List by lazy { ServiceLoader.load(CordaPluginRegistry::class.java).toList() } diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 151d0c23d8..08e30b9e6f 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -1,6 +1,7 @@ package net.corda.node.services.events import co.paralleluniverse.fibers.Suspendable +import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.SettableFuture import kotlinx.support.jdk8.collections.compute import net.corda.core.ThreadBox @@ -17,6 +18,7 @@ import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import net.corda.node.services.api.ServiceHubInternal import net.corda.node.utilities.* +import org.apache.activemq.artemis.utils.ReusableLatch import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.statements.InsertStatement @@ -87,6 +89,9 @@ class NodeSchedulerService(private val database: Database, private val mutex = ThreadBox(InnerState()) + @VisibleForTesting + val unfinishedSchedules = ReusableLatch() + // We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow. fun start() { mutex.locked { @@ -98,7 +103,9 @@ class NodeSchedulerService(private val database: Database, override fun scheduleStateActivity(action: ScheduledStateRef) { log.trace { "Schedule $action" } mutex.locked { - scheduledStates[action.ref] = action + if (scheduledStates.put(action.ref, action) == null) { + unfinishedSchedules.countUp() + } if (action.scheduledAt.isBefore(earliestState?.scheduledAt ?: Instant.MAX)) { // We are earliest earliestState = action @@ -115,6 +122,9 @@ class NodeSchedulerService(private val database: Database, log.trace { "Unschedule $ref" } mutex.locked { val removedAction = scheduledStates.remove(ref) + if (removedAction != null) { + unfinishedSchedules.countDown() + } if (removedAction == earliestState && removedAction != null) { recomputeEarliest() rescheduleWakeUp() @@ -196,6 +206,7 @@ class NodeSchedulerService(private val database: Database, if (value === scheduledState) { if (scheduledActivity == null) { logger.info("Scheduled state $scheduledState has rescheduled to never.") + scheduler.unfinishedSchedules.countDown() null } else if (scheduledActivity.scheduledAt.isAfter(serviceHub.clock.instant())) { logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.") @@ -207,6 +218,7 @@ class NodeSchedulerService(private val database: Database, // FlowLogic will be checkpointed by the time this returns. //scheduler.services.startFlowAndForget(logic) scheduledLogic = logic + scheduler.unfinishedSchedules.countDown() null } } else { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 79811febc3..2d947cd152 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -6,6 +6,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer import co.paralleluniverse.strands.Strand import com.codahale.metrics.Gauge import com.esotericsoftware.kryo.Kryo +import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ListenableFuture import kotlinx.support.jdk8.collections.removeIf import net.corda.core.ThreadBox @@ -100,6 +101,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, @Volatile private var stopping = false // How many Fibers are running and not suspended. If zero and stopping is true, then we are halted. private val liveFibers = ReusableLatch() + @VisibleForTesting + val unfinishedFibers = ReusableLatch() // Monitoring support. private val metrics = serviceHub.monitoringService.metrics @@ -335,6 +338,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, mutex.locked { stateMachines.remove(psm)?.let { checkpointStorage.removeCheckpoint(it) } totalFinishedFlows.inc() + unfinishedFibers.countDown() notifyChangeObservers(psm, AddOrRemove.REMOVE) } endAllFiberSessions(psm) @@ -344,6 +348,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } mutex.locked { totalStartedFlows.inc() + unfinishedFibers.countUp() notifyChangeObservers(psm, AddOrRemove.ADD) } } diff --git a/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt b/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt new file mode 100644 index 0000000000..9e60c223db --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt @@ -0,0 +1,152 @@ +package net.corda.node.services + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.contracts.* +import net.corda.core.crypto.CompositeKey +import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowLogicRefFactory +import net.corda.core.node.CordaPluginRegistry +import net.corda.core.node.services.ServiceInfo +import net.corda.core.node.services.linearHeadsOfType +import net.corda.core.utilities.DUMMY_NOTARY +import net.corda.core.utilities.DUMMY_NOTARY_KEY +import net.corda.flows.FinalityFlow +import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.transactions.ValidatingNotaryService +import net.corda.node.utilities.databaseTransaction +import net.corda.testing.node.MockNetwork +import org.junit.After +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Ignore +import org.junit.Test +import java.security.PublicKey +import java.time.Instant +import kotlin.test.assertEquals + +class ScheduledFlowTests { + lateinit var net: MockNetwork + lateinit var notaryNode: MockNetwork.MockNode + lateinit var nodeA: MockNetwork.MockNode + lateinit var nodeB: MockNetwork.MockNode + + data class ScheduledState(val creationTime: Instant, + val source: Party, + val destination: Party, + val processed: Boolean = false, + override val linearId: UniqueIdentifier = UniqueIdentifier(), + override val contract: Contract = DummyContract()) : SchedulableState, LinearState { + override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? { + if (!processed) { + val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.java, thisStateRef) + return ScheduledActivity(logicRef, creationTime) + } else { + return null + } + } + + override val participants: List = listOf(source.owningKey, destination.owningKey) + + override fun isRelevant(ourKeys: Set): Boolean { + return participants.any { it.containsAny(ourKeys) } + } + } + + class InsertInitialStateFlow(val destination: Party) : FlowLogic() { + @Suspendable + override fun call() { + val scheduledState = ScheduledState(serviceHub.clock.instant(), + serviceHub.myInfo.legalIdentity, destination) + + val notary = serviceHub.networkMapCache.getAnyNotary() + val builder = TransactionType.General.Builder(notary) + val tx = builder.withItems(scheduledState). + signWith(serviceHub.legalIdentityKey).toSignedTransaction(false) + subFlow(FinalityFlow(tx, setOf(serviceHub.myInfo.legalIdentity))) + } + } + + class ScheduledFlow(val stateRef: StateRef) : FlowLogic() { + @Suspendable + override fun call() { + val state = serviceHub.toStateAndRef(stateRef) + val scheduledState = state.state.data + // Only run flow over states originating on this node + if (scheduledState.source != serviceHub.myInfo.legalIdentity) { + return + } + require(!scheduledState.processed) { "State should not have been previously processed" } + val notary = state.state.notary + val newStateOutput = scheduledState.copy(processed = true) + val builder = TransactionType.General.Builder(notary) + val tx = builder.withItems(state, newStateOutput). + signWith(serviceHub.legalIdentityKey).toSignedTransaction(false) + subFlow(FinalityFlow(tx, setOf(scheduledState.source, scheduledState.destination))) + } + } + + class ScheduledFlowTestPlugin : CordaPluginRegistry() { + override val requiredFlows: Map> = mapOf( + InsertInitialStateFlow::class.java.name to setOf(Party::class.java.name), + ScheduledFlow::class.java.name to setOf(StateRef::class.java.name) + ) + } + + + @Before + fun setup() { + net = MockNetwork(threadPerNode = true) + notaryNode = net.createNode( + legalName = DUMMY_NOTARY.name, + keyPair = DUMMY_NOTARY_KEY, + advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type))) + nodeA = net.createNode(notaryNode.info.address, start = false) + nodeB = net.createNode(notaryNode.info.address, start = false) + nodeA.testPluginRegistries.add(ScheduledFlowTestPlugin()) + nodeB.testPluginRegistries.add(ScheduledFlowTestPlugin()) + net.startNodes() + } + + @After + fun cleanUp() { + net.stopNodes() + } + + @Test + fun `create and run scheduled flow then wait for result`() { + nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity)) + net.waitQuiescent() + val stateFromA = databaseTransaction(nodeA.database) { + nodeA.services.vaultService.linearHeadsOfType().values.first() + } + val stateFromB = databaseTransaction(nodeB.database) { + nodeB.services.vaultService.linearHeadsOfType().values.first() + } + assertEquals(stateFromA, stateFromB, "Must be same copy on both nodes") + assertTrue("Must be processed", stateFromB.state.data.processed) + } + + @Ignore + @Test + // TODO I need to investigate why we get very very occasional SessionInit failures + // during notarisation. + fun `Run a whole batch of scheduled flows`() { + val N = 100 + for (i in 0..N - 1) { + nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity)) + nodeB.services.startFlow(InsertInitialStateFlow(nodeA.info.legalIdentity)) + } + net.waitQuiescent() + val statesFromA = databaseTransaction(nodeA.database) { + nodeA.services.vaultService.linearHeadsOfType() + } + val statesFromB = databaseTransaction(nodeB.database) { + nodeB.services.vaultService.linearHeadsOfType() + } + assertEquals(2 * N, statesFromA.count(), "Expect all states to be present") + assertEquals(statesFromA, statesFromB, "Expect identical data on both nodes") + assertTrue("Expect all states have run the scheduled task", statesFromB.values.all { it.state.data.processed }) + } +} diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 62cc909c8e..c38c29fe2f 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -15,6 +15,7 @@ import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.JDBCHashSet import net.corda.node.utilities.databaseTransaction import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging +import org.apache.activemq.artemis.utils.ReusableLatch import org.bouncycastle.asn1.x500.X500Name import org.jetbrains.exposed.sql.Database import org.slf4j.LoggerFactory @@ -66,6 +67,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria private val messageReceiveQueues = HashMap>() private val _receivedMessages = PublishSubject.create() + val messagesInFlight = ReusableLatch() + @Suppress("unused") // Used by the visualiser tool. /** A stream of (sender, message, recipients) triples */ val receivedMessages: Observable @@ -119,6 +122,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria @Synchronized private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) { + messagesInFlight.countUp() messageSendQueue += MessageTransfer(from.myAddress, message, recipients) } @@ -359,6 +363,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria } _receivedMessages.onNext(transfer) processedMessages += transfer.message.uniqueMessageId + messagesInFlight.countDown() } } } else { diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index 10712eb980..c90a0d75db 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -6,6 +6,7 @@ import com.google.common.util.concurrent.Futures import net.corda.core.* import net.corda.core.crypto.Party import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.PhysicalLocation import net.corda.core.node.services.* import net.corda.core.utilities.DUMMY_NOTARY_KEY @@ -150,6 +151,12 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, return this } + // Allow unit tests to modify the plugin list before the node start, + // so they don't have to ServiceLoad test plugins into all unit tests. + val testPluginRegistries = super.pluginRegistries.toMutableList() + override val pluginRegistries: List + get() = testPluginRegistries + // This does not indirect through the NodeInfo object so it can be called before the node is started. // It is used from the network visualiser tool. @Suppress("unused") val place: PhysicalLocation get() = findMyLocation()!! @@ -281,4 +288,20 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, require(nodes.isNotEmpty()) nodes.forEach { if (it.started) it.stop() } } + + // Test method to block until all scheduled activity, active flows + // and network activity has ceased. + // TODO This is not perfect in that certain orderings my skip over the scanning loop. + // However, in practice it works well for testing of scheduled flows. + fun waitQuiescent() { + while(nodes.any { it.smm.unfinishedFibers.count > 0 + || it.scheduler.unfinishedSchedules.count > 0} + || messagingNetwork.messagesInFlight.count > 0) { + for (node in nodes) { + node.smm.unfinishedFibers.await() + node.scheduler.unfinishedSchedules.await() + } + messagingNetwork.messagesInFlight.await() + } + } }