mirror of
https://github.com/corda/corda.git
synced 2025-06-19 15:43:52 +00:00
Merge pull request #639 from corda/shams-initiating-flow-annotation
Introducing InitiatingFlow annotation which has to be annotated by in…
This commit is contained in:
@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.utilities.ALICE
|
||||
import net.corda.core.utilities.BOB
|
||||
@ -23,14 +24,15 @@ class FlowVersioningTest : NodeBasedTest() {
|
||||
assertThat(resultFuture.getOrThrow()).isEqualTo(2)
|
||||
}
|
||||
|
||||
private open class ClientFlow(val otherParty: Party) : FlowLogic<Any>() {
|
||||
@InitiatingFlow
|
||||
private class ClientFlow(val otherParty: Party) : FlowLogic<Any>() {
|
||||
@Suspendable
|
||||
override fun call(): Any {
|
||||
return sendAndReceive<Any>(otherParty, "This is ignored. We only send to kick off the flow on the other side").unwrap { it }
|
||||
}
|
||||
}
|
||||
|
||||
private open class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Any) : FlowLogic<Unit>() {
|
||||
private class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Int) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, otherPartysPlatformVersion)
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.toBase58String
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.random63BitValue
|
||||
@ -222,6 +223,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
return bobParty
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, payload)
|
||||
|
@ -14,7 +14,7 @@ import net.corda.core.crypto.X509Utilities
|
||||
import net.corda.core.crypto.replaceCommonName
|
||||
import net.corda.core.flows.FlowInitiator
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowVersion
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
@ -141,12 +141,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
return serverThread.fetchFrom { smm.add(logic, flowInitiator) }
|
||||
}
|
||||
|
||||
override fun registerServiceFlow(clientFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) {
|
||||
require(clientFlowClass !in serviceFlowFactories) { "${clientFlowClass.name} has already been used to register a service flow" }
|
||||
val version = clientFlowClass.flowVersion
|
||||
val info = ServiceFlowInfo.CorDapp(version, serviceFlowFactory)
|
||||
log.info("Registering service flow for ${clientFlowClass.name}: $info")
|
||||
serviceFlowFactories[clientFlowClass] = info
|
||||
override fun registerServiceFlow(initiatingFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) {
|
||||
require(initiatingFlowClass !in serviceFlowFactories) {
|
||||
"${initiatingFlowClass.name} has already been used to register a service flow"
|
||||
}
|
||||
val info = ServiceFlowInfo.CorDapp(initiatingFlowClass.flowVersion, serviceFlowFactory)
|
||||
log.info("Registering service flow for ${initiatingFlowClass.name}: $info")
|
||||
serviceFlowFactories[initiatingFlowClass] = info
|
||||
}
|
||||
|
||||
override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo? {
|
||||
@ -259,15 +260,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
/**
|
||||
* @suppress
|
||||
* Installs a flow that's core to the Corda platform. Unlike CorDapp flows which are versioned individually using
|
||||
* [FlowVersion], core flows have the same version as the node's platform version. To cater for backwards compatibility
|
||||
* [serviceFlowFactory] provides a second parameter which is the platform version of the initiating party.
|
||||
* [InitiatingFlow.version], core flows have the same version as the node's platform version. To cater for backwards
|
||||
* compatibility [serviceFlowFactory] provides a second parameter which is the platform version of the initiating party.
|
||||
* @suppress
|
||||
*/
|
||||
@VisibleForTesting
|
||||
fun installCoreFlow(clientFlowClass: KClass<out FlowLogic<*>>, serviceFlowFactory: (Party, Int) -> FlowLogic<*>) {
|
||||
require(!clientFlowClass.java.isAnnotationPresent(FlowVersion::class.java)) {
|
||||
"${FlowVersion::class.java.name} not applicable for core flows; their version is the node's platform version"
|
||||
require(clientFlowClass.java.flowVersion == 1) {
|
||||
"${InitiatingFlow::class.java.name}.version not applicable for core flows; their version is the node's platform version"
|
||||
}
|
||||
serviceFlowFactories[clientFlowClass.java] = ServiceFlowInfo.Core(serviceFlowFactory)
|
||||
log.debug { "Installed core flow ${clientFlowClass.java.name}" }
|
||||
@ -302,7 +303,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
// the identity key. But the infrastructure to make that easy isn't here yet.
|
||||
keyManagement = makeKeyManagementService()
|
||||
flowLogicFactory = initialiseFlowLogicFactory()
|
||||
scheduler = NodeSchedulerService(services, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
|
||||
scheduler = NodeSchedulerService(services, database, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
|
||||
|
||||
val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler)
|
||||
makeAdvertisedServices(tokenizableServices)
|
||||
|
@ -63,7 +63,7 @@ class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
|
||||
subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true)
|
||||
subFlow(ResolveTransactionsFlow(request.tx, otherParty))
|
||||
serviceHub.recordTransactions(request.tx)
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
package net.corda.node.services.events
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.contracts.SchedulableState
|
||||
@ -10,7 +9,7 @@ import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.flows.FlowInitiator
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.then
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.FlowLogicRefFactoryInternal
|
||||
@ -18,6 +17,7 @@ import net.corda.node.services.api.SchedulerService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.utilities.*
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.time.Instant
|
||||
@ -44,12 +44,15 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeSchedulerService(private val services: ServiceHubInternal,
|
||||
private val database: Database,
|
||||
private val flowLogicRefFactory: FlowLogicRefFactoryInternal,
|
||||
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
|
||||
private val unfinishedSchedules: ReusableLatch = ReusableLatch())
|
||||
: SchedulerService, SingletonSerializeAsToken() {
|
||||
|
||||
private val log = loggerFor<NodeSchedulerService>()
|
||||
companion object {
|
||||
private val log = loggerFor<NodeSchedulerService>()
|
||||
}
|
||||
|
||||
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}scheduled_states") {
|
||||
val output = stateRef("transaction_id", "output_index")
|
||||
@ -158,71 +161,62 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
||||
}
|
||||
|
||||
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
||||
services.startFlow(RunScheduled(scheduledState, this@NodeSchedulerService), FlowInitiator.Scheduled(scheduledState))
|
||||
database.transaction {
|
||||
val scheduledFlow = getScheduledFlow(scheduledState)
|
||||
if (scheduledFlow != null) {
|
||||
// TODO Because the flow is executed asynchronously, there is a small window between this tx we're in
|
||||
// committing and the flow's first checkpoint when it starts in which we can lose the flow if the node
|
||||
// goes down.
|
||||
// See discussion in https://github.com/corda/corda/pull/639#discussion_r115257437
|
||||
val future = services.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture
|
||||
future.then {
|
||||
unfinishedSchedules.countDown()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RunScheduled(val scheduledState: ScheduledStateRef, val scheduler: NodeSchedulerService) : FlowLogic<Unit>() {
|
||||
companion object {
|
||||
object RUNNING : ProgressTracker.Step("Running scheduled...")
|
||||
|
||||
fun tracker() = ProgressTracker(RUNNING)
|
||||
}
|
||||
override val progressTracker = tracker()
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Unit {
|
||||
progressTracker.currentStep = RUNNING
|
||||
|
||||
// Ensure we are still scheduled.
|
||||
val scheduledLogic: FlowLogic<*>? = getScheduledLogic()
|
||||
if (scheduledLogic != null) {
|
||||
subFlow(scheduledLogic)
|
||||
scheduler.unfinishedSchedules.countDown()
|
||||
}
|
||||
}
|
||||
|
||||
private fun getScheduledaActivity(): ScheduledActivity? {
|
||||
val txState = serviceHub.loadState(scheduledState.ref)
|
||||
val state = txState.data as SchedulableState
|
||||
return try {
|
||||
// This can throw as running contract code.
|
||||
state.nextScheduledActivity(scheduledState.ref, scheduler.flowLogicRefFactory)
|
||||
} catch(e: Exception) {
|
||||
logger.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
private fun getScheduledLogic(): FlowLogic<*>? {
|
||||
val scheduledActivity = getScheduledaActivity()
|
||||
var scheduledLogic: FlowLogic<*>? = null
|
||||
scheduler.mutex.locked {
|
||||
// need to remove us from those scheduled, but only if we are still next
|
||||
scheduledStates.compute(scheduledState.ref) { _, value ->
|
||||
if (value === scheduledState) {
|
||||
if (scheduledActivity == null) {
|
||||
logger.info("Scheduled state $scheduledState has rescheduled to never.")
|
||||
scheduler.unfinishedSchedules.countDown()
|
||||
null
|
||||
} else if (scheduledActivity.scheduledAt.isAfter(serviceHub.clock.instant())) {
|
||||
logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
|
||||
ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
|
||||
} else {
|
||||
// TODO: FlowLogicRefFactory needs to sort out the class loader etc
|
||||
val logic = scheduler.flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
|
||||
logger.trace { "Scheduler starting FlowLogic $logic" }
|
||||
scheduledLogic = logic
|
||||
null
|
||||
}
|
||||
private fun getScheduledFlow(scheduledState: ScheduledStateRef): FlowLogic<*>? {
|
||||
val scheduledActivity = getScheduledActivity(scheduledState)
|
||||
var scheduledFlow: FlowLogic<*>? = null
|
||||
mutex.locked {
|
||||
// need to remove us from those scheduled, but only if we are still next
|
||||
scheduledStates.compute(scheduledState.ref) { _, value ->
|
||||
if (value === scheduledState) {
|
||||
if (scheduledActivity == null) {
|
||||
log.info("Scheduled state $scheduledState has rescheduled to never.")
|
||||
unfinishedSchedules.countDown()
|
||||
null
|
||||
} else if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) {
|
||||
log.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
|
||||
ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
|
||||
} else {
|
||||
value
|
||||
// TODO: FlowLogicRefFactory needs to sort out the class loader etc
|
||||
val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
|
||||
log.trace { "Scheduler starting FlowLogic $flowLogic" }
|
||||
scheduledFlow = flowLogic
|
||||
null
|
||||
}
|
||||
} else {
|
||||
value
|
||||
}
|
||||
// and schedule the next one
|
||||
recomputeEarliest()
|
||||
scheduler.rescheduleWakeUp()
|
||||
}
|
||||
return scheduledLogic
|
||||
// and schedule the next one
|
||||
recomputeEarliest()
|
||||
rescheduleWakeUp()
|
||||
}
|
||||
return scheduledFlow
|
||||
}
|
||||
|
||||
private fun getScheduledActivity(scheduledState: ScheduledStateRef): ScheduledActivity? {
|
||||
val txState = services.loadState(scheduledState.ref)
|
||||
val state = txState.data as SchedulableState
|
||||
return try {
|
||||
// This can throw as running contract code.
|
||||
state.nextScheduledActivity(scheduledState.ref, flowLogicRefFactory)
|
||||
} catch (e: Exception) {
|
||||
log.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -426,7 +426,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
|
||||
val Class<out FlowLogic<*>>.flowVersion: Int get() {
|
||||
val flowVersion = getDeclaredAnnotation(FlowVersion::class.java) ?: return 1
|
||||
require(flowVersion.value > 0) { "Flow versions have to be greater or equal to 1" }
|
||||
return flowVersion.value
|
||||
val annotation = requireNotNull(getAnnotation(InitiatingFlow::class.java)) {
|
||||
"$name as the initiating flow must be annotated with ${InitiatingFlow::class.java.name}"
|
||||
}
|
||||
require(annotation.version > 0) { "Flow versions have to be greater or equal to 1" }
|
||||
return annotation.version
|
||||
}
|
||||
|
@ -368,10 +368,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
is ServiceFlowInfo.Core -> serviceFlowInfo.factory(sender, receivedMessage.platformVersion)
|
||||
}
|
||||
|
||||
if (flow.javaClass.isAnnotationPresent(FlowVersion::class.java)) {
|
||||
logger.warn("${FlowVersion::class.java.name} is not applicable for service flows: ${flow.javaClass.name}")
|
||||
}
|
||||
|
||||
val fiber = createFiber(flow, FlowInitiator.Peer(sender))
|
||||
val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId))
|
||||
if (sessionInit.firstPayload != null) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.messaging
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.CommercialPaper
|
||||
import net.corda.contracts.asset.*
|
||||
import net.corda.contracts.testing.fillWithSomeTestCash
|
||||
@ -8,11 +9,14 @@ import net.corda.core.crypto.AnonymousParty
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.days
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowStateMachine
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.map
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.rootCause
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
@ -266,7 +270,10 @@ class TwoPartyTradeFlowTests {
|
||||
|
||||
// Creates a mock node with an overridden storage service that uses a RecordingMap, that lets us test the order
|
||||
// of gets and puts.
|
||||
private fun makeNodeWithTracking(networkMapAddr: SingleMessageRecipient?, name: X500Name, overrideServices: Map<ServiceInfo, KeyPair>? = null): MockNetwork.MockNode {
|
||||
private fun makeNodeWithTracking(
|
||||
networkMapAddr: SingleMessageRecipient?,
|
||||
name: X500Name,
|
||||
overrideServices: Map<ServiceInfo, KeyPair>? = null): MockNetwork.MockNode {
|
||||
// Create a node in the mock network ...
|
||||
return net.createNode(networkMapAddr, -1, object : MockNetwork.Factory {
|
||||
override fun create(config: NodeConfiguration,
|
||||
@ -391,7 +398,6 @@ class TwoPartyTradeFlowTests {
|
||||
|
||||
@Test
|
||||
fun `track works`() {
|
||||
|
||||
val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name)
|
||||
val aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name)
|
||||
val bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name)
|
||||
@ -445,13 +451,13 @@ class TwoPartyTradeFlowTests {
|
||||
)
|
||||
aliceTxStream.expectEvents { aliceTxExpectations }
|
||||
val aliceMappingExpectations = sequence(
|
||||
expect { mapping: StateMachineTransactionMapping ->
|
||||
require(mapping.stateMachineRunId == aliceSmId)
|
||||
require(mapping.transactionId == bobsFakeCash[0].id)
|
||||
expect { (stateMachineRunId, transactionId) ->
|
||||
require(stateMachineRunId == aliceSmId)
|
||||
require(transactionId == bobsFakeCash[0].id)
|
||||
},
|
||||
expect { mapping: StateMachineTransactionMapping ->
|
||||
require(mapping.stateMachineRunId == aliceSmId)
|
||||
require(mapping.transactionId == bobsFakeCash[2].id)
|
||||
expect { (stateMachineRunId, transactionId) ->
|
||||
require(stateMachineRunId == aliceSmId)
|
||||
require(transactionId == bobsFakeCash[2].id)
|
||||
},
|
||||
expect { mapping: StateMachineTransactionMapping ->
|
||||
require(mapping.stateMachineRunId == aliceSmId)
|
||||
@ -487,10 +493,21 @@ class TwoPartyTradeFlowTests {
|
||||
sellerNode: MockNetwork.MockNode,
|
||||
buyerNode: MockNetwork.MockNode,
|
||||
assetToSell: StateAndRef<OwnableState>): RunResult {
|
||||
val buyerFuture = buyerNode.initiateSingleShotFlow(Seller::class) { otherParty ->
|
||||
@InitiatingFlow
|
||||
class SellerRunnerFlow(val buyer: Party, val notary: NodeInfo) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction = subFlow(Seller(
|
||||
buyer,
|
||||
notary,
|
||||
assetToSell,
|
||||
1000.DOLLARS,
|
||||
serviceHub.legalIdentityKey))
|
||||
}
|
||||
|
||||
val buyerFuture = buyerNode.initiateSingleShotFlow(SellerRunnerFlow::class) { otherParty ->
|
||||
Buyer(otherParty, notaryNode.info.notaryIdentity, 1000.DOLLARS, CommercialPaper.State::class.java)
|
||||
}.map { it.stateMachine }
|
||||
val seller = Seller(buyerNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, sellerNode.services.legalIdentityKey)
|
||||
val seller = SellerRunnerFlow(buyerNode.info.legalIdentity, notaryNode.info)
|
||||
val sellerResultFuture = sellerNode.services.startFlow(seller).resultFuture
|
||||
return RunResult(buyerFuture, sellerResultFuture, seller.stateMachine.id)
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ open class MockServiceHubInternal(
|
||||
return smm.executor.fetchFrom { smm.add(logic, flowInitiator) }
|
||||
}
|
||||
|
||||
override fun registerServiceFlow(clientFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit
|
||||
override fun registerServiceFlow(initiatingFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit
|
||||
|
||||
override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo? = null
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
|
||||
override val testReference = this@NodeSchedulerServiceTest
|
||||
}
|
||||
scheduler = NodeSchedulerService(services, factory, schedulerGatedExecutor)
|
||||
scheduler = NodeSchedulerService(services, database, factory, schedulerGatedExecutor)
|
||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
||||
val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database)
|
||||
mockSMM.changes.subscribe { change ->
|
||||
|
@ -8,6 +8,7 @@ import net.corda.core.contracts.TransactionType
|
||||
import net.corda.core.contracts.USD
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.DUMMY_NOTARY
|
||||
@ -95,6 +96,7 @@ class DataVendingServiceTests {
|
||||
}
|
||||
|
||||
|
||||
@InitiatingFlow
|
||||
private class NotifyTxFlow(val otherParty: Party, val stx: SignedTransaction) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, NotifyTxRequest(stx))
|
||||
|
@ -8,10 +8,11 @@ import net.corda.core.*
|
||||
import net.corda.core.contracts.DOLLARS
|
||||
import net.corda.core.contracts.DummyState
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowVersion
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
@ -56,7 +57,7 @@ import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class StateMachineManagerTests {
|
||||
class FlowFrameworkTests {
|
||||
companion object {
|
||||
init {
|
||||
LogHelper.setLevel("+net.corda.flow")
|
||||
@ -468,13 +469,6 @@ class StateMachineManagerTests {
|
||||
.withMessage("Chain")
|
||||
}
|
||||
|
||||
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
sendAndReceive<Any>(otherParty, payload)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `FlowException thrown and there is a 3rd unrelated party flow`() {
|
||||
val node3 = net.createNode(node1.info.address)
|
||||
@ -521,6 +515,7 @@ class StateMachineManagerTests {
|
||||
|
||||
@Test
|
||||
fun `retry subFlow due to receiving FlowException`() {
|
||||
@InitiatingFlow
|
||||
class AskForExceptionFlow(val otherParty: Party, val throwException: Boolean) : FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String = sendAndReceive<String>(otherParty, throwException).unwrap { it }
|
||||
@ -577,7 +572,7 @@ class StateMachineManagerTests {
|
||||
|
||||
@Test
|
||||
fun `lazy db iterator left on stack during checkpointing`() {
|
||||
val result = node2.services.startFlow(VaultAccessFlow(node1.info.legalIdentity)).resultFuture
|
||||
val result = node2.services.startFlow(VaultAccessFlow()).resultFuture
|
||||
net.runNetwork()
|
||||
assertThatThrownBy { result.getOrThrow() }.hasMessageContaining("Vault").hasMessageContaining("private method")
|
||||
}
|
||||
@ -609,10 +604,20 @@ class StateMachineManagerTests {
|
||||
}.withMessageContaining("Version")
|
||||
}
|
||||
|
||||
@FlowVersion(2)
|
||||
private class UpgradedFlow(val otherParty: Party) : FlowLogic<Any>() {
|
||||
@Suspendable
|
||||
override fun call(): Any = receive<Any>(otherParty).unwrap { it }
|
||||
@Test
|
||||
fun `single inlined sub-flow`() {
|
||||
node2.registerServiceFlow(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
|
||||
val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture
|
||||
net.runNetwork()
|
||||
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `double inlined sub-flow`() {
|
||||
node2.registerServiceFlow(SendAndReceiveFlow::class) { DoubleInlinedSubFlow(it) }
|
||||
val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture
|
||||
net.runNetwork()
|
||||
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
|
||||
}
|
||||
|
||||
|
||||
@ -664,15 +669,13 @@ class StateMachineManagerTests {
|
||||
}
|
||||
}
|
||||
|
||||
private fun sanitise(message: SessionMessage): SessionMessage {
|
||||
return when (message) {
|
||||
is SessionData -> message.copy(recipientSessionId = 0)
|
||||
is SessionInit -> message.copy(initiatorSessionId = 0)
|
||||
is SessionConfirm -> message.copy(initiatorSessionId = 0, initiatedSessionId = 0)
|
||||
is NormalSessionEnd -> message.copy(recipientSessionId = 0)
|
||||
is ErrorSessionEnd -> message.copy(recipientSessionId = 0)
|
||||
else -> message
|
||||
}
|
||||
private fun sanitise(message: SessionMessage) = when (message) {
|
||||
is SessionData -> message.copy(recipientSessionId = 0)
|
||||
is SessionInit -> message.copy(initiatorSessionId = 0)
|
||||
is SessionConfirm -> message.copy(initiatorSessionId = 0, initiatedSessionId = 0)
|
||||
is NormalSessionEnd -> message.copy(recipientSessionId = 0)
|
||||
is ErrorSessionEnd -> message.copy(recipientSessionId = 0)
|
||||
else -> message
|
||||
}
|
||||
|
||||
private infix fun MockNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(id, message)
|
||||
@ -700,6 +703,7 @@ class StateMachineManagerTests {
|
||||
}
|
||||
|
||||
|
||||
@InitiatingFlow
|
||||
private open class SendFlow(val payload: String, vararg val otherParties: Party) : FlowLogic<Unit>() {
|
||||
init {
|
||||
require(otherParties.isNotEmpty())
|
||||
@ -712,6 +716,7 @@ class StateMachineManagerTests {
|
||||
private interface CustomInterface
|
||||
private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty)
|
||||
|
||||
@InitiatingFlow
|
||||
private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic<Unit>() {
|
||||
object START_STEP : ProgressTracker.Step("Starting")
|
||||
object RECEIVED_STEP : ProgressTracker.Step("Received")
|
||||
@ -740,6 +745,18 @@ class StateMachineManagerTests {
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic<Any>() {
|
||||
@Suspendable
|
||||
override fun call(): Any = sendAndReceive<Any>(otherParty, payload).unwrap { it }
|
||||
}
|
||||
|
||||
private class InlinedSendFlow(val payload: String, val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = send(otherParty, payload)
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class PingPongFlow(val otherParty: Party, val payload: Long) : FlowLogic<Unit>() {
|
||||
@Transient var receivedPayload: Long? = null
|
||||
@Transient var receivedPayload2: Long? = null
|
||||
@ -770,6 +787,7 @@ class StateMachineManagerTests {
|
||||
}
|
||||
|
||||
private object WaitingFlows {
|
||||
@InitiatingFlow
|
||||
class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
@ -788,11 +806,32 @@ class StateMachineManagerTests {
|
||||
}
|
||||
}
|
||||
|
||||
private class VaultAccessFlow(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
private class VaultAccessFlow : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
serviceHub.vaultService.unconsumedStates<Cash.State>().filter { true }
|
||||
send(otherParty, "Hello")
|
||||
waitForLedgerCommit(SecureHash.zeroHash)
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow(version = 2)
|
||||
private class UpgradedFlow(val otherParty: Party) : FlowLogic<Any>() {
|
||||
@Suspendable
|
||||
override fun call(): Any = receive<Any>(otherParty).unwrap { it }
|
||||
}
|
||||
|
||||
private class SingleInlinedSubFlow(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val payload = receive<String>(otherParty).unwrap { it }
|
||||
subFlow(InlinedSendFlow(payload + payload, otherParty))
|
||||
}
|
||||
}
|
||||
|
||||
private class DoubleInlinedSubFlow(val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
subFlow(SingleInlinedSubFlow(otherParty))
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user