diff --git a/core/src/main/kotlin/net/corda/core/contracts/DummyContract.kt b/core/src/main/kotlin/net/corda/core/contracts/DummyContract.kt index 7c6bd45846..ac27958ec9 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/DummyContract.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/DummyContract.kt @@ -9,7 +9,7 @@ import net.corda.core.transactions.TransactionBuilder val DUMMY_PROGRAM_ID = DummyContract() -class DummyContract : Contract { +data class DummyContract(override val legalContractReference: SecureHash = SecureHash.sha256("")) : Contract { interface State : ContractState { val magicNumber: Int @@ -44,9 +44,6 @@ class DummyContract : Contract { // Always accepts. } - // The "empty contract" - override val legalContractReference: SecureHash = SecureHash.sha256("") - companion object { @JvmStatic fun generateInitial(owner: PartyAndReference, magicNumber: Int, notary: Party): TransactionBuilder { diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index 18664fdf82..9542136c72 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -1,8 +1,6 @@ package net.corda.core.node -import net.corda.core.contracts.StateRef -import net.corda.core.contracts.TransactionResolutionException -import net.corda.core.contracts.TransactionState +import net.corda.core.contracts.* import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine import net.corda.core.messaging.MessagingService @@ -48,6 +46,16 @@ interface ServiceHub { return definingTx.tx.outputs[stateRef.index] } + /** + * Given a [StateRef] loads the referenced transaction and returns a [StateAndRef] + * + * @throws TransactionResolutionException if the [StateRef] points to a non-existent transaction. + */ + fun toStateAndRef(ref: StateRef): StateAndRef { + val definingTx = storageService.validatedTransactions.getTransaction(ref.txhash) ?: throw TransactionResolutionException(ref.txhash) + return definingTx.tx.outRef(ref.index) + } + /** * Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow. * diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt index 9328395df8..dae7c1efe8 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt @@ -20,10 +20,6 @@ object WorkflowTransactionBuildTutorial { } // DOCSTART 1 -// Helper method to access the StorageService and expand a StateRef into a StateAndRef -fun ServiceHub.toStateAndRef(ref: StateRef): StateAndRef { - return storageService.validatedTransactions.getTransaction(ref.txhash)!!.tx.outRef(ref.index) -} // Helper method to locate the latest Vault version of a LinearState from a possibly out of date StateRef inline fun ServiceHub.latest(ref: StateRef): StateAndRef { diff --git a/finance/src/main/java/net/corda/contracts/JavaCommercialPaper.java b/finance/src/main/java/net/corda/contracts/JavaCommercialPaper.java index 7159eadcd9..6fecd8d47d 100644 --- a/finance/src/main/java/net/corda/contracts/JavaCommercialPaper.java +++ b/finance/src/main/java/net/corda/contracts/JavaCommercialPaper.java @@ -115,7 +115,7 @@ public class JavaCommercialPaper implements Contract { } public State withoutOwner() { - return new State(issuance, CryptoUtilitiesKt.getNullCompositeKey(), faceValue, maturityDate); + return new State(issuance, CryptoUtilities.getNullCompositeKey(), faceValue, maturityDate); } @NotNull diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index ad4e7c7acb..8a3afb46f0 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -176,7 +176,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo get() = _networkMapRegistrationFuture /** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */ - val pluginRegistries: List by lazy { + open val pluginRegistries: List by lazy { ServiceLoader.load(CordaPluginRegistry::class.java).toList() } diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 151d0c23d8..08e30b9e6f 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -1,6 +1,7 @@ package net.corda.node.services.events import co.paralleluniverse.fibers.Suspendable +import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.SettableFuture import kotlinx.support.jdk8.collections.compute import net.corda.core.ThreadBox @@ -17,6 +18,7 @@ import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import net.corda.node.services.api.ServiceHubInternal import net.corda.node.utilities.* +import org.apache.activemq.artemis.utils.ReusableLatch import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.statements.InsertStatement @@ -87,6 +89,9 @@ class NodeSchedulerService(private val database: Database, private val mutex = ThreadBox(InnerState()) + @VisibleForTesting + val unfinishedSchedules = ReusableLatch() + // We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow. fun start() { mutex.locked { @@ -98,7 +103,9 @@ class NodeSchedulerService(private val database: Database, override fun scheduleStateActivity(action: ScheduledStateRef) { log.trace { "Schedule $action" } mutex.locked { - scheduledStates[action.ref] = action + if (scheduledStates.put(action.ref, action) == null) { + unfinishedSchedules.countUp() + } if (action.scheduledAt.isBefore(earliestState?.scheduledAt ?: Instant.MAX)) { // We are earliest earliestState = action @@ -115,6 +122,9 @@ class NodeSchedulerService(private val database: Database, log.trace { "Unschedule $ref" } mutex.locked { val removedAction = scheduledStates.remove(ref) + if (removedAction != null) { + unfinishedSchedules.countDown() + } if (removedAction == earliestState && removedAction != null) { recomputeEarliest() rescheduleWakeUp() @@ -196,6 +206,7 @@ class NodeSchedulerService(private val database: Database, if (value === scheduledState) { if (scheduledActivity == null) { logger.info("Scheduled state $scheduledState has rescheduled to never.") + scheduler.unfinishedSchedules.countDown() null } else if (scheduledActivity.scheduledAt.isAfter(serviceHub.clock.instant())) { logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.") @@ -207,6 +218,7 @@ class NodeSchedulerService(private val database: Database, // FlowLogic will be checkpointed by the time this returns. //scheduler.services.startFlowAndForget(logic) scheduledLogic = logic + scheduler.unfinishedSchedules.countDown() null } } else { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 79811febc3..2d947cd152 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -6,6 +6,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer import co.paralleluniverse.strands.Strand import com.codahale.metrics.Gauge import com.esotericsoftware.kryo.Kryo +import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ListenableFuture import kotlinx.support.jdk8.collections.removeIf import net.corda.core.ThreadBox @@ -100,6 +101,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, @Volatile private var stopping = false // How many Fibers are running and not suspended. If zero and stopping is true, then we are halted. private val liveFibers = ReusableLatch() + @VisibleForTesting + val unfinishedFibers = ReusableLatch() // Monitoring support. private val metrics = serviceHub.monitoringService.metrics @@ -335,6 +338,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, mutex.locked { stateMachines.remove(psm)?.let { checkpointStorage.removeCheckpoint(it) } totalFinishedFlows.inc() + unfinishedFibers.countDown() notifyChangeObservers(psm, AddOrRemove.REMOVE) } endAllFiberSessions(psm) @@ -344,6 +348,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } mutex.locked { totalStartedFlows.inc() + unfinishedFibers.countUp() notifyChangeObservers(psm, AddOrRemove.ADD) } } diff --git a/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt b/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt new file mode 100644 index 0000000000..9e60c223db --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt @@ -0,0 +1,152 @@ +package net.corda.node.services + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.contracts.* +import net.corda.core.crypto.CompositeKey +import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowLogicRefFactory +import net.corda.core.node.CordaPluginRegistry +import net.corda.core.node.services.ServiceInfo +import net.corda.core.node.services.linearHeadsOfType +import net.corda.core.utilities.DUMMY_NOTARY +import net.corda.core.utilities.DUMMY_NOTARY_KEY +import net.corda.flows.FinalityFlow +import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.transactions.ValidatingNotaryService +import net.corda.node.utilities.databaseTransaction +import net.corda.testing.node.MockNetwork +import org.junit.After +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Ignore +import org.junit.Test +import java.security.PublicKey +import java.time.Instant +import kotlin.test.assertEquals + +class ScheduledFlowTests { + lateinit var net: MockNetwork + lateinit var notaryNode: MockNetwork.MockNode + lateinit var nodeA: MockNetwork.MockNode + lateinit var nodeB: MockNetwork.MockNode + + data class ScheduledState(val creationTime: Instant, + val source: Party, + val destination: Party, + val processed: Boolean = false, + override val linearId: UniqueIdentifier = UniqueIdentifier(), + override val contract: Contract = DummyContract()) : SchedulableState, LinearState { + override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? { + if (!processed) { + val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.java, thisStateRef) + return ScheduledActivity(logicRef, creationTime) + } else { + return null + } + } + + override val participants: List = listOf(source.owningKey, destination.owningKey) + + override fun isRelevant(ourKeys: Set): Boolean { + return participants.any { it.containsAny(ourKeys) } + } + } + + class InsertInitialStateFlow(val destination: Party) : FlowLogic() { + @Suspendable + override fun call() { + val scheduledState = ScheduledState(serviceHub.clock.instant(), + serviceHub.myInfo.legalIdentity, destination) + + val notary = serviceHub.networkMapCache.getAnyNotary() + val builder = TransactionType.General.Builder(notary) + val tx = builder.withItems(scheduledState). + signWith(serviceHub.legalIdentityKey).toSignedTransaction(false) + subFlow(FinalityFlow(tx, setOf(serviceHub.myInfo.legalIdentity))) + } + } + + class ScheduledFlow(val stateRef: StateRef) : FlowLogic() { + @Suspendable + override fun call() { + val state = serviceHub.toStateAndRef(stateRef) + val scheduledState = state.state.data + // Only run flow over states originating on this node + if (scheduledState.source != serviceHub.myInfo.legalIdentity) { + return + } + require(!scheduledState.processed) { "State should not have been previously processed" } + val notary = state.state.notary + val newStateOutput = scheduledState.copy(processed = true) + val builder = TransactionType.General.Builder(notary) + val tx = builder.withItems(state, newStateOutput). + signWith(serviceHub.legalIdentityKey).toSignedTransaction(false) + subFlow(FinalityFlow(tx, setOf(scheduledState.source, scheduledState.destination))) + } + } + + class ScheduledFlowTestPlugin : CordaPluginRegistry() { + override val requiredFlows: Map> = mapOf( + InsertInitialStateFlow::class.java.name to setOf(Party::class.java.name), + ScheduledFlow::class.java.name to setOf(StateRef::class.java.name) + ) + } + + + @Before + fun setup() { + net = MockNetwork(threadPerNode = true) + notaryNode = net.createNode( + legalName = DUMMY_NOTARY.name, + keyPair = DUMMY_NOTARY_KEY, + advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), ServiceInfo(ValidatingNotaryService.type))) + nodeA = net.createNode(notaryNode.info.address, start = false) + nodeB = net.createNode(notaryNode.info.address, start = false) + nodeA.testPluginRegistries.add(ScheduledFlowTestPlugin()) + nodeB.testPluginRegistries.add(ScheduledFlowTestPlugin()) + net.startNodes() + } + + @After + fun cleanUp() { + net.stopNodes() + } + + @Test + fun `create and run scheduled flow then wait for result`() { + nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity)) + net.waitQuiescent() + val stateFromA = databaseTransaction(nodeA.database) { + nodeA.services.vaultService.linearHeadsOfType().values.first() + } + val stateFromB = databaseTransaction(nodeB.database) { + nodeB.services.vaultService.linearHeadsOfType().values.first() + } + assertEquals(stateFromA, stateFromB, "Must be same copy on both nodes") + assertTrue("Must be processed", stateFromB.state.data.processed) + } + + @Ignore + @Test + // TODO I need to investigate why we get very very occasional SessionInit failures + // during notarisation. + fun `Run a whole batch of scheduled flows`() { + val N = 100 + for (i in 0..N - 1) { + nodeA.services.startFlow(InsertInitialStateFlow(nodeB.info.legalIdentity)) + nodeB.services.startFlow(InsertInitialStateFlow(nodeA.info.legalIdentity)) + } + net.waitQuiescent() + val statesFromA = databaseTransaction(nodeA.database) { + nodeA.services.vaultService.linearHeadsOfType() + } + val statesFromB = databaseTransaction(nodeB.database) { + nodeB.services.vaultService.linearHeadsOfType() + } + assertEquals(2 * N, statesFromA.count(), "Expect all states to be present") + assertEquals(statesFromA, statesFromB, "Expect identical data on both nodes") + assertTrue("Expect all states have run the scheduled task", statesFromB.values.all { it.state.data.processed }) + } +} diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 62cc909c8e..c38c29fe2f 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -15,6 +15,7 @@ import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.JDBCHashSet import net.corda.node.utilities.databaseTransaction import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging +import org.apache.activemq.artemis.utils.ReusableLatch import org.bouncycastle.asn1.x500.X500Name import org.jetbrains.exposed.sql.Database import org.slf4j.LoggerFactory @@ -66,6 +67,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria private val messageReceiveQueues = HashMap>() private val _receivedMessages = PublishSubject.create() + val messagesInFlight = ReusableLatch() + @Suppress("unused") // Used by the visualiser tool. /** A stream of (sender, message, recipients) triples */ val receivedMessages: Observable @@ -119,6 +122,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria @Synchronized private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) { + messagesInFlight.countUp() messageSendQueue += MessageTransfer(from.myAddress, message, recipients) } @@ -359,6 +363,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria } _receivedMessages.onNext(transfer) processedMessages += transfer.message.uniqueMessageId + messagesInFlight.countDown() } } } else { diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index 10712eb980..c90a0d75db 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -6,6 +6,7 @@ import com.google.common.util.concurrent.Futures import net.corda.core.* import net.corda.core.crypto.Party import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.PhysicalLocation import net.corda.core.node.services.* import net.corda.core.utilities.DUMMY_NOTARY_KEY @@ -150,6 +151,12 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, return this } + // Allow unit tests to modify the plugin list before the node start, + // so they don't have to ServiceLoad test plugins into all unit tests. + val testPluginRegistries = super.pluginRegistries.toMutableList() + override val pluginRegistries: List + get() = testPluginRegistries + // This does not indirect through the NodeInfo object so it can be called before the node is started. // It is used from the network visualiser tool. @Suppress("unused") val place: PhysicalLocation get() = findMyLocation()!! @@ -281,4 +288,20 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, require(nodes.isNotEmpty()) nodes.forEach { if (it.started) it.stop() } } + + // Test method to block until all scheduled activity, active flows + // and network activity has ceased. + // TODO This is not perfect in that certain orderings my skip over the scanning loop. + // However, in practice it works well for testing of scheduled flows. + fun waitQuiescent() { + while(nodes.any { it.smm.unfinishedFibers.count > 0 + || it.scheduler.unfinishedSchedules.count > 0} + || messagingNetwork.messagesInFlight.count > 0) { + for (node in nodes) { + node.smm.unfinishedFibers.await() + node.scheduler.unfinishedSchedules.await() + } + messagingNetwork.messagesInFlight.await() + } + } }