From 227ca3b65bcf737f9b52d135e2df47923e4b1ea8 Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Wed, 20 Jun 2018 16:00:21 +0100 Subject: [PATCH] CORDA-1610: Retain progress tracker during flow retry. Make sure the same progress tracker object is re-used in the restarted flow so subscribers can keep receiving progress updates. --- .../net/corda/node/services/TimedFlowTests.kt | 17 ++++++++++++++ .../SingleThreadedStateMachineManager.kt | 6 ++++- .../utilities/StateMachineManagerUtils.kt | 22 +++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt diff --git a/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt index bc6bc655e7..9a81913f41 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt @@ -19,6 +19,7 @@ import net.corda.core.node.NotaryInfo import net.corda.core.node.services.CordaService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.seconds import net.corda.node.internal.StartedNode import net.corda.node.services.config.NodeConfiguration @@ -43,6 +44,8 @@ import org.junit.Test import org.slf4j.MDC import java.security.PublicKey import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.assertEquals +import kotlin.test.assertNotEquals class TimedFlowTests { companion object { @@ -131,8 +134,15 @@ class TimedFlowTests { addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) } val flow = NotaryFlow.Client(issueTx) + val progressTracker = flow.progressTracker + assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep) val notarySignatures = services.startFlow(flow).resultFuture.get() (issueTx + notarySignatures).verifyRequiredSignatures() + assertEquals( + ProgressTracker.DONE, + progressTracker.currentStep, + "Ensure the same progress tracker object is re-used after flow restart" + ) } } @@ -144,8 +154,15 @@ class TimedFlowTests { addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) } val flow = FinalityFlow(issueTx) + val progressTracker = flow.progressTracker + val stx = services.startFlow(flow).resultFuture.get() stx.verifyRequiredSignatures() + assertEquals( + ProgressTracker.DONE, + progressTracker.currentStep, + "Ensure the same progress tracker object is re-used after flow restart" + ) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 77b10dd575..51ad2460c4 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -33,6 +33,7 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.creat import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor +import net.corda.node.utilities.injectOldProgressTracker import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import net.corda.serialization.internal.SerializeAsTokenContextImpl @@ -361,7 +362,10 @@ class SingleThreadedStateMachineManager( for (sessionId in getFlowSessionIds(currentState.checkpoint)) { sessionToFlow.remove(sessionId) } - if (flow != null) addAndStartFlow(flowId, flow) + if (flow != null) { + injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic) + addAndStartFlow(flowId, flow) + } // Deliver all the external events from the old flow instance. val unprocessedExternalEvents = mutableListOf() do { diff --git a/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt b/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt new file mode 100644 index 0000000000..f8b4294a8a --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt @@ -0,0 +1,22 @@ +package net.corda.node.utilities + +import net.corda.core.flows.FlowLogic +import net.corda.core.utilities.ProgressTracker +import net.corda.node.services.statemachine.StateMachineManagerInternal + +/** + * The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that + * any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress + * updates are correctly sent out after the flow is retried. + */ +fun StateMachineManagerInternal.injectOldProgressTracker(oldProgressTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) { + if (oldProgressTracker != null) { + try { + val field = newFlowLogic::class.java.getDeclaredField("progressTracker") + field.isAccessible = true + field.set(newFlowLogic, oldProgressTracker) + } catch (e: NoSuchFieldException) { + // The flow does not use a progress tracker. + } + } +} \ No newline at end of file