From 76dc981b910acf77c3f122a1db476aa4992c452b Mon Sep 17 00:00:00 2001 From: JamesHR3 <45565019+JamesHR3@users.noreply.github.com> Date: Wed, 13 Mar 2019 16:33:29 +0000 Subject: [PATCH] [CORDA-2737] Buffer events from observables in ProgressTracker until subscribed to (#4882) --- .../corda/core/utilities/ProgressTracker.kt | 5 +-- .../core/utilities/ProgressTrackerTest.kt | 45 ++++++++++++++++--- .../finance/flows/CashIssueAndPaymentFlow.kt | 10 +++-- .../statemachine/FlowFrameworkTests.kt | 2 + .../shell/utlities/ANSIProgressRenderer.kt | 2 - 5 files changed, 49 insertions(+), 15 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt index 554526d71c..b511a26a27 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt @@ -5,7 +5,6 @@ import net.corda.core.internal.STRUCTURAL_STEP_PREFIX import net.corda.core.serialization.CordaSerializable import rx.Observable import rx.Subscription -import rx.subjects.PublishSubject import rx.subjects.ReplaySubject import java.util.* @@ -91,8 +90,8 @@ class ProgressTracker(vararg inputSteps: Step) { private var _allStepsCache: List> = _allSteps() // This field won't be serialized. - private val _changes by transient { PublishSubject.create() } - private val _stepsTreeChanges by transient { PublishSubject.create>>() } + private val _changes by transient { ReplaySubject.create() } + private val _stepsTreeChanges by transient { ReplaySubject.create>>() } private val _stepsTreeIndexChanges by transient { ReplaySubject.create() } var currentStep: Step diff --git a/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt b/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt index 0014cabbcd..70ccd7fddb 100644 --- a/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt +++ b/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt @@ -130,7 +130,7 @@ class ProgressTrackerTest { // Assert no structure changes and proper steps propagation. assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 2, 4, 6)) - assertThat(stepsTreeNotification).isEmpty() + assertThat(stepsTreeNotification).hasSize(1) // One entry per child progress tracker set } @Test @@ -165,7 +165,7 @@ class ProgressTrackerTest { // Assert no structure changes and proper steps propagation. assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 4, 7)) - assertThat(stepsTreeNotification).isEmpty() + assertThat(stepsTreeNotification).hasSize(2) // One entry per child progress tracker set } @Test @@ -179,7 +179,7 @@ class ProgressTrackerTest { } // Put current state as a first change for simplicity when asserting. - val stepsTreeNotification = mutableListOf(pt.allStepsLabels) + val stepsTreeNotification = mutableListOf>>() pt.stepsTreeChanges.subscribe { stepsTreeNotification += it } @@ -202,7 +202,7 @@ class ProgressTrackerTest { // Assert no structure changes and proper steps propagation. assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(2, 7, 10)) - assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state + assertThat(stepsTreeNotification).hasSize(2) // One state per child progress tracker set } @Test @@ -216,7 +216,7 @@ class ProgressTrackerTest { } // Put current state as a first change for simplicity when asserting. - val stepsTreeNotification = mutableListOf(pt.allStepsLabels) + val stepsTreeNotification = mutableListOf>>() pt.stepsTreeChanges.subscribe { stepsTreeNotification += it } @@ -237,7 +237,7 @@ class ProgressTrackerTest { // Assert no structure changes and proper steps propagation. assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(2, 5, 3)) - assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state. + assertThat(stepsTreeNotification).hasSize(2) // One state per child progress tracker set } @Test @@ -264,4 +264,37 @@ class ProgressTrackerTest { assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 2, 3)) } + + @Test + fun `all step changes seen if subscribed mid flow`() { + val steps = mutableListOf() + pt.nextStep() + pt.nextStep() + pt.nextStep() + pt.changes.subscribe { steps.add(it.toString())} + pt.nextStep() + pt.nextStep() + pt.nextStep() + assertEquals(listOf("Starting", "one", "two", "three", "four", "Done"), steps) + } + + @Test + fun `all tree changes seen if subscribed mid flow`() { + val stepTreeNotifications = mutableListOf>>() + pt.setChildProgressTracker(SimpleSteps.TWO, pt2) + + pt.currentStep = SimpleSteps.ONE + pt.currentStep = SimpleSteps.TWO + + pt.setChildProgressTracker(SimpleSteps.TWO, pt3) + pt.stepsTreeChanges.subscribe { stepTreeNotifications.add(it)} + + fun assertStepsTree(index: Int, step: ProgressTracker.Step) { + assertEquals(step.label, stepTreeNotifications[index][pt.stepsTreeIndex].second) + } + pt2.currentStep = ChildSteps.AYY + pt3.currentStep = BabySteps.UNOS + assertStepsTree(0, ChildSteps.AYY) + assertStepsTree(1, BabySteps.UNOS) + } } diff --git a/finance/workflows/src/main/kotlin/net/corda/finance/flows/CashIssueAndPaymentFlow.kt b/finance/workflows/src/main/kotlin/net/corda/finance/flows/CashIssueAndPaymentFlow.kt index 08618cabce..1370f59fc6 100644 --- a/finance/workflows/src/main/kotlin/net/corda/finance/flows/CashIssueAndPaymentFlow.kt +++ b/finance/workflows/src/main/kotlin/net/corda/finance/flows/CashIssueAndPaymentFlow.kt @@ -10,14 +10,16 @@ import net.corda.core.utilities.ProgressTracker import java.util.* /** - * Initiates a flow that self-issues cash (which should then be sent to recipient(s) using a payment transaction). + * Initiates a flow that self-issues cash and then send this to a recipient. * * We issue cash only to ourselves so that all KYC/AML checks on payments are enforced consistently, rather than risk * checks for issuance and payments differing. Outside of test scenarios it would be extremely unusual to issue cash * and immediately transfer it, so impact of this limitation is considered minimal. * * @param amount the amount of currency to issue. - * @param issuerBankPartyRef a reference to put on the issued currency. + * @param issueRef a reference to put on the issued currency. + * @param recipient the recipient of the currency + * @param anonymous if true, the recipient of the cash will be anonymous. Should be true for normal usage * @param notary the notary to set on the output states. */ @StartableByRPC @@ -31,9 +33,9 @@ class CashIssueAndPaymentFlow(val amount: Amount, issueRef: OpaqueBytes, recipient: Party, anonymous: Boolean, - notary: Party) : this(amount, issueRef, recipient, anonymous, notary, tracker()) + notary: Party) : this(amount, issueRef, recipient, anonymous, notary, ProgressTracker()) - constructor(request: IssueAndPaymentRequest) : this(request.amount, request.issueRef, request.recipient, request.anonymous, request.notary, tracker()) + constructor(request: IssueAndPaymentRequest) : this(request.amount, request.issueRef, request.recipient, request.anonymous, request.notary, ProgressTracker()) @Suspendable override fun call(): Result { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 383605cd2c..b31c446986 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -193,6 +193,7 @@ class FlowFrameworkTests { assertThat(receivingFiber.state).isEqualTo(Strand.State.WAITING) assertThat((erroringFlow.get().stateMachine as FlowStateMachineImpl).state).isEqualTo(Strand.State.WAITING) assertThat(erroringFlowSteps.get()).containsExactly( + Notification.createOnNext(ProgressTracker.STARTING), Notification.createOnNext(ExceptionFlow.START_STEP), Notification.createOnError(erroringFlow.get().exceptionThrown) ) @@ -413,6 +414,7 @@ class FlowFrameworkTests { erroringFlowFuture.getOrThrow() val flowSteps = erroringFlowSteps.get() assertThat(flowSteps).containsExactly( + Notification.createOnNext(ProgressTracker.STARTING), Notification.createOnNext(ExceptionFlow.START_STEP), Notification.createOnError(erroringFlowFuture.get().exceptionThrown) ) diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt index 57ccf776da..341454ff77 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt @@ -78,7 +78,6 @@ abstract class ANSIProgressRenderer { flowProgressHandle?.apply { stepsTreeIndexFeed?.apply { - treeIndex = snapshot treeIndexProcessed.add(snapshot) subscriptionIndex = updates.subscribe({ treeIndex = it @@ -87,7 +86,6 @@ abstract class ANSIProgressRenderer { }, { done(it) }, { done(null) }) } stepsTreeFeed?.apply { - tree = snapshot subscriptionTree = updates.subscribe({ remapIndices(it) tree = it