CashFlow progress tracking, remove unused clientToService stream

This commit is contained in:
Andras Slemmer 2016-12-14 10:09:10 +00:00
parent 4ffad426c1
commit 2f12f79f19
3 changed files with 21 additions and 14 deletions

View File

@ -9,7 +9,9 @@ import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.USD import net.corda.core.contracts.USD
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.getOrThrow import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineUpdate import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
@ -32,19 +34,18 @@ import net.corda.testing.expectEvents
import net.corda.testing.sequence import net.corda.testing.sequence
import org.junit.Test import org.junit.Test
import rx.Observable import rx.Observable
import rx.Observer
class NodeMonitorModelTest : DriverBasedTest() { class NodeMonitorModelTest : DriverBasedTest() {
lateinit var aliceNode: NodeInfo lateinit var aliceNode: NodeInfo
lateinit var notaryNode: NodeInfo lateinit var notaryNode: NodeInfo
lateinit var rpc: CordaRPCOps
lateinit var stateMachineTransactionMapping: Observable<StateMachineTransactionMapping> lateinit var stateMachineTransactionMapping: Observable<StateMachineTransactionMapping>
lateinit var stateMachineUpdates: Observable<StateMachineUpdate> lateinit var stateMachineUpdates: Observable<StateMachineUpdate>
lateinit var progressTracking: Observable<ProgressTrackingEvent> lateinit var progressTracking: Observable<ProgressTrackingEvent>
lateinit var transactions: Observable<SignedTransaction> lateinit var transactions: Observable<SignedTransaction>
lateinit var vaultUpdates: Observable<Vault.Update> lateinit var vaultUpdates: Observable<Vault.Update>
lateinit var networkMapUpdates: Observable<NetworkMapCache.MapChange> lateinit var networkMapUpdates: Observable<NetworkMapCache.MapChange>
lateinit var clientToService: Observer<CashCommand>
lateinit var newNode: (String) -> NodeInfo lateinit var newNode: (String) -> NodeInfo
override fun setup() = driver { override fun setup() = driver {
@ -63,9 +64,9 @@ class NodeMonitorModelTest : DriverBasedTest() {
transactions = monitor.transactions.bufferUntilSubscribed() transactions = monitor.transactions.bufferUntilSubscribed()
vaultUpdates = monitor.vaultUpdates.bufferUntilSubscribed() vaultUpdates = monitor.vaultUpdates.bufferUntilSubscribed()
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed() networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
clientToService = monitor.clientToService
monitor.register(ArtemisMessagingComponent.toHostAndPort(aliceNode.address), configureTestSSL(), cashUser.username, cashUser.password) monitor.register(ArtemisMessagingComponent.toHostAndPort(aliceNode.address), configureTestSSL(), cashUser.username, cashUser.password)
rpc = monitor.proxyObservable.value!!
runTest() runTest()
} }
@ -93,7 +94,7 @@ class NodeMonitorModelTest : DriverBasedTest() {
@Test @Test
fun `cash issue works end to end`() { fun `cash issue works end to end`() {
clientToService.onNext(CashCommand.IssueCash( rpc.startFlow(::CashFlow, CashCommand.IssueCash(
amount = Amount(100, USD), amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })), issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.legalIdentity, recipient = aliceNode.legalIdentity,
@ -118,14 +119,14 @@ class NodeMonitorModelTest : DriverBasedTest() {
@Test @Test
fun `cash issue and move`() { fun `cash issue and move`() {
clientToService.onNext(CashCommand.IssueCash( rpc.startFlow(::CashFlow, CashCommand.IssueCash(
amount = Amount(100, USD), amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })), issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.legalIdentity, recipient = aliceNode.legalIdentity,
notary = notaryNode.notaryIdentity notary = notaryNode.notaryIdentity
)) ))
clientToService.onNext(CashCommand.PayCash( rpc.startFlow(::CashFlow, CashCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), amount = Amount(100, Issued(PartyAndReference(aliceNode.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
recipient = aliceNode.legalIdentity recipient = aliceNode.legalIdentity
)) ))

View File

@ -48,9 +48,6 @@ class NodeMonitorModel {
val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject
val networkMap: Observable<MapChange> = networkMapSubject val networkMap: Observable<MapChange> = networkMapSubject
private val clientToServiceSource = PublishSubject.create<CashCommand>()
val clientToService: PublishSubject<CashCommand> = clientToServiceSource
val proxyObservable = SimpleObjectProperty<CordaRPCOps?>() val proxyObservable = SimpleObjectProperty<CordaRPCOps?>()
/** /**
@ -98,10 +95,6 @@ class NodeMonitorModel {
val (parties, futurePartyUpdate) = proxy.networkMapUpdates() val (parties, futurePartyUpdate) = proxy.networkMapUpdates()
futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject) futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject)
// Client -> Service
clientToServiceSource.subscribe {
proxy.startFlow(::CashFlow, it)
}
proxyObservable.set(proxy) proxyObservable.set(proxy)
} }
} }

View File

@ -11,6 +11,7 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import java.security.KeyPair import java.security.KeyPair
import java.util.* import java.util.*
@ -19,7 +20,16 @@ import java.util.*
* *
* @param command Indicates what Cash transaction to create with what parameters. * @param command Indicates what Cash transaction to create with what parameters.
*/ */
class CashFlow(val command: CashCommand) : FlowLogic<CashFlowResult>() { class CashFlow(val command: CashCommand, override val progressTracker: ProgressTracker) : FlowLogic<CashFlowResult>() {
constructor(command: CashCommand) : this(command, tracker())
companion object {
object ISSUING : ProgressTracker.Step("Issuing cash")
object PAYING : ProgressTracker.Step("Paying cash")
object EXITING : ProgressTracker.Step("Exiting cash")
fun tracker() = ProgressTracker(ISSUING, PAYING, EXITING)
}
@Suspendable @Suspendable
override fun call(): CashFlowResult { override fun call(): CashFlowResult {
@ -33,6 +43,7 @@ class CashFlow(val command: CashCommand) : FlowLogic<CashFlowResult>() {
// TODO check with the recipient if they want to accept the cash. // TODO check with the recipient if they want to accept the cash.
@Suspendable @Suspendable
private fun initiatePayment(req: CashCommand.PayCash): CashFlowResult { private fun initiatePayment(req: CashCommand.PayCash): CashFlowResult {
progressTracker.currentStep = PAYING
val builder: TransactionBuilder = TransactionType.General.Builder(null) val builder: TransactionBuilder = TransactionType.General.Builder(null)
// TODO: Have some way of restricting this to states the caller controls // TODO: Have some way of restricting this to states the caller controls
try { try {
@ -59,6 +70,7 @@ class CashFlow(val command: CashCommand) : FlowLogic<CashFlowResult>() {
@Suspendable @Suspendable
private fun exitCash(req: CashCommand.ExitCash): CashFlowResult { private fun exitCash(req: CashCommand.ExitCash): CashFlowResult {
progressTracker.currentStep = EXITING
val builder: TransactionBuilder = TransactionType.General.Builder(null) val builder: TransactionBuilder = TransactionType.General.Builder(null)
try { try {
val issuer = PartyAndReference(serviceHub.myInfo.legalIdentity, req.issueRef) val issuer = PartyAndReference(serviceHub.myInfo.legalIdentity, req.issueRef)
@ -94,6 +106,7 @@ class CashFlow(val command: CashCommand) : FlowLogic<CashFlowResult>() {
@Suspendable @Suspendable
private fun issueCash(req: CashCommand.IssueCash): CashFlowResult { private fun issueCash(req: CashCommand.IssueCash): CashFlowResult {
progressTracker.currentStep = ISSUING
val builder: TransactionBuilder = TransactionType.General.Builder(notary = null) val builder: TransactionBuilder = TransactionType.General.Builder(notary = null)
val issuer = PartyAndReference(serviceHub.myInfo.legalIdentity, req.issueRef) val issuer = PartyAndReference(serviceHub.myInfo.legalIdentity, req.issueRef)
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary) Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)