FlowSession docs (#1693)

* FlowSession docs (#1660)

* FlowSession docs

* PR comments

* Milder example flow name
This commit is contained in:
Andras Slemmer 2017-09-28 16:29:24 +01:00 committed by josecoll
parent 1723838a59
commit 73e3690596
7 changed files with 327 additions and 36 deletions

View File

@ -55,6 +55,10 @@ abstract class FlowLogic<out T> {
*/
val serviceHub: ServiceHub get() = stateMachine.serviceHub
/**
* Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
*/
@Suspendable
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, flowUsedForSessions)

View File

@ -5,7 +5,18 @@ import net.corda.core.identity.Party
import net.corda.core.utilities.UntrustworthyData
/**
* To port existing flows:
*
* A [FlowSession] is a handle on a communication sequence between two paired flows, possibly running on separate nodes.
* It is used to send and receive messages between the flows as well as to query information about the counter-flow.
*
* There are two ways of obtaining such a session:
*
* 1. Calling [FlowLogic.initiateFlow]. This will create a [FlowSession] object on which the first send/receive
* operation will attempt to kick off a corresponding [InitiatedBy] flow on the counterparty's node.
* 2. As constructor parameter to [InitiatedBy] flows. This session is the one corresponding to the initiating flow and
* may be used for replies.
*
* To port flows using the old Party-based API:
*
* Look for [Deprecated] usages of send/receive/sendAndReceive/getFlowInfo.
*
@ -31,6 +42,10 @@ import net.corda.core.utilities.UntrustworthyData
* otherSideSession.send(something)
*/
abstract class FlowSession {
/**
* The [Party] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow]
* [counterparty] is the same Party as the one passed to that function.
*/
abstract val counterparty: Party
/**

View File

@ -113,9 +113,8 @@ subclass's constructor can take any number of arguments of any type. The generic
FlowLogic annotations
---------------------
Any flow that you wish to start either directly via RPC or as a subflow must be annotated with the
``@InitiatingFlow`` annotation. Additionally, if you wish to start the flow via RPC, you must annotate it with the
``@StartableByRPC`` annotation:
Any flow from which you want to initiate other flows must be annotated with the ``@InitiatingFlow`` annotation.
Additionally, if you wish to start the flow via RPC, you must annotate it with the ``@StartableByRPC`` annotation:
.. container:: codeset
@ -139,7 +138,7 @@ Meanwhile, any flow that responds to a message from another flow must be annotat
.. sourcecode:: kotlin
@InitiatedBy(Initiator::class)
class Responder(val otherParty: Party) : FlowLogic<Unit>() { }
class Responder(val otherSideSession: FlowSession) : FlowLogic<Unit>() { }
.. sourcecode:: java
@ -270,18 +269,50 @@ Finally, we can use the map to identify nodes providing a specific service (e.g.
Communication between parties
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``FlowLogic`` instances communicate using three functions:
* ``send(otherParty: Party, payload: Any)``
* Sends the ``payload`` object to the ``otherParty``
* ``receive(receiveType: Class<R>, otherParty: Party)``
* Receives an object of type ``receiveType`` from the ``otherParty``
* ``sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any)``
* Sends the ``payload`` object to the ``otherParty``, and receives an object of type ``receiveType`` back
In order to create a communication session between your initiator flow and the receiver flow you must call
``initiateFlow(party: Party): FlowSession``
``FlowSession`` instances in turn provide three functions:
* ``send(payload: Any)``
* Sends the ``payload`` object
* ``receive(receiveType: Class<R>): R``
* Receives an object of type ``receiveType``
* ``sendAndReceive(receiveType: Class<R>, payload: Any): R``
* Sends the ``payload`` object and receives an object of type ``receiveType`` back
InitiateFlow
~~~~~~~~~~~~
``initiateFlow`` creates a communication session with the passed in ``Party``.
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt
:language: kotlin
:start-after: DOCSTART initiateFlow
:end-before: DOCEND initiateFlow
:dedent: 12
.. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java
:language: java
:start-after: DOCSTART initiateFlow
:end-before: DOCEND initiateFlow
:dedent: 12
Note that at the time of call to this function no actual communication is done, this is deferred to the first
send/receive, at which point the counterparty will either:
1. Ignore the message if they are not registered to respond to messages from this flow.
2. Start the flow they have registered to respond to this flow.
Send
~~~~
We can send arbitrary data to a counterparty:
Once we have a ``FlowSession`` object we can send arbitrary data to a counterparty:
.. container:: codeset
@ -297,12 +328,7 @@ We can send arbitrary data to a counterparty:
:end-before: DOCEND 4
:dedent: 12
If this is the first ``send``, the counterparty will either:
1. Ignore the message if they are not registered to respond to messages from this flow.
2. Start the flow they have registered to respond to this flow, and run the flow until the first call to ``receive``,
at which point they process the message. In other words, we are assuming that the counterparty is registered to
respond to this flow, and has a corresponding ``receive`` call.
The flow on the other side must eventually reach a corresponding ``receive`` call to get this message.
Receive
~~~~~~~
@ -351,6 +377,12 @@ as it likes, and each party can invoke a different response flow:
:end-before: DOCEND 6
:dedent: 12
.. warning:: If you initiate several flows from the same ``@InitiatingFlow`` flow then on the receiving side you must be
prepared to be initiated by any of the corresponding ``initiateFlow()`` calls! A good way of handling this ambiguity
is to send as a first message a "role" message to the initiated flow, indicating which part of the initiating flow
the rest of the counter-flow should conform to. For example send an enum, and on the other side start with a switch
statement.
SendAndReceive
~~~~~~~~~~~~~~
We can also use a single call to send data to a counterparty and wait to receive data of a specific type back. The
@ -395,19 +427,129 @@ Our side of the flow must mirror these calls. We could do this as follows:
:end-before: DOCEND 8
:dedent: 12
Why sessions?
^^^^^^^^^^^^^
Before ``FlowSession`` s were introduced the send/receive API looked a bit different. They were functions on
``FlowLogic`` and took the address ``Party`` as argument. The platform internally maintained a mapping from ``Party`` to
session, hiding sessions from the user completely.
Although this is a convenient API it introduces subtle issues where a message that was originally meant for a specific
session may end up in another.
Consider the following contrived example using the old ``Party`` based API:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt
:language: kotlin
:start-after: DOCSTART LaunchSpaceshipFlow
:end-before: DOCEND LaunchSpaceshipFlow
The intention of the flows is very clear: LaunchSpaceshipFlow asks the president whether a spaceship should be launched.
It is expecting a boolean reply. The president in return first tells the secretary that they need coffee, which is also
communicated with a boolean. Afterwards the president replies to the launcher that they don't want to launch.
However the above can go horribly wrong when the ``launcher`` happens to be the same party ``getSecretary`` returns. In
this case the boolean meant for the secretary will be received by the launcher!
This indicates that ``Party`` is not a good identifier for the communication sequence, and indeed the ``Party`` based
API may introduce ways for an attacker to fish for information and even trigger unintended control flow like in the
above case.
Hence we introduced ``FlowSession``, which identifies the communication sequence. With ``FlowSession`` s the above set
of flows would look like this:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt
:language: kotlin
:start-after: DOCSTART LaunchSpaceshipFlowCorrect
:end-before: DOCEND LaunchSpaceshipFlowCorrect
Note how the president is now explicit about which session it wants to send to.
Porting from the old Party-based API
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In the old API the first ``send`` or ``receive`` to a ``Party`` was the one kicking off the counter-flow. This is now
explicit in the ``initiateFlow`` function call. To port existing code:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt
:language: kotlin
:start-after: DOCSTART FlowSession porting
:end-before: DOCEND FlowSession porting
:dedent: 12
.. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java
:language: java
:start-after: DOCSTART FlowSession porting
:end-before: DOCEND FlowSession porting
:dedent: 12
Subflows
--------
Subflows are pieces of reusable flows that may be run by calling ``FlowLogic.subFlow``. There are two broad categories
of subflows, inlined and initiating ones. The main difference lies in the counter-flow's starting method, initiating
ones initiate counter-flows automatically, while inlined ones expect some parent counter-flow to run the inlined
counter-part.
Inlined subflows
^^^^^^^^^^^^^^^^
Inlined subflows inherit their calling flow's type when initiating a new session with a counterparty. For example, say
we have flow A calling an inlined subflow B, which in turn initiates a session with a party. The FlowLogic type used to
determine which counter-flow should be kicked off will be A, not B. Note that this means that the other side of this
inlined flow must therefore be implemented explicitly in the kicked off flow as well. This may be done by calling a
matching inlined counter-flow, or by implementing the other side explicitly in the kicked off parent flow.
An example of such a flow is ``CollectSignaturesFlow``. It has a counter-flow ``SignTransactionFlow`` that isn't
annotated with ``InitiatedBy``. This is because both of these flows are inlined; the kick-off relationship will be
defined by the parent flows calling ``CollectSignaturesFlow`` and ``SignTransactionFlow``.
In the code inlined subflows appear as regular ``FlowLogic`` instances, `without` either of the ``@InitiatingFlow`` or
``@InitiatedBy`` annotation.
.. note:: Inlined flows aren't versioned; they inherit their parent flow's version.
Initiating subflows
^^^^^^^^^^^^^^^^^^^
Initiating subflows are ones annotated with the ``@InitiatingFlow`` annotation. When such a flow initiates a session its
type will be used to determine which ``@InitiatedBy`` flow to kick off on the counterparty.
An example is the ``@InitiatingFlow InitiatorFlow``/``@InitiatedBy ResponderFlow`` flow pair in the ``FlowCookbook``.
.. note:: Initiating flows are versioned separately from their parents.
Core initiating subflows
^^^^^^^^^^^^^^^^^^^^^^^^
Corda-provided initiating subflows are a little different to standard ones as they are versioned together with the
platform, and their initiated counter-flows are registered explicitly, so there is no need for the ``InitiatedBy``
annotation.
An example is the ``FinalityFlow``/``FinalityHandler`` flow pair.
Built-in subflows
^^^^^^^^^^^^^^^^^
Corda provides a number of built-in flows that should be used for handling common tasks. The most important are:
* ``CollectSignaturesFlow``, which should be used to collect a transaction's required signatures
* ``FinalityFlow``, which should be used to notarise and record a transaction
* ``SendTransactionFlow``, which should be used to send a signed transaction if it needed to be resolved on the other side.
* ``ReceiveTransactionFlow``, which should be used receive a signed transaction
* ``ContractUpgradeFlow``, which should be used to change a state's contract
* ``NotaryChangeFlow``, which should be used to change a state's notary
* ``CollectSignaturesFlow`` (inlined), which should be used to collect a transaction's required signatures
* ``FinalityFlow`` (initiating), which should be used to notarise and record a transaction as well as to broadcast it to
all relevant parties
* ``SendTransactionFlow`` (inlined), which should be used to send a signed transaction if it needed to be resolved on
the other side.
* ``ReceiveTransactionFlow`` (inlined), which should be used receive a signed transaction
* ``ContractUpgradeFlow`` (initiating), which should be used to change a state's contract
* ``NotaryChangeFlow`` (initiating), which should be used to change a state's notary
These flows are designed to be used as building blocks in your own flows. You invoke them by calling
``FlowLogic.subFlow`` from within your flow's ``call`` method. Let's look at three very common examples.
Let's look at three very common examples.
FinalityFlow
^^^^^^^^^^^^
@ -538,6 +680,20 @@ We can also send and receive a ``StateAndRef`` dependency chain and automaticall
:end-before: DOCEND 14
:dedent: 12
Why inlined subflows?
^^^^^^^^^^^^^^^^^^^^^
Inlined subflows provide a way to share commonly used flow code `while forcing users to create a parent flow`. Take for
example ``CollectSignaturesFlow``. Say we made it an initiating flow that automatically kicks off
``SignTransactionFlow`` that signs the transaction. This would mean malicious nodes can just send any old transaction to
us using ``CollectSignaturesFlow`` and we would automatically sign it!
By making this pair of flows inlined we provide control to the user over whether to sign the transaction or not by
forcing them to nest it in their own parent flows.
In general if you're writing a subflow the decision of whether you should make it initiating should depend on whether
the counter-flow needs broader context to achieve its goal.
FlowException
-------------
Suppose a node throws an exception while running a flow. Any counterparty flows waiting for a message from the node

View File

@ -570,6 +570,13 @@ public class FlowCookbookJava {
SignedTransaction notarisedTx2 = subFlow(new FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()));
// DOCEND 10
// DOCSTART FlowSession porting
send(regulator, new Object()); // Old API
// becomes
FlowSession session = initiateFlow(regulator);
session.send(new Object());
// DOCEND FlowSession porting
return null;
}
}

View File

@ -122,6 +122,10 @@ object FlowCookbook {
throw IllegalArgumentException("Couldn't find counterparty with key: $dummyPubKey in identity service")
// DOCEND 2
// DOCSTART initiateFlow
val counterpartySession = initiateFlow(counterparty)
// DOCEND initiateFlow
/**-----------------------------
* SENDING AND RECEIVING DATA *
-----------------------------**/
@ -138,7 +142,6 @@ object FlowCookbook {
// registered to respond to this flow, and has a corresponding
// ``receive`` call.
// DOCSTART 4
val counterpartySession = initiateFlow(counterparty)
counterpartySession.send(Any())
// DOCEND 4
@ -497,7 +500,7 @@ object FlowCookbook {
// other required signers using ``CollectSignaturesFlow``.
// The responder flow will need to call ``SignTransactionFlow``.
// DOCSTART 15
val fullySignedTx: SignedTransaction = subFlow(CollectSignaturesFlow(twiceSignedTx, emptySet(), SIGS_GATHERING.childProgressTracker()))
val fullySignedTx: SignedTransaction = subFlow(CollectSignaturesFlow(twiceSignedTx, setOf(counterpartySession, regulatorSession), SIGS_GATHERING.childProgressTracker()))
// DOCEND 15
/**-----------------------
@ -541,6 +544,13 @@ object FlowCookbook {
val additionalParties: Set<Party> = setOf(regulator)
val notarisedTx2: SignedTransaction = subFlow(FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()))
// DOCEND 10
// DOCSTART FlowSession porting
send(regulator, Any()) // Old API
// becomes
val session = initiateFlow(regulator)
session.send(Any())
// DOCEND FlowSession porting
}
}

View File

@ -0,0 +1,99 @@
package net.corda.docs
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.utilities.unwrap
// DOCSTART LaunchSpaceshipFlow
@InitiatingFlow
class LaunchSpaceshipFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val shouldLaunchSpaceship = receive<Boolean>(getPresident()).unwrap { it }
if (shouldLaunchSpaceship) {
launchSpaceship()
}
}
fun launchSpaceship() {
}
fun getPresident(): Party {
TODO()
}
}
@InitiatedBy(LaunchSpaceshipFlow::class)
@InitiatingFlow
class PresidentSpaceshipFlow(val launcher: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val needCoffee = true
send(getSecretary(), needCoffee)
val shouldLaunchSpaceship = false
send(launcher, shouldLaunchSpaceship)
}
fun getSecretary(): Party {
TODO()
}
}
@InitiatedBy(PresidentSpaceshipFlow::class)
class SecretaryFlow(val president: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
// ignore
}
}
// DOCEND LaunchSpaceshipFlow
// DOCSTART LaunchSpaceshipFlowCorrect
@InitiatingFlow
class LaunchSpaceshipFlowCorrect : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val presidentSession = initiateFlow(getPresident())
val shouldLaunchSpaceship = presidentSession.receive<Boolean>().unwrap { it }
if (shouldLaunchSpaceship) {
launchSpaceship()
}
}
fun launchSpaceship() {
}
fun getPresident(): Party {
TODO()
}
}
@InitiatedBy(LaunchSpaceshipFlowCorrect::class)
@InitiatingFlow
class PresidentSpaceshipFlowCorrect(val launcherSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val needCoffee = true
val secretarySession = initiateFlow(getSecretary())
secretarySession.send(needCoffee)
val shouldLaunchSpaceship = false
launcherSession.send(shouldLaunchSpaceship)
}
fun getSecretary(): Party {
TODO()
}
}
@InitiatedBy(PresidentSpaceshipFlowCorrect::class)
class SecretaryFlowCorrect(val presidentSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
// ignore
}
}
// DOCEND LaunchSpaceshipFlowCorrect

View File

@ -297,13 +297,13 @@ time, and perhaps communicating with the same counterparty node but for differen
way to segregate communication channels so that concurrent conversations between flows on the same set of nodes do
not interfere with each other.
To achieve this the flow framework initiates a new flow session each time a flow starts communicating with a ``Party``
for the first time. A session is simply a pair of IDs, one for each side, to allow the node to route received messages to
the correct flow. If the other side accepts the session request then subsequent sends and receives to that same ``Party``
will use the same session. A session ends when either flow ends, whether as expected or pre-maturely. If a flow ends
pre-maturely then the other side will be notified of that and they will also end, as the whole point of flows is a known
sequence of message transfers. Flows end pre-maturely due to exceptions, and as described above, if that exception is
``FlowException`` or a sub-type then it will propagate to the other side. Any other exception will not propagate.
To achieve this in order to communicate with a counterparty one needs to first initiate such a session with a ``Party``
using ``initiateFlow``, which returns a ``FlowSession`` object, identifying this communication. Subsequently the first
actual communication will kick off a counter-flow on the other side, receiving a "reply" session object. A session ends
when either flow ends, whether as expected or pre-maturely. If a flow ends pre-maturely then the other side will be
notified of that and they will also end, as the whole point of flows is a known sequence of message transfers. Flows end
pre-maturely due to exceptions, and as described above, if that exception is ``FlowException`` or a sub-type then it
will propagate to the other side. Any other exception will not propagate.
Taking a step back, we mentioned that the other side has to accept the session request for there to be a communication
channel. A node accepts a session request if it has registered the flow type (the fully-qualified class name) that is