FlowSession docs (#1660)

This commit is contained in:
Andras Slemmer 2017-09-27 15:33:09 +01:00 committed by josecoll
parent 512de2690d
commit 72cff032e6
5 changed files with 171 additions and 36 deletions

View File

@ -5,7 +5,18 @@ import net.corda.core.identity.Party
import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.UntrustworthyData
/** /**
* To port existing flows: *
* A [FlowSession] is a handle on a communication sequence between two flows. It is used to send and receive messages
* between flows.
*
* There are two ways of obtaining such a session:
*
* 1. Calling [FlowLogic.initiateFlow]. This will create a [FlowSession] object on which the first send/receive
* operation will attempt to kick off a corresponding [InitiatedBy] flow on the counterparty's node.
* 2. As constructor parameter to [InitiatedBy] flows. This session is the one corresponding to the initiating flow and
* may be used for replies.
*
* To port flows using the old Party-based API:
* *
* Look for [Deprecated] usages of send/receive/sendAndReceive/getFlowInfo. * Look for [Deprecated] usages of send/receive/sendAndReceive/getFlowInfo.
* *
@ -31,6 +42,10 @@ import net.corda.core.utilities.UntrustworthyData
* otherSideSession.send(something) * otherSideSession.send(something)
*/ */
abstract class FlowSession { abstract class FlowSession {
/**
* The [Party] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow]
* [counterparty] is the same Party as the one passed to that function.
*/
abstract val counterparty: Party abstract val counterparty: Party
/** /**

View File

@ -113,9 +113,8 @@ subclass's constructor can take any number of arguments of any type. The generic
FlowLogic annotations FlowLogic annotations
--------------------- ---------------------
Any flow that you wish to start either directly via RPC or as a subflow must be annotated with the Any flow from which you want to initiate other flows must be annotated with the ``@InitiatingFlow`` annotation.
``@InitiatingFlow`` annotation. Additionally, if you wish to start the flow via RPC, you must annotate it with the Additionally, if you wish to start the flow via RPC, you must annotate it with the ``@StartableByRPC`` annotation:
``@StartableByRPC`` annotation:
.. container:: codeset .. container:: codeset
@ -139,7 +138,7 @@ Meanwhile, any flow that responds to a message from another flow must be annotat
.. sourcecode:: kotlin .. sourcecode:: kotlin
@InitiatedBy(Initiator::class) @InitiatedBy(Initiator::class)
class Responder(val otherParty: Party) : FlowLogic<Unit>() { } class Responder(val otherSideSession: FlowSession) : FlowLogic<Unit>() { }
.. sourcecode:: java .. sourcecode:: java
@ -270,18 +269,50 @@ Finally, we can use the map to identify nodes providing a specific service (e.g.
Communication between parties Communication between parties
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``FlowLogic`` instances communicate using three functions:
* ``send(otherParty: Party, payload: Any)`` In order to create a communication session between your initiator flow and the receiver flow you must call
* Sends the ``payload`` object to the ``otherParty`` ``initiateFlow(party: Party): FlowSession``
* ``receive(receiveType: Class<R>, otherParty: Party)``
* Receives an object of type ``receiveType`` from the ``otherParty`` ``FlowSession`` instances in turn provide three functions:
* ``sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any)``
* Sends the ``payload`` object to the ``otherParty``, and receives an object of type ``receiveType`` back * ``send(payload: Any)``
* Sends the ``payload`` object
* ``receive(receiveType: Class<R>): R``
* Receives an object of type ``receiveType``
* ``sendAndReceive(receiveType: Class<R>, payload: Any): R``
* Sends the ``payload`` object and receives an object of type ``receiveType`` back
InitiateFlow
~~~~~~~~~~~~
``initiateFlow`` creates a communication session with the passed in ``Party``.
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt
:language: kotlin
:start-after: DOCSTART initiateFlow
:end-before: DOCEND initiateFlow
:dedent: 12
.. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java
:language: java
:start-after: DOCSTART initiateFlow
:end-before: DOCEND initiateFlow
:dedent: 12
Note that at the time of call to this function no actual communication is done, this is deferred to the first
send/receive, at which point the counterparty will either:
1. Ignore the message if they are not registered to respond to messages from this flow.
2. Start the flow they have registered to respond to this flow.
Send Send
~~~~ ~~~~
We can send arbitrary data to a counterparty:
Once we have a ``FlowSession`` object we can send arbitrary data to a counterparty:
.. container:: codeset .. container:: codeset
@ -297,12 +328,7 @@ We can send arbitrary data to a counterparty:
:end-before: DOCEND 4 :end-before: DOCEND 4
:dedent: 12 :dedent: 12
If this is the first ``send``, the counterparty will either: The flow on the other side must eventually reach a corresponding ``receive`` call to get this message.
1. Ignore the message if they are not registered to respond to messages from this flow.
2. Start the flow they have registered to respond to this flow, and run the flow until the first call to ``receive``,
at which point they process the message. In other words, we are assuming that the counterparty is registered to
respond to this flow, and has a corresponding ``receive`` call.
Receive Receive
~~~~~~~ ~~~~~~~
@ -351,6 +377,11 @@ as it likes, and each party can invoke a different response flow:
:end-before: DOCEND 6 :end-before: DOCEND 6
:dedent: 12 :dedent: 12
.. warning:: If you initiate several counter flows from the same ``@InitiatingFlow`` flow then on the receiving side you
must be prepared to be initiated by any of the corresponding ``initiateFlow()`` calls! A good way of handling this
ambiguity is to send as a first message a "role" message to the initiated flow, indicating which part of the
initiating flow the rest of the counter-flow should conform to.
SendAndReceive SendAndReceive
~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~
We can also use a single call to send data to a counterparty and wait to receive data of a specific type back. The We can also use a single call to send data to a counterparty and wait to receive data of a specific type back. The
@ -395,19 +426,91 @@ Our side of the flow must mirror these calls. We could do this as follows:
:end-before: DOCEND 8 :end-before: DOCEND 8
:dedent: 12 :dedent: 12
Porting from the old Party-based API
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Before ``FlowSession`` s were introduced the send/receive API looked a bit different. They were functions on
``FlowLogic`` and took the address ``Party`` as argument. The platform internally maintained a mapping from ``Party`` to
session, hiding sessions from the user completely.
However we realised that this could introduce subtle bugs and security issues where sends meant for different sessions
may end up in the same session if the target ``Party`` happens to be the same.
Therefore the session concept is now exposed through ``FlowSession`` which disambiguates which communication sequence a
message is intended for.
In the old API the first ``send`` or ``receive`` to a ``Party`` was the one kicking off the counterflow. This is now
explicit in the ``initiateFlow`` function call. To port existing code:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt
:language: kotlin
:start-after: DOCSTART FlowSession porting
:end-before: DOCEND FlowSession porting
:dedent: 12
.. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java
:language: java
:start-after: DOCSTART FlowSession porting
:end-before: DOCEND FlowSession porting
:dedent: 12
Subflows Subflows
-------- --------
Subflows are pieces of reusable flows that may be run by calling ``FlowLogic.subFlow``.
Inlined subflows
^^^^^^^^^^^^^^^^
Inlined subflows inherit their calling flow's type when initiating a new session with a counterparty. For example say
we have flow A calling an inlined subflow B, which in turn initiates a session with a party. The FlowLogic type used to
determine which counterflow should be kicked off will be A, not B. Note that this means that the other side of this
session must therefore be called explicitly in the kicked off flow as well.
An example of such a flow is ``CollectSignaturesFlow``. It has a counter-flow ``SignTransactionFlow`` that isn't
annotated with ``InitiatedBy``. This is because both of these flows are inlined, the kick-off relationship will be
defined by the parent flows calling ``CollectSignaturesFlow`` and ``SignTransactionFlow``.
In the code inlined subflows appear as regular ``FlowLogic`` instances, `without` either of the ``@InitiatingFlow`` or
``@InitiatedBy`` annotation.
.. note:: Inlined flows aren't versioned, they inherit their parent flow's version.
Initiating subflows
^^^^^^^^^^^^^^^^^^^
Initiating subflows are ones annotated with the ``@InitiatingFlow`` annotation. When such a flow initiates a session its
type will be used to determine which ``@InitiatedBy`` flow to kick off on the counterparty.
An example is the ``@InitiatingFlow InitiatorFlow``/``@InitiatedBy ResponderFlow`` flow pair in the ``FlowCookbook``.
.. note:: Initiating flows are versioned separately from their parents.
Core initiating subflows
^^^^^^^^^^^^^^^^^^^^^^^^
Corda-provided initiating subflows are a little different to standard ones as they are versioned together with the
platform, and their initiated counterflows are registered explicitly, so there is no need for the ``InitiatedBy``
annotation.
An example is the ``FinalityFlow``/``FinalityHandler`` flow pair.
Built-in subflows
^^^^^^^^^^^^^^^^^
Corda provides a number of built-in flows that should be used for handling common tasks. The most important are: Corda provides a number of built-in flows that should be used for handling common tasks. The most important are:
* ``CollectSignaturesFlow``, which should be used to collect a transaction's required signatures * ``CollectSignaturesFlow`` (inlined), which should be used to collect a transaction's required signatures
* ``FinalityFlow``, which should be used to notarise and record a transaction * ``FinalityFlow`` (initiating), which should be used to notarise and record a transaction
* ``SendTransactionFlow``, which should be used to send a signed transaction if it needed to be resolved on the other side. * ``SendTransactionFlow`` (inlined), which should be used to send a signed transaction if it needed to be resolved on the other side.
* ``ReceiveTransactionFlow``, which should be used receive a signed transaction * ``ReceiveTransactionFlow`` (inlined), which should be used receive a signed transaction
* ``ContractUpgradeFlow``, which should be used to change a state's contract * ``ContractUpgradeFlow`` (initiating), which should be used to change a state's contract
* ``NotaryChangeFlow``, which should be used to change a state's notary * ``NotaryChangeFlow`` (initiating), which should be used to change a state's notary
These flows are designed to be used as building blocks in your own flows. You invoke them by calling Let's look at three very common examples.
``FlowLogic.subFlow`` from within your flow's ``call`` method. Let's look at three very common examples.
FinalityFlow FinalityFlow
^^^^^^^^^^^^ ^^^^^^^^^^^^

View File

@ -567,6 +567,13 @@ public class FlowCookbookJava {
SignedTransaction notarisedTx2 = subFlow(new FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker())); SignedTransaction notarisedTx2 = subFlow(new FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()));
// DOCEND 10 // DOCEND 10
// DOCSTART FlowSession porting
send(regulator, new Object()); // Old API
// becomes
FlowSession session = initiateFlow(regulator);
session.send(new Object());
// DOCEND FlowSession porting
return null; return null;
} }
} }

View File

@ -121,6 +121,10 @@ object FlowCookbook {
throw IllegalArgumentException("Couldn't find counterparty with key: $dummyPubKey in identity service") throw IllegalArgumentException("Couldn't find counterparty with key: $dummyPubKey in identity service")
// DOCEND 2 // DOCEND 2
// DOCSTART initiateFlow
val counterpartySession = initiateFlow(counterparty)
// DOCEND initiateFlow
/**----------------------------- /**-----------------------------
* SENDING AND RECEIVING DATA * * SENDING AND RECEIVING DATA *
-----------------------------**/ -----------------------------**/
@ -137,7 +141,6 @@ object FlowCookbook {
// registered to respond to this flow, and has a corresponding // registered to respond to this flow, and has a corresponding
// ``receive`` call. // ``receive`` call.
// DOCSTART 4 // DOCSTART 4
val counterpartySession = initiateFlow(counterparty)
counterpartySession.send(Any()) counterpartySession.send(Any())
// DOCEND 4 // DOCEND 4
@ -496,7 +499,7 @@ object FlowCookbook {
// other required signers using ``CollectSignaturesFlow``. // other required signers using ``CollectSignaturesFlow``.
// The responder flow will need to call ``SignTransactionFlow``. // The responder flow will need to call ``SignTransactionFlow``.
// DOCSTART 15 // DOCSTART 15
val fullySignedTx: SignedTransaction = subFlow(CollectSignaturesFlow(twiceSignedTx, emptySet(), SIGS_GATHERING.childProgressTracker())) val fullySignedTx: SignedTransaction = subFlow(CollectSignaturesFlow(twiceSignedTx, setOf(counterpartySession, regulatorSession), SIGS_GATHERING.childProgressTracker()))
// DOCEND 15 // DOCEND 15
/**----------------------- /**-----------------------
@ -540,6 +543,13 @@ object FlowCookbook {
val additionalParties: Set<Party> = setOf(regulator) val additionalParties: Set<Party> = setOf(regulator)
val notarisedTx2: SignedTransaction = subFlow(FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker())) val notarisedTx2: SignedTransaction = subFlow(FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()))
// DOCEND 10 // DOCEND 10
// DOCSTART FlowSession porting
send(regulator, Any()) // Old API
// becomes
val session = initiateFlow(regulator)
session.send(Any())
// DOCEND FlowSession porting
} }
} }

View File

@ -297,13 +297,13 @@ time, and perhaps communicating with the same counterparty node but for differen
way to segregate communication channels so that concurrent conversations between flows on the same set of nodes do way to segregate communication channels so that concurrent conversations between flows on the same set of nodes do
not interfere with each other. not interfere with each other.
To achieve this the flow framework initiates a new flow session each time a flow starts communicating with a ``Party`` To achieve this in order to communicate with a counterparty one needs to first initiate such a session with a ``Party``
for the first time. A session is simply a pair of IDs, one for each side, to allow the node to route received messages to using ``initiateFlow``, which returns a ``FlowSession`` object, identifying this communication. Subsequently the first
the correct flow. If the other side accepts the session request then subsequent sends and receives to that same ``Party`` actual communication will kick off a counter-flow on the other side, receiving a "reply" session object. A session ends
will use the same session. A session ends when either flow ends, whether as expected or pre-maturely. If a flow ends when either flow ends, whether as expected or pre-maturely. If a flow ends pre-maturely then the other side will be
pre-maturely then the other side will be notified of that and they will also end, as the whole point of flows is a known notified of that and they will also end, as the whole point of flows is a known sequence of message transfers. Flows end
sequence of message transfers. Flows end pre-maturely due to exceptions, and as described above, if that exception is pre-maturely due to exceptions, and as described above, if that exception is ``FlowException`` or a sub-type then it
``FlowException`` or a sub-type then it will propagate to the other side. Any other exception will not propagate. will propagate to the other side. Any other exception will not propagate.
Taking a step back, we mentioned that the other side has to accept the session request for there to be a communication Taking a step back, we mentioned that the other side has to accept the session request for there to be a communication
channel. A node accepts a session request if it has registered the flow type (the fully-qualified class name) that is channel. A node accepts a session request if it has registered the flow type (the fully-qualified class name) that is