[CORDA-2737] Buffer events from observables in ProgressTracker until subscribed to (#4882)

This commit is contained in:
JamesHR3 2019-03-13 16:33:29 +00:00 committed by Shams Asari
parent 94d827ebe4
commit 76dc981b91
5 changed files with 49 additions and 15 deletions

View File

@ -5,7 +5,6 @@ import net.corda.core.internal.STRUCTURAL_STEP_PREFIX
import net.corda.core.serialization.CordaSerializable
import rx.Observable
import rx.Subscription
import rx.subjects.PublishSubject
import rx.subjects.ReplaySubject
import java.util.*
@ -91,8 +90,8 @@ class ProgressTracker(vararg inputSteps: Step) {
private var _allStepsCache: List<Pair<Int, Step>> = _allSteps()
// This field won't be serialized.
private val _changes by transient { PublishSubject.create<Change>() }
private val _stepsTreeChanges by transient { PublishSubject.create<List<Pair<Int, String>>>() }
private val _changes by transient { ReplaySubject.create<Change>() }
private val _stepsTreeChanges by transient { ReplaySubject.create<List<Pair<Int, String>>>() }
private val _stepsTreeIndexChanges by transient { ReplaySubject.create<Int>() }
var currentStep: Step

View File

@ -130,7 +130,7 @@ class ProgressTrackerTest {
// Assert no structure changes and proper steps propagation.
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 2, 4, 6))
assertThat(stepsTreeNotification).isEmpty()
assertThat(stepsTreeNotification).hasSize(1) // One entry per child progress tracker set
}
@Test
@ -165,7 +165,7 @@ class ProgressTrackerTest {
// Assert no structure changes and proper steps propagation.
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 4, 7))
assertThat(stepsTreeNotification).isEmpty()
assertThat(stepsTreeNotification).hasSize(2) // One entry per child progress tracker set
}
@Test
@ -179,7 +179,7 @@ class ProgressTrackerTest {
}
// Put current state as a first change for simplicity when asserting.
val stepsTreeNotification = mutableListOf(pt.allStepsLabels)
val stepsTreeNotification = mutableListOf<List<Pair<Int, String>>>()
pt.stepsTreeChanges.subscribe {
stepsTreeNotification += it
}
@ -202,7 +202,7 @@ class ProgressTrackerTest {
// Assert no structure changes and proper steps propagation.
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(2, 7, 10))
assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state
assertThat(stepsTreeNotification).hasSize(2) // One state per child progress tracker set
}
@Test
@ -216,7 +216,7 @@ class ProgressTrackerTest {
}
// Put current state as a first change for simplicity when asserting.
val stepsTreeNotification = mutableListOf(pt.allStepsLabels)
val stepsTreeNotification = mutableListOf<List<Pair<Int, String>>>()
pt.stepsTreeChanges.subscribe {
stepsTreeNotification += it
}
@ -237,7 +237,7 @@ class ProgressTrackerTest {
// Assert no structure changes and proper steps propagation.
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(2, 5, 3))
assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state.
assertThat(stepsTreeNotification).hasSize(2) // One state per child progress tracker set
}
@Test
@ -264,4 +264,37 @@ class ProgressTrackerTest {
assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 2, 3))
}
@Test
fun `all step changes seen if subscribed mid flow`() {
val steps = mutableListOf<String>()
pt.nextStep()
pt.nextStep()
pt.nextStep()
pt.changes.subscribe { steps.add(it.toString())}
pt.nextStep()
pt.nextStep()
pt.nextStep()
assertEquals(listOf("Starting", "one", "two", "three", "four", "Done"), steps)
}
@Test
fun `all tree changes seen if subscribed mid flow`() {
val stepTreeNotifications = mutableListOf<List<Pair<Int, String>>>()
pt.setChildProgressTracker(SimpleSteps.TWO, pt2)
pt.currentStep = SimpleSteps.ONE
pt.currentStep = SimpleSteps.TWO
pt.setChildProgressTracker(SimpleSteps.TWO, pt3)
pt.stepsTreeChanges.subscribe { stepTreeNotifications.add(it)}
fun assertStepsTree(index: Int, step: ProgressTracker.Step) {
assertEquals(step.label, stepTreeNotifications[index][pt.stepsTreeIndex].second)
}
pt2.currentStep = ChildSteps.AYY
pt3.currentStep = BabySteps.UNOS
assertStepsTree(0, ChildSteps.AYY)
assertStepsTree(1, BabySteps.UNOS)
}
}

View File

@ -10,14 +10,16 @@ import net.corda.core.utilities.ProgressTracker
import java.util.*
/**
* Initiates a flow that self-issues cash (which should then be sent to recipient(s) using a payment transaction).
* Initiates a flow that self-issues cash and then send this to a recipient.
*
* We issue cash only to ourselves so that all KYC/AML checks on payments are enforced consistently, rather than risk
* checks for issuance and payments differing. Outside of test scenarios it would be extremely unusual to issue cash
* and immediately transfer it, so impact of this limitation is considered minimal.
*
* @param amount the amount of currency to issue.
* @param issuerBankPartyRef a reference to put on the issued currency.
* @param issueRef a reference to put on the issued currency.
* @param recipient the recipient of the currency
* @param anonymous if true, the recipient of the cash will be anonymous. Should be true for normal usage
* @param notary the notary to set on the output states.
*/
@StartableByRPC
@ -31,9 +33,9 @@ class CashIssueAndPaymentFlow(val amount: Amount<Currency>,
issueRef: OpaqueBytes,
recipient: Party,
anonymous: Boolean,
notary: Party) : this(amount, issueRef, recipient, anonymous, notary, tracker())
notary: Party) : this(amount, issueRef, recipient, anonymous, notary, ProgressTracker())
constructor(request: IssueAndPaymentRequest) : this(request.amount, request.issueRef, request.recipient, request.anonymous, request.notary, tracker())
constructor(request: IssueAndPaymentRequest) : this(request.amount, request.issueRef, request.recipient, request.anonymous, request.notary, ProgressTracker())
@Suspendable
override fun call(): Result {

View File

@ -193,6 +193,7 @@ class FlowFrameworkTests {
assertThat(receivingFiber.state).isEqualTo(Strand.State.WAITING)
assertThat((erroringFlow.get().stateMachine as FlowStateMachineImpl).state).isEqualTo(Strand.State.WAITING)
assertThat(erroringFlowSteps.get()).containsExactly(
Notification.createOnNext(ProgressTracker.STARTING),
Notification.createOnNext(ExceptionFlow.START_STEP),
Notification.createOnError(erroringFlow.get().exceptionThrown)
)
@ -413,6 +414,7 @@ class FlowFrameworkTests {
erroringFlowFuture.getOrThrow()
val flowSteps = erroringFlowSteps.get()
assertThat(flowSteps).containsExactly(
Notification.createOnNext(ProgressTracker.STARTING),
Notification.createOnNext(ExceptionFlow.START_STEP),
Notification.createOnError(erroringFlowFuture.get().exceptionThrown)
)

View File

@ -78,7 +78,6 @@ abstract class ANSIProgressRenderer {
flowProgressHandle?.apply {
stepsTreeIndexFeed?.apply {
treeIndex = snapshot
treeIndexProcessed.add(snapshot)
subscriptionIndex = updates.subscribe({
treeIndex = it
@ -87,7 +86,6 @@ abstract class ANSIProgressRenderer {
}, { done(it) }, { done(null) })
}
stepsTreeFeed?.apply {
tree = snapshot
subscriptionTree = updates.subscribe({
remapIndices(it)
tree = it