client: Port Expect reactfx->rx

This commit is contained in:
Andras Slemmer 2016-08-30 15:55:00 +01:00
parent f682d7f173
commit 20ed97ff96
2 changed files with 142 additions and 137 deletions

View File

@ -5,6 +5,7 @@ import com.r3corda.core.ThreadBox
import org.reactfx.EventStream import org.reactfx.EventStream
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import rx.Observable
/** /**
* This file defines a simple DSL for testing non-deterministic sequence of events arriving on an [EventStream]. * This file defines a simple DSL for testing non-deterministic sequence of events arriving on an [EventStream].
@ -61,7 +62,7 @@ fun <E> parallel(vararg expectations: ExpectCompose<E>): ExpectCompose<E> = Expe
* @param isStrict If false non-matched events are disregarded (so the DSL will only check a subset of events). * @param isStrict If false non-matched events are disregarded (so the DSL will only check a subset of events).
* @param expectCompose The DSL we expect to match against the stream of events. * @param expectCompose The DSL we expect to match against the stream of events.
*/ */
fun <E : Any> EventStream<E>.expectEvents(isStrict: Boolean = true, expectCompose: () -> ExpectCompose<E>) { fun <E : Any> Observable<E>.expectEvents(isStrict: Boolean = true, expectCompose: () -> ExpectCompose<E>) {
val finishFuture = SettableFuture<Unit>() val finishFuture = SettableFuture<Unit>()
val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose()) }) val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose()) })
subscribe { event -> subscribe { event ->

View File

@ -11,9 +11,9 @@ import com.r3corda.node.services.monitor.TransactionBuildResult
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AddOrRemove
import org.junit.Test import org.junit.Test
import org.reactfx.EventSource
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import rx.subjects.PublishSubject
import kotlin.test.fail import kotlin.test.fail
val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java) val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java)
@ -33,13 +33,13 @@ class WalletMonitorServiceTests {
log.info("Alice is ${aliceNode.identity}") log.info("Alice is ${aliceNode.identity}")
log.info("Notary is ${notaryNode.identity}") log.info("Notary is ${notaryNode.identity}")
val aliceInStream = EventSource<ServiceToClientEvent>() val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
val aliceOutStream = EventSource<ClientToServiceCommand>() val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
require(aliceMonitorClient.register().get()) require(aliceMonitorClient.register().get())
aliceOutStream.push(ClientToServiceCommand.IssueCash( aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD), amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })), issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.identity, recipient = aliceNode.identity,
@ -82,25 +82,26 @@ class WalletMonitorServiceTests {
log.info("Alice is ${aliceNode.identity}") log.info("Alice is ${aliceNode.identity}")
log.info("Notary is ${notaryNode.identity}") log.info("Notary is ${notaryNode.identity}")
val aliceInStream = EventSource<ServiceToClientEvent>() val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
val aliceOutStream = EventSource<ClientToServiceCommand>() val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
require(aliceMonitorClient.register().get()) require(aliceMonitorClient.register().get())
aliceOutStream.push(ClientToServiceCommand.IssueCash( aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD), amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })), issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.identity, recipient = aliceNode.identity,
notary = notaryNode.identity notary = notaryNode.identity
)) ))
aliceOutStream.push(ClientToServiceCommand.PayCash( aliceOutStream.onNext(ClientToServiceCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), amount = Amount(100, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
recipient = aliceNode.identity recipient = aliceNode.identity
)) ))
aliceInStream.expectEvents(sequence( aliceInStream.expectEvents {
sequence(
// ISSUE // ISSUE
parallel( parallel(
sequence( sequence(
@ -189,7 +190,8 @@ class WalletMonitorServiceTests {
require(output.produced.size == 1) require(output.produced.size == 1)
} }
) )
)) )
}
} }
} }
@ -206,32 +208,33 @@ class WalletMonitorServiceTests {
log.info("Alice is ${aliceNode.identity}") log.info("Alice is ${aliceNode.identity}")
log.info("Notary is ${notaryNode.identity}") log.info("Notary is ${notaryNode.identity}")
val aliceInStream = EventSource<ServiceToClientEvent>() val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
val aliceOutStream = EventSource<ClientToServiceCommand>() val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
require(aliceMonitorClient.register().get()) require(aliceMonitorClient.register().get())
aliceOutStream.push(ClientToServiceCommand.IssueCash( aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD), amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })), issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.identity, recipient = aliceNode.identity,
notary = notaryNode.identity notary = notaryNode.identity
)) ))
aliceOutStream.push(ClientToServiceCommand.IssueCash( aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD), amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 2 })), issueRef = OpaqueBytes(ByteArray(1, { 2 })),
recipient = aliceNode.identity, recipient = aliceNode.identity,
notary = notaryNode.identity notary = notaryNode.identity
)) ))
aliceOutStream.push(ClientToServiceCommand.PayCash( aliceOutStream.onNext(ClientToServiceCommand.PayCash(
amount = Amount(200, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), amount = Amount(200, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
recipient = aliceNode.identity recipient = aliceNode.identity
)) ))
aliceInStream.expectEvents(sequence( aliceInStream.expectEvents {
sequence(
// ISSUE 1 // ISSUE 1
parallel( parallel(
sequence( sequence(
@ -267,7 +270,8 @@ class WalletMonitorServiceTests {
val state = build.state val state = build.state
require(state is TransactionBuildResult.Failed) require(state is TransactionBuildResult.Failed)
} }
)) )
}
} }
} }
} }