From ac4907a4297f2862143386105d82a8340f4daf5a Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Tue, 14 Jul 2020 08:04:52 +0100 Subject: [PATCH] CORDA-3721 Finishing + retrying a flow cancels its future (#6454) Cancel the future being run by a flow when finishing or retrying it. The cancellation of the future no longer cares about what type of future it is. `StateMachineState` has the `future` field, which holds the 3 (currently) possible types of futures: - sleep - wait for ledger commit - async operation / external operation Move the starting of all futures triggered by actions into `ActionFutureExecutor`. --- .../node/services/statemachine/Action.kt | 8 +- .../statemachine/ActionExecutorImpl.kt | 43 ++------- .../statemachine/ActionFutureExecutor.kt | 96 +++++++++++++++++++ .../statemachine/FlowSleepScheduler.kt | 78 --------------- .../SingleThreadedStateMachineManager.kt | 32 ++++--- .../statemachine/StateMachineManager.kt | 2 - .../transitions/StartedFlowTransition.kt | 7 +- 7 files changed, 134 insertions(+), 132 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/statemachine/ActionFutureExecutor.kt delete mode 100644 node/src/main/kotlin/net/corda/node/services/statemachine/FlowSleepScheduler.kt diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt index 51aadc69cf..6b17fe0870 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt @@ -17,7 +17,7 @@ sealed class Action { /** * Track a transaction hash and notify the state machine once the corresponding transaction has committed. */ - data class TrackTransaction(val hash: SecureHash) : Action() + data class TrackTransaction(val hash: SecureHash, val currentState: StateMachineState) : Action() /** * Send an initial session message to [destination]. @@ -140,7 +140,11 @@ sealed class Action { /** * Execute the specified [operation]. */ - data class ExecuteAsyncOperation(val deduplicationId: String, val operation: FlowAsyncOperation<*>) : Action() + data class ExecuteAsyncOperation( + val deduplicationId: String, + val operation: FlowAsyncOperation<*>, + val currentState: StateMachineState + ) : Action() /** * Release soft locks associated with given ID (currently the flow ID). diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 260ea86cac..2849dc03a1 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -3,7 +3,6 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import com.codahale.metrics.Gauge import com.codahale.metrics.Reservoir -import net.corda.core.internal.concurrent.thenMatch import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationContext import net.corda.core.serialization.internal.checkpointSerialize @@ -15,17 +14,17 @@ import net.corda.nodeapi.internal.persistence.contextDatabase import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import java.sql.SQLException -import java.time.Duration /** * This is the bottom execution engine of flow side-effects. */ internal class ActionExecutorImpl( - private val services: ServiceHubInternal, - private val checkpointStorage: CheckpointStorage, - private val flowMessaging: FlowMessaging, - private val stateMachineManager: StateMachineManagerInternal, - private val checkpointSerializationContext: CheckpointSerializationContext + private val services: ServiceHubInternal, + private val checkpointStorage: CheckpointStorage, + private val flowMessaging: FlowMessaging, + private val stateMachineManager: StateMachineManagerInternal, + private val actionFutureExecutor: ActionFutureExecutor, + private val checkpointSerializationContext: CheckpointSerializationContext ) : ActionExecutor { private companion object { @@ -74,16 +73,8 @@ internal class ActionExecutorImpl( if (action.uuid != null) services.vaultService.softLockRelease(action.uuid) } - @Suspendable private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) { - services.validatedTransactions.trackTransactionWithNoWarning(action.hash).thenMatch( - success = { transaction -> - fiber.scheduleEvent(Event.TransactionCommitted(transaction)) - }, - failure = { exception -> - fiber.scheduleEvent(Event.Error(exception)) - } - ) + actionFutureExecutor.awaitTransaction(fiber, action) } @Suspendable @@ -157,13 +148,8 @@ internal class ActionExecutorImpl( fiber.scheduleEvent(action.event) } - @Suspendable private fun executeSleepUntil(fiber: FlowFiber, action: Action.SleepUntil) { - stateMachineManager.scheduleFlowSleep( - fiber, - action.currentState, - Duration.between(services.clock.instant(), action.time) - ) + actionFutureExecutor.sleep(fiber, action) } @Suspendable @@ -236,19 +222,10 @@ internal class ActionExecutorImpl( } } - @Suppress("TooGenericExceptionCaught") // this is fully intentional here, see comment in the catch clause - @Suspendable + @Suppress("TooGenericExceptionCaught") private fun executeAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) { try { - val operationFuture = action.operation.execute(action.deduplicationId) - operationFuture.thenMatch( - success = { result -> - fiber.scheduleEvent(Event.AsyncOperationCompletion(result)) - }, - failure = { exception -> - fiber.scheduleEvent(Event.AsyncOperationThrows(exception)) - } - ) + actionFutureExecutor.awaitAsyncOperation(fiber, action) } catch (e: Exception) { // Catch and wrap any unexpected exceptions from the async operation // Wrapping the exception allows it to be better handled by the flow hospital diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionFutureExecutor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionFutureExecutor.kt new file mode 100644 index 0000000000..dc5d2fc0b9 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionFutureExecutor.kt @@ -0,0 +1,96 @@ +package net.corda.node.services.statemachine + +import net.corda.core.internal.concurrent.thenMatch +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.node.services.api.ServiceHubInternal +import java.time.Duration +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit + +internal class ActionFutureExecutor( + private val innerState: StateMachineInnerState, + private val services: ServiceHubInternal, + private val scheduledExecutor: ScheduledExecutorService +) { + + private companion object { + val log = contextLogger() + } + + /** + * Put a flow to sleep for the duration specified in [action]. + * + * @param fiber The [FlowFiber] that will be woken up after sleeping + * @param action The [Action.SleepUntil] to create a future from + */ + fun sleep(fiber: FlowFiber, action: Action.SleepUntil) { + cancelFutureIfRunning(fiber, action.currentState) + val instance = fiber.instanceId + val duration = Duration.between(services.clock.instant(), action.time) + log.debug { "Putting flow ${instance.runId} to sleep for $duration" } + val future = scheduledExecutor.schedule( + { + log.debug { "Scheduling flow wake up event for flow ${instance.runId}" } + scheduleWakeUpEvent(instance, Event.WakeUpFromSleep) + }, + duration.toMillis(), TimeUnit.MILLISECONDS + ) + action.currentState.future = future + } + + /** + * Suspend a flow until its async operation specified in [action] is completed. + * + * @param fiber The [FlowFiber] to resume after completing the async operation + * @param action The [Action.ExecuteAsyncOperation] to create a future from + */ + fun awaitAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) { + cancelFutureIfRunning(fiber, action.currentState) + val instance = fiber.instanceId + log.debug { "Suspending flow ${instance.runId} until its async operation has completed" } + val future = action.operation.execute(action.deduplicationId) + future.thenMatch( + success = { result -> scheduleWakeUpEvent(instance, Event.AsyncOperationCompletion(result)) }, + failure = { exception -> scheduleWakeUpEvent(instance, Event.AsyncOperationThrows(exception)) } + ) + action.currentState.future = future + } + + /** + * Suspend a flow until the transaction specified in [action] is committed. + * + * @param fiber The [FlowFiber] to resume after the committing the specified transaction + * @param action [Action.TrackTransaction] contains the transaction hash to wait for + */ + fun awaitTransaction(fiber: FlowFiber, action: Action.TrackTransaction) { + cancelFutureIfRunning(fiber, action.currentState) + val instance = fiber.instanceId + log.debug { "Suspending flow ${instance.runId} until transaction ${action.hash} is committed" } + val future = services.validatedTransactions.trackTransactionWithNoWarning(action.hash) + future.thenMatch( + success = { transaction -> scheduleWakeUpEvent(instance, Event.TransactionCommitted(transaction)) }, + failure = { exception -> scheduleWakeUpEvent(instance, Event.Error(exception)) } + ) + action.currentState.future = future + } + + private fun cancelFutureIfRunning(fiber: FlowFiber, currentState: StateMachineState) { + // No other future should be running, cancel it if there is + currentState.future?.run { + log.debug { "Cancelling existing future for flow ${fiber.id}" } + if (!isDone) cancel(true) + } + } + + private fun scheduleWakeUpEvent(instance: StateMachineInstanceId, event: Event) { + innerState.withLock { + flows[instance.runId]?.let { flow -> + // Only schedule a wake up event if the fiber the flow is executing on has not changed + if (flow.fiber.instanceId == instance) { + flow.fiber.scheduleEvent(event) + } + } + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSleepScheduler.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSleepScheduler.kt deleted file mode 100644 index 63fcd5c6e8..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSleepScheduler.kt +++ /dev/null @@ -1,78 +0,0 @@ -package net.corda.node.services.statemachine - -import net.corda.core.internal.FlowIORequest -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.debug -import java.time.Duration -import java.util.concurrent.Future -import java.util.concurrent.ScheduledExecutorService -import java.util.concurrent.ScheduledFuture -import java.util.concurrent.TimeUnit - -internal class FlowSleepScheduler(private val innerState: StateMachineInnerState, private val scheduledExecutor: ScheduledExecutorService) { - - private companion object { - val log = contextLogger() - } - - /** - * Put a flow to sleep for a specified duration. - * - * @param fiber The [FlowFiber] that will be woken up after sleeping - * @param currentState The current [StateMachineState] - * @param duration How long to sleep for - */ - fun sleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration) { - // No other future should be running, cancel it if there is - currentState.future?.run { - log.debug { "Cancelling the existing future for flow ${fiber.id}" } - cancelIfRunning() - } - currentState.future = setAlarmClock(fiber, duration) - } - - /** - * Cancel a sleeping flow's future. Note, this does not cause the flow to wake up. - * - * @param currentState The current [StateMachineState] - */ - fun cancel(currentState: StateMachineState) { - (currentState.checkpoint.flowState as? FlowState.Started)?.let { flowState -> - if (currentState.isWaitingForFuture && flowState.flowIORequest is FlowIORequest.Sleep) { - (currentState.future as? ScheduledFuture)?.run { - log.debug { "Cancelling the sleep scheduled future for flow ${currentState.flowLogic.runId}" } - cancelIfRunning() - currentState.future = null - } - } - - } - } - - private fun Future<*>.cancelIfRunning() { - if (!isDone) cancel(true) - } - - private fun setAlarmClock(fiber: FlowFiber, duration: Duration): ScheduledFuture { - val instance = fiber.instanceId - log.debug { "Putting flow ${instance.runId} to sleep for $duration" } - return scheduledExecutor.schedule( - { - log.debug { "Scheduling flow wake up event for flow ${instance.runId}" } - scheduleWakeUp(instance) - }, - duration.toMillis(), TimeUnit.MILLISECONDS - ) - } - - private fun scheduleWakeUp(instance: StateMachineInstanceId) { - innerState.withLock { - flows[instance.runId]?.let { flow -> - // Only schedule a wake up event if the fiber the flow is executing on has not changed - if (flow.fiber.instanceId == instance) { - flow.fiber.scheduleEvent(Event.WakeUpFromSleep) - } - } - } - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 2914aecd5d..1d07a75d02 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -47,9 +47,8 @@ import net.corda.serialization.internal.withTokenContext import org.apache.activemq.artemis.utils.ReusableLatch import rx.Observable import java.security.SecureRandom -import java.time.Duration +import java.util.ArrayList import java.util.HashSet -import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -88,7 +87,7 @@ internal class SingleThreadedStateMachineManager( private val metrics = serviceHub.monitoringService.metrics private val sessionToFlow = ConcurrentHashMap() private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub) - private val flowSleepScheduler = FlowSleepScheduler(innerState, scheduledFutureExecutor) + private val actionFutureExecutor = ActionFutureExecutor(innerState, serviceHub, scheduledFutureExecutor) private val flowTimeoutScheduler = FlowTimeoutScheduler(innerState, scheduledFutureExecutor, serviceHub) private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null private val ourSenderUUID = serviceHub.networkService.ourSenderUUID @@ -316,7 +315,7 @@ internal class SingleThreadedStateMachineManager( override fun removeFlow(flowId: StateMachineRunId, removalReason: FlowRemovalReason, lastState: StateMachineState) { innerState.withLock { flowTimeoutScheduler.cancel(flowId) - flowSleepScheduler.cancel(lastState) + lastState.cancelFutureIfRunning() val flow = flows.remove(flowId) if (flow != null) { decrementLiveFibers() @@ -384,7 +383,7 @@ internal class SingleThreadedStateMachineManager( @Suppress("TooGenericExceptionCaught", "ComplexMethod", "MaxLineLength") // this is fully intentional here, see comment in the catch clause override fun retryFlowFromSafePoint(currentState: StateMachineState) { - flowSleepScheduler.cancel(currentState) + currentState.cancelFutureIfRunning() // Get set of external events val flowId = currentState.flowLogic.runId val oldFlowLeftOver = innerState.withLock { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue @@ -634,10 +633,6 @@ internal class SingleThreadedStateMachineManager( flowTimeoutScheduler.cancel(flowId) } - override fun scheduleFlowSleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration) { - flowSleepScheduler.sleep(fiber, currentState, duration) - } - private fun tryDeserializeCheckpoint(serializedCheckpoint: Checkpoint.Serialized, flowId: StateMachineRunId): Checkpoint? { return try { serializedCheckpoint.deserialize(checkpointSerializationContext!!) @@ -695,11 +690,12 @@ internal class SingleThreadedStateMachineManager( private fun makeActionExecutor(checkpointSerializationContext: CheckpointSerializationContext): ActionExecutor { return ActionExecutorImpl( - serviceHub, - checkpointStorage, - flowMessaging, - this, - checkpointSerializationContext + serviceHub, + checkpointStorage, + flowMessaging, + this, + actionFutureExecutor, + checkpointSerializationContext ) } @@ -781,4 +777,12 @@ internal class SingleThreadedStateMachineManager( } } } + + private fun StateMachineState.cancelFutureIfRunning() { + future?.run { + logger.debug { "Cancelling future for flow ${flowLogic.runId}" } + if (!isDone) cancel(true) + future = null + } + } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 66a5a60797..6c5050962b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -10,7 +10,6 @@ import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.ReceivedMessage import rx.Observable -import java.time.Duration /** * A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects. @@ -110,7 +109,6 @@ internal interface StateMachineManagerInternal { fun retryFlowFromSafePoint(currentState: StateMachineState) fun scheduleFlowTimeout(flowId: StateMachineRunId) fun cancelFlowTimeout(flowId: StateMachineRunId) - fun scheduleFlowSleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration) } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 904ab3f06a..96b6557829 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -105,11 +105,12 @@ class StartedFlowTransition( // This ensures that the [WaitForLedgerCommit] request is not executed multiple times if extra // [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up return if (!startingState.isWaitingForFuture) { + val state = startingState.copy(isWaitingForFuture = true) TransitionResult( - newState = startingState.copy(isWaitingForFuture = true), + newState = state, actions = listOf( Action.CreateTransaction, - Action.TrackTransaction(flowIORequest.hash), + Action.TrackTransaction(flowIORequest.hash, state), Action.CommitTransaction ) ) @@ -432,8 +433,8 @@ class StartedFlowTransition( // The `numberOfSuspends` is added to the deduplication ID in case an async // operation is executed multiple times within the same flow. val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.checkpointState.numberOfSuspends.toString() - actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation)) currentState = currentState.copy(isWaitingForFuture = true) + actions += Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation, currentState) FlowContinuation.ProcessEvents } } else {