diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index f2d22a00e9..18af90b922 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -308,8 +308,6 @@ abstract class FlowLogic { logger.debug { "Calling subflow: $subLogic" } val result = stateMachine.subFlow(subLogic) logger.debug { "Subflow finished with result ${result.toString().abbreviate(300)}" } - // It's easy to forget this when writing flows so we just step it to the DONE state when it completes. - subLogic.progressTracker?.currentStep = ProgressTracker.DONE return result } diff --git a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt index 7741500c31..7660866f00 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt @@ -5,6 +5,7 @@ import net.corda.core.serialization.CordaSerializable import rx.Observable import rx.Subscription import rx.subjects.PublishSubject +import rx.subjects.ReplaySubject import java.util.* /** @@ -86,7 +87,7 @@ class ProgressTracker(vararg inputSteps: Step) { // This field won't be serialized. private val _changes by transient { PublishSubject.create() } private val _stepsTreeChanges by transient { PublishSubject.create>>() } - private val _stepsTreeIndexChanges by transient { PublishSubject.create() } + private val _stepsTreeIndexChanges by transient { ReplaySubject.create() } var currentStep: Step get() = steps[stepIndex] @@ -147,8 +148,10 @@ class ProgressTracker(vararg inputSteps: Step) { /** The zero-bases index of the current step in a [allStepsLabels] list */ var stepsTreeIndex: Int = -1 private set(value) { - field = value - _stepsTreeIndexChanges.onNext(value) + if (value != field) { + field = value + _stepsTreeIndexChanges.onNext(value) + } } /** @@ -234,7 +237,7 @@ class ProgressTracker(vararg inputSteps: Step) { val result = ArrayList>() for (step in steps) { if (step == UNSTARTED) continue - if (level > 0 && step == DONE) continue + if (level > 0 && (step == DONE || step == STARTING)) continue result += Pair(level, step) getChildProgressTracker(step)?.let { result += it._allSteps(level + 1) } } diff --git a/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt b/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt index e476df1a8b..0014cabbcd 100644 --- a/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt +++ b/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt @@ -123,13 +123,13 @@ class ProgressTrackerTest { assertCurrentStepsTree(2, SimpleSteps.TWO) pt2.currentStep = ChildSteps.BEE - assertCurrentStepsTree(5, ChildSteps.BEE) + assertCurrentStepsTree(4, ChildSteps.BEE) pt.currentStep = SimpleSteps.THREE - assertCurrentStepsTree(7, SimpleSteps.THREE) + assertCurrentStepsTree(6, SimpleSteps.THREE) // Assert no structure changes and proper steps propagation. - assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 2, 5, 7)) + assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 2, 4, 6)) assertThat(stepsTreeNotification).isEmpty() } @@ -161,10 +161,10 @@ class ProgressTrackerTest { assertCurrentStepsTree(4, SimpleSteps.FOUR) pt2.currentStep = ChildSteps.SEA - assertCurrentStepsTree(8, ChildSteps.SEA) + assertCurrentStepsTree(7, ChildSteps.SEA) // Assert no structure changes and proper steps propagation. - assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 4, 8)) + assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 4, 7)) assertThat(stepsTreeNotification).isEmpty() } @@ -180,7 +180,6 @@ class ProgressTrackerTest { // Put current state as a first change for simplicity when asserting. val stepsTreeNotification = mutableListOf(pt.allStepsLabels) - println(pt.allStepsLabels) pt.stepsTreeChanges.subscribe { stepsTreeNotification += it } @@ -194,15 +193,15 @@ class ProgressTrackerTest { assertCurrentStepsTree(2, SimpleSteps.TWO) pt.currentStep = SimpleSteps.FOUR - assertCurrentStepsTree(8, SimpleSteps.FOUR) + assertCurrentStepsTree(7, SimpleSteps.FOUR) pt.setChildProgressTracker(SimpleSteps.THREE, pt3) - assertCurrentStepsTree(12, SimpleSteps.FOUR) + assertCurrentStepsTree(10, SimpleSteps.FOUR) // Assert no structure changes and proper steps propagation. - assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(2, 8, 12)) + assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(2, 7, 10)) assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state } @@ -230,14 +229,14 @@ class ProgressTrackerTest { pt.currentStep = SimpleSteps.TWO pt2.currentStep = ChildSteps.SEA pt3.currentStep = BabySteps.UNOS - assertCurrentStepsTree(6, ChildSteps.SEA) + assertCurrentStepsTree(5, ChildSteps.SEA) pt.setChildProgressTracker(SimpleSteps.TWO, pt3) - assertCurrentStepsTree(4, BabySteps.UNOS) + assertCurrentStepsTree(3, BabySteps.UNOS) // Assert no structure changes and proper steps propagation. - assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(2, 6, 4)) + assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(2, 5, 3)) assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state. } @@ -248,4 +247,21 @@ class ProgressTrackerTest { pt.currentStep = SimpleSteps.ONE assertEquals(SimpleSteps.TWO, pt.nextStep()) } + + @Test + fun `all index changes seen if subscribed mid flow`() { + pt.setChildProgressTracker(SimpleSteps.TWO, pt2) + + pt.currentStep = SimpleSteps.ONE + pt.currentStep = SimpleSteps.TWO + + val stepsIndexNotifications = LinkedList() + pt.stepsTreeIndexChanges.subscribe() { + stepsIndexNotifications += it + } + + pt2.currentStep = ChildSteps.AYY + + assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 2, 3)) + } }