From 668748b054316e2dfe950d0c614e0a61c8009cf5 Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Fri, 20 Mar 2020 19:02:34 +0000 Subject: [PATCH] CORDA-3669 Do not execute `ExecuteAsyncOperation` multiple times (#6087) * CORDA-3669 Do not execute `ExecuteAsyncOperation` multiple times When a `FlowExternalOperation` or `FlowExternalAsyncOperation` executes and completes a flag (`isFlowResumed`) is switched to true. This flag was used inside of `DoRemainingWorkTransition` to decide whether to skip over the execution of an event. Since this flag was being switched to true when the external operation's future completed, it was possible for _unexpected_ events to be placed in the fiber's queue that would retrigger the `FlowIORequest.ExecuteAsyncOperation`, that is held as the checkpoint's next `FlowIORequest`to process. By using the existing `StateMachineState.isTransactionTracked` (and renaming it to `isWaitingForFuture`) we can decide to not process the `FlowIORequest.ExecuteAsyncOperation` if it has already been called before. This moves this code path in line with `FlowIORequest.WaitForLedgerCommit`. Random `DoRemainingWork` events can now be pushed to the fiber's queue without causing the `FlowIORequest.ExecuteAsyncOperation` to execute again. --- .../AbstractFlowExternalOperationTest.kt | 32 ++++++++++++++--- .../SingleThreadedStateMachineManager.kt | 6 ++-- .../statemachine/StateMachineState.kt | 20 +++++------ .../transitions/StartedFlowTransition.kt | 35 ++++++++++++------- .../transitions/TopLevelTransition.kt | 19 +++++++--- .../transitions/TransitionBuilder.kt | 4 +-- 6 files changed, 78 insertions(+), 38 deletions(-) diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt index 52d963678d..9965c874e6 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/AbstractFlowExternalOperationTest.kt @@ -20,6 +20,7 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap import net.corda.node.services.statemachine.StaffedFlowHospital import java.sql.SQLTransientConnectionException import java.util.concurrent.CompletableFuture @@ -44,11 +45,12 @@ abstract class AbstractFlowExternalOperationTest { @Suspendable override fun call(): Any { log.info("Started my flow") + subFlow(PingPongFlow(party)) val result = testCode() val session = initiateFlow(party) - session.send("hi there") - log.info("ServiceHub value = $serviceHub") - session.receive() + session.sendAndReceive("hi there").unwrap { it } + session.sendAndReceive("hi there").unwrap { it } + subFlow(PingPongFlow(party)) log.info("Finished my flow") return result } @@ -64,8 +66,28 @@ abstract class AbstractFlowExternalOperationTest { class FlowWithExternalOperationResponder(val session: FlowSession) : FlowLogic() { @Suspendable override fun call() { - session.receive() + session.receive().unwrap { it } session.send("go away") + session.receive().unwrap { it } + session.send("go away") + } + } + + @InitiatingFlow + class PingPongFlow(val party: Party): FlowLogic() { + @Suspendable + override fun call() { + val session = initiateFlow(party) + session.sendAndReceive("ping pong").unwrap { it } + } + } + + @InitiatedBy(PingPongFlow::class) + class PingPongResponder(val session: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + session.receive().unwrap { it } + session.send("I got you bro") } } @@ -83,7 +105,7 @@ abstract class AbstractFlowExternalOperationTest { fun createFuture(): CompletableFuture { return CompletableFuture.supplyAsync(Supplier { log.info("Starting sleep inside of future") - Thread.sleep(2000) + Thread.sleep(1000) log.info("Finished sleep inside of future") "Here is your return value" }, executorService) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 34c9b77582..cf6b707bd6 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -619,7 +619,7 @@ class SingleThreadedStateMachineManager( checkpoint = checkpoint, pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, - isTransactionTracked = false, + isWaitingForFuture = false, isAnyCheckpointPersisted = existingCheckpoint != null, isStartIdempotent = isStartIdempotent, isRemoved = false, @@ -775,7 +775,7 @@ class SingleThreadedStateMachineManager( checkpoint = checkpoint, pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, - isTransactionTracked = false, + isWaitingForFuture = false, isAnyCheckpointPersisted = isAnyCheckpointPersisted, isStartIdempotent = isStartIdempotent, isRemoved = false, @@ -799,7 +799,7 @@ class SingleThreadedStateMachineManager( checkpoint = checkpoint.copy(), pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, - isTransactionTracked = false, + isWaitingForFuture = false, isAnyCheckpointPersisted = isAnyCheckpointPersisted, isStartIdempotent = isStartIdempotent, isRemoved = false, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 77e1153181..89cecb11c9 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -21,7 +21,7 @@ import java.time.Instant * @param pendingDeduplicationHandlers the list of incomplete deduplication handlers. * @param isFlowResumed true if the control is returned (or being returned) to "user-space" flow code. This is used * to make [Event.DoRemainingWork] idempotent. - * @param isTransactionTracked true if a ledger transaction has been tracked as part of a + * @param isWaitingForFuture true if the flow is waiting for the completion of a future triggered by one of the statemachine's actions * [FlowIORequest.WaitForLedgerCommit]. This used is to make tracking idempotent. * @param isAnyCheckpointPersisted true if at least a single checkpoint has been persisted. This is used to determine * whether we should DELETE the checkpoint at the end of the flow. @@ -34,15 +34,15 @@ import java.time.Instant // TODO perhaps add a read-only environment to the state machine for things that don't change over time? // TODO evaluate persistent datastructure libraries to replace the inefficient copying we currently do. data class StateMachineState( - val checkpoint: Checkpoint, - val flowLogic: FlowLogic<*>, - val pendingDeduplicationHandlers: List, - val isFlowResumed: Boolean, - val isTransactionTracked: Boolean, - val isAnyCheckpointPersisted: Boolean, - val isStartIdempotent: Boolean, - val isRemoved: Boolean, - val senderUUID: String? + val checkpoint: Checkpoint, + val flowLogic: FlowLogic<*>, + val pendingDeduplicationHandlers: List, + val isFlowResumed: Boolean, + val isWaitingForFuture: Boolean, + val isAnyCheckpointPersisted: Boolean, + val isStartIdempotent: Boolean, + val isRemoved: Boolean, + val senderUUID: String? ) /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 3269a87a2f..79cc0696da 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -94,14 +94,16 @@ class StartedFlowTransition( } private fun waitForLedgerCommitTransition(flowIORequest: FlowIORequest.WaitForLedgerCommit): TransitionResult { - return if (!startingState.isTransactionTracked) { + // This ensures that the [WaitForLedgerCommit] request is not executed multiple times if extra + // [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up + return if (!startingState.isWaitingForFuture) { TransitionResult( - newState = startingState.copy(isTransactionTracked = true), - actions = listOf( - Action.CreateTransaction, - Action.TrackTransaction(flowIORequest.hash), - Action.CommitTransaction - ) + newState = startingState.copy(isWaitingForFuture = true), + actions = listOf( + Action.CreateTransaction, + Action.TrackTransaction(flowIORequest.hash), + Action.CommitTransaction + ) ) } else { TransitionResult(startingState) @@ -410,12 +412,19 @@ class StartedFlowTransition( } private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult { - return builder { - // The `numberOfSuspends` is added to the deduplication ID in case an async - // operation is executed multiple times within the same flow. - val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString() - actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation)) - FlowContinuation.ProcessEvents + // This ensures that the [ExecuteAsyncOperation] request is not executed multiple times if extra + // [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up + return if (!startingState.isWaitingForFuture) { + builder { + // The `numberOfSuspends` is added to the deduplication ID in case an async + // operation is executed multiple times within the same flow. + val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString() + actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation)) + currentState = currentState.copy(isWaitingForFuture = true) + FlowContinuation.ProcessEvents + } + } else { + TransitionResult(startingState) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 23b4126a95..5b7b289286 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -1,5 +1,6 @@ package net.corda.node.services.statemachine.transitions +import net.corda.core.crypto.SecureHash import net.corda.core.flows.InitiatingFlow import net.corda.core.internal.FlowIORequest import net.corda.core.utilities.Try @@ -60,11 +61,8 @@ class TopLevelTransition( private fun transactionCommittedTransition(event: Event.TransactionCommitted): TransitionResult { return builder { val checkpoint = currentState.checkpoint - if (currentState.isTransactionTracked && - checkpoint.flowState is FlowState.Started && - checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit && - checkpoint.flowState.flowIORequest.hash == event.transaction.id) { - currentState = currentState.copy(isTransactionTracked = false) + if (isWaitingForLedgerCommit(currentState, checkpoint, event.transaction.id)) { + currentState = currentState.copy(isWaitingForFuture = false) if (isErrored()) { return@builder FlowContinuation.ProcessEvents } @@ -76,6 +74,17 @@ class TopLevelTransition( } } + private fun isWaitingForLedgerCommit( + currentState: StateMachineState, + checkpoint: Checkpoint, + transactionId: SecureHash + ): Boolean { + return currentState.isWaitingForFuture && + checkpoint.flowState is FlowState.Started && + checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit && + checkpoint.flowState.flowIORequest.hash == transactionId + } + private fun softShutdownTransition(): TransitionResult { val lastState = startingState.copy(isRemoved = true) return TransitionResult( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt index bd5b317e32..09336fa96d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionBuilder.kt @@ -68,13 +68,13 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi fun resumeFlowLogic(result: Any?): FlowContinuation { actions.add(Action.CreateTransaction) - currentState = currentState.copy(isFlowResumed = true) + currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false) return FlowContinuation.Resume(result) } fun resumeFlowLogic(result: Throwable): FlowContinuation { actions.add(Action.CreateTransaction) - currentState = currentState.copy(isFlowResumed = true) + currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false) return FlowContinuation.Throw(result) } }