diff --git a/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt index d725020614..fb88da5fb0 100644 --- a/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt +++ b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt @@ -1,6 +1,5 @@ package com.r3corda.client -import co.paralleluniverse.strands.SettableFuture import com.r3corda.core.contracts.* import com.r3corda.core.serialization.OpaqueBytes import com.r3corda.node.driver.driver @@ -21,7 +20,6 @@ val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java) class WalletMonitorServiceTests { @Test fun cashIssueWorksEndToEnd() { - driver { val aliceNodeFuture = startNode("Alice") val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type)) @@ -46,26 +44,20 @@ class WalletMonitorServiceTests { notary = notaryNode.identity )) - val buildFuture = SettableFuture() - val eventFuture = SettableFuture() - aliceInStream.subscribe { - if (it is ServiceToClientEvent.OutputState) - eventFuture.set(it) - else if (it is ServiceToClientEvent.TransactionBuild) - buildFuture.set(it) - else - log.warn("Unexpected event $it") + aliceInStream.expectEvents(isStrict = false) { + parallel( + expect { build: ServiceToClientEvent.TransactionBuild -> + val state = build.state + if (state is TransactionBuildResult.Failed) { + fail(state.message) + } + }, + expect { output: ServiceToClientEvent.OutputState -> + require(output.consumed.size == 0) + require(output.produced.size == 1) + } + ) } - - val buildEvent = buildFuture.get() - val state = buildEvent.state - if (state is TransactionBuildResult.Failed) { - fail(state.message) - } - - val outputEvent = eventFuture.get() - require(outputEvent.consumed.size == 0) - require(outputEvent.produced.size == 1) } } diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/Expect.kt b/test-utils/src/main/kotlin/com/r3corda/testing/Expect.kt index 8228f7bec4..0f7e082aef 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/Expect.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/Expect.kt @@ -13,12 +13,12 @@ import rx.Observable * The only restriction on [parallel] is that we should be able to discriminate which branch to take based on the * arrived event's type. If this is ambiguous the first matching piece of DSL will be run. - * [sequence]s and [parallel]s can be nested arbitrarily + * [sequence]s and [parallel]s can be nested arbitrarily. * * Example usage: * - * val stream: Observable = (..) - * stream.expectEvents( + * val stream: EventStream = (..) + * stream.expectEvents { * sequence( * expect { event: SomeEvent.A -> require(event.isOk()) }, * parallel( @@ -26,7 +26,7 @@ import rx.Observable * expect { event.SomeEvent.C -> } * ) * ) - * ) + * } * * The above will test our expectation that the stream should first emit an A, and then a B and C in unspecified order. */