From 221bb81f8401a5cf144867a4e19892f33d73e2ae Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Mon, 8 May 2017 10:38:18 +0100 Subject: [PATCH] Introducing InitiatingFlow annotation which has to be annotated by initiating flows. This removes the need for the shareParentSessions parameter of FlowLogic.subFlow. It also has the flow's version number so FlowVersion is now no longer needed. --- .../kotlin/net/corda/core/flows/FlowLogic.kt | 59 +++++---- .../net/corda/core/flows/FlowVersion.kt | 18 --- .../net/corda/core/flows/InitiatingFlow.kt | 27 ++++ .../net/corda/core/node/PluginServiceHub.kt | 7 +- .../corda/flows/BroadcastTransactionFlow.kt | 2 + .../net/corda/flows/ContractUpgradeFlow.kt | 2 + .../net/corda/flows/FetchAttachmentsFlow.kt | 2 + .../net/corda/flows/FetchTransactionsFlow.kt | 6 +- .../net/corda/flows/NotaryChangeFlow.kt | 2 + .../main/kotlin/net/corda/flows/NotaryFlow.kt | 2 + .../net/corda/core/flows/FlowsInJavaTest.java | 1 + .../kotlin/net/corda/core/flows/TxKeyFlow.kt | 1 + .../AttachmentSerializationTest.kt | 4 +- docs/source/changelog.rst | 54 +++++--- .../corda/docs/FxTransactionBuildTutorial.kt | 2 + .../docs/WorkflowTransactionBuildTutorial.kt | 2 + docs/source/flow-state-machines.rst | 61 ++++++--- docs/source/release-notes.rst | 15 +-- docs/source/versioning.rst | 8 +- .../main/kotlin/net/corda/flows/IssuerFlow.kt | 2 + .../statemachine/FlowVersioningTest.kt | 6 +- .../services/messaging/MQSecurityTest.kt | 2 + .../net/corda/node/internal/AbstractNode.kt | 27 ++-- .../corda/node/services/CoreFlowHandlers.kt | 2 +- .../services/events/NodeSchedulerService.kt | 118 +++++++++--------- .../statemachine/FlowStateMachineImpl.kt | 8 +- .../statemachine/StateMachineManager.kt | 4 - .../node/messaging/TwoPartyTradeFlowTests.kt | 37 ++++-- .../node/services/MockServiceHubInternal.kt | 2 +- .../events/NodeSchedulerServiceTest.kt | 2 +- .../persistence/DataVendingServiceTests.kt | 2 + ...eManagerTests.kt => FlowFrameworkTests.kt} | 89 +++++++++---- .../net/corda/irs/flows/AutoOfferFlow.kt | 13 +- .../kotlin/net/corda/irs/flows/FixingFlow.kt | 16 +-- .../net/corda/irs/flows/RatesFixFlow.kt | 3 + .../corda/irs/flows/UpdateBusinessDayFlow.kt | 2 + .../net/corda/simulation/IRSSimulation.kt | 22 +++- .../net/corda/simulation/TradeSimulation.kt | 7 +- .../net/corda/vega/flows/IRSTradeFlow.kt | 19 +-- .../kotlin/net/corda/vega/flows/SimmFlow.kt | 28 ++--- .../net/corda/traderdemo/flow/BuyerFlow.kt | 2 +- .../net/corda/traderdemo/flow/SellerFlow.kt | 5 +- 42 files changed, 429 insertions(+), 264 deletions(-) delete mode 100644 core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt create mode 100644 core/src/main/kotlin/net/corda/core/flows/InitiatingFlow.kt rename node/src/test/kotlin/net/corda/node/services/statemachine/{StateMachineManagerTests.kt => FlowFrameworkTests.kt} (93%) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index ae79387908..dec292a99c 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -27,12 +27,20 @@ import rx.Observable * * If you'd like to use another FlowLogic class as a component of your own, construct it on the fly and then pass * it to the [subFlow] method. It will return the result of that flow when it completes. + * + * If your flow (whether it's a top-level flow or a subflow) is supposed to initiate a session with the counterparty + * and request they start their counterpart flow, then make sure it's annotated with [InitiatingFlow]. This annotation + * also has a version property to allow you to version your flow and enables a node to restrict support for the flow to + * that particular version. */ abstract class FlowLogic { /** This is where you should log things to. */ val logger: Logger get() = stateMachine.logger - /** Returns a wrapped [UUID] object that identifies this state machine run (i.e. subflows have the same identifier as their parents). */ + /** + * Returns a wrapped [java.util.UUID] object that identifies this state machine run (i.e. subflows have the same + * identifier as their parents). + */ val runId: StateMachineRunId get() = stateMachine.id /** @@ -60,7 +68,9 @@ abstract class FlowLogic { * * @returns an [UntrustworthyData] wrapper around the received object. */ - inline fun sendAndReceive(otherParty: Party, payload: Any) = sendAndReceive(R::class.java, otherParty, payload) + inline fun sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData { + return sendAndReceive(R::class.java, otherParty, payload) + } /** * Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response @@ -75,11 +85,13 @@ abstract class FlowLogic { */ @Suspendable open fun sendAndReceive(receiveType: Class, otherParty: Party, payload: Any): UntrustworthyData { - return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow) + return stateMachine.sendAndReceive(receiveType, otherParty, payload, flowUsedForSessions) } /** @see sendAndReceiveWithRetry */ - internal inline fun sendAndReceiveWithRetry(otherParty: Party, payload: Any) = sendAndReceiveWithRetry(R::class.java, otherParty, payload) + internal inline fun sendAndReceiveWithRetry(otherParty: Party, payload: Any): UntrustworthyData { + return sendAndReceiveWithRetry(R::class.java, otherParty, payload) + } /** * Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received. @@ -92,7 +104,7 @@ abstract class FlowLogic { */ @Suspendable internal open fun sendAndReceiveWithRetry(receiveType: Class, otherParty: Party, payload: Any): UntrustworthyData { - return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow, true) + return stateMachine.sendAndReceive(receiveType, otherParty, payload, flowUsedForSessions, true) } /** @@ -115,7 +127,7 @@ abstract class FlowLogic { */ @Suspendable open fun receive(receiveType: Class, otherParty: Party): UntrustworthyData { - return stateMachine.receive(receiveType, otherParty, sessionFlow) + return stateMachine.receive(receiveType, otherParty, flowUsedForSessions) } /** @@ -126,30 +138,27 @@ abstract class FlowLogic { * network's event horizon time. */ @Suspendable - open fun send(otherParty: Party, payload: Any) = stateMachine.send(otherParty, payload, sessionFlow) + open fun send(otherParty: Party, payload: Any) = stateMachine.send(otherParty, payload, flowUsedForSessions) /** * Invokes the given subflow. This function returns once the subflow completes successfully with the result - * returned by that subflows [call] method. If the subflow has a progress tracker, it is attached to the + * returned by that subflow's [call] method. If the subflow has a progress tracker, it is attached to the * current step in this flow's progress tracker. * - * @param shareParentSessions In certain situations the need arises to use the same sessions the parent flow has - * already established. However this also prevents the subflow from creating new sessions with those parties. - * For this reason the default value is false. + * If the subflow is not an initiating flow (i.e. not annotated with [InitiatingFlow]) then it will continue to use + * the existing sessions this flow has created with its counterparties. This allows for subflows which can act as + * building blocks for other flows, for example removing the boilerplate of common sequences of sends and receives. * * @throws FlowException This is either thrown by [subLogic] itself or propagated from any of the remote - * [FlowLogic]s it communicated with. A subflow retry can be done by catching this exception. + * [FlowLogic]s it communicated with. The subflow can be retried by catching this exception. */ - // TODO Rethink the default value for shareParentSessions - // TODO shareParentSessions is a bit too low-level and perhaps can be expresed in a better way @Suspendable - @JvmOverloads @Throws(FlowException::class) - open fun subFlow(subLogic: FlowLogic, shareParentSessions: Boolean = false): R { + open fun subFlow(subLogic: FlowLogic): R { subLogic.stateMachine = stateMachine maybeWireUpProgressTracking(subLogic) - if (shareParentSessions) { - subLogic.sessionFlow = this + if (!subLogic.javaClass.isAnnotationPresent(InitiatingFlow::class.java)) { + subLogic.flowUsedForSessions = flowUsedForSessions } logger.debug { "Calling subflow: $subLogic" } val result = subLogic.call() @@ -171,8 +180,7 @@ abstract class FlowLogic { open val progressTracker: ProgressTracker? = null /** - * This is where you fill out your business logic. The returned object will usually be ignored, but can be - * helpful if this flow is meant to be used as a subflow. + * This is where you fill out your business logic. */ @Suspendable @Throws(FlowException::class) @@ -187,7 +195,7 @@ abstract class FlowLogic { fun track(): Pair>? { // TODO this is not threadsafe, needs an atomic get-step-and-subscribe return progressTracker?.let { - Pair(it.currentStep.toString(), it.changes.map { it.toString() }) + it.currentStep.toString() to it.changes.map { it.toString() } } } @@ -197,9 +205,7 @@ abstract class FlowLogic { * valid by the local node, but that doesn't imply the vault will consider it relevant. */ @Suspendable - fun waitForLedgerCommit(hash: SecureHash): SignedTransaction { - return stateMachine.waitForLedgerCommit(hash, this) - } + fun waitForLedgerCommit(hash: SecureHash): SignedTransaction = stateMachine.waitForLedgerCommit(hash, this) //////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -216,8 +222,9 @@ abstract class FlowLogic { _stateMachine = value } - // This points to the outermost flow and is changed when a subflow is invoked. - private var sessionFlow: FlowLogic<*> = this + // This is the flow used for managing sessions. It defaults to the current flow but if this is an inlined sub-flow + // then it will point to the flow it's been inlined to. + private var flowUsedForSessions: FlowLogic<*> = this private fun maybeWireUpProgressTracking(subLogic: FlowLogic<*>) { val ours = progressTracker diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt b/core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt deleted file mode 100644 index a953aa01eb..0000000000 --- a/core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt +++ /dev/null @@ -1,18 +0,0 @@ -package net.corda.core.flows - -/** - * Annotation for initiating [FlowLogic]s to specify the version of their flow protocol. The version is a single integer - * [value] which increments by one whenever a release is made where the flow protocol changes in any manner which is - * backwards incompatible. This may be a change in the sequence of sends and receives between the client and service flows, - * or it could be a change in the meaning. The version is used when a flow first initiates communication with a party to - * inform them what version they are using. For this reason the annotation is not applicable for the initiated flow. - * - * This flow version integer is not the same as Corda's platform version, though it follows a similar semantic. - * - * Note: Only one version of the same flow can currently be loaded at the same time. Any session request by a client flow for - * a different version will be rejected. - * - * Defaults to a flow version of 1 if not specified. - */ -// TODO Add support for multiple versions once CorDapps are loaded in separate class loaders -annotation class FlowVersion(val value: Int) diff --git a/core/src/main/kotlin/net/corda/core/flows/InitiatingFlow.kt b/core/src/main/kotlin/net/corda/core/flows/InitiatingFlow.kt new file mode 100644 index 0000000000..75ead4b8e1 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/flows/InitiatingFlow.kt @@ -0,0 +1,27 @@ +package net.corda.core.flows + +import java.lang.annotation.Inherited +import kotlin.annotation.AnnotationTarget.CLASS + +/** + * This annotation is required by any [FlowLogic] which has been designated to initiate communication with a counterparty + * and request they start their side of the flow communication. To ensure that this is correctly applied + * [net.corda.core.node.PluginServiceHub.registerServiceFlow] checks the initiating flow class has this annotation. + * + * There is also an optional [version] property, which defaults to 1, to specify the version of the flow protocol. This + * integer value should be incremented whenever there is a release of this flow which has changes that are not backwards + * compatible with previous releases. This may be a change in the sends and receives that occur, or it could be a change + * in what a send or receive means, etc. + * + * The version is used when a flow first initiates communication with a party to inform them of the version they are using. + * If the other side does not have this flow registered with the same version then the initiation request will be rejected. + * Currently only one version of the same flow can be registered by a node. + * + * The flow version number is similar in concept to Corda's platform version but they are not the same. A flow's version + * number can change independently of the platform version. + */ +// TODO Add support for multiple versions once CorDapps are loaded in separate class loaders +@Target(CLASS) +@Inherited +@MustBeDocumented +annotation class InitiatingFlow(val version: Int = 1) diff --git a/core/src/main/kotlin/net/corda/core/node/PluginServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/PluginServiceHub.kt index f8b6ede9e9..6f40e356fa 100644 --- a/core/src/main/kotlin/net/corda/core/node/PluginServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/PluginServiceHub.kt @@ -10,14 +10,15 @@ interface PluginServiceHub : ServiceHub { /** * Register the service flow factory to use when an initiating party attempts to communicate with us. The registration * is done against the [Class] object of the client flow to the service flow. What this means is if a counterparty - * starts a [FlowLogic] represented by [clientFlowClass] and starts communication with us, we will execute the service + * starts a [FlowLogic] represented by [initiatingFlowClass] and starts communication with us, we will execute the service * flow produced by [serviceFlowFactory]. This service flow has respond correctly to the sends and receives the client * does. - * @param clientFlowClass [Class] of the client flow involved in this client-server communication. + * @param initiatingFlowClass [Class] of the client flow involved in this client-server communication. * @param serviceFlowFactory Lambda which produces a new service flow for each new client flow communication. The * [Party] parameter of the factory is the client's identity. + * @throws IllegalArgumentException If [initiatingFlowClass] is not annotated with [net.corda.core.flows.InitiatingFlow]. */ - fun registerServiceFlow(clientFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) + fun registerServiceFlow(initiatingFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) @Suppress("UNCHECKED_CAST") @Deprecated("This is scheduled to be removed in a future release", ReplaceWith("registerServiceFlow")) diff --git a/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt b/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt index c5b3db232b..1bfd7a2b79 100644 --- a/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt @@ -3,6 +3,7 @@ package net.corda.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction @@ -15,6 +16,7 @@ import net.corda.core.transactions.SignedTransaction * @param participants a list of participants involved in the transaction. * @return a list of participants who were successfully notified of the transaction. */ +@InitiatingFlow class BroadcastTransactionFlow(val notarisedTransaction: SignedTransaction, val participants: Set) : FlowLogic() { @CordaSerializable diff --git a/core/src/main/kotlin/net/corda/flows/ContractUpgradeFlow.kt b/core/src/main/kotlin/net/corda/flows/ContractUpgradeFlow.kt index 8f4b0faa8c..fd73271e45 100644 --- a/core/src/main/kotlin/net/corda/flows/ContractUpgradeFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/ContractUpgradeFlow.kt @@ -1,6 +1,7 @@ package net.corda.flows import net.corda.core.contracts.* +import net.corda.core.flows.InitiatingFlow import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import java.security.PublicKey @@ -13,6 +14,7 @@ import java.security.PublicKey * Finally, the transaction containing all signatures is sent back to each participant so they can record it and * use the new updated state for future transactions. */ +@InitiatingFlow class ContractUpgradeFlow( originalState: StateAndRef, newContractClass: Class> diff --git a/core/src/main/kotlin/net/corda/flows/FetchAttachmentsFlow.kt b/core/src/main/kotlin/net/corda/flows/FetchAttachmentsFlow.kt index d0c394d4ca..096d5bc726 100644 --- a/core/src/main/kotlin/net/corda/flows/FetchAttachmentsFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/FetchAttachmentsFlow.kt @@ -5,6 +5,7 @@ import net.corda.core.contracts.Attachment import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 +import net.corda.core.flows.InitiatingFlow import net.corda.core.serialization.SerializationToken import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsTokenContext @@ -13,6 +14,7 @@ import net.corda.core.serialization.SerializeAsTokenContext * Given a set of hashes either loads from from local storage or requests them from the other peer. Downloaded * attachments are saved to local storage automatically. */ +@InitiatingFlow class FetchAttachmentsFlow(requests: Set, otherSide: Party) : FetchDataFlow(requests, otherSide) { diff --git a/core/src/main/kotlin/net/corda/flows/FetchTransactionsFlow.kt b/core/src/main/kotlin/net/corda/flows/FetchTransactionsFlow.kt index a8a1913ea0..96f04477e3 100644 --- a/core/src/main/kotlin/net/corda/flows/FetchTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/FetchTransactionsFlow.kt @@ -2,6 +2,7 @@ package net.corda.flows import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash +import net.corda.core.flows.InitiatingFlow import net.corda.core.transactions.SignedTransaction /** @@ -12,8 +13,11 @@ import net.corda.core.transactions.SignedTransaction * results in a [FetchDataFlow.HashNotFound] exception. Note that returned transactions are not inserted into * the database, because it's up to the caller to actually verify the transactions are valid. */ +@InitiatingFlow class FetchTransactionsFlow(requests: Set, otherSide: Party) : FetchDataFlow(requests, otherSide) { - override fun load(txid: SecureHash): SignedTransaction? = serviceHub.storageService.validatedTransactions.getTransaction(txid) + override fun load(txid: SecureHash): SignedTransaction? { + return serviceHub.storageService.validatedTransactions.getTransaction(txid) + } } diff --git a/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt index 44ac9942c5..ce2985d128 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt @@ -2,6 +2,7 @@ package net.corda.flows import net.corda.core.contracts.* import net.corda.core.crypto.Party +import net.corda.core.flows.InitiatingFlow import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.ProgressTracker @@ -16,6 +17,7 @@ import java.security.PublicKey * Finally, the transaction containing all signatures is sent back to each participant so they can record it and * use the new updated state for future transactions. */ +@InitiatingFlow class NotaryChangeFlow( originalState: StateAndRef, newNotary: Party, diff --git a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt index 97a393e00e..ca2c4e4024 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt @@ -6,6 +6,7 @@ import net.corda.core.contracts.Timestamp import net.corda.core.crypto.* import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.services.TimestampChecker import net.corda.core.node.services.UniquenessException import net.corda.core.node.services.UniquenessProvider @@ -26,6 +27,7 @@ object NotaryFlow { * @throws NotaryException in case the any of the inputs to the transaction have been consumed * by another transaction or the timestamp is invalid. */ + @InitiatingFlow open class Client(private val stx: SignedTransaction, override val progressTracker: ProgressTracker) : FlowLogic>() { constructor(stx: SignedTransaction) : this(stx, Client.tracker()) diff --git a/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java b/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java index 197c3bb79d..0bb4c9d770 100644 --- a/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java +++ b/core/src/test/java/net/corda/core/flows/FlowsInJavaTest.java @@ -37,6 +37,7 @@ public class FlowsInJavaTest { } @SuppressWarnings("unused") + @InitiatingFlow private static class SendInUnwrapFlow extends FlowLogic { private final Party otherParty; diff --git a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt index fe45b2d27e..f7bf170841 100644 --- a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt +++ b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt @@ -14,6 +14,7 @@ import java.security.cert.Certificate */ object TxKeyFlow { + @InitiatingFlow class Requester(val otherSide: Party, override val progressTracker: ProgressTracker) : FlowLogic>() { constructor(otherSide: Party) : this(otherSide, tracker()) diff --git a/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt b/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt index f2e0b7c566..135f9a0704 100644 --- a/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt +++ b/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt @@ -5,6 +5,7 @@ import net.corda.core.contracts.Attachment import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.getOrThrow import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient @@ -86,6 +87,7 @@ class AttachmentSerializationTest { private class ClientResult(internal val attachmentContent: String) + @InitiatingFlow private abstract class ClientLogic(server: MockNetwork.MockNode) : FlowLogic() { internal val server = server.info.legalIdentity @@ -134,7 +136,7 @@ class AttachmentSerializationTest { } private fun launchFlow(clientLogic: ClientLogic, rounds: Int) { - server.services.registerFlowInitiator(clientLogic.javaClass, ::ServerLogic) + server.services.registerServiceFlow(clientLogic.javaClass, ::ServerLogic) client.services.startFlow(clientLogic) network.runNetwork(rounds) } diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 1410335844..f3a98c58f0 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,22 +1,40 @@ Changelog ========= -Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code from the previous milestone release. +Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code +from the previous milestone release. UNRELEASED ---------- * API changes: - * ``PluginServiceHub.registerServiceFlow`` has been deprecated and replaced by ``registerServiceFlow`` with the - marker Class restricted to ``FlowLogic``. + * Initiating flows (i.e. those which initiate flows in a counterparty) are now required to be annotated with + ``InitiatingFlow``. - * ``FlowLogic.getCounterpartyMarker`` is no longer used and been deprecated for removal. If you were using this to - manage multiple independent message streams with the same party in the same flow then use sub-flows instead. + * ``PluginServiceHub.registerFlowInitiator`` has been deprecated and replaced by ``registerServiceFlow`` with the + marker Class restricted to ``FlowLogic``. In line with the introduction of ``InitiatingFlow``, it throws an + ``IllegalArgumentException`` if the initiating flow class is not annotated with it. + + * Also related to ``InitiatingFlow``, the ``shareParentSessions`` boolean parameter of ``FlowLogic.subFlow`` has been + removed. Its purpose was to allow subflows to be inlined with the parent flow - i.e. the subflow does not initiate + new sessions with parties the parent flow has already started. This allowed flows to be used as building blocks. To + achieve the same effect now simply requires the subflow to be *not* annotated wth ``InitiatingFlow`` (i.e. we've made + this the default behaviour). If the subflow is not meant to be inlined, and is supposed to initiate flows on the + other side, the annotation is required. * ``ContractUpgradeFlow.Instigator`` has been renamed to just ``ContractUpgradeFlow``. * ``NotaryChangeFlow.Instigator`` has been renamed to just ``NotaryChangeFlow``. + * ``FlowLogic.getCounterpartyMarker`` is no longer used and been deprecated for removal. If you were using this to + manage multiple independent message streams with the same party in the same flow then use sub-flows instead. + +* The ``InitiatingFlow`` annotation also has an integer ``version`` property which assigns the initiating flow a version + number, defaulting to 1 if it's specified. The flow version is included in the flow session request and the counterparty + will only respond and start their own flow if the version number matches to the one they've registered with. At some + point we will support the ability for a node to have multiple versions of the same flow registered, enabling backwards + compatibility of CorDapp flows. + Milestone 11.0 -------------- @@ -31,20 +49,27 @@ Milestone 11.0 * Moved ``generateSpend`` and ``generateExit`` functions into ``OnLedgerAsset`` from the vault and ``AbstractConserveAmount`` clauses respectively. - * Added ``CompositeSignature`` and ``CompositeSignatureData`` as part of enabling ``java.security`` classes to work with - composite keys and signatures. + * Added ``CompositeSignature`` and ``CompositeSignatureData`` as part of enabling ``java.security`` classes to work + with composite keys and signatures. - * ``CompositeKey`` now implements ``java.security.PublicKey`` interface, so that keys can be used on standard classes such as ``Certificate``. + * ``CompositeKey`` now implements ``java.security.PublicKey`` interface, so that keys can be used on standard classes + such as ``Certificate``. - * There is no longer a need to transform single keys into composite - ``composite`` extension was removed, it is imposible to create ``CompositeKey`` with only one leaf. + * There is no longer a need to transform single keys into composite - ``composite`` extension was removed, it is + imposible to create ``CompositeKey`` with only one leaf. - * Constructor of ``CompositeKey`` class is now private. Use ``CompositeKey.Builder`` to create a composite key. Keys emitted by the builder are normalised so that it's impossible to create a composite key with only one node. (Long chains of single nodes are shortened.) + * Constructor of ``CompositeKey`` class is now private. Use ``CompositeKey.Builder`` to create a composite key. + Keys emitted by the builder are normalised so that it's impossible to create a composite key with only one node. + (Long chains of single nodes are shortened.) - * Use extension function ``PublicKeys.keys`` to access all keys belonging to an instance of ``PublicKey``. For a ``CompositeKey``, this is equivalent to ``CompositeKey.leafKeys``. + * Use extension function ``PublicKeys.keys`` to access all keys belonging to an instance of ``PublicKey``. For a + ``CompositeKey``, this is equivalent to ``CompositeKey.leafKeys``. - * Introduced ``containsAny``, ``isFulfilledBy``, ``keys`` extension functions on ``PublicKey`` - ``CompositeKey`` type checking is done there. + * Introduced ``containsAny``, ``isFulfilledBy``, ``keys`` extension functions on ``PublicKey`` - ``CompositeKey`` + type checking is done there. -* Corda now requires JDK 8u131 or above in order to run. Our Kotlin now also compiles to JDK8 bytecode, and so you'll need to update your CorDapp projects to do the same. E.g. by adding this to ``build.gradle``: +* Corda now requires JDK 8u131 or above in order to run. Our Kotlin now also compiles to JDK8 bytecode, and so you'll need + to update your CorDapp projects to do the same. E.g. by adding this to ``build.gradle``: .. parsed-literal:: @@ -89,7 +114,8 @@ to Corda in M10. processor. * Corda DemoBench: - * DemoBench is a new tool to make it easy to configure and launch local Corda nodes. A very useful tool to demonstrate to your colleagues the fundamentals of Corda in real-time. It has the following features: + * DemoBench is a new tool to make it easy to configure and launch local Corda nodes. A very useful tool to demonstrate + to your colleagues the fundamentals of Corda in real-time. It has the following features: * Clicking "Add node" creates a new tab that lets you edit the most important configuration properties of the node before launch, such as its legal name and which CorDapps will be loaded. diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt index 8e9ab5129e..59b8d07a47 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt @@ -11,6 +11,7 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sign import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.PluginServiceHub import net.corda.core.node.ServiceHub import net.corda.core.node.services.unconsumedStates @@ -104,6 +105,7 @@ private fun prepareOurInputsAndOutputs(serviceHub: ServiceHub, request: FxReques // A flow representing creating a transaction that // carries out exchange of cash assets. +@InitiatingFlow class ForeignExchangeFlow(val tradeId: String, val baseCurrencyAmount: Amount>, val quoteCurrencyAmount: Amount>, diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt index c3e50d0bec..158cf0b786 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.* import net.corda.core.crypto.* import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.PluginServiceHub import net.corda.core.node.ServiceHub import net.corda.core.node.services.linearHeadsOfType @@ -144,6 +145,7 @@ class SubmitTradeApprovalFlow(val tradeId: String, * Simple flow to complete a proposal submitted by another party and ensure both nodes * end up with a fully signed copy of the state either as APPROVED, or REJECTED */ +@InitiatingFlow class SubmitCompletionFlow(val ref: StateRef, val verdict: WorkflowState) : FlowLogic>() { init { require(verdict in setOf(WorkflowState.APPROVED, WorkflowState.REJECTED)) { diff --git a/docs/source/flow-state-machines.rst b/docs/source/flow-state-machines.rst index 196d053227..0e099fc292 100644 --- a/docs/source/flow-state-machines.rst +++ b/docs/source/flow-state-machines.rst @@ -228,26 +228,6 @@ These will return a ``FlowProgressHandle``, which is just like a ``FlowHandle`` .. note:: The developer `must` then either subscribe to this ``progress`` observable or invoke the ``notUsed()`` extension function for it. Otherwise the unused observable will waste resources back in the node. -In a two party flow only one side is to be manually started using ``CordaRPCOps.startFlow``. The other side has to be -registered by its node to respond to the initiating flow via ``PluginServiceHub.registerServiceFlow``. In our example it -doesn't matter which flow is the initiator (i.e. client) and which is the initiated (i.e. service). For example, if we -are to take the seller as the initiator then we would register the buyer as such: - -.. container:: codeset - - .. sourcecode:: kotlin - - val services: PluginServiceHub = TODO() - services.registerServiceFlow(Seller::class.java) { otherParty -> - val notary = services.networkMapCache.notaryNodes[0] - val acceptablePrice = TODO() - val typeToBuy = TODO() - Buyer(otherParty, notary, acceptablePrice, typeToBuy) - } - -This is telling the buyer node to fire up an instance of ``Buyer`` (the code in the lambda) when the initiating flow -is a seller (``Seller::class``). - Implementing the seller ----------------------- @@ -419,6 +399,47 @@ This code is longer but no more complicated. Here are some things to pay attenti As you can see, the flow logic is straightforward and does not contain any callbacks or network glue code, despite the fact that it takes minimal resources and can survive node restarts. +Initiating communication +------------------------ + +Now that we have both sides of the deal negotation implemented as flows we need a way to start things off. We do this by +having one side initiate communication and the other respond to it and start their flow. Initiation is typically done using +RPC with the ``startFlowDynamic`` method. The initiating flow has be to annotated with ``InitiatingFlow``. In our example +it doesn't matter which flow is the initiator and which is the initiated, which is why neither ``Buyer`` nor ``Seller`` +are annotated with it. For example, if we choose the seller side as the initiator then we need a seller starter flow that +might look something like this: + +.. container:: codeset + + .. sourcecode:: kotlin + + @InitiatingFlow + class SellerStarter(val otherParty: Party, val assetToSell: StateAndRef, val price: Amount) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val notary: NodeInfo = serviceHub.networkMapCache.notaryNodes[0] + val cpOwnerKey: KeyPair = serviceHub.legalIdentityKey + return subFlow(TwoPartyTradeFlow.Seller(otherParty, notary, assetToSell, price, cpOwnerKey)) + } + } + +The buyer side would then need to register their flow, perhaps with something like: + +.. container:: codeset + + .. sourcecode:: kotlin + + val services: PluginServiceHub = TODO() + services.registerServiceFlow(SellerStarter::class.java) { otherParty -> + val notary = services.networkMapCache.notaryNodes[0] + val acceptablePrice = TODO() + val typeToBuy = TODO() + Buyer(otherParty, notary, acceptablePrice, typeToBuy) + } + +This is telling the buyer node to fire up an instance of ``Buyer`` (the code in the lambda) when the initiating flow +is a seller (``SellerStarter::class.java``). + .. _progress-tracking: Progress tracking diff --git a/docs/source/release-notes.rst b/docs/source/release-notes.rst index 28135c1985..44cbdb6345 100644 --- a/docs/source/release-notes.rst +++ b/docs/source/release-notes.rst @@ -6,10 +6,9 @@ Here are release notes for each snapshot release from M9 onwards. Unreleased ---------- -Flows can now be versioned using the ``FlowVersion`` annotation, which assigns an integer version number to it. For now -this enables a node to restrict usage of a flow to a specific version. Support for multiple verisons of the same flow, -hence achieving backwards compatibility, will be possible once we start loading CorDapps in separate class loaders. Watch -this space... +We've added the ability for flows to be versioned by their CorDapp developers. This enables a node to support a particular +version of a flow and allows it to reject flow communication with a node which isn't using the same fact. In a future +release we allow a node to have multiple versions of the same flow running to enable backwards compatibility. Milestone 11 ------------ @@ -36,7 +35,8 @@ We anticipate enforcing the use of distinguished names in node configurations fr We have increased the maximum message size that we can send to Corda over RPC from 100 KB to 10 MB. -The Corda node now disables any use of ObjectInputStream to prevent Java deserialisation within flows. This is a security fix, and prevents the node from deserialising arbitrary objects. +The Corda node now disables any use of ObjectInputStream to prevent Java deserialisation within flows. This is a security fix, +and prevents the node from deserialising arbitrary objects. We've introduced the concept of platform version which is a single integer value which increments by 1 if a release changes any of the public APIs of the entire Corda platform. This includes the node's public APIs, the messaging protocol, @@ -44,8 +44,9 @@ serialisation, etc. The node exposes the platform version it's on and we envisio run on older versions of the platform to the one they were compiled against. Platform version borrows heavily from Android's API Level. -We have revamped the DemoBench user interface. DemoBench will now also be installed as "Corda DemoBench" for both Windows and MacOSX. The original version -was installed as just "DemoBench", and so will not be overwritten automatically by the new version. +We have revamped the DemoBench user interface. DemoBench will now also be installed as "Corda DemoBench" for both Windows +and MacOSX. The original version was installed as just "DemoBench", and so will not be overwritten automatically by the +new version. Milestone 10 ------------ diff --git a/docs/source/versioning.rst b/docs/source/versioning.rst index 80562490ab..61ce417263 100644 --- a/docs/source/versioning.rst +++ b/docs/source/versioning.rst @@ -35,10 +35,10 @@ A platform which can be extended with CorDapps also requires the ability to vers release to release. This allows users of these apps, whether they're other nodes or RPC users, to select which version they wish to use and enables nodes to control which app versions they support. Flows have their own version numbers, independent of other versioning, for example of the platform. In particular it is the initiating flow that can be versioned -using the ``FlowVersion`` annotation. This assigns an integer version number, similar in concept to the platform version, -which is used in the session handshake process when a flow communicates with another party for the first time. The other -party will only accept the session request if it, firstly, has that flow loaded, and secondly, for the same version (see -:doc:`flow-state-machine`). +using the ``version`` property of the ``InitiatingFlow`` annotation. This assigns an integer version number, similar in +concept to the platform version, which is used in the session handshake process when a flow communicates with another party +for the first time. The other party will only accept the session request if it, firstly, has that flow loaded, and secondly, +for the same version (see also :doc:`flow-state-machine`). .. note:: Currently we don't support multiple versions of the same flow loaded in the same node. This will be possible once we start loading CorDapps in separate class loaders. diff --git a/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt b/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt index 4e74498b7d..4d728eb6c7 100644 --- a/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt @@ -5,6 +5,7 @@ import net.corda.core.contracts.* import net.corda.core.crypto.Party import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.PluginServiceHub import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.OpaqueBytes @@ -28,6 +29,7 @@ object IssuerFlow { * IssuanceRequester should be used by a client to ask a remote node to issue some [FungibleAsset] with the given details. * Returns the transaction created by the Issuer to move the cash to the Requester. */ + @InitiatingFlow class IssuanceRequester(val amount: Amount, val issueToParty: Party, val issueToPartyRef: OpaqueBytes, val issuerBankParty: Party) : FlowLogic() { @Suspendable diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt index dd5c47f5dc..4b81f86089 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.Futures import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.getOrThrow import net.corda.core.utilities.ALICE import net.corda.core.utilities.BOB @@ -23,14 +24,15 @@ class FlowVersioningTest : NodeBasedTest() { assertThat(resultFuture.getOrThrow()).isEqualTo(2) } - private open class ClientFlow(val otherParty: Party) : FlowLogic() { + @InitiatingFlow + private class ClientFlow(val otherParty: Party) : FlowLogic() { @Suspendable override fun call(): Any { return sendAndReceive(otherParty, "This is ignored. We only send to kick off the flow on the other side").unwrap { it } } } - private open class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Any) : FlowLogic() { + private class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Int) : FlowLogic() { @Suspendable override fun call() = send(otherParty, otherPartysPlatformVersion) } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index e9d4d1ea02..a7492a1035 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -7,6 +7,7 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.generateKeyPair import net.corda.core.crypto.toBase58String import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.random63BitValue @@ -222,6 +223,7 @@ abstract class MQSecurityTest : NodeBasedTest() { return bobParty } + @InitiatingFlow private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic() { @Suspendable override fun call() = send(otherParty, payload) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 58c01cab68..c05befff2f 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -13,7 +13,7 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.X509Utilities import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowVersion +import net.corda.core.flows.InitiatingFlow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient @@ -140,12 +140,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, return serverThread.fetchFrom { smm.add(logic, flowInitiator) } } - override fun registerServiceFlow(clientFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) { - require(clientFlowClass !in serviceFlowFactories) { "${clientFlowClass.name} has already been used to register a service flow" } - val version = clientFlowClass.flowVersion - val info = ServiceFlowInfo.CorDapp(version, serviceFlowFactory) - log.info("Registering service flow for ${clientFlowClass.name}: $info") - serviceFlowFactories[clientFlowClass] = info + override fun registerServiceFlow(initiatingFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) { + require(initiatingFlowClass !in serviceFlowFactories) { + "${initiatingFlowClass.name} has already been used to register a service flow" + } + val info = ServiceFlowInfo.CorDapp(initiatingFlowClass.flowVersion, serviceFlowFactory) + log.info("Registering service flow for ${initiatingFlowClass.name}: $info") + serviceFlowFactories[initiatingFlowClass] = info } override fun getServiceFlowFactory(clientFlowClass: Class>): ServiceFlowInfo? { @@ -258,15 +259,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } /** - * @suppress * Installs a flow that's core to the Corda platform. Unlike CorDapp flows which are versioned individually using - * [FlowVersion], core flows have the same version as the node's platform version. To cater for backwards compatibility - * [serviceFlowFactory] provides a second parameter which is the platform version of the initiating party. + * [InitiatingFlow.version], core flows have the same version as the node's platform version. To cater for backwards + * compatibility [serviceFlowFactory] provides a second parameter which is the platform version of the initiating party. + * @suppress */ @VisibleForTesting fun installCoreFlow(clientFlowClass: KClass>, serviceFlowFactory: (Party, Int) -> FlowLogic<*>) { - require(!clientFlowClass.java.isAnnotationPresent(FlowVersion::class.java)) { - "${FlowVersion::class.java.name} not applicable for core flows; their version is the node's platform version" + require(clientFlowClass.java.flowVersion == 1) { + "${InitiatingFlow::class.java.name}.version not applicable for core flows; their version is the node's platform version" } serviceFlowFactories[clientFlowClass.java] = ServiceFlowInfo.Core(serviceFlowFactory) log.debug { "Installed core flow ${clientFlowClass.java.name}" } @@ -301,7 +302,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, // the identity key. But the infrastructure to make that easy isn't here yet. keyManagement = makeKeyManagementService() flowLogicFactory = initialiseFlowLogicFactory() - scheduler = NodeSchedulerService(services, flowLogicFactory, unfinishedSchedules = busyNodeLatch) + scheduler = NodeSchedulerService(services, database, flowLogicFactory, unfinishedSchedules = busyNodeLatch) val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler) makeAdvertisedServices(tokenizableServices) diff --git a/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt b/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt index ee0f303b58..8a8edec5af 100644 --- a/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt +++ b/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt @@ -63,7 +63,7 @@ class NotifyTransactionHandler(val otherParty: Party) : FlowLogic() { @Suspendable override fun call() { val request = receive(otherParty).unwrap { it } - subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true) + subFlow(ResolveTransactionsFlow(request.tx, otherParty)) serviceHub.recordTransactions(request.tx) } } diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index fb9425f268..88d8c3032d 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -1,6 +1,5 @@ package net.corda.node.services.events -import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.SettableFuture import net.corda.core.ThreadBox import net.corda.core.contracts.SchedulableState @@ -10,7 +9,7 @@ import net.corda.core.contracts.StateRef import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowLogic import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.utilities.ProgressTracker +import net.corda.core.then import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import net.corda.node.services.api.FlowLogicRefFactoryInternal @@ -18,6 +17,7 @@ import net.corda.node.services.api.SchedulerService import net.corda.node.services.api.ServiceHubInternal import net.corda.node.utilities.* import org.apache.activemq.artemis.utils.ReusableLatch +import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.statements.InsertStatement import java.time.Instant @@ -44,12 +44,15 @@ import javax.annotation.concurrent.ThreadSafe */ @ThreadSafe class NodeSchedulerService(private val services: ServiceHubInternal, + private val database: Database, private val flowLogicRefFactory: FlowLogicRefFactoryInternal, private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(), private val unfinishedSchedules: ReusableLatch = ReusableLatch()) : SchedulerService, SingletonSerializeAsToken() { - private val log = loggerFor() + companion object { + private val log = loggerFor() + } private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}scheduled_states") { val output = stateRef("transaction_id", "output_index") @@ -158,71 +161,62 @@ class NodeSchedulerService(private val services: ServiceHubInternal, } private fun onTimeReached(scheduledState: ScheduledStateRef) { - services.startFlow(RunScheduled(scheduledState, this@NodeSchedulerService), FlowInitiator.Scheduled(scheduledState)) + database.transaction { + val scheduledFlow = getScheduledFlow(scheduledState) + if (scheduledFlow != null) { + // TODO Because the flow is executed asynchronously, there is a small window between this tx we're in + // committing and the flow's first checkpoint when it starts in which we can lose the flow if the node + // goes down. + // See discussion in https://github.com/corda/corda/pull/639#discussion_r115257437 + val future = services.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture + future.then { + unfinishedSchedules.countDown() + } + } + } } - class RunScheduled(val scheduledState: ScheduledStateRef, val scheduler: NodeSchedulerService) : FlowLogic() { - companion object { - object RUNNING : ProgressTracker.Step("Running scheduled...") - - fun tracker() = ProgressTracker(RUNNING) - } - override val progressTracker = tracker() - - @Suspendable - override fun call(): Unit { - progressTracker.currentStep = RUNNING - - // Ensure we are still scheduled. - val scheduledLogic: FlowLogic<*>? = getScheduledLogic() - if (scheduledLogic != null) { - subFlow(scheduledLogic) - scheduler.unfinishedSchedules.countDown() - } - } - - private fun getScheduledaActivity(): ScheduledActivity? { - val txState = serviceHub.loadState(scheduledState.ref) - val state = txState.data as SchedulableState - return try { - // This can throw as running contract code. - state.nextScheduledActivity(scheduledState.ref, scheduler.flowLogicRefFactory) - } catch(e: Exception) { - logger.error("Attempt to run scheduled state $scheduledState resulted in error.", e) - null - } - } - - private fun getScheduledLogic(): FlowLogic<*>? { - val scheduledActivity = getScheduledaActivity() - var scheduledLogic: FlowLogic<*>? = null - scheduler.mutex.locked { - // need to remove us from those scheduled, but only if we are still next - scheduledStates.compute(scheduledState.ref) { _, value -> - if (value === scheduledState) { - if (scheduledActivity == null) { - logger.info("Scheduled state $scheduledState has rescheduled to never.") - scheduler.unfinishedSchedules.countDown() - null - } else if (scheduledActivity.scheduledAt.isAfter(serviceHub.clock.instant())) { - logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.") - ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt) - } else { - // TODO: FlowLogicRefFactory needs to sort out the class loader etc - val logic = scheduler.flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef) - logger.trace { "Scheduler starting FlowLogic $logic" } - scheduledLogic = logic - null - } + private fun getScheduledFlow(scheduledState: ScheduledStateRef): FlowLogic<*>? { + val scheduledActivity = getScheduledActivity(scheduledState) + var scheduledFlow: FlowLogic<*>? = null + mutex.locked { + // need to remove us from those scheduled, but only if we are still next + scheduledStates.compute(scheduledState.ref) { _, value -> + if (value === scheduledState) { + if (scheduledActivity == null) { + log.info("Scheduled state $scheduledState has rescheduled to never.") + unfinishedSchedules.countDown() + null + } else if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) { + log.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.") + ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt) } else { - value + // TODO: FlowLogicRefFactory needs to sort out the class loader etc + val flowLogic = flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef) + log.trace { "Scheduler starting FlowLogic $flowLogic" } + scheduledFlow = flowLogic + null } + } else { + value } - // and schedule the next one - recomputeEarliest() - scheduler.rescheduleWakeUp() } - return scheduledLogic + // and schedule the next one + recomputeEarliest() + rescheduleWakeUp() + } + return scheduledFlow + } + + private fun getScheduledActivity(scheduledState: ScheduledStateRef): ScheduledActivity? { + val txState = services.loadState(scheduledState.ref) + val state = txState.data as SchedulableState + return try { + // This can throw as running contract code. + state.nextScheduledActivity(scheduledState.ref, flowLogicRefFactory) + } catch (e: Exception) { + log.error("Attempt to run scheduled state $scheduledState resulted in error.", e) + null } } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 94729a66b5..7f98089b2d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -426,7 +426,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } val Class>.flowVersion: Int get() { - val flowVersion = getDeclaredAnnotation(FlowVersion::class.java) ?: return 1 - require(flowVersion.value > 0) { "Flow versions have to be greater or equal to 1" } - return flowVersion.value + val annotation = requireNotNull(getAnnotation(InitiatingFlow::class.java)) { + "$name as the initiating flow must be annotated with ${InitiatingFlow::class.java.name}" + } + require(annotation.version > 0) { "Flow versions have to be greater or equal to 1" } + return annotation.version } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index e5c342b480..56345260f5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -368,10 +368,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, is ServiceFlowInfo.Core -> serviceFlowInfo.factory(sender, receivedMessage.platformVersion) } - if (flow.javaClass.isAnnotationPresent(FlowVersion::class.java)) { - logger.warn("${FlowVersion::class.java.name} is not applicable for service flows: ${flow.javaClass.name}") - } - val fiber = createFiber(flow, FlowInitiator.Peer(sender)) val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId)) if (sessionInit.firstPayload != null) { diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 1b3645ad20..9c8d5e0a0f 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -1,5 +1,6 @@ package net.corda.node.messaging +import co.paralleluniverse.fibers.Suspendable import net.corda.contracts.CommercialPaper import net.corda.contracts.asset.* import net.corda.contracts.testing.fillWithSomeTestCash @@ -8,11 +9,14 @@ import net.corda.core.crypto.AnonymousParty import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.days +import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine +import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.StateMachineRunId import net.corda.core.getOrThrow import net.corda.core.map import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.node.NodeInfo import net.corda.core.node.services.* import net.corda.core.rootCause import net.corda.core.transactions.SignedTransaction @@ -266,7 +270,10 @@ class TwoPartyTradeFlowTests { // Creates a mock node with an overridden storage service that uses a RecordingMap, that lets us test the order // of gets and puts. - private fun makeNodeWithTracking(networkMapAddr: SingleMessageRecipient?, name: X500Name, overrideServices: Map? = null): MockNetwork.MockNode { + private fun makeNodeWithTracking( + networkMapAddr: SingleMessageRecipient?, + name: X500Name, + overrideServices: Map? = null): MockNetwork.MockNode { // Create a node in the mock network ... return net.createNode(networkMapAddr, -1, object : MockNetwork.Factory { override fun create(config: NodeConfiguration, @@ -391,7 +398,6 @@ class TwoPartyTradeFlowTests { @Test fun `track works`() { - val notaryNode = net.createNotaryNode(null, DUMMY_NOTARY.name) val aliceNode = makeNodeWithTracking(notaryNode.info.address, ALICE.name) val bobNode = makeNodeWithTracking(notaryNode.info.address, BOB.name) @@ -445,13 +451,13 @@ class TwoPartyTradeFlowTests { ) aliceTxStream.expectEvents { aliceTxExpectations } val aliceMappingExpectations = sequence( - expect { mapping: StateMachineTransactionMapping -> - require(mapping.stateMachineRunId == aliceSmId) - require(mapping.transactionId == bobsFakeCash[0].id) + expect { (stateMachineRunId, transactionId) -> + require(stateMachineRunId == aliceSmId) + require(transactionId == bobsFakeCash[0].id) }, - expect { mapping: StateMachineTransactionMapping -> - require(mapping.stateMachineRunId == aliceSmId) - require(mapping.transactionId == bobsFakeCash[2].id) + expect { (stateMachineRunId, transactionId) -> + require(stateMachineRunId == aliceSmId) + require(transactionId == bobsFakeCash[2].id) }, expect { mapping: StateMachineTransactionMapping -> require(mapping.stateMachineRunId == aliceSmId) @@ -487,10 +493,21 @@ class TwoPartyTradeFlowTests { sellerNode: MockNetwork.MockNode, buyerNode: MockNetwork.MockNode, assetToSell: StateAndRef): RunResult { - val buyerFuture = buyerNode.initiateSingleShotFlow(Seller::class) { otherParty -> + @InitiatingFlow + class SellerRunnerFlow(val buyer: Party, val notary: NodeInfo) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction = subFlow(Seller( + buyer, + notary, + assetToSell, + 1000.DOLLARS, + serviceHub.legalIdentityKey)) + } + + val buyerFuture = buyerNode.initiateSingleShotFlow(SellerRunnerFlow::class) { otherParty -> Buyer(otherParty, notaryNode.info.notaryIdentity, 1000.DOLLARS, CommercialPaper.State::class.java) }.map { it.stateMachine } - val seller = Seller(buyerNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, sellerNode.services.legalIdentityKey) + val seller = SellerRunnerFlow(buyerNode.info.legalIdentity, notaryNode.info) val sellerResultFuture = sellerNode.services.startFlow(seller).resultFuture return RunResult(buyerFuture, sellerResultFuture, seller.stateMachine.id) } diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt index aa1cf3bc54..341a68662a 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt @@ -73,7 +73,7 @@ open class MockServiceHubInternal( return smm.executor.fetchFrom { smm.add(logic, flowInitiator) } } - override fun registerServiceFlow(clientFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit + override fun registerServiceFlow(initiatingFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit override fun getServiceFlowFactory(clientFlowClass: Class>): ServiceFlowInfo? = null } diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index 6870ccdefa..eda373e2db 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -87,7 +87,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { override val vaultService: VaultService = NodeVaultService(this, dataSourceProps) override val testReference = this@NodeSchedulerServiceTest } - scheduler = NodeSchedulerService(services, factory, schedulerGatedExecutor) + scheduler = NodeSchedulerService(services, database, factory, schedulerGatedExecutor) smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database) mockSMM.changes.subscribe { change -> diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt index f6b4463431..8a6f4fd6b8 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt @@ -8,6 +8,7 @@ import net.corda.core.contracts.TransactionType import net.corda.core.contracts.USD import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.services.unconsumedStates import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY @@ -95,6 +96,7 @@ class DataVendingServiceTests { } + @InitiatingFlow private class NotifyTxFlow(val otherParty: Party, val stx: SignedTransaction) : FlowLogic() { @Suspendable override fun call() = send(otherParty, NotifyTxRequest(stx)) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt similarity index 93% rename from node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt rename to node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index f0b9632fa8..a2b50307cc 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -8,10 +8,11 @@ import net.corda.core.* import net.corda.core.contracts.DOLLARS import net.corda.core.contracts.DummyState import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.generateKeyPair import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowVersion +import net.corda.core.flows.InitiatingFlow import net.corda.core.messaging.MessageRecipients import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.ServiceInfo @@ -56,7 +57,7 @@ import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertTrue -class StateMachineManagerTests { +class FlowFrameworkTests { companion object { init { LogHelper.setLevel("+net.corda.flow") @@ -468,13 +469,6 @@ class StateMachineManagerTests { .withMessage("Chain") } - private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic() { - @Suspendable - override fun call() { - sendAndReceive(otherParty, payload) - } - } - @Test fun `FlowException thrown and there is a 3rd unrelated party flow`() { val node3 = net.createNode(node1.info.address) @@ -521,6 +515,7 @@ class StateMachineManagerTests { @Test fun `retry subFlow due to receiving FlowException`() { + @InitiatingFlow class AskForExceptionFlow(val otherParty: Party, val throwException: Boolean) : FlowLogic() { @Suspendable override fun call(): String = sendAndReceive(otherParty, throwException).unwrap { it } @@ -577,7 +572,7 @@ class StateMachineManagerTests { @Test fun `lazy db iterator left on stack during checkpointing`() { - val result = node2.services.startFlow(VaultAccessFlow(node1.info.legalIdentity)).resultFuture + val result = node2.services.startFlow(VaultAccessFlow()).resultFuture net.runNetwork() assertThatThrownBy { result.getOrThrow() }.hasMessageContaining("Vault").hasMessageContaining("private method") } @@ -609,10 +604,20 @@ class StateMachineManagerTests { }.withMessageContaining("Version") } - @FlowVersion(2) - private class UpgradedFlow(val otherParty: Party) : FlowLogic() { - @Suspendable - override fun call(): Any = receive(otherParty).unwrap { it } + @Test + fun `single inlined sub-flow`() { + node2.registerServiceFlow(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) } + val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture + net.runNetwork() + assertThat(result.getOrThrow()).isEqualTo("HelloHello") + } + + @Test + fun `double inlined sub-flow`() { + node2.registerServiceFlow(SendAndReceiveFlow::class) { DoubleInlinedSubFlow(it) } + val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture + net.runNetwork() + assertThat(result.getOrThrow()).isEqualTo("HelloHello") } @@ -664,15 +669,13 @@ class StateMachineManagerTests { } } - private fun sanitise(message: SessionMessage): SessionMessage { - return when (message) { - is SessionData -> message.copy(recipientSessionId = 0) - is SessionInit -> message.copy(initiatorSessionId = 0) - is SessionConfirm -> message.copy(initiatorSessionId = 0, initiatedSessionId = 0) - is NormalSessionEnd -> message.copy(recipientSessionId = 0) - is ErrorSessionEnd -> message.copy(recipientSessionId = 0) - else -> message - } + private fun sanitise(message: SessionMessage) = when (message) { + is SessionData -> message.copy(recipientSessionId = 0) + is SessionInit -> message.copy(initiatorSessionId = 0) + is SessionConfirm -> message.copy(initiatorSessionId = 0, initiatedSessionId = 0) + is NormalSessionEnd -> message.copy(recipientSessionId = 0) + is ErrorSessionEnd -> message.copy(recipientSessionId = 0) + else -> message } private infix fun MockNode.sent(message: SessionMessage): Pair = Pair(id, message) @@ -700,6 +703,7 @@ class StateMachineManagerTests { } + @InitiatingFlow private open class SendFlow(val payload: String, vararg val otherParties: Party) : FlowLogic() { init { require(otherParties.isNotEmpty()) @@ -712,6 +716,7 @@ class StateMachineManagerTests { private interface CustomInterface private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) + @InitiatingFlow private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic() { object START_STEP : ProgressTracker.Step("Starting") object RECEIVED_STEP : ProgressTracker.Step("Received") @@ -740,6 +745,18 @@ class StateMachineManagerTests { } } + @InitiatingFlow + private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic() { + @Suspendable + override fun call(): Any = sendAndReceive(otherParty, payload).unwrap { it } + } + + private class InlinedSendFlow(val payload: String, val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call() = send(otherParty, payload) + } + + @InitiatingFlow private class PingPongFlow(val otherParty: Party, val payload: Long) : FlowLogic() { @Transient var receivedPayload: Long? = null @Transient var receivedPayload2: Long? = null @@ -770,6 +787,7 @@ class StateMachineManagerTests { } private object WaitingFlows { + @InitiatingFlow class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic() { @Suspendable override fun call(): SignedTransaction { @@ -788,11 +806,32 @@ class StateMachineManagerTests { } } - private class VaultAccessFlow(val otherParty: Party) : FlowLogic() { + private class VaultAccessFlow : FlowLogic() { @Suspendable override fun call() { serviceHub.vaultService.unconsumedStates().filter { true } - send(otherParty, "Hello") + waitForLedgerCommit(SecureHash.zeroHash) + } + } + + @InitiatingFlow(version = 2) + private class UpgradedFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): Any = receive(otherParty).unwrap { it } + } + + private class SingleInlinedSubFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call() { + val payload = receive(otherParty).unwrap { it } + subFlow(InlinedSendFlow(payload + payload, otherParty)) + } + } + + private class DoubleInlinedSubFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call() { + subFlow(SingleInlinedSubFlow(otherParty)) } } diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt index b68a23f39e..d258aa951f 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/AutoOfferFlow.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.DealState import net.corda.core.crypto.AbstractParty import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.PluginServiceHub import net.corda.core.serialization.SingletonSerializeAsToken @@ -31,10 +32,11 @@ object AutoOfferFlow { class Service(services: PluginServiceHub) : SingletonSerializeAsToken() { init { - services.registerServiceFlow(Instigator::class.java) { Acceptor(it) } + services.registerServiceFlow(Requester::class.java) { Acceptor(it) } } } + @InitiatingFlow class Requester(val dealToBeOffered: DealState) : FlowLogic() { companion object { @@ -74,14 +76,7 @@ object AutoOfferFlow { } private fun notUs(parties: List): List { - val notUsParties: MutableList = arrayListOf() - for (party in parties) { - if (serviceHub.myInfo.legalIdentity != party) { - notUsParties.add(party) - } - } - return notUsParties + return parties.filter { serviceHub.myInfo.legalIdentity != it } } - } } diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/FixingFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/FixingFlow.kt index 918c45eb1e..fb5270e682 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/FixingFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/FixingFlow.kt @@ -7,6 +7,7 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.keys import net.corda.core.crypto.toBase58String import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.NodeInfo import net.corda.core.node.PluginServiceHub import net.corda.core.node.services.ServiceType @@ -24,7 +25,7 @@ object FixingFlow { class Service(services: PluginServiceHub) { init { - services.registerServiceFlow(Floater::class.java) { Fixer(it) } + services.registerServiceFlow(FixingRoleDecider::class.java) { Fixer(it) } } } @@ -114,12 +115,13 @@ object FixingFlow { override val myKeyPair: KeyPair get() { val myPublicKey = serviceHub.myInfo.legalIdentity.owningKey - val myKeys = dealToFix.state.data.parties.filter { it.owningKey == myPublicKey }.single().owningKey.keys + val myKeys = dealToFix.state.data.parties.single { it.owningKey == myPublicKey }.owningKey.keys return serviceHub.keyManagementService.toKeyPair(myKeys) } - override val notaryNode: NodeInfo get() = - serviceHub.networkMapCache.notaryNodes.filter { it.notaryIdentity == dealToFix.state.notary }.single() + override val notaryNode: NodeInfo get() { + return serviceHub.networkMapCache.notaryNodes.single { it.notaryIdentity == dealToFix.state.notary } + } } @@ -131,11 +133,9 @@ object FixingFlow { * This flow looks at the deal and decides whether to be the Fixer or Floater role in agreeing a fixing. * * It is kicked off as an activity on both participant nodes by the scheduler when it's time for a fixing. If the - * Fixer role is chosen, then that will be initiated by the [FixingSession] message sent from the other party and - * handled by the [FixingSessionInitiationHandler]. - * - * TODO: Replace [FixingSession] and [FixingSessionInitiationHandler] with generic session initiation logic once it exists. + * Fixer role is chosen, then that will be initiated by the [FixingSession] message sent from the other party. */ + @InitiatingFlow class FixingRoleDecider(val ref: StateRef, override val progressTracker: ProgressTracker) : FlowLogic() { @Suppress("unused") // Used via reflection. constructor(ref: StateRef) : this(ref, tracker()) diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/RatesFixFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/RatesFixFlow.kt index 50eff9825f..df7b3bb6bb 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/RatesFixFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/RatesFixFlow.kt @@ -6,6 +6,7 @@ import net.corda.core.contracts.FixOf import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.TransactionBuilder @@ -93,6 +94,7 @@ open class RatesFixFlow(protected val tx: TransactionBuilder, } // DOCSTART 1 + @InitiatingFlow class FixQueryFlow(val fixOf: FixOf, val oracle: Party) : FlowLogic() { @Suspendable override fun call(): Fix { @@ -109,6 +111,7 @@ open class RatesFixFlow(protected val tx: TransactionBuilder, } } + @InitiatingFlow class FixSignFlow(val tx: TransactionBuilder, val oracle: Party, val partialMerkleTx: FilteredTransaction) : FlowLogic() { @Suspendable diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt index 5238251311..e5aeae94db 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/flows/UpdateBusinessDayFlow.kt @@ -3,6 +3,7 @@ package net.corda.irs.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.NodeInfo import net.corda.core.node.PluginServiceHub @@ -42,6 +43,7 @@ object UpdateBusinessDayFlow { } + @InitiatingFlow class Broadcast(val date: LocalDate, override val progressTracker: ProgressTracker) : FlowLogic() { constructor(date: LocalDate) : this(date, tracker()) diff --git a/samples/irs-demo/src/main/kotlin/net/corda/simulation/IRSSimulation.kt b/samples/irs-demo/src/main/kotlin/net/corda/simulation/IRSSimulation.kt index 86096d1bd4..edc4cb51d0 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/simulation/IRSSimulation.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/simulation/IRSSimulation.kt @@ -1,5 +1,6 @@ package net.corda.simulation +import co.paralleluniverse.fibers.Suspendable import com.fasterxml.jackson.module.kotlin.readValue import com.google.common.util.concurrent.FutureCallback import com.google.common.util.concurrent.Futures @@ -9,8 +10,11 @@ import net.corda.core.RunOnCallerThread import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.UniqueIdentifier import net.corda.core.crypto.AnonymousParty +import net.corda.core.crypto.Party import net.corda.core.flatMap +import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine +import net.corda.core.flows.InitiatingFlow import net.corda.core.map import net.corda.core.node.services.linearHeadsOfType import net.corda.core.success @@ -24,6 +28,7 @@ import net.corda.node.utilities.transaction import net.corda.testing.initiateSingleShotFlow import net.corda.testing.node.InMemoryMessagingNetwork import net.corda.testing.node.MockIdentityService +import java.security.KeyPair import java.time.LocalDate import java.util.* @@ -120,16 +125,27 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten irs.fixedLeg.fixedRatePayer = node1.info.legalIdentity.toAnonymous() irs.floatingLeg.floatingRatePayer = node2.info.legalIdentity.toAnonymous() + @InitiatingFlow + class StartDealFlow(val otherParty: Party, + val payload: AutoOffer, + val myKeyPair: KeyPair) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction = subFlow(Instigator(otherParty, payload, myKeyPair)) + } + @Suppress("UNCHECKED_CAST") - val acceptorTx = node2.initiateSingleShotFlow(Instigator::class) { Acceptor(it) }.flatMap { + val acceptorTx = node2.initiateSingleShotFlow(StartDealFlow::class) { Acceptor(it) }.flatMap { (it.stateMachine as FlowStateMachine).resultFuture } showProgressFor(listOf(node1, node2)) showConsensusFor(listOf(node1, node2, regulators[0])) - val instigator = Instigator(node2.info.legalIdentity, AutoOffer(notary.info.notaryIdentity, irs), node1.services.legalIdentityKey) - val instigatorTx: ListenableFuture = node1.services.startFlow(instigator).resultFuture + val instigator = StartDealFlow( + node2.info.legalIdentity, + AutoOffer(notary.info.notaryIdentity, irs), + node1.services.legalIdentityKey) + val instigatorTx = node1.services.startFlow(instigator).resultFuture return Futures.allAsList(instigatorTx, acceptorTx).flatMap { instigatorTx } } diff --git a/samples/irs-demo/src/main/kotlin/net/corda/simulation/TradeSimulation.kt b/samples/irs-demo/src/main/kotlin/net/corda/simulation/TradeSimulation.kt index 3caec27ccc..477ce1eb94 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/simulation/TradeSimulation.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/simulation/TradeSimulation.kt @@ -36,8 +36,11 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo buyer.services.fillWithSomeTestCash(1500.DOLLARS, notary.info.notaryIdentity) val issuance = run { - val tx = CommercialPaper().generateIssue(seller.info.legalIdentity.ref(1, 2, 3), 1100.DOLLARS `issued by` DUMMY_CASH_ISSUER, - Instant.now() + 10.days, notary.info.notaryIdentity) + val tx = CommercialPaper().generateIssue( + seller.info.legalIdentity.ref(1, 2, 3), + 1100.DOLLARS `issued by` DUMMY_CASH_ISSUER, + Instant.now() + 10.days, + notary.info.notaryIdentity) tx.setTime(Instant.now(), 30.seconds) val notaryKey = notary.services.notaryIdentityKey val sellerKey = seller.services.legalIdentityKey diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/IRSTradeFlow.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/IRSTradeFlow.kt index b8cf1a3ec6..807912cede 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/IRSTradeFlow.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/IRSTradeFlow.kt @@ -3,6 +3,7 @@ package net.corda.vega.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.PluginServiceHub import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction @@ -13,11 +14,17 @@ import net.corda.vega.contracts.OGTrade import net.corda.vega.contracts.SwapData object IRSTradeFlow { + class Service(services: PluginServiceHub) { + init { + services.registerServiceFlow(Requester::class.java, ::Receiver) + } + } + @CordaSerializable data class OfferMessage(val notary: Party, val dealBeingOffered: IRSState) + @InitiatingFlow class Requester(val swap: SwapData, val otherParty: Party) : FlowLogic() { - @Suspendable override fun call(): SignedTransaction { require(serviceHub.networkMapCache.notaryNodes.isNotEmpty()) { "No notary nodes registered" } @@ -38,18 +45,12 @@ object IRSTradeFlow { return subFlow(TwoPartyDealFlow.Instigator( otherParty, TwoPartyDealFlow.AutoOffer(notary, offer), - serviceHub.legalIdentityKey), shareParentSessions = true) + serviceHub.legalIdentityKey)) } - } - class Service(services: PluginServiceHub) { - init { - services.registerServiceFlow(Requester::class.java, ::Receiver) - } } class Receiver(private val replyToParty: Party) : FlowLogic() { - @Suspendable override fun call() { logger.info("IRSTradeFlow receiver started") @@ -59,7 +60,7 @@ object IRSTradeFlow { // Automatically agree - in reality we'd vet the offer message require(serviceHub.networkMapCache.notaryNodes.map { it.notaryIdentity }.contains(offer.notary)) send(replyToParty, true) - subFlow(TwoPartyDealFlow.Acceptor(replyToParty), shareParentSessions = true) + subFlow(TwoPartyDealFlow.Acceptor(replyToParty)) } } } diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt index f226b0bdef..611bcbd21a 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt @@ -3,9 +3,7 @@ package net.corda.vega.flows import co.paralleluniverse.fibers.Suspendable import com.opengamma.strata.basics.ReferenceData import com.opengamma.strata.basics.currency.Currency -import com.opengamma.strata.basics.currency.MultiCurrencyAmount import com.opengamma.strata.data.MarketDataFxRateProvider -import com.opengamma.strata.market.param.CurrencyParameterSensitivities import com.opengamma.strata.pricer.curve.CalibrationMeasures import com.opengamma.strata.pricer.curve.CurveCalibrator import com.opengamma.strata.pricer.rate.ImmutableRatesProvider @@ -15,6 +13,7 @@ import net.corda.core.contracts.StateRef import net.corda.core.crypto.AnonymousParty import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.PluginServiceHub import net.corda.core.node.services.dealsWith import net.corda.core.serialization.CordaSerializable @@ -51,6 +50,7 @@ object SimmFlow { * Initiates with the other party by sending a portfolio to agree on and then comes to consensus over initial * margin using SIMM. If there is an existing state it will update and revalue the portfolio agreement. */ + @InitiatingFlow class Requester(val otherParty: Party, val valuationDate: LocalDate, val existing: StateAndRef?) @@ -62,7 +62,7 @@ object SimmFlow { @Suspendable override fun call(): RevisionedState { - logger.debug("Calling from: ${serviceHub.myInfo.legalIdentity}. Sending to: ${otherParty}") + logger.debug("Calling from: ${serviceHub.myInfo.legalIdentity}. Sending to: $otherParty") require(serviceHub.networkMapCache.notaryNodes.isNotEmpty()) { "No notary nodes registered" } notary = serviceHub.networkMapCache.notaryNodes.first().notaryIdentity myIdentity = serviceHub.myInfo.legalIdentity @@ -88,7 +88,7 @@ object SimmFlow { send(otherParty, OfferMessage(notary, portfolioState, existing?.ref, valuationDate)) logger.info("Awaiting two party deal acceptor") - subFlow(TwoPartyDealFlow.Acceptor(otherParty), shareParentSessions = true) + subFlow(TwoPartyDealFlow.Acceptor(otherParty)) } @Suspendable @@ -97,7 +97,7 @@ object SimmFlow { sendAndReceive(otherParty, OfferMessage(notary, stateAndRef.state.data, existing?.ref, valuationDate)) logger.info("Updating portfolio") val update = PortfolioState.Update(portfolio = portfolio.refs) - subFlow(StateRevisionFlow.Requester(stateAndRef, update), shareParentSessions = true) + subFlow(StateRevisionFlow.Requester(stateAndRef, update)) } @Suspendable @@ -109,7 +109,7 @@ object SimmFlow { require(valuer != null) { "Valuer party must be known to this node" } val valuation = agreeValuation(portfolio, valuationDate, valuer!!) val update = PortfolioState.Update(valuation = valuation) - return subFlow(StateRevisionFlow.Requester(stateRef, update), shareParentSessions = true).state.data + return subFlow(StateRevisionFlow.Requester(stateRef, update)).state.data } @Suspendable @@ -143,10 +143,10 @@ object SimmFlow { val imBatch = analyticsEngine.calculateMarginBatch(sensBatch, combinedRatesProvider, fxRateProvider, cordaMargin) val cordaIMMap = imBatch.map { it.key.info.id.get().value to it.value.toCordaCompatible() }.toMap() - require(agree(cordaMarketData)) - require(agree(sensitivities.first.toCordaCompatible())) - require(agree(sensitivities.second.toCordaCompatible())) - require(agree(cordaMargin)) + require(agree(cordaMarketData)) + require(agree(sensitivities.first.toCordaCompatible())) + require(agree(sensitivities.second.toCordaCompatible())) + require(agree(cordaMargin)) return PortfolioValuation( portfolio.trades.size, @@ -214,7 +214,7 @@ object SimmFlow { @Suspendable private fun agree(data: Any): Boolean { send(replyToParty, data) - return receive(replyToParty).unwrap { it == true } + return receive(replyToParty).unwrap { it } } @Suspendable @@ -306,7 +306,7 @@ object SimmFlow { TwoPartyDealFlow.AutoOffer(offer.notary, offer.dealBeingOffered), serviceHub.legalIdentityKey) logger.info("Starting two party deal initiator with: ${replyToParty.name}") - return subFlow(seller, shareParentSessions = true) + return subFlow(seller) } @Suspendable @@ -318,7 +318,7 @@ object SimmFlow { super.verifyProposal(proposal) if (proposal.modification.portfolio != portfolio.refs) throw StateReplacementException() } - }, shareParentSessions = true) + }) } @Suspendable @@ -331,7 +331,7 @@ object SimmFlow { super.verifyProposal(proposal) if (proposal.modification.valuation != valuation) throw StateReplacementException() } - }, shareParentSessions = true) + }) } } } diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt index d7824dfec5..23af219e8e 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/BuyerFlow.kt @@ -49,7 +49,7 @@ class BuyerFlow(val otherParty: Party, CommercialPaper.State::class.java) // This invokes the trading flow and out pops our finished transaction. - val tradeTX: SignedTransaction = subFlow(buyer, shareParentSessions = true) + val tradeTX: SignedTransaction = subFlow(buyer) // TODO: This should be moved into the flow itself. serviceHub.recordTransactions(listOf(tradeTX)) diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt index bf4bf6ed0e..7a7a850460 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt @@ -9,10 +9,10 @@ import net.corda.core.crypto.SecureHash import net.corda.core.crypto.generateKeyPair import net.corda.core.days import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow import net.corda.core.node.NodeInfo import net.corda.core.seconds import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.DUMMY_BANK_C import net.corda.core.utilities.ProgressTracker import net.corda.flows.NotaryFlow import net.corda.flows.TwoPartyTradeFlow @@ -21,6 +21,7 @@ import java.security.PublicKey import java.time.Instant import java.util.* +@InitiatingFlow class SellerFlow(val otherParty: Party, val amount: Amount, override val progressTracker: ProgressTracker) : FlowLogic() { @@ -60,7 +61,7 @@ class SellerFlow(val otherParty: Party, amount, cpOwnerKey, progressTracker.getChildProgressTracker(TRADING)!!) - return subFlow(seller, shareParentSessions = true) + return subFlow(seller) } @Suspendable