mirror of
https://github.com/corda/corda.git
synced 2025-04-07 19:34:41 +00:00
Merged in aslemmer-node-client (pull request #305)
Aslemmer node client
This commit is contained in:
commit
6af7945dc6
1
.gitignore
vendored
1
.gitignore
vendored
@ -17,6 +17,7 @@ tags
|
||||
/experimental/build
|
||||
/docs/build/doctrees
|
||||
/test-utils/build
|
||||
/client/build
|
||||
|
||||
# gradle's buildSrc build/
|
||||
/buildSrc/build/
|
||||
|
3
.idea/modules.xml
generated
3
.idea/modules.xml
generated
@ -5,6 +5,9 @@
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/modules/buildSrc.iml" filepath="$PROJECT_DIR$/.idea/modules/buildSrc.iml" />
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/modules/buildSrc_main.iml" filepath="$PROJECT_DIR$/.idea/modules/buildSrc_main.iml" group="buildSrc" />
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/modules/buildSrc_test.iml" filepath="$PROJECT_DIR$/.idea/modules/buildSrc_test.iml" group="buildSrc" />
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/modules/client/client.iml" filepath="$PROJECT_DIR$/.idea/modules/client/client.iml" group="client" />
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/modules/client/client_main.iml" filepath="$PROJECT_DIR$/.idea/modules/client/client_main.iml" group="client" />
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/modules/client/client_test.iml" filepath="$PROJECT_DIR$/.idea/modules/client/client_test.iml" group="client" />
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/modules/contracts/contracts.iml" filepath="$PROJECT_DIR$/.idea/modules/contracts/contracts.iml" group="contracts" />
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/modules/contracts/contracts_main.iml" filepath="$PROJECT_DIR$/.idea/modules/contracts/contracts_main.iml" group="contracts" />
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/modules/contracts/contracts_test.iml" filepath="$PROJECT_DIR$/.idea/modules/contracts/contracts_test.iml" group="contracts" />
|
||||
|
53
client/build.gradle
Normal file
53
client/build.gradle
Normal file
@ -0,0 +1,53 @@
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: QuasarPlugin
|
||||
|
||||
repositories {
|
||||
mavenLocal()
|
||||
mavenCentral()
|
||||
maven {
|
||||
url 'http://oss.sonatype.org/content/repositories/snapshots'
|
||||
}
|
||||
jcenter()
|
||||
maven {
|
||||
url 'https://dl.bintray.com/kotlin/exposed'
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//noinspection GroovyAssignabilityCheck
|
||||
configurations {
|
||||
|
||||
// we don't want isolated.jar in classPath, since we want to test jar being dynamically loaded as an attachment
|
||||
runtime.exclude module: 'isolated'
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
test {
|
||||
resources {
|
||||
srcDir "../config/test"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// To find potential version conflicts, run "gradle htmlDependencyReport" and then look in
|
||||
// build/reports/project/dependencies/index.html for green highlighted parts of the tree.
|
||||
|
||||
dependencies {
|
||||
compile project(':node')
|
||||
|
||||
// Log4J: logging framework (with SLF4J bindings)
|
||||
compile "org.apache.logging.log4j:log4j-slf4j-impl:${log4j_version}"
|
||||
compile "org.apache.logging.log4j:log4j-core:${log4j_version}"
|
||||
|
||||
compile "com.google.guava:guava:19.0"
|
||||
|
||||
// ReactFX: Functional reactive UI programming.
|
||||
compile 'org.reactfx:reactfx:2.0-M5'
|
||||
compile 'org.fxmisc.easybind:easybind:1.0.3'
|
||||
|
||||
// Unit testing helpers.
|
||||
testCompile 'junit:junit:4.12'
|
||||
testCompile "org.assertj:assertj-core:${assertj_version}"
|
||||
}
|
||||
|
||||
quasarScan.dependsOn('classes', ':core:classes', ':contracts:classes')
|
@ -0,0 +1,61 @@
|
||||
package com.r3corda.client
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.contracts.ClientToServiceCommand
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.node.services.monitor.*
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
import rx.Observer
|
||||
|
||||
/**
|
||||
* Worked example of a client which communicates with the wallet monitor service.
|
||||
*/
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger("WalletMonitorClient")
|
||||
|
||||
class WalletMonitorClient(
|
||||
val net: MessagingService,
|
||||
val node: NodeInfo,
|
||||
val outEvents: Observable<ClientToServiceCommand>,
|
||||
val inEvents: Observer<ServiceToClientEvent>
|
||||
) {
|
||||
private val sessionID = random63BitValue()
|
||||
|
||||
fun register(): ListenableFuture<Boolean> {
|
||||
|
||||
val future = SettableFuture.create<Boolean>()
|
||||
log.info("Registering with ID $sessionID. I am ${net.myAddress}")
|
||||
net.addMessageHandler(WalletMonitorService.REGISTER_TOPIC, sessionID) { msg, reg ->
|
||||
val resp = msg.data.deserialize<RegisterResponse>()
|
||||
net.removeMessageHandler(reg)
|
||||
future.set(resp.success)
|
||||
}
|
||||
net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, req ->
|
||||
// TODO
|
||||
}
|
||||
|
||||
net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg ->
|
||||
val event = msg.data.deserialize<ServiceToClientEvent>()
|
||||
inEvents.onNext(event)
|
||||
}
|
||||
|
||||
val req = RegisterRequest(net.myAddress, sessionID)
|
||||
val registerMessage = net.createMessage(WalletMonitorService.REGISTER_TOPIC, 0, req.serialize().bits)
|
||||
net.send(registerMessage, node.address)
|
||||
|
||||
outEvents.subscribe { event ->
|
||||
val envelope = ClientToServiceCommandMessage(sessionID, net.myAddress, event)
|
||||
val message = net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, 0, envelope.serialize().bits)
|
||||
net.send(message, node.address)
|
||||
}
|
||||
|
||||
return future
|
||||
}
|
||||
}
|
212
client/src/main/kotlin/com/r3corda/client/testing/Expect.kt
Normal file
212
client/src/main/kotlin/com/r3corda/client/testing/Expect.kt
Normal file
@ -0,0 +1,212 @@
|
||||
package com.r3corda.client.testing
|
||||
|
||||
import co.paralleluniverse.strands.SettableFuture
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
|
||||
/**
|
||||
* This file defines a simple DSL for testing non-deterministic sequence of events arriving on an [Observable].
|
||||
*
|
||||
* [sequence] is used to impose ordering invariants on the stream, whereas [parallel] allows events to arrive in any order.
|
||||
*
|
||||
* The only restriction on [parallel] is that we should be able to discriminate which branch to take based on the
|
||||
* arrived event's type. If this is ambiguous the first matching piece of DSL will be run.
|
||||
|
||||
* [sequence]s and [parallel]s can be nested arbitrarily
|
||||
*
|
||||
* Example usage:
|
||||
*
|
||||
* val stream: Observable<SomeEvent> = (..)
|
||||
* stream.expectEvents(
|
||||
* sequence(
|
||||
* expect { event: SomeEvent.A -> require(event.isOk()) },
|
||||
* parallel(
|
||||
* expect { event.SomeEvent.B -> },
|
||||
* expect { event.SomeEvent.C -> }
|
||||
* )
|
||||
* )
|
||||
* )
|
||||
*
|
||||
* The above will test our expectation that the stream should first emit an A, and then a B and C in unspecified order.
|
||||
*/
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger("Expect")
|
||||
|
||||
/**
|
||||
* Expect an event of type [T] and run [expectClosure] on it
|
||||
*/
|
||||
inline fun <E : Any, reified T : E> expect(noinline expectClosure: (T) -> Unit) = expect(T::class.java, expectClosure)
|
||||
fun <E : Any, T : E> expect(klass: Class<T>, expectClosure: (T) -> Unit): ExpectCompose<E> {
|
||||
return ExpectCompose.Single(Expect(klass, expectClosure))
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that events arrive in the specified order.
|
||||
*
|
||||
* @param expectations The pieces of DSL that should run sequentially when events arrive.
|
||||
*/
|
||||
fun <E> sequence(vararg expectations: ExpectCompose<E>): ExpectCompose<E> = ExpectCompose.Sequential(listOf(*expectations))
|
||||
|
||||
/**
|
||||
* Tests that events arrive in unspecified order.
|
||||
*
|
||||
* @param expectations The pieces of DSL all of which should run but in an unspecified order depending on what sequence events arrive.
|
||||
*/
|
||||
fun <E> parallel(vararg expectations: ExpectCompose<E>): ExpectCompose<E> = ExpectCompose.Parallel(listOf(*expectations))
|
||||
|
||||
/**
|
||||
* Tests that N events of the same type arrive
|
||||
*
|
||||
* @param number The number of events expected.
|
||||
* @param expectation The piece of DSL to run on each event, with the index of the event passed in.
|
||||
*/
|
||||
inline fun <E> repeat(number: Int, expectation: (Int) -> ExpectCompose<E>) = sequence(*Array(number) { expectation(it) })
|
||||
|
||||
/**
|
||||
* Run the specified DSL against the event stream.
|
||||
*
|
||||
* @param isStrict If false non-matched events are disregarded (so the DSL will only check a subset of events).
|
||||
* @param expectCompose The DSL we expect to match against the stream of events.
|
||||
*/
|
||||
fun <E : Any> Observable<E>.expectEvents(isStrict: Boolean = true, expectCompose: () -> ExpectCompose<E>) {
|
||||
val finishFuture = SettableFuture<Unit>()
|
||||
val stateLock = object {}
|
||||
var state = ExpectComposeState.fromExpectCompose(expectCompose())
|
||||
subscribe { event ->
|
||||
synchronized(stateLock) {
|
||||
if (state is ExpectComposeState.Finished) {
|
||||
log.warn("Got event $event, but was expecting no further events")
|
||||
return@subscribe
|
||||
}
|
||||
val next = state.nextState(event)
|
||||
log.info("$event :: ${state.getExpectedEvents()} -> ${next?.second?.getExpectedEvents()}")
|
||||
if (next == null) {
|
||||
val expectedStates = state.getExpectedEvents()
|
||||
val message = "Got $event, expected one of $expectedStates"
|
||||
if (isStrict) {
|
||||
finishFuture.setException(Exception(message))
|
||||
state = ExpectComposeState.Finished()
|
||||
} else {
|
||||
log.warn("$message, discarding event as isStrict=false")
|
||||
}
|
||||
} else {
|
||||
state = next.second
|
||||
try {
|
||||
next.first()
|
||||
} catch (exception: Exception) {
|
||||
finishFuture.setException(exception)
|
||||
}
|
||||
if (state is ExpectComposeState.Finished) {
|
||||
finishFuture.set(Unit)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finishFuture.get()
|
||||
}
|
||||
|
||||
sealed class ExpectCompose<out E> {
|
||||
internal class Single<E>(val expect: Expect<E, E>) : ExpectCompose<E>()
|
||||
internal class Sequential<E>(val sequence: List<ExpectCompose<E>>) : ExpectCompose<E>()
|
||||
internal class Parallel<E>(val parallel: List<ExpectCompose<E>>) : ExpectCompose<E>()
|
||||
}
|
||||
|
||||
internal data class Expect<E, T : E>(
|
||||
val clazz: Class<T>,
|
||||
val expectClosure: (T) -> Unit
|
||||
)
|
||||
|
||||
private sealed class ExpectComposeState<E : Any> {
|
||||
|
||||
abstract fun nextState(event: E): Pair<() -> Unit, ExpectComposeState<E>>?
|
||||
abstract fun getExpectedEvents(): List<Class<out E>>
|
||||
|
||||
class Finished<E : Any> : ExpectComposeState<E>() {
|
||||
override fun nextState(event: E) = null
|
||||
override fun getExpectedEvents(): List<Class<out E>> = listOf()
|
||||
}
|
||||
class Single<E : Any>(val single: ExpectCompose.Single<E>) : ExpectComposeState<E>() {
|
||||
override fun nextState(event: E): Pair<() -> Unit, ExpectComposeState<E>>? =
|
||||
if (single.expect.clazz.isAssignableFrom(event.javaClass)) {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
Pair({ single.expect.expectClosure(event) }, Finished())
|
||||
} else {
|
||||
null
|
||||
}
|
||||
override fun getExpectedEvents() = listOf(single.expect.clazz)
|
||||
}
|
||||
|
||||
class Sequential<E : Any>(
|
||||
val sequential: ExpectCompose.Sequential<E>,
|
||||
val index: Int,
|
||||
val state: ExpectComposeState<E>
|
||||
) : ExpectComposeState<E>() {
|
||||
override fun nextState(event: E): Pair<() -> Unit, ExpectComposeState<E>>? {
|
||||
val next = state.nextState(event)
|
||||
return if (next == null) {
|
||||
null
|
||||
} else if (next.second is Finished) {
|
||||
if (index == sequential.sequence.size - 1) {
|
||||
Pair(next.first, Finished<E>())
|
||||
} else {
|
||||
val nextState = fromExpectCompose(sequential.sequence[index + 1])
|
||||
if (nextState is Finished) {
|
||||
Pair(next.first, Finished<E>())
|
||||
} else {
|
||||
Pair(next.first, Sequential(sequential, index + 1, nextState))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Pair(next.first, Sequential(sequential, index, next.second))
|
||||
}
|
||||
}
|
||||
|
||||
override fun getExpectedEvents() = state.getExpectedEvents()
|
||||
}
|
||||
|
||||
class Parallel<E : Any>(
|
||||
val parallel: ExpectCompose.Parallel<E>,
|
||||
val states: List<ExpectComposeState<E>>
|
||||
) : ExpectComposeState<E>() {
|
||||
override fun nextState(event: E): Pair<() -> Unit, ExpectComposeState<E>>? {
|
||||
states.forEachIndexed { stateIndex, state ->
|
||||
val next = state.nextState(event)
|
||||
if (next != null) {
|
||||
val nextStates = states.mapIndexed { i, expectComposeState ->
|
||||
if (i == stateIndex) next.second else expectComposeState
|
||||
}
|
||||
if (nextStates.all { it is Finished }) {
|
||||
return Pair(next.first, Finished())
|
||||
} else {
|
||||
return Pair(next.first, Parallel(parallel, nextStates))
|
||||
}
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
override fun getExpectedEvents() = states.flatMap { it.getExpectedEvents() }
|
||||
}
|
||||
|
||||
companion object {
|
||||
fun <E : Any> fromExpectCompose(expectCompose: ExpectCompose<E>): ExpectComposeState<E> {
|
||||
return when (expectCompose) {
|
||||
is ExpectCompose.Single -> Single(expectCompose)
|
||||
is ExpectCompose.Sequential -> {
|
||||
if (expectCompose.sequence.size > 0) {
|
||||
Sequential(expectCompose, 0, fromExpectCompose(expectCompose.sequence[0]))
|
||||
} else {
|
||||
Finished()
|
||||
}
|
||||
}
|
||||
is ExpectCompose.Parallel -> {
|
||||
if (expectCompose.parallel.size > 0) {
|
||||
Parallel(expectCompose, expectCompose.parallel.map { fromExpectCompose(it) })
|
||||
} else {
|
||||
Finished()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,259 @@
|
||||
package com.r3corda.client
|
||||
|
||||
import co.paralleluniverse.strands.SettableFuture
|
||||
import com.r3corda.client.testing.*
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.serialization.OpaqueBytes
|
||||
import com.r3corda.node.driver.driver
|
||||
import com.r3corda.node.driver.startClient
|
||||
import com.r3corda.node.services.monitor.ServiceToClientEvent
|
||||
import com.r3corda.node.services.monitor.TransactionBuildResult
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import org.junit.Test
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.subjects.PublishSubject
|
||||
import kotlin.test.fail
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java)
|
||||
|
||||
class WalletMonitorServiceTests {
|
||||
@Test
|
||||
fun cashIssueWorksEndToEnd() {
|
||||
|
||||
driver {
|
||||
val aliceNodeFuture = startNode("Alice")
|
||||
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type))
|
||||
|
||||
val aliceNode = aliceNodeFuture.get()
|
||||
val notaryNode = notaryNodeFuture.get()
|
||||
val client = startClient(aliceNode).get()
|
||||
|
||||
log.info("Alice is ${aliceNode.identity}")
|
||||
log.info("Notary is ${notaryNode.identity}")
|
||||
|
||||
val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
|
||||
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
|
||||
|
||||
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
|
||||
require(aliceMonitorClient.register().get())
|
||||
|
||||
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
|
||||
amount = Amount(100, USD),
|
||||
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
|
||||
recipient = aliceNode.identity,
|
||||
notary = notaryNode.identity
|
||||
))
|
||||
|
||||
val buildFuture = SettableFuture<ServiceToClientEvent.TransactionBuild>()
|
||||
val eventFuture = SettableFuture<ServiceToClientEvent.OutputState>()
|
||||
aliceInStream.subscribe {
|
||||
if (it is ServiceToClientEvent.OutputState)
|
||||
eventFuture.set(it)
|
||||
else if (it is ServiceToClientEvent.TransactionBuild)
|
||||
buildFuture.set(it)
|
||||
else
|
||||
log.warn("Unexpected event $it")
|
||||
}
|
||||
|
||||
val buildEvent = buildFuture.get()
|
||||
val state = buildEvent.state
|
||||
if (state is TransactionBuildResult.Failed) {
|
||||
fail(state.message)
|
||||
}
|
||||
|
||||
val outputEvent = eventFuture.get()
|
||||
require(outputEvent.consumed.size == 0)
|
||||
require(outputEvent.produced.size == 1)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun issueAndMoveWorks() {
|
||||
driver {
|
||||
val aliceNodeFuture = startNode("Alice")
|
||||
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type))
|
||||
|
||||
val aliceNode = aliceNodeFuture.get()
|
||||
val notaryNode = notaryNodeFuture.get()
|
||||
val client = startClient(aliceNode).get()
|
||||
|
||||
log.info("Alice is ${aliceNode.identity}")
|
||||
log.info("Notary is ${notaryNode.identity}")
|
||||
|
||||
val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
|
||||
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
|
||||
|
||||
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
|
||||
require(aliceMonitorClient.register().get())
|
||||
|
||||
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
|
||||
amount = Amount(100, USD),
|
||||
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
|
||||
recipient = aliceNode.identity,
|
||||
notary = notaryNode.identity
|
||||
))
|
||||
|
||||
aliceOutStream.onNext(ClientToServiceCommand.PayCash(
|
||||
amount = Amount(100, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
|
||||
recipient = aliceNode.identity
|
||||
))
|
||||
|
||||
aliceInStream.expectEvents {
|
||||
sequence(
|
||||
// ISSUE
|
||||
parallel(
|
||||
sequence(
|
||||
expect { add: ServiceToClientEvent.StateMachine ->
|
||||
require(add.addOrRemove == AddOrRemove.ADD)
|
||||
},
|
||||
expect { remove: ServiceToClientEvent.StateMachine ->
|
||||
require(remove.addOrRemove == AddOrRemove.REMOVE)
|
||||
}
|
||||
),
|
||||
expect { tx: ServiceToClientEvent.Transaction ->
|
||||
require(tx.transaction.tx.inputs.isEmpty())
|
||||
require(tx.transaction.tx.outputs.size == 1)
|
||||
val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet()
|
||||
// Only Alice signed
|
||||
require(signaturePubKeys.size == 1)
|
||||
require(signaturePubKeys.contains(aliceNode.identity.owningKey))
|
||||
},
|
||||
expect { build: ServiceToClientEvent.TransactionBuild ->
|
||||
val state = build.state
|
||||
when (state) {
|
||||
is TransactionBuildResult.ProtocolStarted -> {
|
||||
}
|
||||
is TransactionBuildResult.Failed -> fail(state.message)
|
||||
}
|
||||
},
|
||||
expect { output: ServiceToClientEvent.OutputState ->
|
||||
require(output.consumed.size == 0)
|
||||
require(output.produced.size == 1)
|
||||
}
|
||||
),
|
||||
|
||||
// MOVE
|
||||
parallel(
|
||||
sequence(
|
||||
expect { add: ServiceToClientEvent.StateMachine ->
|
||||
require(add.addOrRemove == AddOrRemove.ADD)
|
||||
},
|
||||
expect { add: ServiceToClientEvent.StateMachine ->
|
||||
require(add.addOrRemove == AddOrRemove.REMOVE)
|
||||
}
|
||||
),
|
||||
expect { tx: ServiceToClientEvent.Transaction ->
|
||||
require(tx.transaction.tx.inputs.size == 1)
|
||||
require(tx.transaction.tx.outputs.size == 1)
|
||||
val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet()
|
||||
// Alice and Notary signed
|
||||
require(signaturePubKeys.size == 2)
|
||||
require(signaturePubKeys.contains(aliceNode.identity.owningKey))
|
||||
require(signaturePubKeys.contains(notaryNode.identity.owningKey))
|
||||
},
|
||||
sequence(
|
||||
expect { build: ServiceToClientEvent.TransactionBuild ->
|
||||
val state = build.state
|
||||
when (state) {
|
||||
is TransactionBuildResult.ProtocolStarted -> {
|
||||
log.info("${state.message}")
|
||||
}
|
||||
is TransactionBuildResult.Failed -> fail(state.message)
|
||||
}
|
||||
},
|
||||
repeat(7) {
|
||||
expect { build: ServiceToClientEvent.Progress -> }
|
||||
}
|
||||
),
|
||||
expect { output: ServiceToClientEvent.OutputState ->
|
||||
require(output.consumed.size == 1)
|
||||
require(output.produced.size == 1)
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun movingCashOfDifferentIssueRefsFails() {
|
||||
driver {
|
||||
val aliceNodeFuture = startNode("Alice")
|
||||
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type))
|
||||
|
||||
val aliceNode = aliceNodeFuture.get()
|
||||
val notaryNode = notaryNodeFuture.get()
|
||||
val client = startClient(aliceNode).get()
|
||||
|
||||
log.info("Alice is ${aliceNode.identity}")
|
||||
log.info("Notary is ${notaryNode.identity}")
|
||||
|
||||
val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
|
||||
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
|
||||
|
||||
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
|
||||
require(aliceMonitorClient.register().get())
|
||||
|
||||
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
|
||||
amount = Amount(100, USD),
|
||||
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
|
||||
recipient = aliceNode.identity,
|
||||
notary = notaryNode.identity
|
||||
))
|
||||
|
||||
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
|
||||
amount = Amount(100, USD),
|
||||
issueRef = OpaqueBytes(ByteArray(1, { 2 })),
|
||||
recipient = aliceNode.identity,
|
||||
notary = notaryNode.identity
|
||||
))
|
||||
|
||||
aliceOutStream.onNext(ClientToServiceCommand.PayCash(
|
||||
amount = Amount(200, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
|
||||
recipient = aliceNode.identity
|
||||
))
|
||||
|
||||
aliceInStream.expectEvents {
|
||||
sequence(
|
||||
// ISSUE 1
|
||||
parallel(
|
||||
sequence(
|
||||
expect { add: ServiceToClientEvent.StateMachine ->
|
||||
require(add.addOrRemove == AddOrRemove.ADD)
|
||||
},
|
||||
expect { remove: ServiceToClientEvent.StateMachine ->
|
||||
require(remove.addOrRemove == AddOrRemove.REMOVE)
|
||||
}
|
||||
),
|
||||
expect { tx: ServiceToClientEvent.Transaction -> },
|
||||
expect { build: ServiceToClientEvent.TransactionBuild -> },
|
||||
expect { output: ServiceToClientEvent.OutputState -> }
|
||||
),
|
||||
|
||||
// ISSUE 2
|
||||
parallel(
|
||||
sequence(
|
||||
expect { add: ServiceToClientEvent.StateMachine ->
|
||||
require(add.addOrRemove == AddOrRemove.ADD)
|
||||
},
|
||||
expect { remove: ServiceToClientEvent.StateMachine ->
|
||||
require(remove.addOrRemove == AddOrRemove.REMOVE)
|
||||
}
|
||||
),
|
||||
expect { tx: ServiceToClientEvent.Transaction -> },
|
||||
expect { build: ServiceToClientEvent.TransactionBuild -> },
|
||||
expect { output: ServiceToClientEvent.OutputState -> }
|
||||
),
|
||||
|
||||
// MOVE, should fail
|
||||
expect { build: ServiceToClientEvent.TransactionBuild ->
|
||||
val state = build.state
|
||||
require(state is TransactionBuildResult.Failed)
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,10 +1,7 @@
|
||||
package com.r3corda.node.services.monitor
|
||||
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.serialization.OpaqueBytes
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
@ -12,25 +9,34 @@ import java.util.*
|
||||
* Events triggered by changes in the node, and sent to monitoring client(s).
|
||||
*/
|
||||
sealed class ServiceToClientEvent(val time: Instant) {
|
||||
class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time)
|
||||
class OutputState(time: Instant, val consumed: Set<StateRef>, val produced: Set<StateAndRef<ContractState>>) : ServiceToClientEvent(time)
|
||||
class StateMachine(time: Instant, val fiberId: Long, val label: String, val addOrRemove: AddOrRemove) : ServiceToClientEvent(time)
|
||||
class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time)
|
||||
class TransactionBuild(time: Instant, val id: UUID, val state: TransactionBuildResult) : ServiceToClientEvent(time)
|
||||
class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time) {
|
||||
override fun toString() = "Transaction(${transaction.tx.commands})"
|
||||
}
|
||||
class OutputState(
|
||||
time: Instant,
|
||||
val consumed: Set<StateRef>,
|
||||
val produced: Set<StateAndRef<ContractState>>
|
||||
) : ServiceToClientEvent(time) {
|
||||
override fun toString() = "OutputState(consumed=$consumed, produced=${produced.map { it.state.data.javaClass.simpleName } })"
|
||||
}
|
||||
class StateMachine(
|
||||
time: Instant,
|
||||
val fiberId: Long,
|
||||
val label: String,
|
||||
val addOrRemove: AddOrRemove
|
||||
) : ServiceToClientEvent(time) {
|
||||
override fun toString() = "StateMachine(${addOrRemove.name})"
|
||||
}
|
||||
class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time) {
|
||||
override fun toString() = "Progress($message)"
|
||||
}
|
||||
class TransactionBuild(time: Instant, val id: UUID, val state: TransactionBuildResult) : ServiceToClientEvent(time) {
|
||||
override fun toString() = "TransactionBuild($state)"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
sealed class TransactionBuildResult {
|
||||
/**
|
||||
* State indicating the action undertaken has been completed (it was not complex enough to require a
|
||||
* state machine starting).
|
||||
*
|
||||
* @param transaction the transaction created as a result.
|
||||
*/
|
||||
// TODO: We should have a consistent "Transaction your request triggered has been built" event, rather than these
|
||||
// once-off results from a request. Unclear if that means all requests need to trigger a protocol state machine,
|
||||
// so the client sees a consistent process, or if some other solution can be found.
|
||||
class Complete(val transaction: SignedTransaction, val message: String?) : TransactionBuildResult()
|
||||
|
||||
/**
|
||||
* State indicating that a protocol is managing this request, and that the client should track protocol state machine
|
||||
* updates for further information. The monitor will separately receive notification of the state machine having been
|
||||
@ -39,11 +45,15 @@ sealed class TransactionBuildResult {
|
||||
*
|
||||
* @param transaction the transaction created as a result, in the case where the protocol has completed.
|
||||
*/
|
||||
class ProtocolStarted(val fiberId: Long, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult()
|
||||
class ProtocolStarted(val fiberId: Long, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult() {
|
||||
override fun toString() = "Started($message)"
|
||||
}
|
||||
|
||||
/**
|
||||
* State indicating the action undertaken failed, either directly (it is not something which requires a
|
||||
* state machine), or before a state machine was started.
|
||||
*/
|
||||
class Failed(val message: String?) : TransactionBuildResult()
|
||||
class Failed(val message: String?) : TransactionBuildResult() {
|
||||
override fun toString() = "Failed($message)"
|
||||
}
|
||||
}
|
||||
|
@ -201,7 +201,7 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
|
||||
|
||||
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
|
||||
private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult {
|
||||
val builder: TransactionBuilder = TransactionType.General.Builder(notary = req.notary)
|
||||
val builder: TransactionBuilder = TransactionType.General.Builder(notary = null)
|
||||
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
|
||||
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)
|
||||
builder.signWith(services.storageService.myLegalIdentityKey)
|
||||
|
@ -3,6 +3,7 @@ include 'contracts'
|
||||
include 'contracts:isolated'
|
||||
include 'core'
|
||||
include 'node'
|
||||
include 'client'
|
||||
include 'experimental'
|
||||
include 'test-utils'
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user