CORDA-3721 Finishing + retrying a flow cancels its future (#6454)

Cancel the future being run by a flow when finishing or retrying it. The
cancellation of the future no longer cares about what type of future it
is.

`StateMachineState` has the `future` field, which holds the 3
(currently) possible types of futures:
- sleep
- wait for ledger commit
- async operation / external operation

Move the starting of all futures triggered by actions into
`ActionFutureExecutor`.
This commit is contained in:
Dan Newton 2020-07-14 08:04:52 +01:00 committed by GitHub
parent 79b75ff1ec
commit ac4907a429
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 134 additions and 132 deletions

View File

@ -17,7 +17,7 @@ sealed class Action {
/**
* Track a transaction hash and notify the state machine once the corresponding transaction has committed.
*/
data class TrackTransaction(val hash: SecureHash) : Action()
data class TrackTransaction(val hash: SecureHash, val currentState: StateMachineState) : Action()
/**
* Send an initial session message to [destination].
@ -140,7 +140,11 @@ sealed class Action {
/**
* Execute the specified [operation].
*/
data class ExecuteAsyncOperation(val deduplicationId: String, val operation: FlowAsyncOperation<*>) : Action()
data class ExecuteAsyncOperation(
val deduplicationId: String,
val operation: FlowAsyncOperation<*>,
val currentState: StateMachineState
) : Action()
/**
* Release soft locks associated with given ID (currently the flow ID).

View File

@ -3,7 +3,6 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.Gauge
import com.codahale.metrics.Reservoir
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.checkpointSerialize
@ -15,17 +14,17 @@ import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.sql.SQLException
import java.time.Duration
/**
* This is the bottom execution engine of flow side-effects.
*/
internal class ActionExecutorImpl(
private val services: ServiceHubInternal,
private val checkpointStorage: CheckpointStorage,
private val flowMessaging: FlowMessaging,
private val stateMachineManager: StateMachineManagerInternal,
private val checkpointSerializationContext: CheckpointSerializationContext
private val services: ServiceHubInternal,
private val checkpointStorage: CheckpointStorage,
private val flowMessaging: FlowMessaging,
private val stateMachineManager: StateMachineManagerInternal,
private val actionFutureExecutor: ActionFutureExecutor,
private val checkpointSerializationContext: CheckpointSerializationContext
) : ActionExecutor {
private companion object {
@ -74,16 +73,8 @@ internal class ActionExecutorImpl(
if (action.uuid != null) services.vaultService.softLockRelease(action.uuid)
}
@Suspendable
private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) {
services.validatedTransactions.trackTransactionWithNoWarning(action.hash).thenMatch(
success = { transaction ->
fiber.scheduleEvent(Event.TransactionCommitted(transaction))
},
failure = { exception ->
fiber.scheduleEvent(Event.Error(exception))
}
)
actionFutureExecutor.awaitTransaction(fiber, action)
}
@Suspendable
@ -157,13 +148,8 @@ internal class ActionExecutorImpl(
fiber.scheduleEvent(action.event)
}
@Suspendable
private fun executeSleepUntil(fiber: FlowFiber, action: Action.SleepUntil) {
stateMachineManager.scheduleFlowSleep(
fiber,
action.currentState,
Duration.between(services.clock.instant(), action.time)
)
actionFutureExecutor.sleep(fiber, action)
}
@Suspendable
@ -236,19 +222,10 @@ internal class ActionExecutorImpl(
}
}
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, see comment in the catch clause
@Suspendable
@Suppress("TooGenericExceptionCaught")
private fun executeAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) {
try {
val operationFuture = action.operation.execute(action.deduplicationId)
operationFuture.thenMatch(
success = { result ->
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
},
failure = { exception ->
fiber.scheduleEvent(Event.AsyncOperationThrows(exception))
}
)
actionFutureExecutor.awaitAsyncOperation(fiber, action)
} catch (e: Exception) {
// Catch and wrap any unexpected exceptions from the async operation
// Wrapping the exception allows it to be better handled by the flow hospital

View File

@ -0,0 +1,96 @@
package net.corda.node.services.statemachine
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.api.ServiceHubInternal
import java.time.Duration
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
internal class ActionFutureExecutor(
private val innerState: StateMachineInnerState,
private val services: ServiceHubInternal,
private val scheduledExecutor: ScheduledExecutorService
) {
private companion object {
val log = contextLogger()
}
/**
* Put a flow to sleep for the duration specified in [action].
*
* @param fiber The [FlowFiber] that will be woken up after sleeping
* @param action The [Action.SleepUntil] to create a future from
*/
fun sleep(fiber: FlowFiber, action: Action.SleepUntil) {
cancelFutureIfRunning(fiber, action.currentState)
val instance = fiber.instanceId
val duration = Duration.between(services.clock.instant(), action.time)
log.debug { "Putting flow ${instance.runId} to sleep for $duration" }
val future = scheduledExecutor.schedule<Unit>(
{
log.debug { "Scheduling flow wake up event for flow ${instance.runId}" }
scheduleWakeUpEvent(instance, Event.WakeUpFromSleep)
},
duration.toMillis(), TimeUnit.MILLISECONDS
)
action.currentState.future = future
}
/**
* Suspend a flow until its async operation specified in [action] is completed.
*
* @param fiber The [FlowFiber] to resume after completing the async operation
* @param action The [Action.ExecuteAsyncOperation] to create a future from
*/
fun awaitAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) {
cancelFutureIfRunning(fiber, action.currentState)
val instance = fiber.instanceId
log.debug { "Suspending flow ${instance.runId} until its async operation has completed" }
val future = action.operation.execute(action.deduplicationId)
future.thenMatch(
success = { result -> scheduleWakeUpEvent(instance, Event.AsyncOperationCompletion(result)) },
failure = { exception -> scheduleWakeUpEvent(instance, Event.AsyncOperationThrows(exception)) }
)
action.currentState.future = future
}
/**
* Suspend a flow until the transaction specified in [action] is committed.
*
* @param fiber The [FlowFiber] to resume after the committing the specified transaction
* @param action [Action.TrackTransaction] contains the transaction hash to wait for
*/
fun awaitTransaction(fiber: FlowFiber, action: Action.TrackTransaction) {
cancelFutureIfRunning(fiber, action.currentState)
val instance = fiber.instanceId
log.debug { "Suspending flow ${instance.runId} until transaction ${action.hash} is committed" }
val future = services.validatedTransactions.trackTransactionWithNoWarning(action.hash)
future.thenMatch(
success = { transaction -> scheduleWakeUpEvent(instance, Event.TransactionCommitted(transaction)) },
failure = { exception -> scheduleWakeUpEvent(instance, Event.Error(exception)) }
)
action.currentState.future = future
}
private fun cancelFutureIfRunning(fiber: FlowFiber, currentState: StateMachineState) {
// No other future should be running, cancel it if there is
currentState.future?.run {
log.debug { "Cancelling existing future for flow ${fiber.id}" }
if (!isDone) cancel(true)
}
}
private fun scheduleWakeUpEvent(instance: StateMachineInstanceId, event: Event) {
innerState.withLock {
flows[instance.runId]?.let { flow ->
// Only schedule a wake up event if the fiber the flow is executing on has not changed
if (flow.fiber.instanceId == instance) {
flow.fiber.scheduleEvent(event)
}
}
}
}
}

View File

@ -1,78 +0,0 @@
package net.corda.node.services.statemachine
import net.corda.core.internal.FlowIORequest
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import java.time.Duration
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
internal class FlowSleepScheduler(private val innerState: StateMachineInnerState, private val scheduledExecutor: ScheduledExecutorService) {
private companion object {
val log = contextLogger()
}
/**
* Put a flow to sleep for a specified duration.
*
* @param fiber The [FlowFiber] that will be woken up after sleeping
* @param currentState The current [StateMachineState]
* @param duration How long to sleep for
*/
fun sleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration) {
// No other future should be running, cancel it if there is
currentState.future?.run {
log.debug { "Cancelling the existing future for flow ${fiber.id}" }
cancelIfRunning()
}
currentState.future = setAlarmClock(fiber, duration)
}
/**
* Cancel a sleeping flow's future. Note, this does not cause the flow to wake up.
*
* @param currentState The current [StateMachineState]
*/
fun cancel(currentState: StateMachineState) {
(currentState.checkpoint.flowState as? FlowState.Started)?.let { flowState ->
if (currentState.isWaitingForFuture && flowState.flowIORequest is FlowIORequest.Sleep) {
(currentState.future as? ScheduledFuture)?.run {
log.debug { "Cancelling the sleep scheduled future for flow ${currentState.flowLogic.runId}" }
cancelIfRunning()
currentState.future = null
}
}
}
}
private fun Future<*>.cancelIfRunning() {
if (!isDone) cancel(true)
}
private fun setAlarmClock(fiber: FlowFiber, duration: Duration): ScheduledFuture<Unit> {
val instance = fiber.instanceId
log.debug { "Putting flow ${instance.runId} to sleep for $duration" }
return scheduledExecutor.schedule<Unit>(
{
log.debug { "Scheduling flow wake up event for flow ${instance.runId}" }
scheduleWakeUp(instance)
},
duration.toMillis(), TimeUnit.MILLISECONDS
)
}
private fun scheduleWakeUp(instance: StateMachineInstanceId) {
innerState.withLock {
flows[instance.runId]?.let { flow ->
// Only schedule a wake up event if the fiber the flow is executing on has not changed
if (flow.fiber.instanceId == instance) {
flow.fiber.scheduleEvent(Event.WakeUpFromSleep)
}
}
}
}
}

View File

@ -47,9 +47,8 @@ import net.corda.serialization.internal.withTokenContext
import org.apache.activemq.artemis.utils.ReusableLatch
import rx.Observable
import java.security.SecureRandom
import java.time.Duration
import java.util.ArrayList
import java.util.HashSet
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
@ -88,7 +87,7 @@ internal class SingleThreadedStateMachineManager(
private val metrics = serviceHub.monitoringService.metrics
private val sessionToFlow = ConcurrentHashMap<SessionId, StateMachineRunId>()
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
private val flowSleepScheduler = FlowSleepScheduler(innerState, scheduledFutureExecutor)
private val actionFutureExecutor = ActionFutureExecutor(innerState, serviceHub, scheduledFutureExecutor)
private val flowTimeoutScheduler = FlowTimeoutScheduler(innerState, scheduledFutureExecutor, serviceHub)
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
@ -316,7 +315,7 @@ internal class SingleThreadedStateMachineManager(
override fun removeFlow(flowId: StateMachineRunId, removalReason: FlowRemovalReason, lastState: StateMachineState) {
innerState.withLock {
flowTimeoutScheduler.cancel(flowId)
flowSleepScheduler.cancel(lastState)
lastState.cancelFutureIfRunning()
val flow = flows.remove(flowId)
if (flow != null) {
decrementLiveFibers()
@ -384,7 +383,7 @@ internal class SingleThreadedStateMachineManager(
@Suppress("TooGenericExceptionCaught", "ComplexMethod", "MaxLineLength") // this is fully intentional here, see comment in the catch clause
override fun retryFlowFromSafePoint(currentState: StateMachineState) {
flowSleepScheduler.cancel(currentState)
currentState.cancelFutureIfRunning()
// Get set of external events
val flowId = currentState.flowLogic.runId
val oldFlowLeftOver = innerState.withLock { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
@ -634,10 +633,6 @@ internal class SingleThreadedStateMachineManager(
flowTimeoutScheduler.cancel(flowId)
}
override fun scheduleFlowSleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration) {
flowSleepScheduler.sleep(fiber, currentState, duration)
}
private fun tryDeserializeCheckpoint(serializedCheckpoint: Checkpoint.Serialized, flowId: StateMachineRunId): Checkpoint? {
return try {
serializedCheckpoint.deserialize(checkpointSerializationContext!!)
@ -695,11 +690,12 @@ internal class SingleThreadedStateMachineManager(
private fun makeActionExecutor(checkpointSerializationContext: CheckpointSerializationContext): ActionExecutor {
return ActionExecutorImpl(
serviceHub,
checkpointStorage,
flowMessaging,
this,
checkpointSerializationContext
serviceHub,
checkpointStorage,
flowMessaging,
this,
actionFutureExecutor,
checkpointSerializationContext
)
}
@ -781,4 +777,12 @@ internal class SingleThreadedStateMachineManager(
}
}
}
private fun StateMachineState.cancelFutureIfRunning() {
future?.run {
logger.debug { "Cancelling future for flow ${flowLogic.runId}" }
if (!isDone) cancel(true)
future = null
}
}
}

View File

@ -10,7 +10,6 @@ import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage
import rx.Observable
import java.time.Duration
/**
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
@ -110,7 +109,6 @@ internal interface StateMachineManagerInternal {
fun retryFlowFromSafePoint(currentState: StateMachineState)
fun scheduleFlowTimeout(flowId: StateMachineRunId)
fun cancelFlowTimeout(flowId: StateMachineRunId)
fun scheduleFlowSleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration)
}
/**

View File

@ -105,11 +105,12 @@ class StartedFlowTransition(
// This ensures that the [WaitForLedgerCommit] request is not executed multiple times if extra
// [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up
return if (!startingState.isWaitingForFuture) {
val state = startingState.copy(isWaitingForFuture = true)
TransitionResult(
newState = startingState.copy(isWaitingForFuture = true),
newState = state,
actions = listOf(
Action.CreateTransaction,
Action.TrackTransaction(flowIORequest.hash),
Action.TrackTransaction(flowIORequest.hash, state),
Action.CommitTransaction
)
)
@ -432,8 +433,8 @@ class StartedFlowTransition(
// The `numberOfSuspends` is added to the deduplication ID in case an async
// operation is executed multiple times within the same flow.
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.checkpointState.numberOfSuspends.toString()
actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation))
currentState = currentState.copy(isWaitingForFuture = true)
actions += Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation, currentState)
FlowContinuation.ProcessEvents
}
} else {