diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt index 735db12c64..559369c442 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt @@ -20,6 +20,7 @@ import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap import net.corda.node.services.statemachine.StaffedFlowHospital import org.junit.Before import java.sql.SQLTransientConnectionException @@ -72,11 +73,12 @@ abstract class AbstractFlowExternalOperationTest { @Suspendable override fun call(): Any { log.info("Started my flow") + subFlow(PingPongFlow(party)) val result = testCode() val session = initiateFlow(party) - session.send("hi there") - log.info("ServiceHub value = $serviceHub") - session.receive() + session.sendAndReceive("hi there").unwrap { it } + session.sendAndReceive("hi there").unwrap { it } + subFlow(PingPongFlow(party)) log.info("Finished my flow") return result } @@ -92,8 +94,28 @@ abstract class AbstractFlowExternalOperationTest { class FlowWithExternalOperationResponder(val session: FlowSession) : FlowLogic() { @Suspendable override fun call() { - session.receive() + session.receive().unwrap { it } session.send("go away") + session.receive().unwrap { it } + session.send("go away") + } + } + + @InitiatingFlow + class PingPongFlow(val party: Party): FlowLogic() { + @Suspendable + override fun call() { + val session = initiateFlow(party) + session.sendAndReceive("ping pong").unwrap { it } + } + } + + @InitiatedBy(PingPongFlow::class) + class PingPongResponder(val session: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + session.receive().unwrap { it } + session.send("I got you bro") } } @@ -111,7 +133,7 @@ abstract class AbstractFlowExternalOperationTest { fun createFuture(): CompletableFuture { return CompletableFuture.supplyAsync(Supplier { log.info("Starting sleep inside of future") - Thread.sleep(2000) + Thread.sleep(1000) log.info("Finished sleep inside of future") "Here is your return value" }, executorService) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 637400b128..13e4bf8ac6 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -624,7 +624,7 @@ class SingleThreadedStateMachineManager( checkpoint = checkpoint, pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, - isTransactionTracked = false, + isWaitingForFuture = false, isAnyCheckpointPersisted = existingCheckpoint != null, isStartIdempotent = isStartIdempotent, isRemoved = false, @@ -780,7 +780,7 @@ class SingleThreadedStateMachineManager( checkpoint = checkpoint, pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, - isTransactionTracked = false, + isWaitingForFuture = false, isAnyCheckpointPersisted = isAnyCheckpointPersisted, isStartIdempotent = isStartIdempotent, isRemoved = false, @@ -804,7 +804,7 @@ class SingleThreadedStateMachineManager( checkpoint = checkpoint.copy(), pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, - isTransactionTracked = false, + isWaitingForFuture = false, isAnyCheckpointPersisted = isAnyCheckpointPersisted, isStartIdempotent = isStartIdempotent, isRemoved = false, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 77e1153181..89cecb11c9 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -21,7 +21,7 @@ import java.time.Instant * @param pendingDeduplicationHandlers the list of incomplete deduplication handlers. * @param isFlowResumed true if the control is returned (or being returned) to "user-space" flow code. This is used * to make [Event.DoRemainingWork] idempotent. - * @param isTransactionTracked true if a ledger transaction has been tracked as part of a + * @param isWaitingForFuture true if the flow is waiting for the completion of a future triggered by one of the statemachine's actions * [FlowIORequest.WaitForLedgerCommit]. This used is to make tracking idempotent. * @param isAnyCheckpointPersisted true if at least a single checkpoint has been persisted. This is used to determine * whether we should DELETE the checkpoint at the end of the flow. @@ -34,15 +34,15 @@ import java.time.Instant // TODO perhaps add a read-only environment to the state machine for things that don't change over time? // TODO evaluate persistent datastructure libraries to replace the inefficient copying we currently do. data class StateMachineState( - val checkpoint: Checkpoint, - val flowLogic: FlowLogic<*>, - val pendingDeduplicationHandlers: List, - val isFlowResumed: Boolean, - val isTransactionTracked: Boolean, - val isAnyCheckpointPersisted: Boolean, - val isStartIdempotent: Boolean, - val isRemoved: Boolean, - val senderUUID: String? + val checkpoint: Checkpoint, + val flowLogic: FlowLogic<*>, + val pendingDeduplicationHandlers: List, + val isFlowResumed: Boolean, + val isWaitingForFuture: Boolean, + val isAnyCheckpointPersisted: Boolean, + val isStartIdempotent: Boolean, + val isRemoved: Boolean, + val senderUUID: String? ) /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 4a10c335bd..3e90040ef0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -95,14 +95,16 @@ class StartedFlowTransition( } private fun waitForLedgerCommitTransition(flowIORequest: FlowIORequest.WaitForLedgerCommit): TransitionResult { - return if (!startingState.isTransactionTracked) { + // This ensures that the [WaitForLedgerCommit] request is not executed multiple times if extra + // [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up + return if (!startingState.isWaitingForFuture) { TransitionResult( - newState = startingState.copy(isTransactionTracked = true), - actions = listOf( - Action.CreateTransaction, - Action.TrackTransaction(flowIORequest.hash), - Action.CommitTransaction - ) + newState = startingState.copy(isWaitingForFuture = true), + actions = listOf( + Action.CreateTransaction, + Action.TrackTransaction(flowIORequest.hash), + Action.CommitTransaction + ) ) } else { TransitionResult(startingState) @@ -416,12 +418,19 @@ class StartedFlowTransition( } private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult { - return builder { - // The `numberOfSuspends` is added to the deduplication ID in case an async - // operation is executed multiple times within the same flow. - val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString() - actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation)) - FlowContinuation.ProcessEvents + // This ensures that the [ExecuteAsyncOperation] request is not executed multiple times if extra + // [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up + return if (!startingState.isWaitingForFuture) { + builder { + // The `numberOfSuspends` is added to the deduplication ID in case an async + // operation is executed multiple times within the same flow. + val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString() + actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation)) + currentState = currentState.copy(isWaitingForFuture = true) + FlowContinuation.ProcessEvents + } + } else { + TransitionResult(startingState) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 23b4126a95..5b7b289286 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -1,5 +1,6 @@ package net.corda.node.services.statemachine.transitions +import net.corda.core.crypto.SecureHash import net.corda.core.flows.InitiatingFlow import net.corda.core.internal.FlowIORequest import net.corda.core.utilities.Try @@ -60,11 +61,8 @@ class TopLevelTransition( private fun transactionCommittedTransition(event: Event.TransactionCommitted): TransitionResult { return builder { val checkpoint = currentState.checkpoint - if (currentState.isTransactionTracked && - checkpoint.flowState is FlowState.Started && - checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit && - checkpoint.flowState.flowIORequest.hash == event.transaction.id) { - currentState = currentState.copy(isTransactionTracked = false) + if (isWaitingForLedgerCommit(currentState, checkpoint, event.transaction.id)) { + currentState = currentState.copy(isWaitingForFuture = false) if (isErrored()) { return@builder FlowContinuation.ProcessEvents } @@ -76,6 +74,17 @@ class TopLevelTransition( } } + private fun isWaitingForLedgerCommit( + currentState: StateMachineState, + checkpoint: Checkpoint, + transactionId: SecureHash + ): Boolean { + return currentState.isWaitingForFuture && + checkpoint.flowState is FlowState.Started && + checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit && + checkpoint.flowState.flowIORequest.hash == transactionId + } + private fun softShutdownTransition(): TransitionResult { val lastState = startingState.copy(isRemoved = true) return TransitionResult( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt index bd5b317e32..09336fa96d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt @@ -68,13 +68,13 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi fun resumeFlowLogic(result: Any?): FlowContinuation { actions.add(Action.CreateTransaction) - currentState = currentState.copy(isFlowResumed = true) + currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false) return FlowContinuation.Resume(result) } fun resumeFlowLogic(result: Throwable): FlowContinuation { actions.add(Action.CreateTransaction) - currentState = currentState.copy(isFlowResumed = true) + currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false) return FlowContinuation.Throw(result) } }