Add flow state machine extension tutorial

This commit is contained in:
Andras Slemmer 2018-06-01 19:15:35 +01:00
parent ab08ce21f4
commit 80a5e7ab81
6 changed files with 313 additions and 0 deletions

View File

@ -5,6 +5,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.CordaSerializable
// DOCSTART FlowAsyncOperation
/**
* Interface for arbitrary operations that can be invoked in a flow asynchronously - the flow will suspend until the
* operation completes. Operation parameters are expected to be injected via constructor.
@ -14,10 +15,13 @@ interface FlowAsyncOperation<R : Any> {
/** Performs the operation in a non-blocking fashion. */
fun execute(): CordaFuture<R>
}
// DOCEND FlowAsyncOperation
// DOCSTART executeAsync
/** Executes the specified [operation] and suspends until operation completion. */
@Suspendable
fun <T, R : Any> FlowLogic<T>.executeAsync(operation: FlowAsyncOperation<R>, maySkipCheckpoint: Boolean = false): R {
val request = FlowIORequest.ExecuteAsyncOperation(operation)
return stateMachine.suspend(request, maySkipCheckpoint)
}
// DOCEND executeAsync

View File

@ -9,6 +9,7 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NonEmptySet
import java.time.Instant
// DOCSTART FlowIORequest
/**
* A [FlowIORequest] represents an IO request of a flow when it suspends. It is persisted in checkpoints.
*/
@ -93,3 +94,4 @@ sealed class FlowIORequest<out R : Any> {
// TODO: consider using an empty FlowAsyncOperation instead
object ForceCheckpoint : FlowIORequest<Unit>()
}
// DOCSEND FlowIORequest

View File

@ -0,0 +1,237 @@
.. highlight:: kotlin
.. raw:: html
<script type="text/javascript" src="_static/jquery.js"></script>
<script type="text/javascript" src="_static/codesets.js"></script>
How to extend the state machine
===============================
This article explains how to extend the state machine code that underlies flow execution. It is intended for Corda
contributors.
How to add suspending operations
--------------------------------
To add a suspending operation that's a simple request-response type function that perhaps involves some external IO you
can use the internal ``FlowAsyncOperation`` interface.
.. container:: codeset
.. literalinclude:: ../../core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt
:language: kotlin
:start-after: DOCSTART FlowAsyncOperation
:end-before: DOCEND FlowAsyncOperation
Let's imagine we want to add a suspending operation that takes two integers and returns their sum. To do this we
implement ``FlowAsyncOperation``:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt
:language: kotlin
:start-after: DOCSTART SummingOperation
:end-before: DOCEND SummingOperation
As we can see the constructor of ``SummingOperation`` takes the two numbers, and the ``execute`` function simply returns
a future that is immediately completed by the result of summing the numbers. Note how we don't use ``@Suspendable`` on
``execute``, this is because we'll never suspend inside this function, the suspension will happen before we're calling
it.
Note also how the input numbers are stored in the class as fields. This is important, because in the flow's checkpoint
we'll store an instance of this class whenever we're suspending on such an operation. If the node fails or restarts
while the operation is underway this class will be deserialized from the checkpoint and ``execute`` will be called
again.
Now we can use the internal function ``executeAsync`` to execute this operation from a flow.
.. container:: codeset
.. literalinclude:: ../../core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt
:language: kotlin
:start-after: DOCSTART executeAsync
:end-before: DOCEND executeAsync
It simply takes a ``FlowAsyncOperation`` and an optional flag we don't care about for now. We can use this function in a
flow:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt
:language: kotlin
:start-after: DOCSTART ExampleSummingFlow
:end-before: DOCEND ExampleSummingFlow
That's it! Obviously this is a mostly useless example, but this is the basic code structure one could extend for heavier
computations/other IO. For example the function could call into a ``CordaService`` or something similar. One thing to
note is that the operation executed in ``execute`` must be redoable(= "idempotent") in case the node fails while the
next checkpoint is reached.
How to test
-----------
The recommended way to test flows and the state machine is using the Driver DSL. This ensures that you will test your
flow with a full node.
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/integration-test/kotlin/net/corda/docs/TutorialFlowAsyncOperationTest.kt
:language: kotlin
:start-after: DOCSTART summingWorks
:end-before: DOCEND summingWorks
The above will spin up a node and run our example flow.
How to debug issues
-------------------
Let's assume we made a mistake in our summing operation:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt
:language: kotlin
:start-after: DOCSTART SummingOperationThrowing
:end-before: DOCEND SummingOperationThrowing
The operation now throws a rude exception. If we modify the example flow to use this and run the same test we will get
a lot of logs about the error condition (as we are in dev mode). The interesting bit looks like this:
.. parsed-literal::
[WARN ] 18:38:52,613 [Node thread-1] (DumpHistoryOnErrorInterceptor.kt:39) interceptors.DumpHistoryOnErrorInterceptor.executeTransition - Flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] errored, dumping all transitions:
--- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] ---
Timestamp: 2018-06-01T17:38:52.426Z
Event: DoRemainingWork
Actions:
CreateTransaction
PersistCheckpoint(id=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7], checkpoint=Checkpoint(invocationContext=InvocationContext(origin=RPC(actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES)), trace=Trace(invocationId=26bcf0c3-f1d8-4098-a52d-3780f4095b7a, timestamp: 2018-06-01T17:38:52.234Z, entityType: Invocation, sessionId=393d1175-3bb1-4eb1-bff0-6ba317851260, timestamp: 2018-06-01T17:38:52.169Z, entityType: Session), actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES), externalTrace=null, impersonatedActor=null), ourIdentity=O=Alice Corp, L=Madrid, C=ES, sessions={}, subFlowStack=[Inlined(flowClass=class net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow, subFlowVersion=CorDappFlow(platformVersion=1, corDappName=net.corda.docs-c6816652-f975-4fb2-aa09-ef1dddea19b3, corDappHash=F4012397D8CF97926B5998E046DBCE16D497318BB87DCED66313912D4B303BB7))], flowState=Unstarted(flowStart=Explicit, frozenFlowLogic=74BA62EC5821EBD4FC4CBE129843F9ED6509DB37E6E3C8F85E3F7A8D84083500), errorState=Clean, numberOfSuspends=0, deduplicationSeed=03ab886e-3fd3-4667-b944-ab6a3b1f90a7))
PersistDeduplicationFacts(deduplicationHandlers=[net.corda.node.internal.FlowStarterImpl$startFlow$startFlowEvent$1@69326343])
CommitTransaction
AcknowledgeMessages(deduplicationHandlers=[net.corda.node.internal.FlowStarterImpl$startFlow$startFlowEvent$1@69326343])
SignalFlowHasStarted(flowId=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7])
CreateTransaction
Continuation: Resume(result=null)
Diff between previous and next state:
isAnyCheckpointPersisted:
false
true
pendingDeduplicationHandlers:
[net.corda.node.internal.FlowStarterImpl$startFlow$startFlowEvent$1@69326343]
[]
isFlowResumed:
false
true
--- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] ---
Timestamp: 2018-06-01T17:38:52.487Z
Event: Suspend(ioRequest=ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d), maySkipCheckpoint=false, fiber=15EC69204562BB396846768169AD4A339569D97AE841D805C230C513A8BA5BDE, )
Actions:
PersistCheckpoint(id=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7], checkpoint=Checkpoint(invocationContext=InvocationContext(origin=RPC(actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES)), trace=Trace(invocationId=26bcf0c3-f1d8-4098-a52d-3780f4095b7a, timestamp: 2018-06-01T17:38:52.234Z, entityType: Invocation, sessionId=393d1175-3bb1-4eb1-bff0-6ba317851260, timestamp: 2018-06-01T17:38:52.169Z, entityType: Session), actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES), externalTrace=null, impersonatedActor=null), ourIdentity=O=Alice Corp, L=Madrid, C=ES, sessions={}, subFlowStack=[Inlined(flowClass=class net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow, subFlowVersion=CorDappFlow(platformVersion=1, corDappName=net.corda.docs-c6816652-f975-4fb2-aa09-ef1dddea19b3, corDappHash=F4012397D8CF97926B5998E046DBCE16D497318BB87DCED66313912D4B303BB7))], flowState=Started(flowIORequest=ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d), frozenFiber=15EC69204562BB396846768169AD4A339569D97AE841D805C230C513A8BA5BDE), errorState=Clean, numberOfSuspends=1, deduplicationSeed=03ab886e-3fd3-4667-b944-ab6a3b1f90a7))
PersistDeduplicationFacts(deduplicationHandlers=[])
CommitTransaction
AcknowledgeMessages(deduplicationHandlers=[])
ScheduleEvent(event=DoRemainingWork)
Continuation: ProcessEvents
Diff between previous and next state:
checkpoint.numberOfSuspends:
0
1
checkpoint.flowState:
Unstarted(flowStart=Explicit, frozenFlowLogic=74BA62EC5821EBD4FC4CBE129843F9ED6509DB37E6E3C8F85E3F7A8D84083500)
Started(flowIORequest=ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d), frozenFiber=15EC69204562BB396846768169AD4A339569D97AE841D805C230C513A8BA5BDE)
isFlowResumed:
true
false
--- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] ---
Timestamp: 2018-06-01T17:38:52.549Z
Event: DoRemainingWork
Actions:
ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d)
Continuation: ProcessEvents
Diff between previous and intended state:
null
Diff between previous and next state:
checkpoint.errorState:
Clean
Errored(errors=[FlowError(errorId=-8704604242619505379, exception=java.lang.IllegalStateException: You shouldn't be calling me)], propagatedIndex=0, propagating=false)
--- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] ---
Timestamp: 2018-06-01T17:38:52.555Z
Event: DoRemainingWork
Actions:
Continuation: ProcessEvents
Diff between previous and next state:
null
--- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] ---
Timestamp: 2018-06-01T17:38:52.556Z
Event: StartErrorPropagation
Actions:
ScheduleEvent(event=DoRemainingWork)
Continuation: ProcessEvents
Diff between previous and next state:
checkpoint.errorState.propagating:
false
true
--- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] ---
Timestamp: 2018-06-01T17:38:52.606Z
Event: DoRemainingWork
Actions:
PropagateErrors(errorMessages=[ErrorSessionMessage(flowException=null, errorId=-8704604242619505379)], sessions=[], senderUUID=861f07d6-4b8f-42bd-9b52-5152812db2ba)
CreateTransaction
RemoveCheckpoint(id=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7])
PersistDeduplicationFacts(deduplicationHandlers=[])
ReleaseSoftLocks(uuid=03ab886e-3fd3-4667-b944-ab6a3b1f90a7)
CommitTransaction
AcknowledgeMessages(deduplicationHandlers=[])
RemoveSessionBindings(sessionIds=[])
RemoveFlow(flowId=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7], removalReason=ErrorFinish(flowErrors=[FlowError(errorId=-8704604242619505379, exception=java.lang.IllegalStateException: You shouldn't be calling me)]), lastState=StateMachineState(checkpoint=Checkpoint(invocationContext=InvocationContext(origin=RPC(actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES)), trace=Trace(invocationId=26bcf0c3-f1d8-4098-a52d-3780f4095b7a, timestamp: 2018-06-01T17:38:52.234Z, entityType: Invocation, sessionId=393d1175-3bb1-4eb1-bff0-6ba317851260, timestamp: 2018-06-01T17:38:52.169Z, entityType: Session), actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES), externalTrace=null, impersonatedActor=null), ourIdentity=O=Alice Corp, L=Madrid, C=ES, sessions={}, subFlowStack=[Inlined(flowClass=class net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow, subFlowVersion=CorDappFlow(platformVersion=1, corDappName=net.corda.docs-c6816652-f975-4fb2-aa09-ef1dddea19b3, corDappHash=F4012397D8CF97926B5998E046DBCE16D497318BB87DCED66313912D4B303BB7))], flowState=Started(flowIORequest=ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d), frozenFiber=15EC69204562BB396846768169AD4A339569D97AE841D805C230C513A8BA5BDE), errorState=Errored(errors=[FlowError(errorId=-8704604242619505379, exception=java.lang.IllegalStateException: You shouldn't be calling me)], propagatedIndex=1, propagating=true), numberOfSuspends=1, deduplicationSeed=03ab886e-3fd3-4667-b944-ab6a3b1f90a7), flowLogic=net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow@600b0c6c, pendingDeduplicationHandlers=[], isFlowResumed=false, isTransactionTracked=false, isAnyCheckpointPersisted=true, isStartIdempotent=false, isRemoved=true, senderUUID=861f07d6-4b8f-42bd-9b52-5152812db2ba))
Continuation: Abort
Diff between previous and next state:
checkpoint.errorState.propagatedIndex:
0
1
isRemoved:
false
true
Whoa that's a lot of stuff. Now we get a glimpse into the bowels of the flow state machine. As we can see the flow did
quite a few things, even though the flow code looks simple.
What we can see here is the different transitions the flow's state machine went through that led up to the error
condition. For each transition we see what *Event* triggered the transition, what *Action* s were taken as a consequence,
and how the internal *State* of the state machine was modified in the process. It also prints the transition's
*Continuation*, which indicates how the flow should proceed after the transition.
For example in the first transition we can see that the triggering event was a ``DoRemainingWork``, this is a generic
event that instructs the state machine to check its own state to see whether there's any work left to do, and does it if
there's any.
In this case the work involves persisting a checkpoint together with some deduplication data in a database transaction,
then acknowledging any triggering messages, signalling that the flow has started, and creating a fresh database
transaction, to be used by user code.
The continuation is a ``Resume``, which instructs the state machine to hand control to user code. The state change is
a simple update of bookkeeping data.
On other words the first transition concerns initializing of the flow, which includes the creation of the
checkpoint.
The next transition is the suspension on our summing operation, triggered by the ``Suspend`` event. As we can see in
this transition we aren't doing any work related to the summation yet, we're merely persisting the checkpoint that
indicates that we want to do the summation. Had we added a ``toString`` method to our ``SummingOperationThrowing`` we
would see a nicer message.
The next transition is the faulty one, as we can see it was also triggered by a ``DoRemainingWork``, and executed our
operation. We can see that there are two state "diff"s printed, one that would've happened had the transition succeeded,
and one that actually happened, this is the marking of the flow's state as errored. The rest of the transitions involve
error propagation (triggered by the ``FlowHospital``) and notification of failure, which ultimately raises the exception
on the RPC ``resultFuture``.

View File

@ -107,6 +107,9 @@ Running the API scanner
Your changes must also not break compatibility with existing public API. We have an API scanning tool which runs as part of the build
process which can be used to flag up any accidental changes, which is detailed :doc:`here </api-scanner>`.
Extending the flow state machine
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can find instructions on how to extend the flow state machine :doc:`here </contributing-flow-state-machines>`
Updating the docs
-----------------

View File

@ -0,0 +1,29 @@
package net.corda.docs
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow
import net.corda.node.services.Permissions
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.junit.Test
import kotlin.test.assertEquals
class TutorialFlowAsyncOperationTest {
// DOCSTART summingWorks
@Test
fun summingWorks() {
driver(DriverParameters(startNodesInProcess = true)) {
val aliceUser = User("aliceUser", "testPassword1", permissions = setOf(Permissions.all()))
val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val aliceClient = CordaRPCClient(alice.rpcAddress)
val aliceProxy = aliceClient.start("aliceUser", "testPassword1").proxy
val answer = aliceProxy.startFlow(::ExampleSummingFlow).returnValue.getOrThrow()
assertEquals(3, answer)
}
}
// DOCEND summingWorks
}

View File

@ -0,0 +1,38 @@
package net.corda.docs.tutorial.flowstatemachines
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.executeAsync
// DOCSTART SummingOperation
class SummingOperation(val a: Int, val b: Int) : FlowAsyncOperation<Int> {
override fun execute(): CordaFuture<Int> {
return doneFuture(a + b)
}
}
// DOCEND SummingOperation
// DOCSTART SummingOperationThrowing
class SummingOperationThrowing(val a: Int, val b: Int) : FlowAsyncOperation<Int> {
override fun execute(): CordaFuture<Int> {
throw IllegalStateException("You shouldn't be calling me")
}
}
// DOCEND SummingOperationThrowing
// DOCSTART ExampleSummingFlow
@StartableByRPC
class ExampleSummingFlow : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
val answer = executeAsync(SummingOperationThrowing(1, 2))
return answer // hopefully 3
}
}
// DOCEND ExampleSummingFlow