From be6b76ff89bb4512667c519d6996b114d7bb4247 Mon Sep 17 00:00:00 2001 From: William Vigor <58432369+williamvigorr3@users.noreply.github.com> Date: Fri, 14 Aug 2020 21:11:48 +0100 Subject: [PATCH] ENT-5684 Reconnect flow's progress tracker when unpausing (#6640) Previously we were just throwing this away when pausing, meaning updates would not be passed back to the user. The progress tracker is now maintained in the `NonResidentFlow` allowing it to be reused in the flow when it is retried. --- .../net/corda/node/flows/FlowRetryTest.kt | 2 +- .../node/services/statemachine/FlowCreator.kt | 76 ++++++++++++++++++- .../SingleThreadedStateMachineManager.kt | 11 ++- .../utilities/StateMachineManagerUtils.kt | 66 ---------------- 4 files changed, 81 insertions(+), 74 deletions(-) delete mode 100644 node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt index 499dcfd232..eff577eba5 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt @@ -468,4 +468,4 @@ class GetCheckpointNumberOfStatusFlow(private val flowStatus: Checkpoint.FlowSta } } } -} \ No newline at end of file +} diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt index bdc750b7e3..83d8795c31 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt @@ -15,6 +15,7 @@ import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationContext import net.corda.core.serialization.internal.checkpointDeserialize import net.corda.core.serialization.internal.checkpointSerialize +import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.contextLogger import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal @@ -23,6 +24,8 @@ import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.isEnabledTimedFlow import net.corda.nodeapi.internal.persistence.CordaPersistence import org.apache.activemq.artemis.utils.ReusableLatch +import org.apache.commons.lang3.reflect.FieldUtils +import java.lang.reflect.Field import java.security.SecureRandom import java.util.concurrent.Semaphore @@ -33,7 +36,8 @@ data class NonResidentFlow( var checkpoint: Checkpoint, val resultFuture: OpenFuture = openFuture(), val resumable: Boolean = true, - val hospitalized: Boolean = false + val hospitalized: Boolean = false, + val progressTracker: ProgressTracker? = null ) { val events = mutableListOf() @@ -42,6 +46,7 @@ data class NonResidentFlow( } } +@Suppress("TooManyFunctions") class FlowCreator( private val checkpointSerializationContext: CheckpointSerializationContext, private val checkpointStorage: CheckpointStorage, @@ -71,7 +76,12 @@ class FlowCreator( } else -> nonResidentFlow.checkpoint } - return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, resultFuture = nonResidentFlow.resultFuture) + return createFlowFromCheckpoint( + nonResidentFlow.runId, + checkpoint, + resultFuture = nonResidentFlow.resultFuture, + progressTracker = nonResidentFlow.progressTracker + ) } @Suppress("LongParameterList") @@ -81,7 +91,8 @@ class FlowCreator( reloadCheckpointAfterSuspendCount: Int? = null, lock: Semaphore = Semaphore(1), resultFuture: OpenFuture = openFuture(), - firstRestore: Boolean = true + firstRestore: Boolean = true, + progressTracker: ProgressTracker? = null ): Flow<*>? { val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore) var checkpoint = oldCheckpoint @@ -105,6 +116,7 @@ class FlowCreator( ?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null, lock = lock ) + injectOldProgressTracker(progressTracker, fiber.logic) return Flow(fiber, resultFuture) } @@ -250,4 +262,62 @@ class FlowCreator( lock = lock ) } + + /** + * The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that + * any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress + * updates are correctly sent out after the flow is retried. + * + * If the new tracker contains any child trackers from sub-flows, we need to attach those to the old tracker as well. + */ + private fun injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) { + if (oldTracker != null) { + val newTracker = newFlowLogic.progressTracker + if (newTracker != null) { + attachNewChildren(oldTracker, newTracker) + replaceTracker(newFlowLogic, oldTracker) + } + } + } + + private fun attachNewChildren(oldTracker: ProgressTracker, newTracker: ProgressTracker) { + oldTracker.currentStep = newTracker.currentStep + oldTracker.steps.forEachIndexed { index, step -> + val newStep = newTracker.steps[index] + val newChildTracker = newTracker.getChildProgressTracker(newStep) + newChildTracker?.let { child -> + oldTracker.setChildProgressTracker(step, child) + } + } + resubscribeToChildren(oldTracker) + } + + /** + * Re-subscribes to child tracker observables. When a nested progress tracker is deserialized from a checkpoint, + * it retains the child links, but does not automatically re-subscribe to the child changes. + */ + private fun resubscribeToChildren(tracker: ProgressTracker) { + tracker.steps.forEach { + val childTracker = tracker.getChildProgressTracker(it) + if (childTracker != null) { + tracker.setChildProgressTracker(it, childTracker) + resubscribeToChildren(childTracker) + } + } + } + + /** Replaces the deserialized [ProgressTracker] in the [newFlowLogic] with the old one to retain old subscribers. */ + private fun replaceTracker(newFlowLogic: FlowLogic<*>, oldProgressTracker: ProgressTracker?) { + val field = getProgressTrackerField(newFlowLogic) + field?.apply { + isAccessible = true + set(newFlowLogic, oldProgressTracker) + } + } + + private fun getProgressTrackerField(newFlowLogic: FlowLogic<*>): Field? { + // The progress tracker field may have been overridden in an abstract superclass, so we have to traverse up + // the hierarchy. + return FieldUtils.getAllFieldsList(newFlowLogic::class.java).find { it.name == "progressTracker" } + } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 52639cf12c..9ce5a4980f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -41,7 +41,6 @@ import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInter import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor import net.corda.node.services.statemachine.interceptors.PrintingInterceptor import net.corda.node.utilities.AffinityExecutor -import net.corda.node.utilities.injectOldProgressTracker import net.corda.node.utilities.isEnabledTimedFlow import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction @@ -517,7 +516,8 @@ internal class SingleThreadedStateMachineManager( checkpointLoadingStatus.checkpoint, currentState.reloadCheckpointAfterSuspendCount, currentState.lock, - firstRestore = false + firstRestore = false, + progressTracker = currentState.flowLogic.progressTracker ) ?: return } checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> { @@ -540,7 +540,6 @@ internal class SingleThreadedStateMachineManager( sessionToFlow.remove(sessionId) } if (flow != null) { - injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic) addAndStartFlow(flowId, flow) } extractAndScheduleEventsForRetry(oldFlowLeftOver, currentState) @@ -812,7 +811,11 @@ internal class SingleThreadedStateMachineManager( decrementLiveFibers() //Setting flowState = FlowState.Paused means we don't hold the frozen fiber in memory. val checkpoint = currentState.checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED, flowState = FlowState.Paused) - val pausedFlow = NonResidentFlow(id, checkpoint, flow.resultFuture) + val pausedFlow = NonResidentFlow( + id, + checkpoint, flow.resultFuture, + progressTracker = currentState.flowLogic.progressTracker + ) val eventQueue = flow.fiber.transientValues.eventQueue extractAndQueueExternalEventsForPausedFlow(eventQueue, currentState.pendingDeduplicationHandlers, pausedFlow) pausedFlows.put(id, pausedFlow) diff --git a/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt b/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt deleted file mode 100644 index 99aeebc6d8..0000000000 --- a/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt +++ /dev/null @@ -1,66 +0,0 @@ -package net.corda.node.utilities - -import net.corda.core.flows.FlowLogic -import net.corda.core.utilities.ProgressTracker -import net.corda.node.services.statemachine.StateMachineManagerInternal -import org.apache.commons.lang3.reflect.FieldUtils -import java.lang.reflect.Field - -/** - * The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that - * any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress - * updates are correctly sent out after the flow is retried. - * - * If the new tracker contains any child trackers from sub-flows, we need to attach those to the old tracker as well. - */ -//TODO: instead of replacing the progress tracker after constructing the flow logic, we should inject it during fiber deserialization -internal fun StateMachineManagerInternal.injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) { - if (oldTracker != null) { - val newTracker = newFlowLogic.progressTracker - if (newTracker != null) { - attachNewChildren(oldTracker, newTracker) - replaceTracker(newFlowLogic, oldTracker) - } - } -} - -private fun attachNewChildren(oldTracker: ProgressTracker, newTracker: ProgressTracker) { - oldTracker.currentStep = newTracker.currentStep - oldTracker.steps.forEachIndexed { index, step -> - val newStep = newTracker.steps[index] - val newChildTracker = newTracker.getChildProgressTracker(newStep) - newChildTracker?.let { child -> - oldTracker.setChildProgressTracker(step, child) - } - } - resubscribeToChildren(oldTracker) -} - -/** - * Re-subscribes to child tracker observables. When a nested progress tracker is deserialized from a checkpoint, - * it retains the child links, but does not automatically re-subscribe to the child changes. - */ -private fun resubscribeToChildren(tracker: ProgressTracker) { - tracker.steps.forEach { - val childTracker = tracker.getChildProgressTracker(it) - if (childTracker != null) { - tracker.setChildProgressTracker(it, childTracker) - resubscribeToChildren(childTracker) - } - } -} - -/** Replaces the deserialized [ProgressTracker] in the [newFlowLogic] with the old one to retain old subscribers. */ -private fun replaceTracker(newFlowLogic: FlowLogic<*>, oldProgressTracker: ProgressTracker?) { - val field = getProgressTrackerField(newFlowLogic) - field?.apply { - isAccessible = true - set(newFlowLogic, oldProgressTracker) - } -} - -private fun getProgressTrackerField(newFlowLogic: FlowLogic<*>): Field? { - // The progress tracker field may have been overridden in an abstract superclass, so we have to traverse up - // the hierarchy. - return FieldUtils.getAllFieldsList(newFlowLogic::class.java).find { it.name == "progressTracker" } -}