CORDA-3669 Do not execute ExecuteAsyncOperation multiple times (#6087)

* CORDA-3669 Do not execute `ExecuteAsyncOperation` multiple times

When a `FlowExternalOperation` or `FlowExternalAsyncOperation` executes
and completes a flag (`isFlowResumed`) is switched to true.

This flag was used inside of `DoRemainingWorkTransition` to decide
whether to skip over the execution of an event.

Since this flag was being switched to true when the external operation's
 future completed, it was possible for _unexpected_ events to be placed
in the fiber's queue that would retrigger the
`FlowIORequest.ExecuteAsyncOperation`, that is held as the checkpoint's
next `FlowIORequest`to process.

By using the existing `StateMachineState.isTransactionTracked` (and
renaming it to `isWaitingForFuture`) we can decide to not process the
`FlowIORequest.ExecuteAsyncOperation` if it has already been called
before. This moves this code path in line with
`FlowIORequest.WaitForLedgerCommit`.

Random `DoRemainingWork` events can now be pushed to the fiber's queue
without causing the `FlowIORequest.ExecuteAsyncOperation` to execute
again.
This commit is contained in:
Dan Newton 2020-03-20 19:02:34 +00:00 committed by GitHub
parent 861b769499
commit 668748b054
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 78 additions and 38 deletions

View File

@ -20,6 +20,7 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.statemachine.StaffedFlowHospital
import java.sql.SQLTransientConnectionException
import java.util.concurrent.CompletableFuture
@ -44,11 +45,12 @@ abstract class AbstractFlowExternalOperationTest {
@Suspendable
override fun call(): Any {
log.info("Started my flow")
subFlow(PingPongFlow(party))
val result = testCode()
val session = initiateFlow(party)
session.send("hi there")
log.info("ServiceHub value = $serviceHub")
session.receive<String>()
session.sendAndReceive<String>("hi there").unwrap { it }
session.sendAndReceive<String>("hi there").unwrap { it }
subFlow(PingPongFlow(party))
log.info("Finished my flow")
return result
}
@ -64,8 +66,28 @@ abstract class AbstractFlowExternalOperationTest {
class FlowWithExternalOperationResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>()
session.receive<String>().unwrap { it }
session.send("go away")
session.receive<String>().unwrap { it }
session.send("go away")
}
}
@InitiatingFlow
class PingPongFlow(val party: Party): FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(party)
session.sendAndReceive<String>("ping pong").unwrap { it }
}
}
@InitiatedBy(PingPongFlow::class)
class PingPongResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
session.send("I got you bro")
}
}
@ -83,7 +105,7 @@ abstract class AbstractFlowExternalOperationTest {
fun createFuture(): CompletableFuture<Any> {
return CompletableFuture.supplyAsync(Supplier<Any> {
log.info("Starting sleep inside of future")
Thread.sleep(2000)
Thread.sleep(1000)
log.info("Finished sleep inside of future")
"Here is your return value"
}, executorService)

View File

@ -619,7 +619,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint,
pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isTransactionTracked = false,
isWaitingForFuture = false,
isAnyCheckpointPersisted = existingCheckpoint != null,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
@ -775,7 +775,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isTransactionTracked = false,
isWaitingForFuture = false,
isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
@ -799,7 +799,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint.copy(),
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isTransactionTracked = false,
isWaitingForFuture = false,
isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent,
isRemoved = false,

View File

@ -21,7 +21,7 @@ import java.time.Instant
* @param pendingDeduplicationHandlers the list of incomplete deduplication handlers.
* @param isFlowResumed true if the control is returned (or being returned) to "user-space" flow code. This is used
* to make [Event.DoRemainingWork] idempotent.
* @param isTransactionTracked true if a ledger transaction has been tracked as part of a
* @param isWaitingForFuture true if the flow is waiting for the completion of a future triggered by one of the statemachine's actions
* [FlowIORequest.WaitForLedgerCommit]. This used is to make tracking idempotent.
* @param isAnyCheckpointPersisted true if at least a single checkpoint has been persisted. This is used to determine
* whether we should DELETE the checkpoint at the end of the flow.
@ -34,15 +34,15 @@ import java.time.Instant
// TODO perhaps add a read-only environment to the state machine for things that don't change over time?
// TODO evaluate persistent datastructure libraries to replace the inefficient copying we currently do.
data class StateMachineState(
val checkpoint: Checkpoint,
val flowLogic: FlowLogic<*>,
val pendingDeduplicationHandlers: List<DeduplicationHandler>,
val isFlowResumed: Boolean,
val isTransactionTracked: Boolean,
val isAnyCheckpointPersisted: Boolean,
val isStartIdempotent: Boolean,
val isRemoved: Boolean,
val senderUUID: String?
val checkpoint: Checkpoint,
val flowLogic: FlowLogic<*>,
val pendingDeduplicationHandlers: List<DeduplicationHandler>,
val isFlowResumed: Boolean,
val isWaitingForFuture: Boolean,
val isAnyCheckpointPersisted: Boolean,
val isStartIdempotent: Boolean,
val isRemoved: Boolean,
val senderUUID: String?
)
/**

View File

@ -94,14 +94,16 @@ class StartedFlowTransition(
}
private fun waitForLedgerCommitTransition(flowIORequest: FlowIORequest.WaitForLedgerCommit): TransitionResult {
return if (!startingState.isTransactionTracked) {
// This ensures that the [WaitForLedgerCommit] request is not executed multiple times if extra
// [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up
return if (!startingState.isWaitingForFuture) {
TransitionResult(
newState = startingState.copy(isTransactionTracked = true),
actions = listOf(
Action.CreateTransaction,
Action.TrackTransaction(flowIORequest.hash),
Action.CommitTransaction
)
newState = startingState.copy(isWaitingForFuture = true),
actions = listOf(
Action.CreateTransaction,
Action.TrackTransaction(flowIORequest.hash),
Action.CommitTransaction
)
)
} else {
TransitionResult(startingState)
@ -410,12 +412,19 @@ class StartedFlowTransition(
}
private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult {
return builder {
// The `numberOfSuspends` is added to the deduplication ID in case an async
// operation is executed multiple times within the same flow.
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString()
actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation))
FlowContinuation.ProcessEvents
// This ensures that the [ExecuteAsyncOperation] request is not executed multiple times if extra
// [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up
return if (!startingState.isWaitingForFuture) {
builder {
// The `numberOfSuspends` is added to the deduplication ID in case an async
// operation is executed multiple times within the same flow.
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString()
actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation))
currentState = currentState.copy(isWaitingForFuture = true)
FlowContinuation.ProcessEvents
}
} else {
TransitionResult(startingState)
}
}

View File

@ -1,5 +1,6 @@
package net.corda.node.services.statemachine.transitions
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.InitiatingFlow
import net.corda.core.internal.FlowIORequest
import net.corda.core.utilities.Try
@ -60,11 +61,8 @@ class TopLevelTransition(
private fun transactionCommittedTransition(event: Event.TransactionCommitted): TransitionResult {
return builder {
val checkpoint = currentState.checkpoint
if (currentState.isTransactionTracked &&
checkpoint.flowState is FlowState.Started &&
checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit &&
checkpoint.flowState.flowIORequest.hash == event.transaction.id) {
currentState = currentState.copy(isTransactionTracked = false)
if (isWaitingForLedgerCommit(currentState, checkpoint, event.transaction.id)) {
currentState = currentState.copy(isWaitingForFuture = false)
if (isErrored()) {
return@builder FlowContinuation.ProcessEvents
}
@ -76,6 +74,17 @@ class TopLevelTransition(
}
}
private fun isWaitingForLedgerCommit(
currentState: StateMachineState,
checkpoint: Checkpoint,
transactionId: SecureHash
): Boolean {
return currentState.isWaitingForFuture &&
checkpoint.flowState is FlowState.Started &&
checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit &&
checkpoint.flowState.flowIORequest.hash == transactionId
}
private fun softShutdownTransition(): TransitionResult {
val lastState = startingState.copy(isRemoved = true)
return TransitionResult(

View File

@ -68,13 +68,13 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
fun resumeFlowLogic(result: Any?): FlowContinuation {
actions.add(Action.CreateTransaction)
currentState = currentState.copy(isFlowResumed = true)
currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false)
return FlowContinuation.Resume(result)
}
fun resumeFlowLogic(result: Throwable): FlowContinuation {
actions.add(Action.CreateTransaction)
currentState = currentState.copy(isFlowResumed = true)
currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false)
return FlowContinuation.Throw(result)
}
}