diff --git a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt index 22bc15ae64..8de89f40fe 100644 --- a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt +++ b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt @@ -32,40 +32,83 @@ import org.slf4j.LoggerFactory * The above will test our expectation that the stream should first emit an A, and then a B and C in unspecified order. */ -val log: Logger = LoggerFactory.getLogger("Expect") +private val log: Logger = LoggerFactory.getLogger("Expect") -sealed class ExpectCompose { - class Single(val expect: Expect) : ExpectCompose() - class Sequential(val sequence: List>) : ExpectCompose() - class Parallel(val parallel: List>) : ExpectCompose() +/** + * Expect an event of type [T] and run [expectClosure] on it + */ +inline fun expect(noinline expectClosure: (T) -> Unit) = expect(T::class.java, expectClosure) +fun expect(klass: Class, expectClosure: (T) -> Unit): ExpectCompose { + return ExpectCompose.Single(Expect(klass, expectClosure)) } -data class Expect( +/** + * Tests that events arrive in the specified order. + * + * @param expectations The pieces of DSL that should run sequentially when events arrive. + */ +fun sequence(vararg expectations: ExpectCompose): ExpectCompose = ExpectCompose.Sequential(listOf(*expectations)) + +/** + * Tests that events arrive in unspecified order. + * + * @param expectations The pieces of DSL all of which should run but in an unspecified order depending on what sequence events arrive. + */ +fun parallel(vararg expectations: ExpectCompose): ExpectCompose = ExpectCompose.Parallel(listOf(*expectations)) + +/** + * Run the specified DSL against the event stream. + * @param isStrict If false non-matched events are disregarded (so the DSL will only check a subset of events). + * @param expectCompose The DSL we expect to match against the stream of events. + */ +fun EventStream.expectEvents(isStrict: Boolean = true, expectCompose: () -> ExpectCompose) { + val finishFuture = SettableFuture() + val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose()) }) + subscribe { event -> + lockedState.locked { + if (state is ExpectComposeState.Finished) { + log.warn("Got event $event, but was expecting no further events") + return@subscribe + } + val next = state.nextState(event) + log.info("$event :: ${state.getExpectedEvents()} -> ${next?.second?.getExpectedEvents()}") + if (next == null) { + val expectedStates = state.getExpectedEvents() + val message = "Got $event, expected one of $expectedStates" + if (isStrict) { + finishFuture.setException(Exception(message)) + state = ExpectComposeState.Finished() + } else { + log.warn("$message, discarding event as isStrict=false") + } + } else { + state = next.second + try { + next.first() + } catch (exception: Exception) { + finishFuture.setException(exception) + } + if (state is ExpectComposeState.Finished) { + finishFuture.set(Unit) + } + } + } + } + finishFuture.get() +} + +sealed class ExpectCompose { + internal class Single(val expect: Expect) : ExpectCompose() + internal class Sequential(val sequence: List>) : ExpectCompose() + internal class Parallel(val parallel: List>) : ExpectCompose() +} + +internal data class Expect( val clazz: Class, val expectClosure: (T) -> Unit ) -inline fun expect(noinline expectClosure: (T) -> Unit): ExpectCompose { - return ExpectCompose.Single(Expect(T::class.java, expectClosure)) -} - -/** - * Tests that events arrive in the specified order - * - * @param expects The pieces of DSL that should run sequentially when events arrive - */ -fun sequence(vararg expects: ExpectCompose) = - ExpectCompose.Sequential(listOf(*expects)) - -/** - * Tests that events arrive in unspecified order - * - * @param expects The pieces of DSL all of which should run but in an unspecified order depending on what sequence events arrive - */ -fun parallel(vararg expects: ExpectCompose) = - ExpectCompose.Parallel(listOf(*expects)) - -sealed class ExpectComposeState{ +private sealed class ExpectComposeState{ class Finished : ExpectComposeState() class Single(val single: ExpectCompose.Single) : ExpectComposeState() class Sequential( @@ -159,38 +202,3 @@ sealed class ExpectComposeState{ } } } - -fun EventStream.expectEvents(expectCompose: ExpectCompose) { - val finishFuture = SettableFuture() - val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose) }) - subscribe { event -> - lockedState.locked { - if (state is ExpectComposeState.Finished) { - log.warn("Got event $event, but was expecting no further events") - return@subscribe - } - val next = state.nextState(event) - log.info("$event :: ${state.getExpectedEvents()} -> ${next?.second?.getExpectedEvents()}") - if (next == null) { - val expectedStates = state.getExpectedEvents() - finishFuture.setException(Exception( - "Got $event, expected one of $expectedStates" - )) - state = ExpectComposeState.Finished() - } else { - state = next.second - try { - next.first() - } catch (exception: Exception) { - finishFuture.setException(exception) - } - if (state is ExpectComposeState.Finished) { - finishFuture.set(Unit) - } - } - } - } - finishFuture.get() -} - -