From 9cda99bbccaa4d019f7a891bc81ace0957a09510 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Fri, 26 Aug 2016 14:29:25 +0100 Subject: [PATCH 01/11] client: Add client submodule --- .gitignore | 1 + .idea/modules.xml | 3 +++ client/build.gradle | 53 +++++++++++++++++++++++++++++++++++++++++++++ settings.gradle | 1 + 4 files changed, 58 insertions(+) create mode 100644 client/build.gradle diff --git a/.gitignore b/.gitignore index 295c86e572..b7ebccf366 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ tags /experimental/build /docs/build/doctrees /test-utils/build +/client/build # gradle's buildSrc build/ /buildSrc/build/ diff --git a/.idea/modules.xml b/.idea/modules.xml index e284ae5b60..df2780c199 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -5,6 +5,9 @@ + + + diff --git a/client/build.gradle b/client/build.gradle new file mode 100644 index 0000000000..5fe9e7006b --- /dev/null +++ b/client/build.gradle @@ -0,0 +1,53 @@ +apply plugin: 'kotlin' +apply plugin: QuasarPlugin + +repositories { + mavenLocal() + mavenCentral() + maven { + url 'http://oss.sonatype.org/content/repositories/snapshots' + } + jcenter() + maven { + url 'https://dl.bintray.com/kotlin/exposed' + } +} + + +//noinspection GroovyAssignabilityCheck +configurations { + + // we don't want isolated.jar in classPath, since we want to test jar being dynamically loaded as an attachment + runtime.exclude module: 'isolated' +} + +sourceSets { + test { + resources { + srcDir "../config/test" + } + } +} + +// To find potential version conflicts, run "gradle htmlDependencyReport" and then look in +// build/reports/project/dependencies/index.html for green highlighted parts of the tree. + +dependencies { + compile project(':node') + + // Log4J: logging framework (with SLF4J bindings) + compile "org.apache.logging.log4j:log4j-slf4j-impl:${log4j_version}" + compile "org.apache.logging.log4j:log4j-core:${log4j_version}" + + compile "com.google.guava:guava:19.0" + + // ReactFX: Functional reactive UI programming. + compile 'org.reactfx:reactfx:2.0-M5' + compile 'org.fxmisc.easybind:easybind:1.0.3' + + // Unit testing helpers. + testCompile 'junit:junit:4.12' + testCompile "org.assertj:assertj-core:${assertj_version}" +} + +quasarScan.dependsOn('classes', ':core:classes', ':contracts:classes') diff --git a/settings.gradle b/settings.gradle index ee2860a7e5..43a8b99cde 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,6 +3,7 @@ include 'contracts' include 'contracts:isolated' include 'core' include 'node' +include 'client' include 'experimental' include 'test-utils' From cfebccc49557c9e221cf4bfb1e3d583e59aacfd3 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Fri, 26 Aug 2016 14:30:38 +0100 Subject: [PATCH 02/11] node: Remove TransactionBuildResult.Complete, add pretty printing of ServiceToClientEvent --- .../r3corda/node/services/monitor/Events.kt | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt index 748af3f9c4..01a38b0dc8 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt @@ -1,10 +1,7 @@ package com.r3corda.node.services.monitor import com.r3corda.core.contracts.* -import com.r3corda.core.crypto.Party -import com.r3corda.core.serialization.OpaqueBytes import com.r3corda.node.utilities.AddOrRemove -import java.security.PublicKey import java.time.Instant import java.util.* @@ -12,25 +9,34 @@ import java.util.* * Events triggered by changes in the node, and sent to monitoring client(s). */ sealed class ServiceToClientEvent(val time: Instant) { - class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time) - class OutputState(time: Instant, val consumed: Set, val produced: Set>) : ServiceToClientEvent(time) - class StateMachine(time: Instant, val fiberId: Long, val label: String, val addOrRemove: AddOrRemove) : ServiceToClientEvent(time) - class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time) - class TransactionBuild(time: Instant, val id: UUID, val state: TransactionBuildResult) : ServiceToClientEvent(time) + class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time) { + override fun toString() = "Transaction(${transaction.tx.commands})" + } + class OutputState( + time: Instant, + val consumed: Set, + val produced: Set> + ) : ServiceToClientEvent(time) { + override fun toString() = "OutputState(consumed=$consumed, produced=${produced.map { it.state.data.javaClass.simpleName } })" + } + class StateMachine( + time: Instant, + val fiberId: Long, + val label: String, + val addOrRemove: AddOrRemove + ) : ServiceToClientEvent(time) { + override fun toString() = "StateMachine(${addOrRemove.name})" + } + class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time) { + override fun toString() = "Progress($message)" + } + class TransactionBuild(time: Instant, val id: UUID, val state: TransactionBuildResult) : ServiceToClientEvent(time) { + override fun toString() = "TransactionBuild($state)" + } + } sealed class TransactionBuildResult { - /** - * State indicating the action undertaken has been completed (it was not complex enough to require a - * state machine starting). - * - * @param transaction the transaction created as a result. - */ - // TODO: We should have a consistent "Transaction your request triggered has been built" event, rather than these - // once-off results from a request. Unclear if that means all requests need to trigger a protocol state machine, - // so the client sees a consistent process, or if some other solution can be found. - class Complete(val transaction: SignedTransaction, val message: String?) : TransactionBuildResult() - /** * State indicating that a protocol is managing this request, and that the client should track protocol state machine * updates for further information. The monitor will separately receive notification of the state machine having been @@ -39,11 +45,15 @@ sealed class TransactionBuildResult { * * @param transaction the transaction created as a result, in the case where the protocol has completed. */ - class ProtocolStarted(val fiberId: Long, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult() + class ProtocolStarted(val fiberId: Long, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult() { + override fun toString() = "Started($message)" + } /** * State indicating the action undertaken failed, either directly (it is not something which requires a * state machine), or before a state machine was started. */ - class Failed(val message: String?) : TransactionBuildResult() + class Failed(val message: String?) : TransactionBuildResult() { + override fun toString() = "Failed($message)" + } } From 2f7b022c8e5243042afeeda73392ffa1c7b579ea Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Fri, 26 Aug 2016 14:31:25 +0100 Subject: [PATCH 03/11] client: Add WalletMonitorClient, Expect DSL and some tests for the client --- .../com/r3corda/client/WalletMonitorClient.kt | 61 ++++ .../com/r3corda/client/testing/Expect.kt | 196 +++++++++++++ .../client/WalletMonitorClientTests.kt | 273 ++++++++++++++++++ 3 files changed, 530 insertions(+) create mode 100644 client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt create mode 100644 client/src/main/kotlin/com/r3corda/client/testing/Expect.kt create mode 100644 client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt diff --git a/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt b/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt new file mode 100644 index 0000000000..7b96d086dd --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt @@ -0,0 +1,61 @@ +package com.r3corda.client + +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import com.r3corda.core.contracts.ClientToServiceCommand +import com.r3corda.core.messaging.MessagingService +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.random63BitValue +import com.r3corda.core.serialization.deserialize +import com.r3corda.core.serialization.serialize +import com.r3corda.node.services.monitor.* +import org.reactfx.EventSink +import org.reactfx.EventStream +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * Worked example of a client which communicates with the wallet monitor service. + */ + +private val log: Logger = LoggerFactory.getLogger("WalletMonitorClient") + +class WalletMonitorClient( + val net: MessagingService, + val node: NodeInfo, + val outEvents: EventStream, + val inEvents: EventSink +) { + private val sessionID = random63BitValue() + + fun register(): ListenableFuture { + + val future = SettableFuture.create() + log.info("Registering with ID $sessionID. I am ${net.myAddress}") + net.addMessageHandler(WalletMonitorService.REGISTER_TOPIC, sessionID) { msg, reg -> + val resp = msg.data.deserialize() + net.removeMessageHandler(reg) + future.set(resp.success) + } + net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, req -> + // TODO + } + + net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg -> + val event = msg.data.deserialize() + inEvents.push(event) + } + + val req = RegisterRequest(net.myAddress, sessionID) + val registerMessage = net.createMessage(WalletMonitorService.REGISTER_TOPIC, 0, req.serialize().bits) + net.send(registerMessage, node.address) + + outEvents.subscribe { event -> + val envelope = ClientToServiceCommandMessage(sessionID, net.myAddress, event) + val message = net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, 0, envelope.serialize().bits) + net.send(message, node.address) + } + + return future + } +} diff --git a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt new file mode 100644 index 0000000000..22bc15ae64 --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt @@ -0,0 +1,196 @@ +package com.r3corda.client.testing + +import co.paralleluniverse.strands.SettableFuture +import com.r3corda.core.ThreadBox +import org.reactfx.EventStream +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * This file defines a simple DSL for testing non-deterministic sequence of events arriving on an [EventStream]. + * + * [sequence] is used to impose ordering invariants on the stream, whereas [parallel] allows events to arrive in any order. + * + * The only restriction on [parallel] is that we should be able to discriminate which branch to take based on the + * arrived event's type. If this is ambiguous the first matching piece of DSL will be run. + + * [sequence]s and [parallel]s can be nested arbitrarily + * + * Example usage: + * + * val stream: EventStream = (..) + * stream.expectEvents( + * sequence( + * expect { event: SomeEvent.A -> require(event.isOk()) }, + * parallel( + * expect { event.SomeEvent.B -> }, + * expect { event.SomeEvent.C -> } + * ) + * ) + * ) + * + * The above will test our expectation that the stream should first emit an A, and then a B and C in unspecified order. + */ + +val log: Logger = LoggerFactory.getLogger("Expect") + +sealed class ExpectCompose { + class Single(val expect: Expect) : ExpectCompose() + class Sequential(val sequence: List>) : ExpectCompose() + class Parallel(val parallel: List>) : ExpectCompose() +} + +data class Expect( + val clazz: Class, + val expectClosure: (T) -> Unit +) + +inline fun expect(noinline expectClosure: (T) -> Unit): ExpectCompose { + return ExpectCompose.Single(Expect(T::class.java, expectClosure)) +} + +/** + * Tests that events arrive in the specified order + * + * @param expects The pieces of DSL that should run sequentially when events arrive + */ +fun sequence(vararg expects: ExpectCompose) = + ExpectCompose.Sequential(listOf(*expects)) + +/** + * Tests that events arrive in unspecified order + * + * @param expects The pieces of DSL all of which should run but in an unspecified order depending on what sequence events arrive + */ +fun parallel(vararg expects: ExpectCompose) = + ExpectCompose.Parallel(listOf(*expects)) + +sealed class ExpectComposeState{ + class Finished : ExpectComposeState() + class Single(val single: ExpectCompose.Single) : ExpectComposeState() + class Sequential( + val sequential: ExpectCompose.Sequential, + val index: Int, + val state: ExpectComposeState + ) : ExpectComposeState() + class Parallel( + val parallel: + ExpectCompose.Parallel, + val states: List> + ) : ExpectComposeState() + + companion object { + fun fromExpectCompose(expectCompose: ExpectCompose): ExpectComposeState { + return when (expectCompose) { + is ExpectCompose.Single -> Single(expectCompose) + is ExpectCompose.Sequential -> { + if (expectCompose.sequence.size > 0) { + Sequential(expectCompose, 0, fromExpectCompose(expectCompose.sequence[0])) + } else { + Finished() + } + } + is ExpectCompose.Parallel -> { + if (expectCompose.parallel.size > 0) { + Parallel(expectCompose, expectCompose.parallel.map { fromExpectCompose(it) }) + } else { + Finished() + } + } + } + } + } + + fun getExpectedEvents(): List> { + return when (this) { + is ExpectComposeState.Finished -> listOf() + is ExpectComposeState.Single -> listOf(single.expect.clazz) + is ExpectComposeState.Sequential -> state.getExpectedEvents() + is ExpectComposeState.Parallel -> states.flatMap { it.getExpectedEvents() } + } + } + + fun nextState(event: E): Pair<() -> Unit, ExpectComposeState>? { + return when (this) { + is ExpectComposeState.Finished -> null + is ExpectComposeState.Single -> { + if (single.expect.clazz.isAssignableFrom(event.javaClass)) { + @Suppress("UNCHECKED_CAST") + Pair({ single.expect.expectClosure(event) }, Finished()) + } else { + null + } + } + is ExpectComposeState.Sequential -> { + val next = state.nextState(event) + if (next == null) { + null + } else if (next.second is Finished) { + if (index == sequential.sequence.size - 1) { + Pair(next.first, Finished()) + } else { + val nextState = fromExpectCompose(sequential.sequence[index + 1]) + if (nextState is Finished) { + Pair(next.first, Finished()) + } else { + Pair(next.first, Sequential(sequential, index + 1, nextState)) + } + } + } else { + Pair(next.first, Sequential(sequential, index, next.second)) + } + } + is ExpectComposeState.Parallel -> { + states.forEachIndexed { stateIndex, state -> + val next = state.nextState(event) + if (next != null) { + val nextStates = states.mapIndexed { i, expectComposeState -> + if (i == stateIndex) next.second else expectComposeState + } + if (nextStates.all { it is Finished }) { + return Pair(next.first, Finished()) + } else { + return Pair(next.first, Parallel(parallel, nextStates)) + } + } + } + null + } + } + } +} + +fun EventStream.expectEvents(expectCompose: ExpectCompose) { + val finishFuture = SettableFuture() + val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose) }) + subscribe { event -> + lockedState.locked { + if (state is ExpectComposeState.Finished) { + log.warn("Got event $event, but was expecting no further events") + return@subscribe + } + val next = state.nextState(event) + log.info("$event :: ${state.getExpectedEvents()} -> ${next?.second?.getExpectedEvents()}") + if (next == null) { + val expectedStates = state.getExpectedEvents() + finishFuture.setException(Exception( + "Got $event, expected one of $expectedStates" + )) + state = ExpectComposeState.Finished() + } else { + state = next.second + try { + next.first() + } catch (exception: Exception) { + finishFuture.setException(exception) + } + if (state is ExpectComposeState.Finished) { + finishFuture.set(Unit) + } + } + } + } + finishFuture.get() +} + + diff --git a/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt new file mode 100644 index 0000000000..de0f7ed2b1 --- /dev/null +++ b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt @@ -0,0 +1,273 @@ +package com.r3corda.client + +import co.paralleluniverse.strands.SettableFuture +import com.r3corda.client.testing.* +import com.r3corda.core.contracts.* +import com.r3corda.core.serialization.OpaqueBytes +import com.r3corda.node.driver.driver +import com.r3corda.node.driver.startClient +import com.r3corda.node.services.monitor.ServiceToClientEvent +import com.r3corda.node.services.monitor.TransactionBuildResult +import com.r3corda.node.services.transactions.SimpleNotaryService +import com.r3corda.node.utilities.AddOrRemove +import org.junit.Test +import org.reactfx.EventSource +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import kotlin.test.fail + +val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java) + +class WalletMonitorServiceTests { + @Test + fun cashIssueWorksEndToEnd() { + + driver { + val aliceNodeFuture = startNode("Alice") + val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type)) + + val aliceNode = aliceNodeFuture.get() + val notaryNode = notaryNodeFuture.get() + val client = startClient(aliceNode).get() + + log.info("Alice is ${aliceNode.identity}") + log.info("Notary is ${notaryNode.identity}") + + val aliceInStream = EventSource() + val aliceOutStream = EventSource() + + val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) + require(aliceMonitorClient.register().get()) + + aliceOutStream.push(ClientToServiceCommand.IssueCash( + amount = Amount(100, USD), + issueRef = OpaqueBytes(ByteArray(1, { 1 })), + recipient = aliceNode.identity, + notary = notaryNode.identity + )) + + val buildFuture = SettableFuture() + val eventFuture = SettableFuture() + aliceInStream.subscribe { + if (it is ServiceToClientEvent.OutputState) + eventFuture.set(it) + else if (it is ServiceToClientEvent.TransactionBuild) + buildFuture.set(it) + else + log.warn("Unexpected event $it") + } + + val buildEvent = buildFuture.get() + val state = buildEvent.state + if (state is TransactionBuildResult.Failed) { + fail(state.message) + } + + val outputEvent = eventFuture.get() + require(outputEvent.consumed.size == 0) + require(outputEvent.produced.size == 1) + } + } + + @Test + fun issueAndMoveWorks() { + driver { + val aliceNodeFuture = startNode("Alice") + val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type)) + + val aliceNode = aliceNodeFuture.get() + val notaryNode = notaryNodeFuture.get() + val client = startClient(aliceNode).get() + + log.info("Alice is ${aliceNode.identity}") + log.info("Notary is ${notaryNode.identity}") + + val aliceInStream = EventSource() + val aliceOutStream = EventSource() + + val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) + require(aliceMonitorClient.register().get()) + + aliceOutStream.push(ClientToServiceCommand.IssueCash( + amount = Amount(100, USD), + issueRef = OpaqueBytes(ByteArray(1, { 1 })), + recipient = aliceNode.identity, + notary = notaryNode.identity + )) + + aliceOutStream.push(ClientToServiceCommand.PayCash( + amount = Amount(100, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), + recipient = aliceNode.identity + )) + + aliceInStream.expectEvents(sequence( + // ISSUE + parallel( + sequence( + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.ADD) + }, + expect { remove: ServiceToClientEvent.StateMachine -> + require(remove.addOrRemove == AddOrRemove.REMOVE) + } + ), + expect { tx: ServiceToClientEvent.Transaction -> + require(tx.transaction.tx.inputs.isEmpty()) + require(tx.transaction.tx.outputs.size == 1) + val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet() + // Only Alice signed + require(signaturePubKeys.size == 1) + require(signaturePubKeys.contains(aliceNode.identity.owningKey)) + }, + expect { build: ServiceToClientEvent.TransactionBuild -> + val state = build.state + when (state) { + is TransactionBuildResult.ProtocolStarted -> { + } + is TransactionBuildResult.Failed -> fail(state.message) + } + }, + expect { output: ServiceToClientEvent.OutputState -> + require(output.consumed.size == 0) + require(output.produced.size == 1) + } + ), + + // MOVE + parallel( + sequence( + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.ADD) + }, + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.REMOVE) + } + ), + expect { tx: ServiceToClientEvent.Transaction -> + require(tx.transaction.tx.inputs.size == 1) + require(tx.transaction.tx.outputs.size == 1) + val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet() + // Alice and Notary signed + require(signaturePubKeys.size == 2) + require(signaturePubKeys.contains(aliceNode.identity.owningKey)) + require(signaturePubKeys.contains(notaryNode.identity.owningKey)) + }, + sequence( + expect { build: ServiceToClientEvent.TransactionBuild -> + val state = build.state + when (state) { + is TransactionBuildResult.ProtocolStarted -> { + log.info("${state.message}") + } + is TransactionBuildResult.Failed -> fail(state.message) + } + }, + expect { build: ServiceToClientEvent.Progress -> + // Requesting signature by notary service + }, + expect { build: ServiceToClientEvent.Progress -> + // Structural step change in child of Requesting signature by notary service + }, + expect { build: ServiceToClientEvent.Progress -> + // Requesting signature by notary service + }, + expect { build: ServiceToClientEvent.Progress -> + // Validating response from Notary service + }, + expect { build: ServiceToClientEvent.Progress -> + // Done + }, + expect { build: ServiceToClientEvent.Progress -> + // Broadcasting transaction to participants + }, + expect { build: ServiceToClientEvent.Progress -> + // Done + } + ), + expect { output: ServiceToClientEvent.OutputState -> + require(output.consumed.size == 1) + require(output.produced.size == 1) + } + ) + )) + } + } + + @Test + fun movingCashOfDifferentIssueRefsFails() { + driver { + val aliceNodeFuture = startNode("Alice") + val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type)) + + val aliceNode = aliceNodeFuture.get() + val notaryNode = notaryNodeFuture.get() + val client = startClient(aliceNode).get() + + log.info("Alice is ${aliceNode.identity}") + log.info("Notary is ${notaryNode.identity}") + + val aliceInStream = EventSource() + val aliceOutStream = EventSource() + + val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) + require(aliceMonitorClient.register().get()) + + aliceOutStream.push(ClientToServiceCommand.IssueCash( + amount = Amount(100, USD), + issueRef = OpaqueBytes(ByteArray(1, { 1 })), + recipient = aliceNode.identity, + notary = notaryNode.identity + )) + + aliceOutStream.push(ClientToServiceCommand.IssueCash( + amount = Amount(100, USD), + issueRef = OpaqueBytes(ByteArray(1, { 2 })), + recipient = aliceNode.identity, + notary = notaryNode.identity + )) + + aliceOutStream.push(ClientToServiceCommand.PayCash( + amount = Amount(200, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), + recipient = aliceNode.identity + )) + + aliceInStream.expectEvents(sequence( + // ISSUE 1 + parallel( + sequence( + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.ADD) + }, + expect { remove: ServiceToClientEvent.StateMachine -> + require(remove.addOrRemove == AddOrRemove.REMOVE) + } + ), + expect { tx: ServiceToClientEvent.Transaction -> }, + expect { build: ServiceToClientEvent.TransactionBuild -> }, + expect { output: ServiceToClientEvent.OutputState -> } + ), + + // ISSUE 2 + parallel( + sequence( + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.ADD) + }, + expect { remove: ServiceToClientEvent.StateMachine -> + require(remove.addOrRemove == AddOrRemove.REMOVE) + } + ), + expect { tx: ServiceToClientEvent.Transaction -> }, + expect { build: ServiceToClientEvent.TransactionBuild -> }, + expect { output: ServiceToClientEvent.OutputState -> } + ), + + // MOVE, should fail + expect { build: ServiceToClientEvent.TransactionBuild -> + val state = build.state + require(state is TransactionBuildResult.Failed) + } + )) + } + } +} From 86d6ee011092bcec2b26cb9a9519ffde5b532c7b Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Fri, 26 Aug 2016 17:30:27 +0100 Subject: [PATCH 04/11] node: Cash Issue doesn't require notary --- .../com/r3corda/node/services/monitor/WalletMonitorService.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt index 8c7690f0cb..ebefa7b7bf 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt @@ -201,7 +201,7 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager, // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult { - val builder: TransactionBuilder = TransactionType.General.Builder(notary = req.notary) + val builder: TransactionBuilder = TransactionType.General.Builder(notary = null) val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef) Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary) builder.signWith(services.storageService.myLegalIdentityKey) From a2ace973ffa852c022a9ca060377095e895bc91a Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 30 Aug 2016 15:24:51 +0100 Subject: [PATCH 05/11] client: reactfx->rx --- .../kotlin/com/r3corda/client/WalletMonitorClient.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt b/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt index 7b96d086dd..bc48f8ed06 100644 --- a/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt +++ b/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt @@ -9,10 +9,10 @@ import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.node.services.monitor.* -import org.reactfx.EventSink -import org.reactfx.EventStream import org.slf4j.Logger import org.slf4j.LoggerFactory +import rx.Observable +import rx.Observer /** * Worked example of a client which communicates with the wallet monitor service. @@ -23,8 +23,8 @@ private val log: Logger = LoggerFactory.getLogger("WalletMonitorClient") class WalletMonitorClient( val net: MessagingService, val node: NodeInfo, - val outEvents: EventStream, - val inEvents: EventSink + val outEvents: Observable, + val inEvents: Observer ) { private val sessionID = random63BitValue() @@ -43,7 +43,7 @@ class WalletMonitorClient( net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg -> val event = msg.data.deserialize() - inEvents.push(event) + inEvents.onNext(event) } val req = RegisterRequest(net.myAddress, sessionID) From f682d7f1731dd873d0545ca988f1a648768512c8 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 30 Aug 2016 15:51:16 +0100 Subject: [PATCH 06/11] client: Hide public Expect.kt types --- .../com/r3corda/client/testing/Expect.kt | 132 ++++++++++-------- 1 file changed, 70 insertions(+), 62 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt index 22bc15ae64..8de89f40fe 100644 --- a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt +++ b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt @@ -32,40 +32,83 @@ import org.slf4j.LoggerFactory * The above will test our expectation that the stream should first emit an A, and then a B and C in unspecified order. */ -val log: Logger = LoggerFactory.getLogger("Expect") +private val log: Logger = LoggerFactory.getLogger("Expect") -sealed class ExpectCompose { - class Single(val expect: Expect) : ExpectCompose() - class Sequential(val sequence: List>) : ExpectCompose() - class Parallel(val parallel: List>) : ExpectCompose() +/** + * Expect an event of type [T] and run [expectClosure] on it + */ +inline fun expect(noinline expectClosure: (T) -> Unit) = expect(T::class.java, expectClosure) +fun expect(klass: Class, expectClosure: (T) -> Unit): ExpectCompose { + return ExpectCompose.Single(Expect(klass, expectClosure)) } -data class Expect( +/** + * Tests that events arrive in the specified order. + * + * @param expectations The pieces of DSL that should run sequentially when events arrive. + */ +fun sequence(vararg expectations: ExpectCompose): ExpectCompose = ExpectCompose.Sequential(listOf(*expectations)) + +/** + * Tests that events arrive in unspecified order. + * + * @param expectations The pieces of DSL all of which should run but in an unspecified order depending on what sequence events arrive. + */ +fun parallel(vararg expectations: ExpectCompose): ExpectCompose = ExpectCompose.Parallel(listOf(*expectations)) + +/** + * Run the specified DSL against the event stream. + * @param isStrict If false non-matched events are disregarded (so the DSL will only check a subset of events). + * @param expectCompose The DSL we expect to match against the stream of events. + */ +fun EventStream.expectEvents(isStrict: Boolean = true, expectCompose: () -> ExpectCompose) { + val finishFuture = SettableFuture() + val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose()) }) + subscribe { event -> + lockedState.locked { + if (state is ExpectComposeState.Finished) { + log.warn("Got event $event, but was expecting no further events") + return@subscribe + } + val next = state.nextState(event) + log.info("$event :: ${state.getExpectedEvents()} -> ${next?.second?.getExpectedEvents()}") + if (next == null) { + val expectedStates = state.getExpectedEvents() + val message = "Got $event, expected one of $expectedStates" + if (isStrict) { + finishFuture.setException(Exception(message)) + state = ExpectComposeState.Finished() + } else { + log.warn("$message, discarding event as isStrict=false") + } + } else { + state = next.second + try { + next.first() + } catch (exception: Exception) { + finishFuture.setException(exception) + } + if (state is ExpectComposeState.Finished) { + finishFuture.set(Unit) + } + } + } + } + finishFuture.get() +} + +sealed class ExpectCompose { + internal class Single(val expect: Expect) : ExpectCompose() + internal class Sequential(val sequence: List>) : ExpectCompose() + internal class Parallel(val parallel: List>) : ExpectCompose() +} + +internal data class Expect( val clazz: Class, val expectClosure: (T) -> Unit ) -inline fun expect(noinline expectClosure: (T) -> Unit): ExpectCompose { - return ExpectCompose.Single(Expect(T::class.java, expectClosure)) -} - -/** - * Tests that events arrive in the specified order - * - * @param expects The pieces of DSL that should run sequentially when events arrive - */ -fun sequence(vararg expects: ExpectCompose) = - ExpectCompose.Sequential(listOf(*expects)) - -/** - * Tests that events arrive in unspecified order - * - * @param expects The pieces of DSL all of which should run but in an unspecified order depending on what sequence events arrive - */ -fun parallel(vararg expects: ExpectCompose) = - ExpectCompose.Parallel(listOf(*expects)) - -sealed class ExpectComposeState{ +private sealed class ExpectComposeState{ class Finished : ExpectComposeState() class Single(val single: ExpectCompose.Single) : ExpectComposeState() class Sequential( @@ -159,38 +202,3 @@ sealed class ExpectComposeState{ } } } - -fun EventStream.expectEvents(expectCompose: ExpectCompose) { - val finishFuture = SettableFuture() - val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose) }) - subscribe { event -> - lockedState.locked { - if (state is ExpectComposeState.Finished) { - log.warn("Got event $event, but was expecting no further events") - return@subscribe - } - val next = state.nextState(event) - log.info("$event :: ${state.getExpectedEvents()} -> ${next?.second?.getExpectedEvents()}") - if (next == null) { - val expectedStates = state.getExpectedEvents() - finishFuture.setException(Exception( - "Got $event, expected one of $expectedStates" - )) - state = ExpectComposeState.Finished() - } else { - state = next.second - try { - next.first() - } catch (exception: Exception) { - finishFuture.setException(exception) - } - if (state is ExpectComposeState.Finished) { - finishFuture.set(Unit) - } - } - } - } - finishFuture.get() -} - - From 20ed97ff96c3674755b599c1dd8f5e3a6edc72d7 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 30 Aug 2016 15:55:00 +0100 Subject: [PATCH 07/11] client: Port Expect reactfx->rx --- .../com/r3corda/client/testing/Expect.kt | 3 +- .../client/WalletMonitorClientTests.kt | 276 +++++++++--------- 2 files changed, 142 insertions(+), 137 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt index 8de89f40fe..220bfba04e 100644 --- a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt +++ b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt @@ -5,6 +5,7 @@ import com.r3corda.core.ThreadBox import org.reactfx.EventStream import org.slf4j.Logger import org.slf4j.LoggerFactory +import rx.Observable /** * This file defines a simple DSL for testing non-deterministic sequence of events arriving on an [EventStream]. @@ -61,7 +62,7 @@ fun parallel(vararg expectations: ExpectCompose): ExpectCompose = Expe * @param isStrict If false non-matched events are disregarded (so the DSL will only check a subset of events). * @param expectCompose The DSL we expect to match against the stream of events. */ -fun EventStream.expectEvents(isStrict: Boolean = true, expectCompose: () -> ExpectCompose) { +fun Observable.expectEvents(isStrict: Boolean = true, expectCompose: () -> ExpectCompose) { val finishFuture = SettableFuture() val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose()) }) subscribe { event -> diff --git a/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt index de0f7ed2b1..ce85694854 100644 --- a/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt +++ b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt @@ -11,9 +11,9 @@ import com.r3corda.node.services.monitor.TransactionBuildResult import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.utilities.AddOrRemove import org.junit.Test -import org.reactfx.EventSource import org.slf4j.Logger import org.slf4j.LoggerFactory +import rx.subjects.PublishSubject import kotlin.test.fail val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java) @@ -33,13 +33,13 @@ class WalletMonitorServiceTests { log.info("Alice is ${aliceNode.identity}") log.info("Notary is ${notaryNode.identity}") - val aliceInStream = EventSource() - val aliceOutStream = EventSource() + val aliceInStream = PublishSubject.create() + val aliceOutStream = PublishSubject.create() val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) require(aliceMonitorClient.register().get()) - aliceOutStream.push(ClientToServiceCommand.IssueCash( + aliceOutStream.onNext(ClientToServiceCommand.IssueCash( amount = Amount(100, USD), issueRef = OpaqueBytes(ByteArray(1, { 1 })), recipient = aliceNode.identity, @@ -82,114 +82,116 @@ class WalletMonitorServiceTests { log.info("Alice is ${aliceNode.identity}") log.info("Notary is ${notaryNode.identity}") - val aliceInStream = EventSource() - val aliceOutStream = EventSource() + val aliceInStream = PublishSubject.create() + val aliceOutStream = PublishSubject.create() val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) require(aliceMonitorClient.register().get()) - aliceOutStream.push(ClientToServiceCommand.IssueCash( + aliceOutStream.onNext(ClientToServiceCommand.IssueCash( amount = Amount(100, USD), issueRef = OpaqueBytes(ByteArray(1, { 1 })), recipient = aliceNode.identity, notary = notaryNode.identity )) - aliceOutStream.push(ClientToServiceCommand.PayCash( + aliceOutStream.onNext(ClientToServiceCommand.PayCash( amount = Amount(100, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), recipient = aliceNode.identity )) - aliceInStream.expectEvents(sequence( - // ISSUE - parallel( - sequence( - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.ADD) - }, - expect { remove: ServiceToClientEvent.StateMachine -> - require(remove.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { tx: ServiceToClientEvent.Transaction -> - require(tx.transaction.tx.inputs.isEmpty()) - require(tx.transaction.tx.outputs.size == 1) - val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet() - // Only Alice signed - require(signaturePubKeys.size == 1) - require(signaturePubKeys.contains(aliceNode.identity.owningKey)) - }, - expect { build: ServiceToClientEvent.TransactionBuild -> - val state = build.state - when (state) { - is TransactionBuildResult.ProtocolStarted -> { - } - is TransactionBuildResult.Failed -> fail(state.message) - } - }, - expect { output: ServiceToClientEvent.OutputState -> - require(output.consumed.size == 0) - require(output.produced.size == 1) - } - ), - - // MOVE - parallel( - sequence( - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.ADD) - }, - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { tx: ServiceToClientEvent.Transaction -> - require(tx.transaction.tx.inputs.size == 1) - require(tx.transaction.tx.outputs.size == 1) - val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet() - // Alice and Notary signed - require(signaturePubKeys.size == 2) - require(signaturePubKeys.contains(aliceNode.identity.owningKey)) - require(signaturePubKeys.contains(notaryNode.identity.owningKey)) - }, - sequence( - expect { build: ServiceToClientEvent.TransactionBuild -> - val state = build.state - when (state) { - is TransactionBuildResult.ProtocolStarted -> { - log.info("${state.message}") - } - is TransactionBuildResult.Failed -> fail(state.message) + aliceInStream.expectEvents { + sequence( + // ISSUE + parallel( + sequence( + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.ADD) + }, + expect { remove: ServiceToClientEvent.StateMachine -> + require(remove.addOrRemove == AddOrRemove.REMOVE) } - }, - expect { build: ServiceToClientEvent.Progress -> - // Requesting signature by notary service - }, - expect { build: ServiceToClientEvent.Progress -> - // Structural step change in child of Requesting signature by notary service - }, - expect { build: ServiceToClientEvent.Progress -> - // Requesting signature by notary service - }, - expect { build: ServiceToClientEvent.Progress -> - // Validating response from Notary service - }, - expect { build: ServiceToClientEvent.Progress -> - // Done - }, - expect { build: ServiceToClientEvent.Progress -> - // Broadcasting transaction to participants - }, - expect { build: ServiceToClientEvent.Progress -> - // Done + ), + expect { tx: ServiceToClientEvent.Transaction -> + require(tx.transaction.tx.inputs.isEmpty()) + require(tx.transaction.tx.outputs.size == 1) + val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet() + // Only Alice signed + require(signaturePubKeys.size == 1) + require(signaturePubKeys.contains(aliceNode.identity.owningKey)) + }, + expect { build: ServiceToClientEvent.TransactionBuild -> + val state = build.state + when (state) { + is TransactionBuildResult.ProtocolStarted -> { + } + is TransactionBuildResult.Failed -> fail(state.message) } - ), - expect { output: ServiceToClientEvent.OutputState -> - require(output.consumed.size == 1) - require(output.produced.size == 1) - } - ) - )) + }, + expect { output: ServiceToClientEvent.OutputState -> + require(output.consumed.size == 0) + require(output.produced.size == 1) + } + ), + + // MOVE + parallel( + sequence( + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.ADD) + }, + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.REMOVE) + } + ), + expect { tx: ServiceToClientEvent.Transaction -> + require(tx.transaction.tx.inputs.size == 1) + require(tx.transaction.tx.outputs.size == 1) + val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet() + // Alice and Notary signed + require(signaturePubKeys.size == 2) + require(signaturePubKeys.contains(aliceNode.identity.owningKey)) + require(signaturePubKeys.contains(notaryNode.identity.owningKey)) + }, + sequence( + expect { build: ServiceToClientEvent.TransactionBuild -> + val state = build.state + when (state) { + is TransactionBuildResult.ProtocolStarted -> { + log.info("${state.message}") + } + is TransactionBuildResult.Failed -> fail(state.message) + } + }, + expect { build: ServiceToClientEvent.Progress -> + // Requesting signature by notary service + }, + expect { build: ServiceToClientEvent.Progress -> + // Structural step change in child of Requesting signature by notary service + }, + expect { build: ServiceToClientEvent.Progress -> + // Requesting signature by notary service + }, + expect { build: ServiceToClientEvent.Progress -> + // Validating response from Notary service + }, + expect { build: ServiceToClientEvent.Progress -> + // Done + }, + expect { build: ServiceToClientEvent.Progress -> + // Broadcasting transaction to participants + }, + expect { build: ServiceToClientEvent.Progress -> + // Done + } + ), + expect { output: ServiceToClientEvent.OutputState -> + require(output.consumed.size == 1) + require(output.produced.size == 1) + } + ) + ) + } } } @@ -206,68 +208,70 @@ class WalletMonitorServiceTests { log.info("Alice is ${aliceNode.identity}") log.info("Notary is ${notaryNode.identity}") - val aliceInStream = EventSource() - val aliceOutStream = EventSource() + val aliceInStream = PublishSubject.create() + val aliceOutStream = PublishSubject.create() val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) require(aliceMonitorClient.register().get()) - aliceOutStream.push(ClientToServiceCommand.IssueCash( + aliceOutStream.onNext(ClientToServiceCommand.IssueCash( amount = Amount(100, USD), issueRef = OpaqueBytes(ByteArray(1, { 1 })), recipient = aliceNode.identity, notary = notaryNode.identity )) - aliceOutStream.push(ClientToServiceCommand.IssueCash( + aliceOutStream.onNext(ClientToServiceCommand.IssueCash( amount = Amount(100, USD), issueRef = OpaqueBytes(ByteArray(1, { 2 })), recipient = aliceNode.identity, notary = notaryNode.identity )) - aliceOutStream.push(ClientToServiceCommand.PayCash( + aliceOutStream.onNext(ClientToServiceCommand.PayCash( amount = Amount(200, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), recipient = aliceNode.identity )) - aliceInStream.expectEvents(sequence( - // ISSUE 1 - parallel( - sequence( - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.ADD) - }, - expect { remove: ServiceToClientEvent.StateMachine -> - require(remove.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { tx: ServiceToClientEvent.Transaction -> }, - expect { build: ServiceToClientEvent.TransactionBuild -> }, - expect { output: ServiceToClientEvent.OutputState -> } - ), + aliceInStream.expectEvents { + sequence( + // ISSUE 1 + parallel( + sequence( + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.ADD) + }, + expect { remove: ServiceToClientEvent.StateMachine -> + require(remove.addOrRemove == AddOrRemove.REMOVE) + } + ), + expect { tx: ServiceToClientEvent.Transaction -> }, + expect { build: ServiceToClientEvent.TransactionBuild -> }, + expect { output: ServiceToClientEvent.OutputState -> } + ), - // ISSUE 2 - parallel( - sequence( - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.ADD) - }, - expect { remove: ServiceToClientEvent.StateMachine -> - require(remove.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { tx: ServiceToClientEvent.Transaction -> }, - expect { build: ServiceToClientEvent.TransactionBuild -> }, - expect { output: ServiceToClientEvent.OutputState -> } - ), + // ISSUE 2 + parallel( + sequence( + expect { add: ServiceToClientEvent.StateMachine -> + require(add.addOrRemove == AddOrRemove.ADD) + }, + expect { remove: ServiceToClientEvent.StateMachine -> + require(remove.addOrRemove == AddOrRemove.REMOVE) + } + ), + expect { tx: ServiceToClientEvent.Transaction -> }, + expect { build: ServiceToClientEvent.TransactionBuild -> }, + expect { output: ServiceToClientEvent.OutputState -> } + ), - // MOVE, should fail - expect { build: ServiceToClientEvent.TransactionBuild -> - val state = build.state - require(state is TransactionBuildResult.Failed) - } - )) + // MOVE, should fail + expect { build: ServiceToClientEvent.TransactionBuild -> + val state = build.state + require(state is TransactionBuildResult.Failed) + } + ) + } } } } From af641ab9778ff738da65e35c396ea193cae529ce Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 30 Aug 2016 16:28:09 +0100 Subject: [PATCH 08/11] client: Small cosmetics --- .../main/kotlin/com/r3corda/client/testing/Expect.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt index 220bfba04e..27dcd5f6a3 100644 --- a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt +++ b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt @@ -64,9 +64,10 @@ fun parallel(vararg expectations: ExpectCompose): ExpectCompose = Expe */ fun Observable.expectEvents(isStrict: Boolean = true, expectCompose: () -> ExpectCompose) { val finishFuture = SettableFuture() - val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose()) }) + val stateLock = object {} + var state = ExpectComposeState.fromExpectCompose(expectCompose()) subscribe { event -> - lockedState.locked { + synchronized(stateLock) { if (state is ExpectComposeState.Finished) { log.warn("Got event $event, but was expecting no further events") return@subscribe @@ -109,7 +110,7 @@ internal data class Expect( val expectClosure: (T) -> Unit ) -private sealed class ExpectComposeState{ +private sealed class ExpectComposeState { class Finished : ExpectComposeState() class Single(val single: ExpectCompose.Single) : ExpectComposeState() class Sequential( @@ -118,8 +119,7 @@ private sealed class ExpectComposeState{ val state: ExpectComposeState ) : ExpectComposeState() class Parallel( - val parallel: - ExpectCompose.Parallel, + val parallel: ExpectCompose.Parallel, val states: List> ) : ExpectComposeState() From 0f54aec6aca4fb9cada5ff41aba8a41639b6f5c7 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 1 Sep 2016 09:35:13 +0100 Subject: [PATCH 09/11] client: Add repeat\(n\) to Expect dsl --- .../com/r3corda/client/testing/Expect.kt | 13 ++++++++--- .../client/WalletMonitorClientTests.kt | 22 ++----------------- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt index 27dcd5f6a3..cc3cdba34d 100644 --- a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt +++ b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt @@ -1,14 +1,12 @@ package com.r3corda.client.testing import co.paralleluniverse.strands.SettableFuture -import com.r3corda.core.ThreadBox -import org.reactfx.EventStream import org.slf4j.Logger import org.slf4j.LoggerFactory import rx.Observable /** - * This file defines a simple DSL for testing non-deterministic sequence of events arriving on an [EventStream]. + * This file defines a simple DSL for testing non-deterministic sequence of events arriving on an [Observable]. * * [sequence] is used to impose ordering invariants on the stream, whereas [parallel] allows events to arrive in any order. * @@ -57,8 +55,17 @@ fun sequence(vararg expectations: ExpectCompose): ExpectCompose = Expe */ fun parallel(vararg expectations: ExpectCompose): ExpectCompose = ExpectCompose.Parallel(listOf(*expectations)) +/** + * Tests that N events of the same type arrive + * + * @param number The number of events expected. + * @param expectation The piece of DSL to run on each event, with the index of the event passed in. + */ +inline fun repeat(number: Int, expectation: (Int) -> ExpectCompose) = sequence(*Array(number) { expectation(it) }) + /** * Run the specified DSL against the event stream. + * * @param isStrict If false non-matched events are disregarded (so the DSL will only check a subset of events). * @param expectCompose The DSL we expect to match against the stream of events. */ diff --git a/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt index ce85694854..9626185e28 100644 --- a/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt +++ b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt @@ -163,26 +163,8 @@ class WalletMonitorServiceTests { is TransactionBuildResult.Failed -> fail(state.message) } }, - expect { build: ServiceToClientEvent.Progress -> - // Requesting signature by notary service - }, - expect { build: ServiceToClientEvent.Progress -> - // Structural step change in child of Requesting signature by notary service - }, - expect { build: ServiceToClientEvent.Progress -> - // Requesting signature by notary service - }, - expect { build: ServiceToClientEvent.Progress -> - // Validating response from Notary service - }, - expect { build: ServiceToClientEvent.Progress -> - // Done - }, - expect { build: ServiceToClientEvent.Progress -> - // Broadcasting transaction to participants - }, - expect { build: ServiceToClientEvent.Progress -> - // Done + repeat(7) { + expect { build: ServiceToClientEvent.Progress -> } } ), expect { output: ServiceToClientEvent.OutputState -> From 5d04bd2cadd0a70f1ab0d356d6af62aea4d3da02 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 1 Sep 2016 09:38:04 +0100 Subject: [PATCH 10/11] client: Small comment correction --- client/src/main/kotlin/com/r3corda/client/testing/Expect.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt index cc3cdba34d..9281af5b85 100644 --- a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt +++ b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt @@ -17,7 +17,7 @@ import rx.Observable * * Example usage: * - * val stream: EventStream = (..) + * val stream: Ovservable = (..) * stream.expectEvents( * sequence( * expect { event: SomeEvent.A -> require(event.isOk()) }, From 1f095533be0ea5b35da9fb47fc9fe841127b8093 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 1 Sep 2016 10:14:37 +0100 Subject: [PATCH 11/11] client: Remove `when (this)` --- .../com/r3corda/client/testing/Expect.kt | 126 +++++++++--------- 1 file changed, 63 insertions(+), 63 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt index 9281af5b85..f3d1113dc8 100644 --- a/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt +++ b/client/src/main/kotlin/com/r3corda/client/testing/Expect.kt @@ -17,7 +17,7 @@ import rx.Observable * * Example usage: * - * val stream: Ovservable = (..) + * val stream: Observable = (..) * stream.expectEvents( * sequence( * expect { event: SomeEvent.A -> require(event.isOk()) }, @@ -118,17 +118,75 @@ internal data class Expect( ) private sealed class ExpectComposeState { - class Finished : ExpectComposeState() - class Single(val single: ExpectCompose.Single) : ExpectComposeState() + + abstract fun nextState(event: E): Pair<() -> Unit, ExpectComposeState>? + abstract fun getExpectedEvents(): List> + + class Finished : ExpectComposeState() { + override fun nextState(event: E) = null + override fun getExpectedEvents(): List> = listOf() + } + class Single(val single: ExpectCompose.Single) : ExpectComposeState() { + override fun nextState(event: E): Pair<() -> Unit, ExpectComposeState>? = + if (single.expect.clazz.isAssignableFrom(event.javaClass)) { + @Suppress("UNCHECKED_CAST") + Pair({ single.expect.expectClosure(event) }, Finished()) + } else { + null + } + override fun getExpectedEvents() = listOf(single.expect.clazz) + } + class Sequential( val sequential: ExpectCompose.Sequential, val index: Int, val state: ExpectComposeState - ) : ExpectComposeState() + ) : ExpectComposeState() { + override fun nextState(event: E): Pair<() -> Unit, ExpectComposeState>? { + val next = state.nextState(event) + return if (next == null) { + null + } else if (next.second is Finished) { + if (index == sequential.sequence.size - 1) { + Pair(next.first, Finished()) + } else { + val nextState = fromExpectCompose(sequential.sequence[index + 1]) + if (nextState is Finished) { + Pair(next.first, Finished()) + } else { + Pair(next.first, Sequential(sequential, index + 1, nextState)) + } + } + } else { + Pair(next.first, Sequential(sequential, index, next.second)) + } + } + + override fun getExpectedEvents() = state.getExpectedEvents() + } + class Parallel( val parallel: ExpectCompose.Parallel, val states: List> - ) : ExpectComposeState() + ) : ExpectComposeState() { + override fun nextState(event: E): Pair<() -> Unit, ExpectComposeState>? { + states.forEachIndexed { stateIndex, state -> + val next = state.nextState(event) + if (next != null) { + val nextStates = states.mapIndexed { i, expectComposeState -> + if (i == stateIndex) next.second else expectComposeState + } + if (nextStates.all { it is Finished }) { + return Pair(next.first, Finished()) + } else { + return Pair(next.first, Parallel(parallel, nextStates)) + } + } + } + return null + } + override fun getExpectedEvents() = states.flatMap { it.getExpectedEvents() } + } companion object { fun fromExpectCompose(expectCompose: ExpectCompose): ExpectComposeState { @@ -151,62 +209,4 @@ private sealed class ExpectComposeState { } } } - - fun getExpectedEvents(): List> { - return when (this) { - is ExpectComposeState.Finished -> listOf() - is ExpectComposeState.Single -> listOf(single.expect.clazz) - is ExpectComposeState.Sequential -> state.getExpectedEvents() - is ExpectComposeState.Parallel -> states.flatMap { it.getExpectedEvents() } - } - } - - fun nextState(event: E): Pair<() -> Unit, ExpectComposeState>? { - return when (this) { - is ExpectComposeState.Finished -> null - is ExpectComposeState.Single -> { - if (single.expect.clazz.isAssignableFrom(event.javaClass)) { - @Suppress("UNCHECKED_CAST") - Pair({ single.expect.expectClosure(event) }, Finished()) - } else { - null - } - } - is ExpectComposeState.Sequential -> { - val next = state.nextState(event) - if (next == null) { - null - } else if (next.second is Finished) { - if (index == sequential.sequence.size - 1) { - Pair(next.first, Finished()) - } else { - val nextState = fromExpectCompose(sequential.sequence[index + 1]) - if (nextState is Finished) { - Pair(next.first, Finished()) - } else { - Pair(next.first, Sequential(sequential, index + 1, nextState)) - } - } - } else { - Pair(next.first, Sequential(sequential, index, next.second)) - } - } - is ExpectComposeState.Parallel -> { - states.forEachIndexed { stateIndex, state -> - val next = state.nextState(event) - if (next != null) { - val nextStates = states.mapIndexed { i, expectComposeState -> - if (i == stateIndex) next.second else expectComposeState - } - if (nextStates.all { it is Finished }) { - return Pair(next.first, Finished()) - } else { - return Pair(next.first, Parallel(parallel, nextStates)) - } - } - } - null - } - } - } }