Merge pull request #34 from corda/mnesbit-add-wait-hooks-for-scheduled-flow-testing

Add some hooks to StateMachineManager and NodeSchedulerService
This commit is contained in:
Matthew Nesbit 2016-12-07 16:12:36 +00:00 committed by GitHub
commit 893b4b5393
10 changed files with 212 additions and 14 deletions

View File

@ -9,7 +9,7 @@ import net.corda.core.transactions.TransactionBuilder
val DUMMY_PROGRAM_ID = DummyContract() val DUMMY_PROGRAM_ID = DummyContract()
class DummyContract : Contract { data class DummyContract(override val legalContractReference: SecureHash = SecureHash.sha256("")) : Contract {
interface State : ContractState { interface State : ContractState {
val magicNumber: Int val magicNumber: Int
@ -44,9 +44,6 @@ class DummyContract : Contract {
// Always accepts. // Always accepts.
} }
// The "empty contract"
override val legalContractReference: SecureHash = SecureHash.sha256("")
companion object { companion object {
@JvmStatic @JvmStatic
fun generateInitial(owner: PartyAndReference, magicNumber: Int, notary: Party): TransactionBuilder { fun generateInitial(owner: PartyAndReference, magicNumber: Int, notary: Party): TransactionBuilder {

View File

@ -1,8 +1,6 @@
package net.corda.core.node package net.corda.core.node
import net.corda.core.contracts.StateRef import net.corda.core.contracts.*
import net.corda.core.contracts.TransactionResolutionException
import net.corda.core.contracts.TransactionState
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.MessagingService import net.corda.core.messaging.MessagingService
@ -48,6 +46,16 @@ interface ServiceHub {
return definingTx.tx.outputs[stateRef.index] return definingTx.tx.outputs[stateRef.index]
} }
/**
* Given a [StateRef] loads the referenced transaction and returns a [StateAndRef<T>]
*
* @throws TransactionResolutionException if the [StateRef] points to a non-existent transaction.
*/
fun <T : ContractState> toStateAndRef(ref: StateRef): StateAndRef<T> {
val definingTx = storageService.validatedTransactions.getTransaction(ref.txhash) ?: throw TransactionResolutionException(ref.txhash)
return definingTx.tx.outRef<T>(ref.index)
}
/** /**
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow. * Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow.
* *

View File

@ -20,10 +20,6 @@ object WorkflowTransactionBuildTutorial {
} }
// DOCSTART 1 // DOCSTART 1
// Helper method to access the StorageService and expand a StateRef into a StateAndRef
fun <T : ContractState> ServiceHub.toStateAndRef(ref: StateRef): StateAndRef<T> {
return storageService.validatedTransactions.getTransaction(ref.txhash)!!.tx.outRef<T>(ref.index)
}
// Helper method to locate the latest Vault version of a LinearState from a possibly out of date StateRef // Helper method to locate the latest Vault version of a LinearState from a possibly out of date StateRef
inline fun <reified T : LinearState> ServiceHub.latest(ref: StateRef): StateAndRef<T> { inline fun <reified T : LinearState> ServiceHub.latest(ref: StateRef): StateAndRef<T> {

View File

@ -115,7 +115,7 @@ public class JavaCommercialPaper implements Contract {
} }
public State withoutOwner() { public State withoutOwner() {
return new State(issuance, CryptoUtilitiesKt.getNullCompositeKey(), faceValue, maturityDate); return new State(issuance, CryptoUtilities.getNullCompositeKey(), faceValue, maturityDate);
} }
@NotNull @NotNull

View File

@ -176,7 +176,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
get() = _networkMapRegistrationFuture get() = _networkMapRegistrationFuture
/** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */ /** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */
val pluginRegistries: List<CordaPluginRegistry> by lazy { open val pluginRegistries: List<CordaPluginRegistry> by lazy {
ServiceLoader.load(CordaPluginRegistry::class.java).toList() ServiceLoader.load(CordaPluginRegistry::class.java).toList()
} }

View File

@ -1,6 +1,7 @@
package net.corda.node.services.events package net.corda.node.services.events
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import kotlinx.support.jdk8.collections.compute import kotlinx.support.jdk8.collections.compute
import net.corda.core.ThreadBox import net.corda.core.ThreadBox
@ -17,6 +18,7 @@ import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.* import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement import org.jetbrains.exposed.sql.statements.InsertStatement
@ -87,6 +89,9 @@ class NodeSchedulerService(private val database: Database,
private val mutex = ThreadBox(InnerState()) private val mutex = ThreadBox(InnerState())
@VisibleForTesting
val unfinishedSchedules = ReusableLatch()
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow. // We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
fun start() { fun start() {
mutex.locked { mutex.locked {
@ -98,7 +103,9 @@ class NodeSchedulerService(private val database: Database,
override fun scheduleStateActivity(action: ScheduledStateRef) { override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" } log.trace { "Schedule $action" }
mutex.locked { mutex.locked {
scheduledStates[action.ref] = action if (scheduledStates.put(action.ref, action) == null) {
unfinishedSchedules.countUp()
}
if (action.scheduledAt.isBefore(earliestState?.scheduledAt ?: Instant.MAX)) { if (action.scheduledAt.isBefore(earliestState?.scheduledAt ?: Instant.MAX)) {
// We are earliest // We are earliest
earliestState = action earliestState = action
@ -115,6 +122,9 @@ class NodeSchedulerService(private val database: Database,
log.trace { "Unschedule $ref" } log.trace { "Unschedule $ref" }
mutex.locked { mutex.locked {
val removedAction = scheduledStates.remove(ref) val removedAction = scheduledStates.remove(ref)
if (removedAction != null) {
unfinishedSchedules.countDown()
}
if (removedAction == earliestState && removedAction != null) { if (removedAction == earliestState && removedAction != null) {
recomputeEarliest() recomputeEarliest()
rescheduleWakeUp() rescheduleWakeUp()
@ -196,6 +206,7 @@ class NodeSchedulerService(private val database: Database,
if (value === scheduledState) { if (value === scheduledState) {
if (scheduledActivity == null) { if (scheduledActivity == null) {
logger.info("Scheduled state $scheduledState has rescheduled to never.") logger.info("Scheduled state $scheduledState has rescheduled to never.")
scheduler.unfinishedSchedules.countDown()
null null
} else if (scheduledActivity.scheduledAt.isAfter(serviceHub.clock.instant())) { } else if (scheduledActivity.scheduledAt.isAfter(serviceHub.clock.instant())) {
logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.") logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
@ -207,6 +218,7 @@ class NodeSchedulerService(private val database: Database,
// FlowLogic will be checkpointed by the time this returns. // FlowLogic will be checkpointed by the time this returns.
//scheduler.services.startFlowAndForget(logic) //scheduler.services.startFlowAndForget(logic)
scheduledLogic = logic scheduledLogic = logic
scheduler.unfinishedSchedules.countDown()
null null
} }
} else { } else {

View File

@ -6,6 +6,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.Strand
import com.codahale.metrics.Gauge import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import kotlinx.support.jdk8.collections.removeIf import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox import net.corda.core.ThreadBox
@ -100,6 +101,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
@Volatile private var stopping = false @Volatile private var stopping = false
// How many Fibers are running and not suspended. If zero and stopping is true, then we are halted. // How many Fibers are running and not suspended. If zero and stopping is true, then we are halted.
private val liveFibers = ReusableLatch() private val liveFibers = ReusableLatch()
@VisibleForTesting
val unfinishedFibers = ReusableLatch()
// Monitoring support. // Monitoring support.
private val metrics = serviceHub.monitoringService.metrics private val metrics = serviceHub.monitoringService.metrics
@ -335,6 +338,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
mutex.locked { mutex.locked {
stateMachines.remove(psm)?.let { checkpointStorage.removeCheckpoint(it) } stateMachines.remove(psm)?.let { checkpointStorage.removeCheckpoint(it) }
totalFinishedFlows.inc() totalFinishedFlows.inc()
unfinishedFibers.countDown()
notifyChangeObservers(psm, AddOrRemove.REMOVE) notifyChangeObservers(psm, AddOrRemove.REMOVE)
} }
endAllFiberSessions(psm) endAllFiberSessions(psm)
@ -344,6 +348,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
} }
mutex.locked { mutex.locked {
totalStartedFlows.inc() totalStartedFlows.inc()
unfinishedFibers.countUp()
notifyChangeObservers(psm, AddOrRemove.ADD) notifyChangeObservers(psm, AddOrRemove.ADD)
} }
} }

View File

@ -0,0 +1,152 @@
package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.*
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.flows.FinalityFlow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.security.PublicKey
import java.time.Instant
import kotlin.test.assertEquals
class ScheduledFlowTests {
lateinit var net: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var nodeA: MockNetwork.MockNode
lateinit var nodeB: MockNetwork.MockNode
data class ScheduledState(val creationTime: Instant,
val source: Party,
val destination: Party,
val processed: Boolean = false,
override val linearId: UniqueIdentifier = UniqueIdentifier(),
override val contract: Contract = DummyContract()) : SchedulableState, LinearState {
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
if (!processed) {
val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.java, thisStateRef)
return ScheduledActivity(logicRef, creationTime)
} else {
return null
}
}
override val participants: List<CompositeKey> = listOf(source.owningKey, destination.owningKey)
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean {
return participants.any { it.containsAny(ourKeys) }
}
}
class InsertInitialStateFlow(val destination: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val scheduledState = ScheduledState(serviceHub.clock.instant(),
serviceHub.myInfo.legalIdentity, destination)
val notary = serviceHub.networkMapCache.getAnyNotary()
val builder = TransactionType.General.Builder(notary)
val tx = builder.withItems(scheduledState).
signWith(serviceHub.legalIdentityKey).toSignedTransaction(false)
subFlow(FinalityFlow(tx, setOf(serviceHub.myInfo.legalIdentity)))
}
}
class ScheduledFlow(val stateRef: StateRef) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val state = serviceHub.toStateAndRef<ScheduledState>(stateRef)
val scheduledState = state.state.data
// Only run flow over states originating on this node
if (scheduledState.source != serviceHub.myInfo.legalIdentity) {
return
}
require(!scheduledState.processed) { "State should not have been previously processed" }
val notary = state.state.notary
val newStateOutput = scheduledState.copy(processed = true)
val builder = TransactionType.General.Builder(notary)
val tx = builder.withItems(state, newStateOutput).
signWith(serviceHub.legalIdentityKey).toSignedTransaction(false)
subFlow(FinalityFlow(tx, setOf(scheduledState.source, scheduledState.destination)))
}
}
class ScheduledFlowTestPlugin : CordaPluginRegistry() {
override val requiredFlows: Map<String, Set<String>> = mapOf(
InsertInitialStateFlow::class.java.name to setOf(Party::class.java.name),
ScheduledFlow::class.java.name to setOf(StateRef::class.java.name)
)
}
@Before
fun setup() {
net = MockNetwork(threadPerNode = true)
notaryNode = net.createNode(
legalName = DUMMY_NOTARY.name,
keyPair = DUMMY_NOTARY_KEY,
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type)))
nodeA = net.createNode(notaryNode.info.address, start = false)
nodeB = net.createNode(notaryNode.info.address, start = false)
nodeA.testPluginRegistries.add(ScheduledFlowTestPlugin())
nodeB.testPluginRegistries.add(ScheduledFlowTestPlugin())
net.startNodes()
}
@After
fun cleanUp() {
net.stopNodes()
}
@Test
fun `create and run scheduled flow then wait for result`() {
nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity))
net.waitQuiescent()
val stateFromA = databaseTransaction(nodeA.database) {
nodeA.services.vaultService.linearHeadsOfType<ScheduledState>().values.first()
}
val stateFromB = databaseTransaction(nodeB.database) {
nodeB.services.vaultService.linearHeadsOfType<ScheduledState>().values.first()
}
assertEquals(stateFromA, stateFromB, "Must be same copy on both nodes")
assertTrue("Must be processed", stateFromB.state.data.processed)
}
@Ignore
@Test
// TODO I need to investigate why we get very very occasional SessionInit failures
// during notarisation.
fun `Run a whole batch of scheduled flows`() {
val N = 100
for (i in 0..N - 1) {
nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity))
nodeB.services.startFlow(InsertInitialStateFlow(nodeA.info.legalIdentity))
}
net.waitQuiescent()
val statesFromA = databaseTransaction(nodeA.database) {
nodeA.services.vaultService.linearHeadsOfType<ScheduledState>()
}
val statesFromB = databaseTransaction(nodeB.database) {
nodeB.services.vaultService.linearHeadsOfType<ScheduledState>()
}
assertEquals(2 * N, statesFromA.count(), "Expect all states to be present")
assertEquals(statesFromA, statesFromB, "Expect identical data on both nodes")
assertTrue("Expect all states have run the scheduled task", statesFromB.values.all { it.state.data.processed })
}
}

View File

@ -15,6 +15,7 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JDBCHashSet import net.corda.node.utilities.JDBCHashSet
import net.corda.node.utilities.databaseTransaction import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
import org.apache.activemq.artemis.utils.ReusableLatch
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@ -66,6 +67,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
private val messageReceiveQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>() private val messageReceiveQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
private val _receivedMessages = PublishSubject.create<MessageTransfer>() private val _receivedMessages = PublishSubject.create<MessageTransfer>()
val messagesInFlight = ReusableLatch()
@Suppress("unused") // Used by the visualiser tool. @Suppress("unused") // Used by the visualiser tool.
/** A stream of (sender, message, recipients) triples */ /** A stream of (sender, message, recipients) triples */
val receivedMessages: Observable<MessageTransfer> val receivedMessages: Observable<MessageTransfer>
@ -119,6 +122,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
@Synchronized @Synchronized
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) { private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
messagesInFlight.countUp()
messageSendQueue += MessageTransfer(from.myAddress, message, recipients) messageSendQueue += MessageTransfer(from.myAddress, message, recipients)
} }
@ -359,6 +363,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
} }
_receivedMessages.onNext(transfer) _receivedMessages.onNext(transfer)
processedMessages += transfer.message.uniqueMessageId processedMessages += transfer.message.uniqueMessageId
messagesInFlight.countDown()
} }
} }
} else { } else {

View File

@ -6,6 +6,7 @@ import com.google.common.util.concurrent.Futures
import net.corda.core.* import net.corda.core.*
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PhysicalLocation import net.corda.core.node.PhysicalLocation
import net.corda.core.node.services.* import net.corda.core.node.services.*
import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.DUMMY_NOTARY_KEY
@ -150,6 +151,12 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
return this return this
} }
// Allow unit tests to modify the plugin list before the node start,
// so they don't have to ServiceLoad test plugins into all unit tests.
val testPluginRegistries = super.pluginRegistries.toMutableList()
override val pluginRegistries: List<CordaPluginRegistry>
get() = testPluginRegistries
// This does not indirect through the NodeInfo object so it can be called before the node is started. // This does not indirect through the NodeInfo object so it can be called before the node is started.
// It is used from the network visualiser tool. // It is used from the network visualiser tool.
@Suppress("unused") val place: PhysicalLocation get() = findMyLocation()!! @Suppress("unused") val place: PhysicalLocation get() = findMyLocation()!!
@ -281,4 +288,20 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
require(nodes.isNotEmpty()) require(nodes.isNotEmpty())
nodes.forEach { if (it.started) it.stop() } nodes.forEach { if (it.started) it.stop() }
} }
// Test method to block until all scheduled activity, active flows
// and network activity has ceased.
// TODO This is not perfect in that certain orderings my skip over the scanning loop.
// However, in practice it works well for testing of scheduled flows.
fun waitQuiescent() {
while(nodes.any { it.smm.unfinishedFibers.count > 0
|| it.scheduler.unfinishedSchedules.count > 0}
|| messagingNetwork.messagesInFlight.count > 0) {
for (node in nodes) {
node.smm.unfinishedFibers.await()
node.scheduler.unfinishedSchedules.await()
}
messagingNetwork.messagesInFlight.await()
}
}
} }