CORDA-1610: Retain progress tracker during flow retry.

Make sure the same progress tracker object is re-used in the restarted flow so subscribers can keep receiving progress updates.
This commit is contained in:
Andrius Dagys 2018-06-20 16:00:21 +01:00
parent 3af5412d40
commit 227ca3b65b
3 changed files with 44 additions and 1 deletions

View File

@ -19,6 +19,7 @@ import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.CordaService import net.corda.core.node.services.CordaService
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.internal.StartedNode import net.corda.node.internal.StartedNode
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
@ -43,6 +44,8 @@ import org.junit.Test
import org.slf4j.MDC import org.slf4j.MDC
import java.security.PublicKey import java.security.PublicKey
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
class TimedFlowTests { class TimedFlowTests {
companion object { companion object {
@ -131,8 +134,15 @@ class TimedFlowTests {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
} }
val flow = NotaryFlow.Client(issueTx) val flow = NotaryFlow.Client(issueTx)
val progressTracker = flow.progressTracker
assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep)
val notarySignatures = services.startFlow(flow).resultFuture.get() val notarySignatures = services.startFlow(flow).resultFuture.get()
(issueTx + notarySignatures).verifyRequiredSignatures() (issueTx + notarySignatures).verifyRequiredSignatures()
assertEquals(
ProgressTracker.DONE,
progressTracker.currentStep,
"Ensure the same progress tracker object is re-used after flow restart"
)
} }
} }
@ -144,8 +154,15 @@ class TimedFlowTests {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
} }
val flow = FinalityFlow(issueTx) val flow = FinalityFlow(issueTx)
val progressTracker = flow.progressTracker
val stx = services.startFlow(flow).resultFuture.get() val stx = services.startFlow(flow).resultFuture.get()
stx.verifyRequiredSignatures() stx.verifyRequiredSignatures()
assertEquals(
ProgressTracker.DONE,
progressTracker.currentStep,
"Ensure the same progress tracker object is re-used after flow restart"
)
} }
} }

View File

@ -33,6 +33,7 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.creat
import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.injectOldProgressTracker
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import net.corda.serialization.internal.SerializeAsTokenContextImpl import net.corda.serialization.internal.SerializeAsTokenContextImpl
@ -361,7 +362,10 @@ class SingleThreadedStateMachineManager(
for (sessionId in getFlowSessionIds(currentState.checkpoint)) { for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId) sessionToFlow.remove(sessionId)
} }
if (flow != null) addAndStartFlow(flowId, flow) if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance. // Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>() val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do { do {

View File

@ -0,0 +1,22 @@
package net.corda.node.utilities
import net.corda.core.flows.FlowLogic
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.statemachine.StateMachineManagerInternal
/**
* The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that
* any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress
* updates are correctly sent out after the flow is retried.
*/
fun StateMachineManagerInternal.injectOldProgressTracker(oldProgressTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
if (oldProgressTracker != null) {
try {
val field = newFlowLogic::class.java.getDeclaredField("progressTracker")
field.isAccessible = true
field.set(newFlowLogic, oldProgressTracker)
} catch (e: NoSuchFieldException) {
// The flow does not use a progress tracker.
}
}
}