Add some hooks to StateMachineManager and NodeSchedulerService so that unit tests of flows with scheduled actions can safely test for completion of their test activities. Typically this is done using a while loop whilst there are active fibers, or schedules and then blocking on the ReusuableLatches until the status changes and can be re-evaluated.

Add unit tests of ScheduledFlow running on simulated network.

Just use existing DumyContract in test

DummyContract requires value equality so that assertEquals over states works as expected.

Remove blank line.

Add TODO on waitQuiescent.

Fix minor build error
This commit is contained in:
Matthew Nesbit 2016-12-05 17:33:28 +00:00
parent 41fca0c1eb
commit f63e6cd2a6
10 changed files with 212 additions and 14 deletions

View File

@ -9,7 +9,7 @@ import net.corda.core.transactions.TransactionBuilder
val DUMMY_PROGRAM_ID = DummyContract()
class DummyContract : Contract {
data class DummyContract(override val legalContractReference: SecureHash = SecureHash.sha256("")) : Contract {
interface State : ContractState {
val magicNumber: Int
@ -44,9 +44,6 @@ class DummyContract : Contract {
// Always accepts.
}
// The "empty contract"
override val legalContractReference: SecureHash = SecureHash.sha256("")
companion object {
@JvmStatic
fun generateInitial(owner: PartyAndReference, magicNumber: Int, notary: Party): TransactionBuilder {

View File

@ -1,8 +1,6 @@
package net.corda.core.node
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionResolutionException
import net.corda.core.contracts.TransactionState
import net.corda.core.contracts.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.MessagingService
@ -48,6 +46,16 @@ interface ServiceHub {
return definingTx.tx.outputs[stateRef.index]
}
/**
* Given a [StateRef] loads the referenced transaction and returns a [StateAndRef<T>]
*
* @throws TransactionResolutionException if the [StateRef] points to a non-existent transaction.
*/
fun <T : ContractState> toStateAndRef(ref: StateRef): StateAndRef<T> {
val definingTx = storageService.validatedTransactions.getTransaction(ref.txhash) ?: throw TransactionResolutionException(ref.txhash)
return definingTx.tx.outRef<T>(ref.index)
}
/**
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow.
*

View File

@ -20,10 +20,6 @@ object WorkflowTransactionBuildTutorial {
}
// DOCSTART 1
// Helper method to access the StorageService and expand a StateRef into a StateAndRef
fun <T : ContractState> ServiceHub.toStateAndRef(ref: StateRef): StateAndRef<T> {
return storageService.validatedTransactions.getTransaction(ref.txhash)!!.tx.outRef<T>(ref.index)
}
// Helper method to locate the latest Vault version of a LinearState from a possibly out of date StateRef
inline fun <reified T : LinearState> ServiceHub.latest(ref: StateRef): StateAndRef<T> {

View File

@ -115,7 +115,7 @@ public class JavaCommercialPaper implements Contract {
}
public State withoutOwner() {
return new State(issuance, CryptoUtilitiesKt.getNullCompositeKey(), faceValue, maturityDate);
return new State(issuance, CryptoUtilities.getNullCompositeKey(), faceValue, maturityDate);
}
@NotNull

View File

@ -176,7 +176,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
get() = _networkMapRegistrationFuture
/** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */
val pluginRegistries: List<CordaPluginRegistry> by lazy {
open val pluginRegistries: List<CordaPluginRegistry> by lazy {
ServiceLoader.load(CordaPluginRegistry::class.java).toList()
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.events
import co.paralleluniverse.fibers.Suspendable
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.SettableFuture
import kotlinx.support.jdk8.collections.compute
import net.corda.core.ThreadBox
@ -17,6 +18,7 @@ import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
@ -87,6 +89,9 @@ class NodeSchedulerService(private val database: Database,
private val mutex = ThreadBox(InnerState())
@VisibleForTesting
val unfinishedSchedules = ReusableLatch()
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
fun start() {
mutex.locked {
@ -98,7 +103,9 @@ class NodeSchedulerService(private val database: Database,
override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" }
mutex.locked {
scheduledStates[action.ref] = action
if (scheduledStates.put(action.ref, action) == null) {
unfinishedSchedules.countUp()
}
if (action.scheduledAt.isBefore(earliestState?.scheduledAt ?: Instant.MAX)) {
// We are earliest
earliestState = action
@ -115,6 +122,9 @@ class NodeSchedulerService(private val database: Database,
log.trace { "Unschedule $ref" }
mutex.locked {
val removedAction = scheduledStates.remove(ref)
if (removedAction != null) {
unfinishedSchedules.countDown()
}
if (removedAction == earliestState && removedAction != null) {
recomputeEarliest()
rescheduleWakeUp()
@ -196,6 +206,7 @@ class NodeSchedulerService(private val database: Database,
if (value === scheduledState) {
if (scheduledActivity == null) {
logger.info("Scheduled state $scheduledState has rescheduled to never.")
scheduler.unfinishedSchedules.countDown()
null
} else if (scheduledActivity.scheduledAt.isAfter(serviceHub.clock.instant())) {
logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
@ -207,6 +218,7 @@ class NodeSchedulerService(private val database: Database,
// FlowLogic will be checkpointed by the time this returns.
//scheduler.services.startFlowAndForget(logic)
scheduledLogic = logic
scheduler.unfinishedSchedules.countDown()
null
}
} else {

View File

@ -6,6 +6,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import co.paralleluniverse.strands.Strand
import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox
@ -100,6 +101,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
@Volatile private var stopping = false
// How many Fibers are running and not suspended. If zero and stopping is true, then we are halted.
private val liveFibers = ReusableLatch()
@VisibleForTesting
val unfinishedFibers = ReusableLatch()
// Monitoring support.
private val metrics = serviceHub.monitoringService.metrics
@ -335,6 +338,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
mutex.locked {
stateMachines.remove(psm)?.let { checkpointStorage.removeCheckpoint(it) }
totalFinishedFlows.inc()
unfinishedFibers.countDown()
notifyChangeObservers(psm, AddOrRemove.REMOVE)
}
endAllFiberSessions(psm)
@ -344,6 +348,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
mutex.locked {
totalStartedFlows.inc()
unfinishedFibers.countUp()
notifyChangeObservers(psm, AddOrRemove.ADD)
}
}

View File

@ -0,0 +1,152 @@
package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.*
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.flows.FinalityFlow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.security.PublicKey
import java.time.Instant
import kotlin.test.assertEquals
class ScheduledFlowTests {
lateinit var net: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var nodeA: MockNetwork.MockNode
lateinit var nodeB: MockNetwork.MockNode
data class ScheduledState(val creationTime: Instant,
val source: Party,
val destination: Party,
val processed: Boolean = false,
override val linearId: UniqueIdentifier = UniqueIdentifier(),
override val contract: Contract = DummyContract()) : SchedulableState, LinearState {
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
if (!processed) {
val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.java, thisStateRef)
return ScheduledActivity(logicRef, creationTime)
} else {
return null
}
}
override val participants: List<CompositeKey> = listOf(source.owningKey, destination.owningKey)
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean {
return participants.any { it.containsAny(ourKeys) }
}
}
class InsertInitialStateFlow(val destination: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val scheduledState = ScheduledState(serviceHub.clock.instant(),
serviceHub.myInfo.legalIdentity, destination)
val notary = serviceHub.networkMapCache.getAnyNotary()
val builder = TransactionType.General.Builder(notary)
val tx = builder.withItems(scheduledState).
signWith(serviceHub.legalIdentityKey).toSignedTransaction(false)
subFlow(FinalityFlow(tx, setOf(serviceHub.myInfo.legalIdentity)))
}
}
class ScheduledFlow(val stateRef: StateRef) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val state = serviceHub.toStateAndRef<ScheduledState>(stateRef)
val scheduledState = state.state.data
// Only run flow over states originating on this node
if (scheduledState.source != serviceHub.myInfo.legalIdentity) {
return
}
require(!scheduledState.processed) { "State should not have been previously processed" }
val notary = state.state.notary
val newStateOutput = scheduledState.copy(processed = true)
val builder = TransactionType.General.Builder(notary)
val tx = builder.withItems(state, newStateOutput).
signWith(serviceHub.legalIdentityKey).toSignedTransaction(false)
subFlow(FinalityFlow(tx, setOf(scheduledState.source, scheduledState.destination)))
}
}
class ScheduledFlowTestPlugin : CordaPluginRegistry() {
override val requiredFlows: Map<String, Set<String>> = mapOf(
InsertInitialStateFlow::class.java.name to setOf(Party::class.java.name),
ScheduledFlow::class.java.name to setOf(StateRef::class.java.name)
)
}
@Before
fun setup() {
net = MockNetwork(threadPerNode = true)
notaryNode = net.createNode(
legalName = DUMMY_NOTARY.name,
keyPair = DUMMY_NOTARY_KEY,
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type)))
nodeA = net.createNode(notaryNode.info.address, start = false)
nodeB = net.createNode(notaryNode.info.address, start = false)
nodeA.testPluginRegistries.add(ScheduledFlowTestPlugin())
nodeB.testPluginRegistries.add(ScheduledFlowTestPlugin())
net.startNodes()
}
@After
fun cleanUp() {
net.stopNodes()
}
@Test
fun `create and run scheduled flow then wait for result`() {
nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity))
net.waitQuiescent()
val stateFromA = databaseTransaction(nodeA.database) {
nodeA.services.vaultService.linearHeadsOfType<ScheduledState>().values.first()
}
val stateFromB = databaseTransaction(nodeB.database) {
nodeB.services.vaultService.linearHeadsOfType<ScheduledState>().values.first()
}
assertEquals(stateFromA, stateFromB, "Must be same copy on both nodes")
assertTrue("Must be processed", stateFromB.state.data.processed)
}
@Ignore
@Test
// TODO I need to investigate why we get very very occasional SessionInit failures
// during notarisation.
fun `Run a whole batch of scheduled flows`() {
val N = 100
for (i in 0..N - 1) {
nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity))
nodeB.services.startFlow(InsertInitialStateFlow(nodeA.info.legalIdentity))
}
net.waitQuiescent()
val statesFromA = databaseTransaction(nodeA.database) {
nodeA.services.vaultService.linearHeadsOfType<ScheduledState>()
}
val statesFromB = databaseTransaction(nodeB.database) {
nodeB.services.vaultService.linearHeadsOfType<ScheduledState>()
}
assertEquals(2 * N, statesFromA.count(), "Expect all states to be present")
assertEquals(statesFromA, statesFromB, "Expect identical data on both nodes")
assertTrue("Expect all states have run the scheduled task", statesFromB.values.all { it.state.data.processed })
}
}

View File

@ -15,6 +15,7 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JDBCHashSet
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
import org.apache.activemq.artemis.utils.ReusableLatch
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
import org.slf4j.LoggerFactory
@ -66,6 +67,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
private val messageReceiveQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
private val _receivedMessages = PublishSubject.create<MessageTransfer>()
val messagesInFlight = ReusableLatch()
@Suppress("unused") // Used by the visualiser tool.
/** A stream of (sender, message, recipients) triples */
val receivedMessages: Observable<MessageTransfer>
@ -119,6 +122,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
@Synchronized
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
messagesInFlight.countUp()
messageSendQueue += MessageTransfer(from.myAddress, message, recipients)
}
@ -359,6 +363,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
}
_receivedMessages.onNext(transfer)
processedMessages += transfer.message.uniqueMessageId
messagesInFlight.countDown()
}
}
} else {

View File

@ -6,6 +6,7 @@ import com.google.common.util.concurrent.Futures
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PhysicalLocation
import net.corda.core.node.services.*
import net.corda.core.utilities.DUMMY_NOTARY_KEY
@ -150,6 +151,12 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
return this
}
// Allow unit tests to modify the plugin list before the node start,
// so they don't have to ServiceLoad test plugins into all unit tests.
val testPluginRegistries = super.pluginRegistries.toMutableList()
override val pluginRegistries: List<CordaPluginRegistry>
get() = testPluginRegistries
// This does not indirect through the NodeInfo object so it can be called before the node is started.
// It is used from the network visualiser tool.
@Suppress("unused") val place: PhysicalLocation get() = findMyLocation()!!
@ -281,4 +288,20 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
require(nodes.isNotEmpty())
nodes.forEach { if (it.started) it.stop() }
}
// Test method to block until all scheduled activity, active flows
// and network activity has ceased.
// TODO This is not perfect in that certain orderings my skip over the scanning loop.
// However, in practice it works well for testing of scheduled flows.
fun waitQuiescent() {
while(nodes.any { it.smm.unfinishedFibers.count > 0
|| it.scheduler.unfinishedSchedules.count > 0}
|| messagingNetwork.messagesInFlight.count > 0) {
for (node in nodes) {
node.smm.unfinishedFibers.await()
node.scheduler.unfinishedSchedules.await()
}
messagingNetwork.messagesInFlight.await()
}
}
}