client: Add WalletMonitorClient, Expect DSL and some tests for the client

This commit is contained in:
Andras Slemmer 2016-08-26 14:31:25 +01:00
parent cfebccc495
commit 2f7b022c8e
3 changed files with 530 additions and 0 deletions

View File

@ -0,0 +1,61 @@
package com.r3corda.client
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.node.services.monitor.*
import org.reactfx.EventSink
import org.reactfx.EventStream
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
* Worked example of a client which communicates with the wallet monitor service.
*/
private val log: Logger = LoggerFactory.getLogger("WalletMonitorClient")
class WalletMonitorClient(
val net: MessagingService,
val node: NodeInfo,
val outEvents: EventStream<ClientToServiceCommand>,
val inEvents: EventSink<ServiceToClientEvent>
) {
private val sessionID = random63BitValue()
fun register(): ListenableFuture<Boolean> {
val future = SettableFuture.create<Boolean>()
log.info("Registering with ID $sessionID. I am ${net.myAddress}")
net.addMessageHandler(WalletMonitorService.REGISTER_TOPIC, sessionID) { msg, reg ->
val resp = msg.data.deserialize<RegisterResponse>()
net.removeMessageHandler(reg)
future.set(resp.success)
}
net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, req ->
// TODO
}
net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg ->
val event = msg.data.deserialize<ServiceToClientEvent>()
inEvents.push(event)
}
val req = RegisterRequest(net.myAddress, sessionID)
val registerMessage = net.createMessage(WalletMonitorService.REGISTER_TOPIC, 0, req.serialize().bits)
net.send(registerMessage, node.address)
outEvents.subscribe { event ->
val envelope = ClientToServiceCommandMessage(sessionID, net.myAddress, event)
val message = net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, 0, envelope.serialize().bits)
net.send(message, node.address)
}
return future
}
}

View File

@ -0,0 +1,196 @@
package com.r3corda.client.testing
import co.paralleluniverse.strands.SettableFuture
import com.r3corda.core.ThreadBox
import org.reactfx.EventStream
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
* This file defines a simple DSL for testing non-deterministic sequence of events arriving on an [EventStream].
*
* [sequence] is used to impose ordering invariants on the stream, whereas [parallel] allows events to arrive in any order.
*
* The only restriction on [parallel] is that we should be able to discriminate which branch to take based on the
* arrived event's type. If this is ambiguous the first matching piece of DSL will be run.
* [sequence]s and [parallel]s can be nested arbitrarily
*
* Example usage:
*
* val stream: EventStream<SomeEvent> = (..)
* stream.expectEvents(
* sequence(
* expect { event: SomeEvent.A -> require(event.isOk()) },
* parallel(
* expect { event.SomeEvent.B -> },
* expect { event.SomeEvent.C -> }
* )
* )
* )
*
* The above will test our expectation that the stream should first emit an A, and then a B and C in unspecified order.
*/
val log: Logger = LoggerFactory.getLogger("Expect")
sealed class ExpectCompose<out E> {
class Single<E>(val expect: Expect<E, E>) : ExpectCompose<E>()
class Sequential<E>(val sequence: List<ExpectCompose<E>>) : ExpectCompose<E>()
class Parallel<E>(val parallel: List<ExpectCompose<E>>) : ExpectCompose<E>()
}
data class Expect<E, T : E>(
val clazz: Class<T>,
val expectClosure: (T) -> Unit
)
inline fun <E : Any, reified T : E> expect(noinline expectClosure: (T) -> Unit): ExpectCompose<E> {
return ExpectCompose.Single(Expect(T::class.java, expectClosure))
}
/**
* Tests that events arrive in the specified order
*
* @param expects The pieces of DSL that should run sequentially when events arrive
*/
fun <E> sequence(vararg expects: ExpectCompose<E>) =
ExpectCompose.Sequential(listOf(*expects))
/**
* Tests that events arrive in unspecified order
*
* @param expects The pieces of DSL all of which should run but in an unspecified order depending on what sequence events arrive
*/
fun <E> parallel(vararg expects: ExpectCompose<E>) =
ExpectCompose.Parallel(listOf(*expects))
sealed class ExpectComposeState<E : Any>{
class Finished<E : Any> : ExpectComposeState<E>()
class Single<E : Any>(val single: ExpectCompose.Single<E>) : ExpectComposeState<E>()
class Sequential<E : Any>(
val sequential: ExpectCompose.Sequential<E>,
val index: Int,
val state: ExpectComposeState<E>
) : ExpectComposeState<E>()
class Parallel<E : Any>(
val parallel:
ExpectCompose.Parallel<E>,
val states: List<ExpectComposeState<E>>
) : ExpectComposeState<E>()
companion object {
fun <E : Any> fromExpectCompose(expectCompose: ExpectCompose<E>): ExpectComposeState<E> {
return when (expectCompose) {
is ExpectCompose.Single -> Single(expectCompose)
is ExpectCompose.Sequential -> {
if (expectCompose.sequence.size > 0) {
Sequential(expectCompose, 0, fromExpectCompose(expectCompose.sequence[0]))
} else {
Finished()
}
}
is ExpectCompose.Parallel -> {
if (expectCompose.parallel.size > 0) {
Parallel(expectCompose, expectCompose.parallel.map { fromExpectCompose(it) })
} else {
Finished()
}
}
}
}
}
fun getExpectedEvents(): List<Class<out E>> {
return when (this) {
is ExpectComposeState.Finished -> listOf()
is ExpectComposeState.Single -> listOf(single.expect.clazz)
is ExpectComposeState.Sequential -> state.getExpectedEvents()
is ExpectComposeState.Parallel -> states.flatMap { it.getExpectedEvents() }
}
}
fun nextState(event: E): Pair<() -> Unit, ExpectComposeState<E>>? {
return when (this) {
is ExpectComposeState.Finished -> null
is ExpectComposeState.Single -> {
if (single.expect.clazz.isAssignableFrom(event.javaClass)) {
@Suppress("UNCHECKED_CAST")
Pair({ single.expect.expectClosure(event) }, Finished())
} else {
null
}
}
is ExpectComposeState.Sequential -> {
val next = state.nextState(event)
if (next == null) {
null
} else if (next.second is Finished) {
if (index == sequential.sequence.size - 1) {
Pair(next.first, Finished<E>())
} else {
val nextState = fromExpectCompose(sequential.sequence[index + 1])
if (nextState is Finished) {
Pair(next.first, Finished<E>())
} else {
Pair(next.first, Sequential(sequential, index + 1, nextState))
}
}
} else {
Pair(next.first, Sequential(sequential, index, next.second))
}
}
is ExpectComposeState.Parallel -> {
states.forEachIndexed { stateIndex, state ->
val next = state.nextState(event)
if (next != null) {
val nextStates = states.mapIndexed { i, expectComposeState ->
if (i == stateIndex) next.second else expectComposeState
}
if (nextStates.all { it is Finished }) {
return Pair(next.first, Finished())
} else {
return Pair(next.first, Parallel(parallel, nextStates))
}
}
}
null
}
}
}
}
fun <E : Any> EventStream<E>.expectEvents(expectCompose: ExpectCompose<E>) {
val finishFuture = SettableFuture<Unit>()
val lockedState = ThreadBox(object { var state = ExpectComposeState.fromExpectCompose(expectCompose) })
subscribe { event ->
lockedState.locked {
if (state is ExpectComposeState.Finished) {
log.warn("Got event $event, but was expecting no further events")
return@subscribe
}
val next = state.nextState(event)
log.info("$event :: ${state.getExpectedEvents()} -> ${next?.second?.getExpectedEvents()}")
if (next == null) {
val expectedStates = state.getExpectedEvents()
finishFuture.setException(Exception(
"Got $event, expected one of $expectedStates"
))
state = ExpectComposeState.Finished()
} else {
state = next.second
try {
next.first()
} catch (exception: Exception) {
finishFuture.setException(exception)
}
if (state is ExpectComposeState.Finished) {
finishFuture.set(Unit)
}
}
}
}
finishFuture.get()
}

View File

@ -0,0 +1,273 @@
package com.r3corda.client
import co.paralleluniverse.strands.SettableFuture
import com.r3corda.client.testing.*
import com.r3corda.core.contracts.*
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.node.driver.driver
import com.r3corda.node.driver.startClient
import com.r3corda.node.services.monitor.ServiceToClientEvent
import com.r3corda.node.services.monitor.TransactionBuildResult
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.utilities.AddOrRemove
import org.junit.Test
import org.reactfx.EventSource
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import kotlin.test.fail
val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java)
class WalletMonitorServiceTests {
@Test
fun cashIssueWorksEndToEnd() {
driver {
val aliceNodeFuture = startNode("Alice")
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type))
val aliceNode = aliceNodeFuture.get()
val notaryNode = notaryNodeFuture.get()
val client = startClient(aliceNode).get()
log.info("Alice is ${aliceNode.identity}")
log.info("Notary is ${notaryNode.identity}")
val aliceInStream = EventSource<ServiceToClientEvent>()
val aliceOutStream = EventSource<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
require(aliceMonitorClient.register().get())
aliceOutStream.push(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.identity,
notary = notaryNode.identity
))
val buildFuture = SettableFuture<ServiceToClientEvent.TransactionBuild>()
val eventFuture = SettableFuture<ServiceToClientEvent.OutputState>()
aliceInStream.subscribe {
if (it is ServiceToClientEvent.OutputState)
eventFuture.set(it)
else if (it is ServiceToClientEvent.TransactionBuild)
buildFuture.set(it)
else
log.warn("Unexpected event $it")
}
val buildEvent = buildFuture.get()
val state = buildEvent.state
if (state is TransactionBuildResult.Failed) {
fail(state.message)
}
val outputEvent = eventFuture.get()
require(outputEvent.consumed.size == 0)
require(outputEvent.produced.size == 1)
}
}
@Test
fun issueAndMoveWorks() {
driver {
val aliceNodeFuture = startNode("Alice")
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type))
val aliceNode = aliceNodeFuture.get()
val notaryNode = notaryNodeFuture.get()
val client = startClient(aliceNode).get()
log.info("Alice is ${aliceNode.identity}")
log.info("Notary is ${notaryNode.identity}")
val aliceInStream = EventSource<ServiceToClientEvent>()
val aliceOutStream = EventSource<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
require(aliceMonitorClient.register().get())
aliceOutStream.push(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.identity,
notary = notaryNode.identity
))
aliceOutStream.push(ClientToServiceCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
recipient = aliceNode.identity
))
aliceInStream.expectEvents(sequence(
// ISSUE
parallel(
sequence(
expect { add: ServiceToClientEvent.StateMachine ->
require(add.addOrRemove == AddOrRemove.ADD)
},
expect { remove: ServiceToClientEvent.StateMachine ->
require(remove.addOrRemove == AddOrRemove.REMOVE)
}
),
expect { tx: ServiceToClientEvent.Transaction ->
require(tx.transaction.tx.inputs.isEmpty())
require(tx.transaction.tx.outputs.size == 1)
val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet()
// Only Alice signed
require(signaturePubKeys.size == 1)
require(signaturePubKeys.contains(aliceNode.identity.owningKey))
},
expect { build: ServiceToClientEvent.TransactionBuild ->
val state = build.state
when (state) {
is TransactionBuildResult.ProtocolStarted -> {
}
is TransactionBuildResult.Failed -> fail(state.message)
}
},
expect { output: ServiceToClientEvent.OutputState ->
require(output.consumed.size == 0)
require(output.produced.size == 1)
}
),
// MOVE
parallel(
sequence(
expect { add: ServiceToClientEvent.StateMachine ->
require(add.addOrRemove == AddOrRemove.ADD)
},
expect { add: ServiceToClientEvent.StateMachine ->
require(add.addOrRemove == AddOrRemove.REMOVE)
}
),
expect { tx: ServiceToClientEvent.Transaction ->
require(tx.transaction.tx.inputs.size == 1)
require(tx.transaction.tx.outputs.size == 1)
val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet()
// Alice and Notary signed
require(signaturePubKeys.size == 2)
require(signaturePubKeys.contains(aliceNode.identity.owningKey))
require(signaturePubKeys.contains(notaryNode.identity.owningKey))
},
sequence(
expect { build: ServiceToClientEvent.TransactionBuild ->
val state = build.state
when (state) {
is TransactionBuildResult.ProtocolStarted -> {
log.info("${state.message}")
}
is TransactionBuildResult.Failed -> fail(state.message)
}
},
expect { build: ServiceToClientEvent.Progress ->
// Requesting signature by notary service
},
expect { build: ServiceToClientEvent.Progress ->
// Structural step change in child of Requesting signature by notary service
},
expect { build: ServiceToClientEvent.Progress ->
// Requesting signature by notary service
},
expect { build: ServiceToClientEvent.Progress ->
// Validating response from Notary service
},
expect { build: ServiceToClientEvent.Progress ->
// Done
},
expect { build: ServiceToClientEvent.Progress ->
// Broadcasting transaction to participants
},
expect { build: ServiceToClientEvent.Progress ->
// Done
}
),
expect { output: ServiceToClientEvent.OutputState ->
require(output.consumed.size == 1)
require(output.produced.size == 1)
}
)
))
}
}
@Test
fun movingCashOfDifferentIssueRefsFails() {
driver {
val aliceNodeFuture = startNode("Alice")
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type))
val aliceNode = aliceNodeFuture.get()
val notaryNode = notaryNodeFuture.get()
val client = startClient(aliceNode).get()
log.info("Alice is ${aliceNode.identity}")
log.info("Notary is ${notaryNode.identity}")
val aliceInStream = EventSource<ServiceToClientEvent>()
val aliceOutStream = EventSource<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
require(aliceMonitorClient.register().get())
aliceOutStream.push(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.identity,
notary = notaryNode.identity
))
aliceOutStream.push(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 2 })),
recipient = aliceNode.identity,
notary = notaryNode.identity
))
aliceOutStream.push(ClientToServiceCommand.PayCash(
amount = Amount(200, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
recipient = aliceNode.identity
))
aliceInStream.expectEvents(sequence(
// ISSUE 1
parallel(
sequence(
expect { add: ServiceToClientEvent.StateMachine ->
require(add.addOrRemove == AddOrRemove.ADD)
},
expect { remove: ServiceToClientEvent.StateMachine ->
require(remove.addOrRemove == AddOrRemove.REMOVE)
}
),
expect { tx: ServiceToClientEvent.Transaction -> },
expect { build: ServiceToClientEvent.TransactionBuild -> },
expect { output: ServiceToClientEvent.OutputState -> }
),
// ISSUE 2
parallel(
sequence(
expect { add: ServiceToClientEvent.StateMachine ->
require(add.addOrRemove == AddOrRemove.ADD)
},
expect { remove: ServiceToClientEvent.StateMachine ->
require(remove.addOrRemove == AddOrRemove.REMOVE)
}
),
expect { tx: ServiceToClientEvent.Transaction -> },
expect { build: ServiceToClientEvent.TransactionBuild -> },
expect { output: ServiceToClientEvent.OutputState -> }
),
// MOVE, should fail
expect { build: ServiceToClientEvent.TransactionBuild ->
val state = build.state
require(state is TransactionBuildResult.Failed)
}
))
}
}
}