mirror of
https://github.com/corda/corda.git
synced 2025-06-19 15:43:52 +00:00
Introducing InitiatingFlow annotation which has to be annotated by initiating flows.
This removes the need for the shareParentSessions parameter of FlowLogic.subFlow. It also has the flow's version number so FlowVersion is now no longer needed.
This commit is contained in:
@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.utilities.ALICE
|
||||
import net.corda.core.utilities.BOB
|
||||
@ -23,14 +24,15 @@ class FlowVersioningTest : NodeBasedTest() {
|
||||
assertThat(resultFuture.getOrThrow()).isEqualTo(2)
|
||||
}
|
||||
|
||||
private open class ClientFlow(val otherParty: Party) : FlowLogic<Any>() {
|
||||
@InitiatingFlow
|
||||
private class ClientFlow(val otherParty: Party) : FlowLogic<Any>() {
|
||||
@Suspendable
|
||||
override fun call(): Any {
|
||||
return sendAndReceive<Any>(otherParty, "This is ignored. We only send to kick off the flow on the other side").unwrap { it }
|
||||
}
|
||||
}
|
||||
|
||||
private open class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Any) : FlowLogic<Unit>() {
|
||||
private class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Int) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, otherPartysPlatformVersion)
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.toBase58String
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.random63BitValue
|
||||
@ -222,6 +223,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
return bobParty
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, payload)
|
||||
|
@ -13,7 +13,7 @@ import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.X509Utilities
|
||||
import net.corda.core.flows.FlowInitiator
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowVersion
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
@ -140,12 +140,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
return serverThread.fetchFrom { smm.add(logic, flowInitiator) }
|
||||
}
|
||||
|
||||
override fun registerServiceFlow(clientFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) {
|
||||
require(clientFlowClass !in serviceFlowFactories) { "${clientFlowClass.name} has already been used to register a service flow" }
|
||||
val version = clientFlowClass.flowVersion
|
||||
val info = ServiceFlowInfo.CorDapp(version, serviceFlowFactory)
|
||||
log.info("Registering service flow for ${clientFlowClass.name}: $info")
|
||||
serviceFlowFactories[clientFlowClass] = info
|
||||
override fun registerServiceFlow(initiatingFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) {
|
||||
require(initiatingFlowClass !in serviceFlowFactories) {
|
||||
"${initiatingFlowClass.name} has already been used to register a service flow"
|
||||
}
|
||||
val info = ServiceFlowInfo.CorDapp(initiatingFlowClass.flowVersion, serviceFlowFactory)
|
||||
log.info("Registering service flow for ${initiatingFlowClass.name}: $info")
|
||||
serviceFlowFactories[initiatingFlowClass] = info
|
||||
}
|
||||
|
||||
override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo? {
|
||||
@ -258,15 +259,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
/**
|
||||
* @suppress
|
||||
* Installs a flow that's core to the Corda platform. Unlike CorDapp flows which are versioned individually using
|
||||
* [FlowVersion], core flows have the same version as the node's platform version. To cater for backwards compatibility
|
||||
* [serviceFlowFactory] provides a second parameter which is the platform version of the initiating party.
|
||||
* [InitiatingFlow.version], core flows have the same version as the node's platform version. To cater for backwards
|
||||
* compatibility [serviceFlowFactory] provides a second parameter which is the platform version of the initiating party.
|
||||
* @suppress
|
||||
*/
|
||||
@VisibleForTesting
|
||||
fun installCoreFlow(clientFlowClass: KClass<out FlowLogic<*>>, serviceFlowFactory: (Party, Int) -> FlowLogic<*>) {
|
||||
require(!clientFlowClass.java.isAnnotationPresent(FlowVersion::class.java)) {
|
||||
"${FlowVersion::class.java.name} not applicable for core flows; their version is the node's platform version"
|
||||
require(clientFlowClass.java.flowVersion == 1) {
|
||||
"${InitiatingFlow::class.java.name}.version not applicable for core flows; their version is the node's platform version"
|
||||
}
|
||||
serviceFlowFactories[clientFlowClass.java] = ServiceFlowInfo.Core(serviceFlowFactory)
|
||||
log.debug { "Installed core flow ${clientFlowClass.java.name}" }
|
||||
@ -301,7 +302,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
// the identity key. But the infrastructure to make that easy isn't here yet.
|
||||
keyManagement = makeKeyManagementService()
|
||||
flowLogicFactory = initialiseFlowLogicFactory()
|
||||
scheduler = NodeSchedulerService(services, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
|
||||
scheduler = NodeSchedulerService(services, database, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
|
||||
|
||||
val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler)
|
||||
makeAdvertisedServices(tokenizableServices)
|
||||
|
@ -63,7 +63,7 @@ class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
|
||||
subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true)
|
||||
subFlow(ResolveTransactionsFlow(request.tx, otherParty))
|
||||
serviceHub.recordTransactions(request.tx)
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
package net.corda.node.services.events
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.contracts.SchedulableState
|
||||
@ -10,7 +9,7 @@ import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.flows.FlowInitiator
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.then
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.FlowLogicRefFactoryInternal
|
||||
@ -18,6 +17,7 @@ import net.corda.node.services.api.SchedulerService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.utilities.*
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.time.Instant
|
||||
@ -44,12 +44,15 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeSchedulerService(private val services: ServiceHubInternal,
|
||||
private val database: Database,
|
||||
private val flowLogicRefFactory: FlowLogicRefFactoryInternal,
|
||||
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
|
||||
private val unfinishedSchedules: ReusableLatch = ReusableLatch())
|
||||
: SchedulerService, SingletonSerializeAsToken() {
|
||||
|
||||
private val log = loggerFor<NodeSchedulerService>()
|
||||
companion object {
|
||||
private val log = loggerFor<NodeSchedulerService>()
|
||||
}
|
||||
|
||||
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}scheduled_states") {
|
||||
val output = stateRef("transaction_id", "output_index")
|
||||
@ -158,71 +161,62 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
||||
}
|
||||
|
||||
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
||||
services.startFlow(RunScheduled(scheduledState, this@NodeSchedulerService), FlowInitiator.Scheduled(scheduledState))
|
||||
database.transaction {
|
||||
val scheduledFlow = getScheduledFlow(scheduledState)
|
||||
if (scheduledFlow != null) {
|
||||
// TODO Because the flow is executed asynchronously, there is a small window between this tx we're in
|
||||
// committing and the flow's first checkpoint when it starts in which we can lose the flow if the node
|
||||
// goes down.
|
||||
// See discussion in https://github.com/corda/corda/pull/639#discussion_r115257437
|
||||
val future = services.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture
|
||||
future.then {
|
||||
unfinishedSchedules.countDown()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RunScheduled(val scheduledState: ScheduledStateRef, val scheduler: NodeSchedulerService) : FlowLogic<Unit>() {
|
||||
companion object {
|
||||
object RUNNING : ProgressTracker.Step("Running scheduled...")
|
||||
|
||||
fun tracker() = ProgressTracker(RUNNING)
|
||||
}
|
||||
override val progressTracker = tracker()
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Unit {
|
||||
progressTracker.currentStep = RUNNING
|
||||
|
||||
// Ensure we are still scheduled.
|
||||
val scheduledLogic: FlowLogic<*>? = getScheduledLogic()
|
||||
if (scheduledLogic != null) {
|
||||
subFlow(scheduledLogic)
|
||||
scheduler.unfinishedSchedules.countDown()
|
||||
}
|
||||
}
|
||||
|
||||
private fun getScheduledaActivity(): ScheduledActivity? {
|
||||
val txState = serviceHub.loadState(scheduledState.ref)
|
||||
val state = txState.data as SchedulableState
|
||||
return try {
|
||||
// This can throw as running contract code.
|
||||
state.nextScheduledActivity(scheduledState.ref, scheduler.flowLogicRefFactory)
|
||||
} catch(e: Exception) {
|
||||
logger.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
private fun getScheduledLogic(): FlowLogic<*>? {
|
||||
val scheduledActivity = getScheduledaActivity()
|
||||
var scheduledLogic: FlowLogic<*>? = null
|
||||
scheduler.mutex.locked {
|
||||
// need to remove us from those scheduled, but only if we are still next
|
||||
scheduledStates.compute(scheduledState.ref) { _, value ->
|
||||
if (value === scheduledState) {
|
||||
if (scheduledActivity == null) {
|
||||
logger.info("Scheduled state $scheduledState has rescheduled to never.")
|
||||
scheduler.unfinishedSchedules.countDown()
|
||||
null
|
||||
} else if (scheduledActivity.scheduledAt.isAfter(serviceHub.clock.instant())) {
|
||||
logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
|
||||
ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
|
||||
} else {
|
||||
// TODO: FlowLogicRefFactory needs to sort out the class loader etc
|
||||
val logic = scheduler.flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
|
||||
logger.trace { "Scheduler starting FlowLogic $logic" }
|
||||
scheduledLogic = logic
|
||||
null
|
||||
}
|
||||
private fun getScheduledFlow(scheduledState: ScheduledStateRef): FlowLogic<*>? {
|
||||
val scheduledActivity = getScheduledActivity(scheduledState)
|
||||
var scheduledFlow: FlowLogic<*>? = null
|
||||
mutex.locked {
|
||||
// need to remove us from those scheduled, but only if we are still next
|
||||
scheduledStates.compute(scheduledState.ref) { _, value ->
|
||||
if (value === scheduledState) {
|
||||
if (scheduledActivity == null) {
|
||||
log.info("Scheduled state $scheduledState has rescheduled to never.")
|
||||
unfinishedSchedules.countDown()
|
||||
null
|
||||
} else if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) {
|
||||
log.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
|
||||
ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
|
||||
} else {
|
||||
value
|
||||
// TODO: FlowLogicRefFactory needs to sort out the class loader etc
|
||||
val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
|
||||
log.trace { "Scheduler starting FlowLogic $flowLogic" }
|
||||
scheduledFlow = flowLogic
|
||||
null
|
||||
}
|
||||
} else {
|
||||
value
|
||||
}
|
||||
// and schedule the next one
|
||||
recomputeEarliest()
|
||||
scheduler.rescheduleWakeUp()
|
||||
}
|
||||
return scheduledLogic
|
||||
// and schedule the next one
|
||||
recomputeEarliest()
|
||||
rescheduleWakeUp()
|
||||
}
|
||||
return scheduledFlow
|
||||
}
|
||||
|
||||
private fun getScheduledActivity(scheduledState: ScheduledStateRef): ScheduledActivity? {
|
||||
val txState = services.loadState(scheduledState.ref)
|
||||
val state = txState.data as SchedulableState
|
||||
return try {
|
||||
// This can throw as running contract code.
|
||||
state.nextScheduledActivity(scheduledState.ref, flowLogicRefFactory)
|
||||
} catch (e: Exception) {
|
||||
log.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -426,7 +426,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
|
||||
val Class<out FlowLogic<*>>.flowVersion: Int get() {
|
||||
val flowVersion = getDeclaredAnnotation(FlowVersion::class.java) ?: return 1
|
||||
require(flowVersion.value > 0) { "Flow versions have to be greater or equal to 1" }
|
||||
return flowVersion.value
|
||||
val annotation = requireNotNull(getAnnotation(InitiatingFlow::class.java)) {
|
||||
"$name as the initiating flow must be annotated with ${InitiatingFlow::class.java.name}"
|
||||
}
|
||||
require(annotation.version > 0) { "Flow versions have to be greater or equal to 1" }
|
||||
return annotation.version
|
||||
}
|
||||
|
@ -368,10 +368,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
is ServiceFlowInfo.Core -> serviceFlowInfo.factory(sender, receivedMessage.platformVersion)
|
||||
}
|
||||
|
||||
if (flow.javaClass.isAnnotationPresent(FlowVersion::class.java)) {
|
||||
logger.warn("${FlowVersion::class.java.name} is not applicable for service flows: ${flow.javaClass.name}")
|
||||
}
|
||||
|
||||
val fiber = createFiber(flow, FlowInitiator.Peer(sender))
|
||||
val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId))
|
||||
if (sessionInit.firstPayload != null) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.messaging
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.CommercialPaper
|
||||
import net.corda.contracts.asset.*
|
||||
import net.corda.contracts.testing.fillWithSomeTestCash
|
||||
@ -8,11 +9,14 @@ import net.corda.core.crypto.AnonymousParty
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.days
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowStateMachine
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.map
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.rootCause
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
@ -266,7 +270,10 @@ class TwoPartyTradeFlowTests {
|
||||
|
||||
// Creates a mock node with an overridden storage service that uses a RecordingMap, that lets us test the order
|
||||
// of gets and puts.
|
||||
private fun makeNodeWithTracking(networkMapAddr: SingleMessageRecipient?, name: X500Name, overrideServices: Map<ServiceInfo, KeyPair>? = null): MockNetwork.MockNode {
|
||||
private fun makeNodeWithTracking(
|
||||
networkMapAddr: SingleMessageRecipient?,
|
||||
name: X500Name,
|
||||
overrideServices: Map<ServiceInfo, KeyPair>? = null): MockNetwork.MockNode {
|
||||
// Create a node in the mock network ...
|
||||
return net.createNode(networkMapAddr, -1, object : MockNetwork.Factory {
|
||||
override fun create(config: NodeConfiguration,
|
||||
@ -391,7 +398,6 @@ class TwoPartyTradeFlowTests {
|
||||
|
||||
@Test
|
||||
fun `track works`() {
|
||||
|
||||
val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name)
|
||||
val aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name)
|
||||
val bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name)
|
||||
@ -445,13 +451,13 @@ class TwoPartyTradeFlowTests {
|
||||
)
|
||||
aliceTxStream.expectEvents { aliceTxExpectations }
|
||||
val aliceMappingExpectations = sequence(
|
||||
expect { mapping: StateMachineTransactionMapping ->
|
||||
require(mapping.stateMachineRunId == aliceSmId)
|
||||
require(mapping.transactionId == bobsFakeCash[0].id)
|
||||
expect { (stateMachineRunId, transactionId) ->
|
||||
require(stateMachineRunId == aliceSmId)
|
||||
require(transactionId == bobsFakeCash[0].id)
|
||||
},
|
||||
expect { mapping: StateMachineTransactionMapping ->
|
||||
require(mapping.stateMachineRunId == aliceSmId)
|
||||
require(mapping.transactionId == bobsFakeCash[2].id)
|
||||
expect { (stateMachineRunId, transactionId) ->
|
||||
require(stateMachineRunId == aliceSmId)
|
||||
require(transactionId == bobsFakeCash[2].id)
|
||||
},
|
||||
expect { mapping: StateMachineTransactionMapping ->
|
||||
require(mapping.stateMachineRunId == aliceSmId)
|
||||
@ -487,10 +493,21 @@ class TwoPartyTradeFlowTests {
|
||||
sellerNode: MockNetwork.MockNode,
|
||||
buyerNode: MockNetwork.MockNode,
|
||||
assetToSell: StateAndRef<OwnableState>): RunResult {
|
||||
val buyerFuture = buyerNode.initiateSingleShotFlow(Seller::class) { otherParty ->
|
||||
@InitiatingFlow
|
||||
class SellerRunnerFlow(val buyer: Party, val notary: NodeInfo) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction = subFlow(Seller(
|
||||
buyer,
|
||||
notary,
|
||||
assetToSell,
|
||||
1000.DOLLARS,
|
||||
serviceHub.legalIdentityKey))
|
||||
}
|
||||
|
||||
val buyerFuture = buyerNode.initiateSingleShotFlow(SellerRunnerFlow::class) { otherParty ->
|
||||
Buyer(otherParty, notaryNode.info.notaryIdentity, 1000.DOLLARS, CommercialPaper.State::class.java)
|
||||
}.map { it.stateMachine }
|
||||
val seller = Seller(buyerNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, sellerNode.services.legalIdentityKey)
|
||||
val seller = SellerRunnerFlow(buyerNode.info.legalIdentity, notaryNode.info)
|
||||
val sellerResultFuture = sellerNode.services.startFlow(seller).resultFuture
|
||||
return RunResult(buyerFuture, sellerResultFuture, seller.stateMachine.id)
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ open class MockServiceHubInternal(
|
||||
return smm.executor.fetchFrom { smm.add(logic, flowInitiator) }
|
||||
}
|
||||
|
||||
override fun registerServiceFlow(clientFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit
|
||||
override fun registerServiceFlow(initiatingFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit
|
||||
|
||||
override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo? = null
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
|
||||
override val testReference = this@NodeSchedulerServiceTest
|
||||
}
|
||||
scheduler = NodeSchedulerService(services, factory, schedulerGatedExecutor)
|
||||
scheduler = NodeSchedulerService(services, database, factory, schedulerGatedExecutor)
|
||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
||||
val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database)
|
||||
mockSMM.changes.subscribe { change ->
|
||||
|
@ -8,6 +8,7 @@ import net.corda.core.contracts.TransactionType
|
||||
import net.corda.core.contracts.USD
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
@ -95,6 +96,7 @@ class DataVendingServiceTests {
|
||||
}
|
||||
|
||||
|
||||
@InitiatingFlow
|
||||
private class NotifyTxFlow(val otherParty: Party, val stx: SignedTransaction) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, NotifyTxRequest(stx))
|
||||
|
@ -8,10 +8,11 @@ import net.corda.core.*
|
||||
import net.corda.core.contracts.DOLLARS
|
||||
import net.corda.core.contracts.DummyState
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowVersion
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
@ -56,7 +57,7 @@ import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class StateMachineManagerTests {
|
||||
class FlowFrameworkTests {
|
||||
companion object {
|
||||
init {
|
||||
LogHelper.setLevel("+net.corda.flow")
|
||||
@ -468,13 +469,6 @@ class StateMachineManagerTests {
|
||||
.withMessage("Chain")
|
||||
}
|
||||
|
||||
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
sendAndReceive<Any>(otherParty, payload)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `FlowException thrown and there is a 3rd unrelated party flow`() {
|
||||
val node3 = net.createNode(node1.info.address)
|
||||
@ -521,6 +515,7 @@ class StateMachineManagerTests {
|
||||
|
||||
@Test
|
||||
fun `retry subFlow due to receiving FlowException`() {
|
||||
@InitiatingFlow
|
||||
class AskForExceptionFlow(val otherParty: Party, val throwException: Boolean) : FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String = sendAndReceive<String>(otherParty, throwException).unwrap { it }
|
||||
@ -577,7 +572,7 @@ class StateMachineManagerTests {
|
||||
|
||||
@Test
|
||||
fun `lazy db iterator left on stack during checkpointing`() {
|
||||
val result = node2.services.startFlow(VaultAccessFlow(node1.info.legalIdentity)).resultFuture
|
||||
val result = node2.services.startFlow(VaultAccessFlow()).resultFuture
|
||||
net.runNetwork()
|
||||
assertThatThrownBy { result.getOrThrow() }.hasMessageContaining("Vault").hasMessageContaining("private method")
|
||||
}
|
||||
@ -609,10 +604,20 @@ class StateMachineManagerTests {
|
||||
}.withMessageContaining("Version")
|
||||
}
|
||||
|
||||
@FlowVersion(2)
|
||||
private class UpgradedFlow(val otherParty: Party) : FlowLogic<Any>() {
|
||||
@Suspendable
|
||||
override fun call(): Any = receive<Any>(otherParty).unwrap { it }
|
||||
@Test
|
||||
fun `single inlined sub-flow`() {
|
||||
node2.registerServiceFlow(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
|
||||
val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture
|
||||
net.runNetwork()
|
||||
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `double inlined sub-flow`() {
|
||||
node2.registerServiceFlow(SendAndReceiveFlow::class) { DoubleInlinedSubFlow(it) }
|
||||
val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture
|
||||
net.runNetwork()
|
||||
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
|
||||
}
|
||||
|
||||
|
||||
@ -664,15 +669,13 @@ class StateMachineManagerTests {
|
||||
}
|
||||
}
|
||||
|
||||
private fun sanitise(message: SessionMessage): SessionMessage {
|
||||
return when (message) {
|
||||
is SessionData -> message.copy(recipientSessionId = 0)
|
||||
is SessionInit -> message.copy(initiatorSessionId = 0)
|
||||
is SessionConfirm -> message.copy(initiatorSessionId = 0, initiatedSessionId = 0)
|
||||
is NormalSessionEnd -> message.copy(recipientSessionId = 0)
|
||||
is ErrorSessionEnd -> message.copy(recipientSessionId = 0)
|
||||
else -> message
|
||||
}
|
||||
private fun sanitise(message: SessionMessage) = when (message) {
|
||||
is SessionData -> message.copy(recipientSessionId = 0)
|
||||
is SessionInit -> message.copy(initiatorSessionId = 0)
|
||||
is SessionConfirm -> message.copy(initiatorSessionId = 0, initiatedSessionId = 0)
|
||||
is NormalSessionEnd -> message.copy(recipientSessionId = 0)
|
||||
is ErrorSessionEnd -> message.copy(recipientSessionId = 0)
|
||||
else -> message
|
||||
}
|
||||
|
||||
private infix fun MockNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(id, message)
|
||||
@ -700,6 +703,7 @@ class StateMachineManagerTests {
|
||||
}
|
||||
|
||||
|
||||
@InitiatingFlow
|
||||
private open class SendFlow(val payload: String, vararg val otherParties: Party) : FlowLogic<Unit>() {
|
||||
init {
|
||||
require(otherParties.isNotEmpty())
|
||||
@ -712,6 +716,7 @@ class StateMachineManagerTests {
|
||||
private interface CustomInterface
|
||||
private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty)
|
||||
|
||||
@InitiatingFlow
|
||||
private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic<Unit>() {
|
||||
object START_STEP : ProgressTracker.Step("Starting")
|
||||
object RECEIVED_STEP : ProgressTracker.Step("Received")
|
||||
@ -740,6 +745,18 @@ class StateMachineManagerTests {
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic<Any>() {
|
||||
@Suspendable
|
||||
override fun call(): Any = sendAndReceive<Any>(otherParty, payload).unwrap { it }
|
||||
}
|
||||
|
||||
private class InlinedSendFlow(val payload: String, val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, payload)
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class PingPongFlow(val otherParty: Party, val payload: Long) : FlowLogic<Unit>() {
|
||||
@Transient var receivedPayload: Long? = null
|
||||
@Transient var receivedPayload2: Long? = null
|
||||
@ -770,6 +787,7 @@ class StateMachineManagerTests {
|
||||
}
|
||||
|
||||
private object WaitingFlows {
|
||||
@InitiatingFlow
|
||||
class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
@ -788,11 +806,32 @@ class StateMachineManagerTests {
|
||||
}
|
||||
}
|
||||
|
||||
private class VaultAccessFlow(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
private class VaultAccessFlow : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.vaultService.unconsumedStates<Cash.State>().filter { true }
|
||||
send(otherParty, "Hello")
|
||||
waitForLedgerCommit(SecureHash.zeroHash)
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow(version = 2)
|
||||
private class UpgradedFlow(val otherParty: Party) : FlowLogic<Any>() {
|
||||
@Suspendable
|
||||
override fun call(): Any = receive<Any>(otherParty).unwrap { it }
|
||||
}
|
||||
|
||||
private class SingleInlinedSubFlow(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val payload = receive<String>(otherParty).unwrap { it }
|
||||
subFlow(InlinedSendFlow(payload + payload, otherParty))
|
||||
}
|
||||
}
|
||||
|
||||
private class DoubleInlinedSubFlow(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
subFlow(SingleInlinedSubFlow(otherParty))
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user