ENT-5684 Reconnect flow's progress tracker when unpausing (#6640)

Previously we were just throwing this away when pausing, meaning
updates would not be passed back to the user.

The progress tracker is now maintained in the `NonResidentFlow`
allowing it to be reused in the flow when it is retried.
This commit is contained in:
William Vigor 2020-08-14 21:11:48 +01:00 committed by GitHub
parent 32cb085a53
commit be6b76ff89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 74 deletions

View File

@ -468,4 +468,4 @@ class GetCheckpointNumberOfStatusFlow(private val flowStatus: Checkpoint.FlowSta
}
}
}
}
}

View File

@ -15,6 +15,7 @@ import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
@ -23,6 +24,8 @@ import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.isEnabledTimedFlow
import net.corda.nodeapi.internal.persistence.CordaPersistence
import org.apache.activemq.artemis.utils.ReusableLatch
import org.apache.commons.lang3.reflect.FieldUtils
import java.lang.reflect.Field
import java.security.SecureRandom
import java.util.concurrent.Semaphore
@ -33,7 +36,8 @@ data class NonResidentFlow(
var checkpoint: Checkpoint,
val resultFuture: OpenFuture<Any?> = openFuture(),
val resumable: Boolean = true,
val hospitalized: Boolean = false
val hospitalized: Boolean = false,
val progressTracker: ProgressTracker? = null
) {
val events = mutableListOf<ExternalEvent>()
@ -42,6 +46,7 @@ data class NonResidentFlow(
}
}
@Suppress("TooManyFunctions")
class FlowCreator(
private val checkpointSerializationContext: CheckpointSerializationContext,
private val checkpointStorage: CheckpointStorage,
@ -71,7 +76,12 @@ class FlowCreator(
}
else -> nonResidentFlow.checkpoint
}
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, resultFuture = nonResidentFlow.resultFuture)
return createFlowFromCheckpoint(
nonResidentFlow.runId,
checkpoint,
resultFuture = nonResidentFlow.resultFuture,
progressTracker = nonResidentFlow.progressTracker
)
}
@Suppress("LongParameterList")
@ -81,7 +91,8 @@ class FlowCreator(
reloadCheckpointAfterSuspendCount: Int? = null,
lock: Semaphore = Semaphore(1),
resultFuture: OpenFuture<Any?> = openFuture(),
firstRestore: Boolean = true
firstRestore: Boolean = true,
progressTracker: ProgressTracker? = null
): Flow<*>? {
val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore)
var checkpoint = oldCheckpoint
@ -105,6 +116,7 @@ class FlowCreator(
?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null,
lock = lock
)
injectOldProgressTracker(progressTracker, fiber.logic)
return Flow(fiber, resultFuture)
}
@ -250,4 +262,62 @@ class FlowCreator(
lock = lock
)
}
/**
* The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that
* any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress
* updates are correctly sent out after the flow is retried.
*
* If the new tracker contains any child trackers from sub-flows, we need to attach those to the old tracker as well.
*/
private fun injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
if (oldTracker != null) {
val newTracker = newFlowLogic.progressTracker
if (newTracker != null) {
attachNewChildren(oldTracker, newTracker)
replaceTracker(newFlowLogic, oldTracker)
}
}
}
private fun attachNewChildren(oldTracker: ProgressTracker, newTracker: ProgressTracker) {
oldTracker.currentStep = newTracker.currentStep
oldTracker.steps.forEachIndexed { index, step ->
val newStep = newTracker.steps[index]
val newChildTracker = newTracker.getChildProgressTracker(newStep)
newChildTracker?.let { child ->
oldTracker.setChildProgressTracker(step, child)
}
}
resubscribeToChildren(oldTracker)
}
/**
* Re-subscribes to child tracker observables. When a nested progress tracker is deserialized from a checkpoint,
* it retains the child links, but does not automatically re-subscribe to the child changes.
*/
private fun resubscribeToChildren(tracker: ProgressTracker) {
tracker.steps.forEach {
val childTracker = tracker.getChildProgressTracker(it)
if (childTracker != null) {
tracker.setChildProgressTracker(it, childTracker)
resubscribeToChildren(childTracker)
}
}
}
/** Replaces the deserialized [ProgressTracker] in the [newFlowLogic] with the old one to retain old subscribers. */
private fun replaceTracker(newFlowLogic: FlowLogic<*>, oldProgressTracker: ProgressTracker?) {
val field = getProgressTrackerField(newFlowLogic)
field?.apply {
isAccessible = true
set(newFlowLogic, oldProgressTracker)
}
}
private fun getProgressTrackerField(newFlowLogic: FlowLogic<*>): Field? {
// The progress tracker field may have been overridden in an abstract superclass, so we have to traverse up
// the hierarchy.
return FieldUtils.getAllFieldsList(newFlowLogic::class.java).find { it.name == "progressTracker" }
}
}

View File

@ -41,7 +41,6 @@ import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInter
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.injectOldProgressTracker
import net.corda.node.utilities.isEnabledTimedFlow
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
@ -517,7 +516,8 @@ internal class SingleThreadedStateMachineManager(
checkpointLoadingStatus.checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
currentState.lock,
firstRestore = false
firstRestore = false,
progressTracker = currentState.flowLogic.progressTracker
) ?: return
}
checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> {
@ -540,7 +540,6 @@ internal class SingleThreadedStateMachineManager(
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
extractAndScheduleEventsForRetry(oldFlowLeftOver, currentState)
@ -812,7 +811,11 @@ internal class SingleThreadedStateMachineManager(
decrementLiveFibers()
//Setting flowState = FlowState.Paused means we don't hold the frozen fiber in memory.
val checkpoint = currentState.checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED, flowState = FlowState.Paused)
val pausedFlow = NonResidentFlow(id, checkpoint, flow.resultFuture)
val pausedFlow = NonResidentFlow(
id,
checkpoint, flow.resultFuture,
progressTracker = currentState.flowLogic.progressTracker
)
val eventQueue = flow.fiber.transientValues.eventQueue
extractAndQueueExternalEventsForPausedFlow(eventQueue, currentState.pendingDeduplicationHandlers, pausedFlow)
pausedFlows.put(id, pausedFlow)

View File

@ -1,66 +0,0 @@
package net.corda.node.utilities
import net.corda.core.flows.FlowLogic
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.statemachine.StateMachineManagerInternal
import org.apache.commons.lang3.reflect.FieldUtils
import java.lang.reflect.Field
/**
* The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that
* any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress
* updates are correctly sent out after the flow is retried.
*
* If the new tracker contains any child trackers from sub-flows, we need to attach those to the old tracker as well.
*/
//TODO: instead of replacing the progress tracker after constructing the flow logic, we should inject it during fiber deserialization
internal fun StateMachineManagerInternal.injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
if (oldTracker != null) {
val newTracker = newFlowLogic.progressTracker
if (newTracker != null) {
attachNewChildren(oldTracker, newTracker)
replaceTracker(newFlowLogic, oldTracker)
}
}
}
private fun attachNewChildren(oldTracker: ProgressTracker, newTracker: ProgressTracker) {
oldTracker.currentStep = newTracker.currentStep
oldTracker.steps.forEachIndexed { index, step ->
val newStep = newTracker.steps[index]
val newChildTracker = newTracker.getChildProgressTracker(newStep)
newChildTracker?.let { child ->
oldTracker.setChildProgressTracker(step, child)
}
}
resubscribeToChildren(oldTracker)
}
/**
* Re-subscribes to child tracker observables. When a nested progress tracker is deserialized from a checkpoint,
* it retains the child links, but does not automatically re-subscribe to the child changes.
*/
private fun resubscribeToChildren(tracker: ProgressTracker) {
tracker.steps.forEach {
val childTracker = tracker.getChildProgressTracker(it)
if (childTracker != null) {
tracker.setChildProgressTracker(it, childTracker)
resubscribeToChildren(childTracker)
}
}
}
/** Replaces the deserialized [ProgressTracker] in the [newFlowLogic] with the old one to retain old subscribers. */
private fun replaceTracker(newFlowLogic: FlowLogic<*>, oldProgressTracker: ProgressTracker?) {
val field = getProgressTrackerField(newFlowLogic)
field?.apply {
isAccessible = true
set(newFlowLogic, oldProgressTracker)
}
}
private fun getProgressTrackerField(newFlowLogic: FlowLogic<*>): Field? {
// The progress tracker field may have been overridden in an abstract superclass, so we have to traverse up
// the hierarchy.
return FieldUtils.getAllFieldsList(newFlowLogic::class.java).find { it.name == "progressTracker" }
}