diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt b/core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt index dad14b4b69..61fc4ee5d5 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt @@ -5,6 +5,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.flows.FlowLogic import net.corda.core.serialization.CordaSerializable +// DOCSTART FlowAsyncOperation /** * Interface for arbitrary operations that can be invoked in a flow asynchronously - the flow will suspend until the * operation completes. Operation parameters are expected to be injected via constructor. @@ -14,10 +15,13 @@ interface FlowAsyncOperation { /** Performs the operation in a non-blocking fashion. */ fun execute(): CordaFuture } +// DOCEND FlowAsyncOperation +// DOCSTART executeAsync /** Executes the specified [operation] and suspends until operation completion. */ @Suspendable fun FlowLogic.executeAsync(operation: FlowAsyncOperation, maySkipCheckpoint: Boolean = false): R { val request = FlowIORequest.ExecuteAsyncOperation(operation) return stateMachine.suspend(request, maySkipCheckpoint) } +// DOCEND executeAsync diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt index 257e7706c6..494c5099aa 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt @@ -9,6 +9,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.NonEmptySet import java.time.Instant +// DOCSTART FlowIORequest /** * A [FlowIORequest] represents an IO request of a flow when it suspends. It is persisted in checkpoints. */ @@ -93,3 +94,4 @@ sealed class FlowIORequest { // TODO: consider using an empty FlowAsyncOperation instead object ForceCheckpoint : FlowIORequest() } +// DOCSEND FlowIORequest diff --git a/docs/source/contributing-flow-state-machines.rst b/docs/source/contributing-flow-state-machines.rst new file mode 100644 index 0000000000..7f5a9a37c8 --- /dev/null +++ b/docs/source/contributing-flow-state-machines.rst @@ -0,0 +1,257 @@ +.. highlight:: kotlin +.. raw:: html + + + + +How to extend the state machine +=============================== + +This article explains how to extend the state machine code that underlies flow execution. It is intended for Corda +contributors. + +How to add suspending operations +-------------------------------- + +To add a suspending operation for a simple request-response type function that perhaps involves some external IO we can +use the internal ``FlowAsyncOperation`` interface. + +.. container:: codeset + + .. literalinclude:: ../../core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt + :language: kotlin + :start-after: DOCSTART FlowAsyncOperation + :end-before: DOCEND FlowAsyncOperation + +Let's imagine we want to add a suspending operation that takes two integers and returns their sum. To do this we +implement ``FlowAsyncOperation``: + +.. container:: codeset + + .. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt + :language: kotlin + :start-after: DOCSTART SummingOperation + :end-before: DOCEND SummingOperation + + .. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/SummingOperation.java + :language: java + :start-after: DOCSTART SummingOperation + :end-before: DOCEND SummingOperation + +As we can see the constructor of ``SummingOperation`` takes the two numbers, and the ``execute`` function simply returns +a future that is immediately completed by the result of summing the numbers. Note how we don't use ``@Suspendable`` on +``execute``, this is because we'll never suspend inside this function, the suspension will happen before we're calling +it. + +Note also how the input numbers are stored in the class as fields. This is important, because in the flow's checkpoint +we'll store an instance of this class whenever we're suspending on such an operation. If the node fails or restarts +while the operation is underway this class will be deserialized from the checkpoint and ``execute`` will be called +again. + +Now we can use the internal function ``executeAsync`` to execute this operation from a flow. + +.. container:: codeset + + .. literalinclude:: ../../core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt + :language: kotlin + :start-after: DOCSTART executeAsync + :end-before: DOCEND executeAsync + +It simply takes a ``FlowAsyncOperation`` and an optional flag we don't care about for now. We can use this function in a +flow: + +.. container:: codeset + + .. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt + :language: kotlin + :start-after: DOCSTART ExampleSummingFlow + :end-before: DOCEND ExampleSummingFlow + + .. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/ExampleSummingFlow.java + :language: java + :start-after: DOCSTART ExampleSummingFlow + :end-before: DOCEND ExampleSummingFlow + +That's it! Obviously this is a mostly useless example, but this is the basic code structure one could extend for heavier +computations/other IO. For example the function could call into a ``CordaService`` or something similar. One thing to +note is that the operation executed in ``execute`` must be redoable(= "idempotent") in case the node fails before the +next checkpoint is committed. + +How to test +----------- + +The recommended way to test flows and the state machine is using the Driver DSL. This ensures that you will test your +flow with a full node. + +.. container:: codeset + + .. literalinclude:: ../../docs/source/example-code/src/integration-test/kotlin/net/corda/docs/TutorialFlowAsyncOperationTest.kt + :language: kotlin + :start-after: DOCSTART summingWorks + :end-before: DOCEND summingWorks + + .. literalinclude:: ../../docs/source/example-code/src/integration-test/java/net/corda/docs/java/TutorialFlowAsyncOperationTest.java + :language: java + :start-after: DOCSTART summingWorks + :end-before: DOCEND summingWorks + +The above will spin up a node and run our example flow. + +How to debug issues +------------------- + +Let's assume we made a mistake in our summing operation: + +.. container:: codeset + + .. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt + :language: kotlin + :start-after: DOCSTART SummingOperationThrowing + :end-before: DOCEND SummingOperationThrowing + + .. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/SummingOperationThrowing.java + :language: java + :start-after: DOCSTART SummingOperationThrowing + :end-before: DOCEND SummingOperationThrowing + +The operation now throws a rude exception. If we modify the example flow to use this and run the same test we will get +a lot of logs about the error condition (as we are in dev mode). The interesting bit looks like this: + +.. parsed-literal:: + [WARN ] 18:38:52,613 [Node thread-1] (DumpHistoryOnErrorInterceptor.kt:39) interceptors.DumpHistoryOnErrorInterceptor.executeTransition - Flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] errored, dumping all transitions: + + --- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] --- + Timestamp: 2018-06-01T17:38:52.426Z + Event: DoRemainingWork + Actions: + CreateTransaction + PersistCheckpoint(id=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7], checkpoint=Checkpoint(invocationContext=InvocationContext(origin=RPC(actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES)), trace=Trace(invocationId=26bcf0c3-f1d8-4098-a52d-3780f4095b7a, timestamp: 2018-06-01T17:38:52.234Z, entityType: Invocation, sessionId=393d1175-3bb1-4eb1-bff0-6ba317851260, timestamp: 2018-06-01T17:38:52.169Z, entityType: Session), actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES), externalTrace=null, impersonatedActor=null), ourIdentity=O=Alice Corp, L=Madrid, C=ES, sessions={}, subFlowStack=[Inlined(flowClass=class net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow, subFlowVersion=CorDappFlow(platformVersion=1, corDappName=net.corda.docs-c6816652-f975-4fb2-aa09-ef1dddea19b3, corDappHash=F4012397D8CF97926B5998E046DBCE16D497318BB87DCED66313912D4B303BB7))], flowState=Unstarted(flowStart=Explicit, frozenFlowLogic=74BA62EC5821EBD4FC4CBE129843F9ED6509DB37E6E3C8F85E3F7A8D84083500), errorState=Clean, numberOfSuspends=0, deduplicationSeed=03ab886e-3fd3-4667-b944-ab6a3b1f90a7)) + PersistDeduplicationFacts(deduplicationHandlers=[net.corda.node.internal.FlowStarterImpl$startFlow$startFlowEvent$1@69326343]) + CommitTransaction + AcknowledgeMessages(deduplicationHandlers=[net.corda.node.internal.FlowStarterImpl$startFlow$startFlowEvent$1@69326343]) + SignalFlowHasStarted(flowId=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7]) + CreateTransaction + Continuation: Resume(result=null) + Diff between previous and next state: + isAnyCheckpointPersisted: + false + true + pendingDeduplicationHandlers: + [net.corda.node.internal.FlowStarterImpl$startFlow$startFlowEvent$1@69326343] + [] + isFlowResumed: + false + true + + + --- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] --- + Timestamp: 2018-06-01T17:38:52.487Z + Event: Suspend(ioRequest=ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d), maySkipCheckpoint=false, fiber=15EC69204562BB396846768169AD4A339569D97AE841D805C230C513A8BA5BDE, ) + Actions: + PersistCheckpoint(id=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7], checkpoint=Checkpoint(invocationContext=InvocationContext(origin=RPC(actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES)), trace=Trace(invocationId=26bcf0c3-f1d8-4098-a52d-3780f4095b7a, timestamp: 2018-06-01T17:38:52.234Z, entityType: Invocation, sessionId=393d1175-3bb1-4eb1-bff0-6ba317851260, timestamp: 2018-06-01T17:38:52.169Z, entityType: Session), actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES), externalTrace=null, impersonatedActor=null), ourIdentity=O=Alice Corp, L=Madrid, C=ES, sessions={}, subFlowStack=[Inlined(flowClass=class net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow, subFlowVersion=CorDappFlow(platformVersion=1, corDappName=net.corda.docs-c6816652-f975-4fb2-aa09-ef1dddea19b3, corDappHash=F4012397D8CF97926B5998E046DBCE16D497318BB87DCED66313912D4B303BB7))], flowState=Started(flowIORequest=ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d), frozenFiber=15EC69204562BB396846768169AD4A339569D97AE841D805C230C513A8BA5BDE), errorState=Clean, numberOfSuspends=1, deduplicationSeed=03ab886e-3fd3-4667-b944-ab6a3b1f90a7)) + PersistDeduplicationFacts(deduplicationHandlers=[]) + CommitTransaction + AcknowledgeMessages(deduplicationHandlers=[]) + ScheduleEvent(event=DoRemainingWork) + Continuation: ProcessEvents + Diff between previous and next state: + checkpoint.numberOfSuspends: + 0 + 1 + checkpoint.flowState: + Unstarted(flowStart=Explicit, frozenFlowLogic=74BA62EC5821EBD4FC4CBE129843F9ED6509DB37E6E3C8F85E3F7A8D84083500) + Started(flowIORequest=ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d), frozenFiber=15EC69204562BB396846768169AD4A339569D97AE841D805C230C513A8BA5BDE) + isFlowResumed: + true + false + + + --- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] --- + Timestamp: 2018-06-01T17:38:52.549Z + Event: DoRemainingWork + Actions: + ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d) + Continuation: ProcessEvents + Diff between previous and intended state: + null + Diff between previous and next state: + checkpoint.errorState: + Clean + Errored(errors=[FlowError(errorId=-8704604242619505379, exception=java.lang.IllegalStateException: You shouldn't be calling me)], propagatedIndex=0, propagating=false) + + + --- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] --- + Timestamp: 2018-06-01T17:38:52.555Z + Event: DoRemainingWork + Actions: + + Continuation: ProcessEvents + Diff between previous and next state: + null + + --- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] --- + Timestamp: 2018-06-01T17:38:52.556Z + Event: StartErrorPropagation + Actions: + ScheduleEvent(event=DoRemainingWork) + Continuation: ProcessEvents + Diff between previous and next state: + checkpoint.errorState.propagating: + false + true + + + --- Transition of flow [03ab886e-3fd3-4667-b944-ab6a3b1f90a7] --- + Timestamp: 2018-06-01T17:38:52.606Z + Event: DoRemainingWork + Actions: + PropagateErrors(errorMessages=[ErrorSessionMessage(flowException=null, errorId=-8704604242619505379)], sessions=[], senderUUID=861f07d6-4b8f-42bd-9b52-5152812db2ba) + CreateTransaction + RemoveCheckpoint(id=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7]) + PersistDeduplicationFacts(deduplicationHandlers=[]) + ReleaseSoftLocks(uuid=03ab886e-3fd3-4667-b944-ab6a3b1f90a7) + CommitTransaction + AcknowledgeMessages(deduplicationHandlers=[]) + RemoveSessionBindings(sessionIds=[]) + RemoveFlow(flowId=[03ab886e-3fd3-4667-b944-ab6a3b1f90a7], removalReason=ErrorFinish(flowErrors=[FlowError(errorId=-8704604242619505379, exception=java.lang.IllegalStateException: You shouldn't be calling me)]), lastState=StateMachineState(checkpoint=Checkpoint(invocationContext=InvocationContext(origin=RPC(actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES)), trace=Trace(invocationId=26bcf0c3-f1d8-4098-a52d-3780f4095b7a, timestamp: 2018-06-01T17:38:52.234Z, entityType: Invocation, sessionId=393d1175-3bb1-4eb1-bff0-6ba317851260, timestamp: 2018-06-01T17:38:52.169Z, entityType: Session), actor=Actor(id=Id(value=aliceUser), serviceId=AuthServiceId(value=NODE_CONFIG), owningLegalIdentity=O=Alice Corp, L=Madrid, C=ES), externalTrace=null, impersonatedActor=null), ourIdentity=O=Alice Corp, L=Madrid, C=ES, sessions={}, subFlowStack=[Inlined(flowClass=class net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow, subFlowVersion=CorDappFlow(platformVersion=1, corDappName=net.corda.docs-c6816652-f975-4fb2-aa09-ef1dddea19b3, corDappHash=F4012397D8CF97926B5998E046DBCE16D497318BB87DCED66313912D4B303BB7))], flowState=Started(flowIORequest=ExecuteAsyncOperation(operation=net.corda.docs.tutorial.flowstatemachines.SummingOperationThrowing@40f4c23d), frozenFiber=15EC69204562BB396846768169AD4A339569D97AE841D805C230C513A8BA5BDE), errorState=Errored(errors=[FlowError(errorId=-8704604242619505379, exception=java.lang.IllegalStateException: You shouldn't be calling me)], propagatedIndex=1, propagating=true), numberOfSuspends=1, deduplicationSeed=03ab886e-3fd3-4667-b944-ab6a3b1f90a7), flowLogic=net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow@600b0c6c, pendingDeduplicationHandlers=[], isFlowResumed=false, isTransactionTracked=false, isAnyCheckpointPersisted=true, isStartIdempotent=false, isRemoved=true, senderUUID=861f07d6-4b8f-42bd-9b52-5152812db2ba)) + Continuation: Abort + Diff between previous and next state: + checkpoint.errorState.propagatedIndex: + 0 + 1 + isRemoved: + false + true + +Whoa that's a lot of stuff. Now we get a glimpse into the bowels of the flow state machine. As we can see the flow did +quite a few things, even though the flow code looks simple. + +What we can see here is the different transitions the flow's state machine went through that led up to the error +condition. For each transition we see what *Event* triggered the transition, what *Action* s were taken as a consequence, +and how the internal *State* of the state machine was modified in the process. It also prints the transition's +*Continuation*, which indicates how the flow should proceed after the transition. + +For example in the first transition we can see that the triggering event was a ``DoRemainingWork``, this is a generic +event that instructs the state machine to check its own state to see whether there's any work left to do, and does it if +there's any. + +In this case the work involves persisting a checkpoint together with some deduplication data in a database transaction, +then acknowledging any triggering messages, signalling that the flow has started, and creating a fresh database +transaction, to be used by user code. + +The continuation is a ``Resume``, which instructs the state machine to hand control to user code. The state change is +a simple update of bookkeeping data. + +In other words the first transition concerns the initialization of the flow, which includes the creation of the +checkpoint. + +The next transition is the suspension of our summing operation, triggered by the ``Suspend`` event. As we can see in +this transition we aren't doing any work related to the summation yet, we're merely persisting the checkpoint that +indicates that we want to do the summation. Had we added a ``toString`` method to our ``SummingOperationThrowing`` we +would see a nicer message. + +The next transition is the faulty one, as we can see it was also triggered by a ``DoRemainingWork``, and executed our +operation. We can see that there are two state "diff"s printed, one that would've happened had the transition succeeded, +and one that actually happened, which marked the flow's state as errored. The rest of the transitions involve error +propagation (triggered by the ``FlowHospital``) and notification of failure, which ultimately raises the exception on +the RPC ``resultFuture``. diff --git a/docs/source/contributing.rst b/docs/source/contributing.rst index 04becd16b5..9b8e885de9 100644 --- a/docs/source/contributing.rst +++ b/docs/source/contributing.rst @@ -28,6 +28,10 @@ Making the required changes 2. Clone the fork to your local machine 3. Make the changes, in accordance with the :doc:`code style guide ` +Extending the flow state machine +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +You can find instructions on how to extend the flow state machine :doc:`here ` + Things to check ^^^^^^^^^^^^^^^ @@ -107,7 +111,6 @@ Running the API scanner Your changes must also not break compatibility with existing public API. We have an API scanning tool which runs as part of the build process which can be used to flag up any accidental changes, which is detailed :doc:`here `. - Updating the docs ----------------- diff --git a/docs/source/example-code/build.gradle b/docs/source/example-code/build.gradle index e45425686a..54c5ad53de 100644 --- a/docs/source/example-code/build.gradle +++ b/docs/source/example-code/build.gradle @@ -24,6 +24,11 @@ sourceSets { runtimeClasspath += main.output + test.output srcDir file('src/integration-test/kotlin') } + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integration-test/java') + } } } diff --git a/docs/source/example-code/src/integration-test/java/net/corda/docs/java/TutorialFlowAsyncOperationTest.java b/docs/source/example-code/src/integration-test/java/net/corda/docs/java/TutorialFlowAsyncOperationTest.java new file mode 100644 index 0000000000..410f2fcf73 --- /dev/null +++ b/docs/source/example-code/src/integration-test/java/net/corda/docs/java/TutorialFlowAsyncOperationTest.java @@ -0,0 +1,42 @@ +package net.corda.docs.java; + +import kotlin.Unit; +import net.corda.client.rpc.CordaRPCClient; +import net.corda.core.messaging.CordaRPCOps; +import net.corda.core.utilities.KotlinUtilsKt; +import net.corda.docs.java.tutorial.flowstatemachines.ExampleSummingFlow; +import net.corda.node.services.Permissions; +import net.corda.testing.driver.*; +import net.corda.testing.node.User; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.Future; + +import static net.corda.testing.core.TestConstants.ALICE_NAME; +import static org.junit.Assert.assertEquals; + +public final class TutorialFlowAsyncOperationTest { + // DOCSTART summingWorks + @Test + public final void summingWorks() { + Driver.driver(new DriverParameters(), (DriverDSL dsl) -> { + User aliceUser = new User("aliceUser", "testPassword1", + new HashSet<>(Collections.singletonList(Permissions.all())) + ); + Future aliceFuture = dsl.startNode(new NodeParameters() + .withProvidedName(ALICE_NAME) + .withRpcUsers(Collections.singletonList(aliceUser)) + ); + NodeHandle alice = KotlinUtilsKt.getOrThrow(aliceFuture, null); + CordaRPCClient aliceClient = new CordaRPCClient(alice.getRpcAddress()); + CordaRPCOps aliceProxy = aliceClient.start("aliceUser", "testPassword1").getProxy(); + Future answerFuture = aliceProxy.startFlowDynamic(ExampleSummingFlow.class).getReturnValue(); + int answer = KotlinUtilsKt.getOrThrow(answerFuture, null); + assertEquals(3, answer); + return Unit.INSTANCE; + }); + } + // DOCEND summingWorks +} diff --git a/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/TutorialFlowAsyncOperationTest.kt b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/TutorialFlowAsyncOperationTest.kt new file mode 100644 index 0000000000..fb3717f109 --- /dev/null +++ b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/TutorialFlowAsyncOperationTest.kt @@ -0,0 +1,29 @@ +package net.corda.docs + +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.docs.tutorial.flowstatemachines.ExampleSummingFlow +import net.corda.node.services.Permissions +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.node.User +import org.junit.Test +import kotlin.test.assertEquals + +class TutorialFlowAsyncOperationTest { + // DOCSTART summingWorks + @Test + fun summingWorks() { + driver(DriverParameters(startNodesInProcess = true)) { + val aliceUser = User("aliceUser", "testPassword1", permissions = setOf(Permissions.all())) + val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + val aliceClient = CordaRPCClient(alice.rpcAddress) + val aliceProxy = aliceClient.start("aliceUser", "testPassword1").proxy + val answer = aliceProxy.startFlow(::ExampleSummingFlow).returnValue.getOrThrow() + assertEquals(3, answer) + } + } + // DOCEND summingWorks +} diff --git a/docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/ExampleSummingFlow.java b/docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/ExampleSummingFlow.java new file mode 100644 index 0000000000..c8e93d7025 --- /dev/null +++ b/docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/ExampleSummingFlow.java @@ -0,0 +1,19 @@ +package net.corda.docs.java.tutorial.flowstatemachines; + +import co.paralleluniverse.fibers.Suspendable; +import net.corda.core.flows.FlowLogic; +import net.corda.core.flows.StartableByRPC; +import net.corda.core.internal.FlowAsyncOperationKt; +import org.jetbrains.annotations.NotNull; + +// DOCSTART ExampleSummingFlow +@StartableByRPC +public final class ExampleSummingFlow extends FlowLogic { + @Suspendable + @NotNull + @Override + public Integer call() { + return FlowAsyncOperationKt.executeAsync(this, new SummingOperation(1, 2), false); + } +} +// DOCEND ExampleSummingFlow diff --git a/docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/SummingOperation.java b/docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/SummingOperation.java new file mode 100644 index 0000000000..d313fdb8ce --- /dev/null +++ b/docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/SummingOperation.java @@ -0,0 +1,32 @@ +package net.corda.docs.java.tutorial.flowstatemachines; + +import net.corda.core.concurrent.CordaFuture; +import net.corda.core.internal.FlowAsyncOperation; +import net.corda.core.internal.concurrent.CordaFutureImplKt; +import org.jetbrains.annotations.NotNull; + +// DOCSTART SummingOperation +public final class SummingOperation implements FlowAsyncOperation { + private final int a; + private final int b; + + @NotNull + @Override + public CordaFuture execute() { + return CordaFutureImplKt.doneFuture(this.a + this.b); + } + + public final int getA() { + return this.a; + } + + public final int getB() { + return this.b; + } + + public SummingOperation(int a, int b) { + this.a = a; + this.b = b; + } +} +// DOCEND SummingOperation diff --git a/docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/SummingOperationThrowing.java b/docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/SummingOperationThrowing.java new file mode 100644 index 0000000000..1a759074b0 --- /dev/null +++ b/docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/SummingOperationThrowing.java @@ -0,0 +1,31 @@ +package net.corda.docs.java.tutorial.flowstatemachines; + +import net.corda.core.concurrent.CordaFuture; +import net.corda.core.internal.FlowAsyncOperation; +import org.jetbrains.annotations.NotNull; + +// DOCSTART SummingOperationThrowing +public final class SummingOperationThrowing implements FlowAsyncOperation { + private final int a; + private final int b; + + @NotNull + @Override + public CordaFuture execute() { + throw new IllegalStateException("You shouldn't be calling me"); + } + + public final int getA() { + return this.a; + } + + public final int getB() { + return this.b; + } + + public SummingOperationThrowing(int a, int b) { + this.a = a; + this.b = b; + } +} +// DOCEND SummingOperationThrowing diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt new file mode 100644 index 0000000000..8619361485 --- /dev/null +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt @@ -0,0 +1,38 @@ +package net.corda.docs.tutorial.flowstatemachines + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.concurrent.CordaFuture +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.internal.FlowAsyncOperation +import net.corda.core.internal.concurrent.doneFuture +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.executeAsync + +// DOCSTART SummingOperation +class SummingOperation(val a: Int, val b: Int) : FlowAsyncOperation { + override fun execute(): CordaFuture { + return doneFuture(a + b) + } +} +// DOCEND SummingOperation + +// DOCSTART SummingOperationThrowing +class SummingOperationThrowing(val a: Int, val b: Int) : FlowAsyncOperation { + override fun execute(): CordaFuture { + throw IllegalStateException("You shouldn't be calling me") + } +} +// DOCEND SummingOperationThrowing + +// DOCSTART ExampleSummingFlow +@StartableByRPC +class ExampleSummingFlow : FlowLogic() { + @Suspendable + override fun call(): Int { + val answer = executeAsync(SummingOperation(1, 2)) + return answer // hopefully 3 + } +} +// DOCEND ExampleSummingFlow +