diff --git a/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorClientTests.kt b/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorClientTests.kt deleted file mode 100644 index 8d282fdd6b..0000000000 --- a/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorClientTests.kt +++ /dev/null @@ -1,252 +0,0 @@ -package com.r3corda.client - -import com.r3corda.core.contracts.* -import com.r3corda.core.node.services.ServiceInfo -import com.r3corda.core.serialization.OpaqueBytes -import com.r3corda.node.driver.driver -import com.r3corda.node.driver.startClient -import com.r3corda.node.services.monitor.ServiceToClientEvent -import com.r3corda.node.services.monitor.TransactionBuildResult -import com.r3corda.node.services.transactions.SimpleNotaryService -import com.r3corda.node.utilities.AddOrRemove -import com.r3corda.testing.* -import org.junit.Test -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import rx.subjects.PublishSubject -import kotlin.test.fail - -val log: Logger = LoggerFactory.getLogger(NodeMonitorClientTests::class.java) - -class NodeMonitorClientTests { - @Test - fun cashIssueWorksEndToEnd() { - driver { - val aliceNodeFuture = startNode("Alice") - val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.Type))) - - val aliceNode = aliceNodeFuture.get() - val notaryNode = notaryNodeFuture.get() - val client = startClient(aliceNode).get() - - log.info("Alice is ${aliceNode.identity}") - log.info("Notary is ${notaryNode.identity}") - - val aliceInStream = PublishSubject.create() - val aliceOutStream = PublishSubject.create() - - val aliceMonitorClient = NodeMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create()) - require(aliceMonitorClient.register().get()) - - aliceOutStream.onNext(ClientToServiceCommand.IssueCash( - amount = Amount(100, USD), - issueRef = OpaqueBytes(ByteArray(1, { 1 })), - recipient = aliceNode.identity, - notary = notaryNode.identity - )) - - aliceInStream.expectEvents(isStrict = false) { - parallel( - expect { build: ServiceToClientEvent.TransactionBuild -> - val state = build.state - if (state is TransactionBuildResult.Failed) { - fail(state.message) - } - }, - expect { output: ServiceToClientEvent.OutputState -> - require(output.consumed.size == 0) - require(output.produced.size == 1) - } - ) - } - } - } - - @Test - fun issueAndMoveWorks() { - driver { - val aliceNodeFuture = startNode("Alice") - val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.Type))) - - val aliceNode = aliceNodeFuture.get() - val notaryNode = notaryNodeFuture.get() - val client = startClient(aliceNode).get() - - log.info("Alice is ${aliceNode.identity}") - log.info("Notary is ${notaryNode.identity}") - - val aliceInStream = PublishSubject.create() - val aliceOutStream = PublishSubject.create() - - val aliceMonitorClient = NodeMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create()) - require(aliceMonitorClient.register().get()) - - aliceOutStream.onNext(ClientToServiceCommand.IssueCash( - amount = Amount(100, USD), - issueRef = OpaqueBytes(ByteArray(1, { 1 })), - recipient = aliceNode.identity, - notary = notaryNode.identity - )) - - aliceOutStream.onNext(ClientToServiceCommand.PayCash( - amount = Amount(100, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), - recipient = aliceNode.identity - )) - - aliceInStream.expectEvents { - sequence( - // ISSUE - parallel( - sequence( - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.ADD) - }, - expect { remove: ServiceToClientEvent.StateMachine -> - require(remove.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { tx: ServiceToClientEvent.Transaction -> - require(tx.transaction.tx.inputs.isEmpty()) - require(tx.transaction.tx.outputs.size == 1) - val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet() - // Only Alice signed - require(signaturePubKeys.size == 1) - require(signaturePubKeys.contains(aliceNode.identity.owningKey)) - }, - expect { build: ServiceToClientEvent.TransactionBuild -> - val state = build.state - when (state) { - is TransactionBuildResult.ProtocolStarted -> { - } - is TransactionBuildResult.Failed -> fail(state.message) - } - }, - expect { output: ServiceToClientEvent.OutputState -> - require(output.consumed.size == 0) - require(output.produced.size == 1) - } - ), - - // MOVE - parallel( - sequence( - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.ADD) - }, - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { tx: ServiceToClientEvent.Transaction -> - require(tx.transaction.tx.inputs.size == 1) - require(tx.transaction.tx.outputs.size == 1) - val signaturePubKeys = tx.transaction.sigs.map { it.by }.toSet() - // Alice and Notary signed - require(signaturePubKeys.size == 2) - require(signaturePubKeys.contains(aliceNode.identity.owningKey)) - require(signaturePubKeys.contains(notaryNode.identity.owningKey)) - }, - sequence( - expect { build: ServiceToClientEvent.TransactionBuild -> - val state = build.state - when (state) { - is TransactionBuildResult.ProtocolStarted -> { - log.info("${state.message}") - } - is TransactionBuildResult.Failed -> fail(state.message) - } - }, - replicate(7) { - expect { build: ServiceToClientEvent.Progress -> } - } - ), - expect { output: ServiceToClientEvent.OutputState -> - require(output.consumed.size == 1) - require(output.produced.size == 1) - } - ) - ) - } - } - } - - @Test - fun movingCashOfDifferentIssueRefsFails() { - driver { - val aliceNodeFuture = startNode("Alice") - val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.Type))) - - val aliceNode = aliceNodeFuture.get() - val notaryNode = notaryNodeFuture.get() - val client = startClient(aliceNode).get() - - log.info("Alice is ${aliceNode.identity}") - log.info("Notary is ${notaryNode.identity}") - - val aliceInStream = PublishSubject.create() - val aliceOutStream = PublishSubject.create() - - val aliceMonitorClient = NodeMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create()) - require(aliceMonitorClient.register().get()) - - aliceOutStream.onNext(ClientToServiceCommand.IssueCash( - amount = Amount(100, USD), - issueRef = OpaqueBytes(ByteArray(1, { 1 })), - recipient = aliceNode.identity, - notary = notaryNode.identity - )) - - aliceOutStream.onNext(ClientToServiceCommand.IssueCash( - amount = Amount(100, USD), - issueRef = OpaqueBytes(ByteArray(1, { 2 })), - recipient = aliceNode.identity, - notary = notaryNode.identity - )) - - aliceOutStream.onNext(ClientToServiceCommand.PayCash( - amount = Amount(200, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), - recipient = aliceNode.identity - )) - - aliceInStream.expectEvents { - sequence( - // ISSUE 1 - parallel( - sequence( - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.ADD) - }, - expect { remove: ServiceToClientEvent.StateMachine -> - require(remove.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { tx: ServiceToClientEvent.Transaction -> }, - expect { build: ServiceToClientEvent.TransactionBuild -> }, - expect { output: ServiceToClientEvent.OutputState -> } - ), - - // ISSUE 2 - parallel( - sequence( - expect { add: ServiceToClientEvent.StateMachine -> - require(add.addOrRemove == AddOrRemove.ADD) - }, - expect { remove: ServiceToClientEvent.StateMachine -> - require(remove.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { tx: ServiceToClientEvent.Transaction -> }, - expect { build: ServiceToClientEvent.TransactionBuild -> }, - expect { output: ServiceToClientEvent.OutputState -> } - ), - - // MOVE, should fail - expect { build: ServiceToClientEvent.TransactionBuild -> - val state = build.state - require(state is TransactionBuildResult.Failed) - } - ) - } - } - } -} diff --git a/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt b/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt new file mode 100644 index 0000000000..115511957e --- /dev/null +++ b/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt @@ -0,0 +1,200 @@ +package com.r3corda.client + +import com.google.common.util.concurrent.SettableFuture +import com.r3corda.client.model.NodeMonitorModel +import com.r3corda.client.model.ProgressTrackingEvent +import com.r3corda.core.bufferUntilSubscribed +import com.r3corda.core.contracts.* +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.StateMachineTransactionMapping +import com.r3corda.core.node.services.Vault +import com.r3corda.core.protocols.StateMachineRunId +import com.r3corda.core.serialization.OpaqueBytes +import com.r3corda.core.transactions.SignedTransaction +import com.r3corda.node.driver.driver +import com.r3corda.node.driver.startClient +import com.r3corda.node.services.messaging.NodeMessagingClient +import com.r3corda.node.services.messaging.StateMachineUpdate +import com.r3corda.node.services.transactions.SimpleNotaryService +import com.r3corda.testing.* +import org.junit.* +import rx.Observable +import rx.Observer +import kotlin.concurrent.thread + +class NodeMonitorModelTest { + + lateinit var aliceNode: NodeInfo + lateinit var notaryNode: NodeInfo + lateinit var aliceClient: NodeMessagingClient + val driverStarted = SettableFuture.create() + val stopDriver = SettableFuture.create() + val driverStopped = SettableFuture.create() + + lateinit var stateMachineTransactionMapping: Observable + lateinit var stateMachineUpdates: Observable + lateinit var progressTracking: Observable + lateinit var transactions: Observable + lateinit var vaultUpdates: Observable + lateinit var clientToService: Observer + + @Before + fun start() { + thread { + driver { + val aliceNodeFuture = startNode("Alice") + val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type)) + + aliceNode = aliceNodeFuture.get() + notaryNode = notaryNodeFuture.get() + aliceClient = startClient(aliceNode).get() + + val monitor = NodeMonitorModel() + + stateMachineTransactionMapping = monitor.stateMachineTransactionMapping.bufferUntilSubscribed() + stateMachineUpdates = monitor.stateMachineUpdates.bufferUntilSubscribed() + progressTracking = monitor.progressTracking.bufferUntilSubscribed() + transactions = monitor.transactions.bufferUntilSubscribed() + vaultUpdates = monitor.vaultUpdates.bufferUntilSubscribed() + clientToService = monitor.clientToService + + monitor.register(aliceNode, aliceClient.config.certificatesPath) + driverStarted.set(Unit) + stopDriver.get() + } + driverStopped.set(Unit) + } + driverStarted.get() + } + + @After + fun stop() { + stopDriver.set(Unit) + driverStopped.get() + } + + @Test + fun cashIssueWorksEndToEnd() { + clientToService.onNext(ClientToServiceCommand.IssueCash( + amount = Amount(100, USD), + issueRef = OpaqueBytes(ByteArray(1, { 1 })), + recipient = aliceNode.identity, + notary = notaryNode.identity + )) + + vaultUpdates.expectEvents(isStrict = false) { + sequence( + // SNAPSHOT + expect { output: Vault.Update -> + require(output.consumed.size == 0) { output.consumed.size } + require(output.produced.size == 0) { output.produced.size } + }, + // ISSUE + expect { output: Vault.Update -> + require(output.consumed.size == 0) { output.consumed.size } + require(output.produced.size == 1) { output.produced.size } + } + ) + } + } + + @Test + fun issueAndMoveWorks() { + + clientToService.onNext(ClientToServiceCommand.IssueCash( + amount = Amount(100, USD), + issueRef = OpaqueBytes(ByteArray(1, { 1 })), + recipient = aliceNode.identity, + notary = notaryNode.identity + )) + + clientToService.onNext(ClientToServiceCommand.PayCash( + amount = Amount(100, Issued(PartyAndReference(aliceNode.identity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), + recipient = aliceNode.identity + )) + + var issueSmId: StateMachineRunId? = null + var moveSmId: StateMachineRunId? = null + var issueTx: SignedTransaction? = null + var moveTx: SignedTransaction? = null + stateMachineUpdates.expectEvents { + sequence( + // ISSUE + expect { add: StateMachineUpdate.Added -> + issueSmId = add.id + }, + expect { remove: StateMachineUpdate.Removed -> + require(remove.id == issueSmId) + }, + // MOVE + expect { add: StateMachineUpdate.Added -> + moveSmId = add.id + }, + expect { remove: StateMachineUpdate.Removed -> + require(remove.id == moveSmId) + } + ) + } + + transactions.expectEvents { + sequence( + // ISSUE + expect { tx -> + require(tx.tx.inputs.isEmpty()) + require(tx.tx.outputs.size == 1) + val signaturePubKeys = tx.sigs.map { it.by }.toSet() + // Only Alice signed + require(signaturePubKeys.size == 1) + require(signaturePubKeys.contains(aliceNode.identity.owningKey)) + issueTx = tx + }, + // MOVE + expect { tx -> + require(tx.tx.inputs.size == 1) + require(tx.tx.outputs.size == 1) + val signaturePubKeys = tx.sigs.map { it.by }.toSet() + // Alice and Notary signed + require(signaturePubKeys.size == 2) + require(signaturePubKeys.contains(aliceNode.identity.owningKey)) + require(signaturePubKeys.contains(notaryNode.identity.owningKey)) + moveTx = tx + } + ) + } + + vaultUpdates.expectEvents { + sequence( + // SNAPSHOT + expect { output: Vault.Update -> + require(output.consumed.size == 0) { output.consumed.size } + require(output.produced.size == 0) { output.produced.size } + }, + // ISSUE + expect { update -> + require(update.consumed.size == 0) { update.consumed.size } + require(update.produced.size == 1) { update.produced.size } + }, + // MOVE + expect { update -> + require(update.consumed.size == 1) { update.consumed.size } + require(update.produced.size == 1) { update.produced.size } + } + ) + } + + stateMachineTransactionMapping.expectEvents { + sequence( + // ISSUE + expect { mapping -> + require(mapping.stateMachineRunId == issueSmId) + require(mapping.transactionId == issueTx!!.id) + }, + // MOVE + expect { mapping -> + require(mapping.stateMachineRunId == moveSmId) + require(mapping.transactionId == moveTx!!.id) + } + ) + } + } +} diff --git a/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt b/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt deleted file mode 100644 index 64f7fbdbfe..0000000000 --- a/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt +++ /dev/null @@ -1,63 +0,0 @@ -package com.r3corda.client - -import com.google.common.util.concurrent.ListenableFuture -import com.r3corda.core.contracts.ClientToServiceCommand -import com.r3corda.core.map -import com.r3corda.core.messaging.MessagingService -import com.r3corda.core.messaging.createMessage -import com.r3corda.core.messaging.onNext -import com.r3corda.core.node.NodeInfo -import com.r3corda.core.random63BitValue -import com.r3corda.core.serialization.deserialize -import com.r3corda.core.serialization.serialize -import com.r3corda.core.success -import com.r3corda.core.utilities.loggerFor -import com.r3corda.node.services.monitor.* -import com.r3corda.node.services.monitor.NodeMonitorService.Companion.IN_EVENT_TOPIC -import com.r3corda.node.services.monitor.NodeMonitorService.Companion.OUT_EVENT_TOPIC -import com.r3corda.node.services.monitor.NodeMonitorService.Companion.REGISTER_TOPIC -import com.r3corda.node.services.monitor.NodeMonitorService.Companion.STATE_TOPIC -import rx.Observable -import rx.Observer - -/** - * Worked example of a client which communicates with the wallet monitor service. - */ -class NodeMonitorClient( - val net: MessagingService, - val node: NodeInfo, - val outEvents: Observable, - val inEvents: Observer, - val snapshot: Observer -) { - - companion object { - private val log = loggerFor() - } - - fun register(): ListenableFuture { - val sessionID = random63BitValue() - - log.info("Registering with ID $sessionID. I am ${net.myAddress}") - val future = net.onNext(REGISTER_TOPIC, sessionID).map { it.success } - - net.onNext(STATE_TOPIC, sessionID).success { snapshot.onNext(it) } - - net.addMessageHandler(IN_EVENT_TOPIC, sessionID) { msg, reg -> - val event = msg.data.deserialize() - inEvents.onNext(event) - } - - val req = RegisterRequest(net.myAddress, sessionID) - val registerMessage = net.createMessage(REGISTER_TOPIC, 0, req.serialize().bits) - net.send(registerMessage, node.address) - - outEvents.subscribe { event -> - val envelope = ClientToServiceCommandMessage(sessionID, net.myAddress, event) - val message = net.createMessage(OUT_EVENT_TOPIC, 0, envelope.serialize().bits) - net.send(message, node.address) - } - - return future - } -} diff --git a/client/src/main/kotlin/com/r3corda/client/mock/EventGenerator.kt b/client/src/main/kotlin/com/r3corda/client/mock/EventGenerator.kt index ff11a6e50b..02bf7dc6e6 100644 --- a/client/src/main/kotlin/com/r3corda/client/mock/EventGenerator.kt +++ b/client/src/main/kotlin/com/r3corda/client/mock/EventGenerator.kt @@ -5,7 +5,6 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party import com.r3corda.core.serialization.OpaqueBytes import com.r3corda.core.transactions.TransactionBuilder -import com.r3corda.node.services.monitor.ServiceToClientEvent import java.time.Instant /** @@ -34,7 +33,7 @@ class EventGenerator( val partyGenerator = Generator.oneOf(parties) val cashStateGenerator = amountIssuedGenerator.combine(publicKeyGenerator) { amount, from -> - val builder = TransactionBuilder() + val builder = TransactionBuilder(notary = notary) builder.addOutputState(Cash.State(amount, from)) builder.addCommand(Command(Cash.Commands.Issue(), amount.token.issuer.party.owningKey)) builder.toWireTransaction().outRef(0) @@ -60,10 +59,6 @@ class EventGenerator( } ) - val outputStateGenerator = consumedGenerator.combine(producedGenerator) { consumed, produced -> - ServiceToClientEvent.OutputState(Instant.now(), consumed, produced) - } - val issueRefGenerator = Generator.intRange(0, 1).map { number -> OpaqueBytes(ByteArray(1, { number.toByte() })) } val amountGenerator = Generator.intRange(0, 10000).combine(currencyGenerator) { quantity, currency -> Amount(quantity.toLong(), currency) } @@ -96,10 +91,6 @@ class EventGenerator( ) } - val serviceToClientEventGenerator = Generator.frequency( - 1.0 to outputStateGenerator - ) - val clientToServiceCommandGenerator = Generator.frequency( 0.4 to issueCashGenerator, 0.5 to moveCashGenerator, diff --git a/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt index 820a48927a..ae18d20bd3 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt @@ -6,52 +6,33 @@ import com.r3corda.contracts.asset.Cash import com.r3corda.core.contracts.ContractState import com.r3corda.core.contracts.StateAndRef import com.r3corda.core.contracts.StateRef -import com.r3corda.node.services.monitor.ServiceToClientEvent -import com.r3corda.node.services.monitor.StateSnapshotMessage +import com.r3corda.core.node.services.Vault import javafx.collections.ObservableList import kotlinx.support.jdk8.collections.removeIf import rx.Observable -sealed class StatesModification{ - class Diff( - val added: Collection>, - val removed: Collection - ) : StatesModification() - class Reset(val states: Collection>) : StatesModification() -} +data class Diff( + val added: Collection>, + val removed: Collection +) /** * This model exposes the list of owned contract states. */ class ContractStateModel { - private val serviceToClient: Observable by observable(NodeMonitorModel::serviceToClient) - private val snapshot: Observable by observable(NodeMonitorModel::snapshot) - private val outputStates = serviceToClient.ofType(ServiceToClientEvent.OutputState::class.java) + private val vaultUpdates: Observable by observable(NodeMonitorModel::vaultUpdates) - val contractStatesDiff: Observable> = - outputStates.map { StatesModification.Diff(it.produced, it.consumed) } - // We filter the diff first rather than the complete contract state list. - val cashStatesModification: Observable> = Observable.merge( - arrayOf( - contractStatesDiff.map { - StatesModification.Diff(it.added.filterCashStateAndRefs(), it.removed) - }, - snapshot.map { - StatesModification.Reset(it.contractStates.filterCashStateAndRefs()) - } - ) - ) + val contractStatesDiff: Observable> = vaultUpdates.map { + Diff(it.produced, it.consumed) + } + val cashStatesDiff: Observable> = contractStatesDiff.map { + // We can't filter removed hashes here as we don't have type info + Diff(it.added.filterCashStateAndRefs(), it.removed) + } val cashStates: ObservableList> = - cashStatesModification.foldToObservableList(Unit) { statesDiff, _accumulator, observableList -> - when (statesDiff) { - is StatesModification.Diff -> { - observableList.removeIf { it.ref in statesDiff.removed } - observableList.addAll(statesDiff.added) - } - is StatesModification.Reset -> { - observableList.setAll(statesDiff.states) - } - } + cashStatesDiff.foldToObservableList(Unit) { statesDiff, _accumulator, observableList -> + observableList.removeIf { it.ref in statesDiff.removed } + observableList.addAll(statesDiff.added) } diff --git a/client/src/main/kotlin/com/r3corda/client/model/GatheredTransactionDataModel.kt b/client/src/main/kotlin/com/r3corda/client/model/GatheredTransactionDataModel.kt index 6a59605bed..3efb21c857 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/GatheredTransactionDataModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/GatheredTransactionDataModel.kt @@ -1,40 +1,27 @@ package com.r3corda.client.model -import com.r3corda.client.fxutils.foldToObservableList -import com.r3corda.client.fxutils.getObservableValue +import com.r3corda.client.fxutils.* import com.r3corda.core.contracts.ContractState import com.r3corda.core.contracts.StateAndRef import com.r3corda.core.contracts.StateRef import com.r3corda.client.fxutils.recordInSequence import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.services.StateMachineTransactionMapping import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.transactions.SignedTransaction -import com.r3corda.node.services.monitor.ServiceToClientEvent -import com.r3corda.node.services.monitor.TransactionBuildResult -import com.r3corda.node.utilities.AddOrRemove +import com.r3corda.node.services.messaging.StateMachineUpdate import javafx.beans.property.SimpleObjectProperty import javafx.beans.value.ObservableValue -import javafx.collections.FXCollections import javafx.collections.ObservableList import javafx.collections.ObservableMap import org.fxmisc.easybind.EasyBind -import org.jetbrains.exposed.sql.transactions.transaction import org.slf4j.LoggerFactory import rx.Observable -import java.time.Instant -import java.util.UUID -import kotlin.reflect.KProperty1 -interface GatheredTransactionData { - val stateMachineRunId: ObservableValue - val uuid: ObservableValue - val protocolStatus: ObservableValue - val stateMachineStatus: ObservableValue - val transaction: ObservableValue - val status: ObservableValue - val lastUpdate: ObservableValue - val allEvents: ObservableList -} +data class GatheredTransactionData( + val transaction: PartiallyResolvedTransaction, + val stateMachines: ObservableList +) /** * [PartiallyResolvedTransaction] holds a [SignedTransaction] that has zero or more inputs resolved. The intent is @@ -79,248 +66,71 @@ sealed class TransactionCreateStatus(val message: String?) { data class ProtocolStatus( val status: String ) + sealed class StateMachineStatus(val stateMachineName: String) { class Added(stateMachineName: String): StateMachineStatus(stateMachineName) class Removed(stateMachineName: String): StateMachineStatus(stateMachineName) override fun toString(): String = "${javaClass.simpleName}($stateMachineName)" } -data class GatheredTransactionDataWritable( - override val stateMachineRunId: SimpleObjectProperty = SimpleObjectProperty(null), - override val uuid: SimpleObjectProperty = SimpleObjectProperty(null), - override val stateMachineStatus: SimpleObjectProperty = SimpleObjectProperty(null), - override val protocolStatus: SimpleObjectProperty = SimpleObjectProperty(null), - override val transaction: SimpleObjectProperty = SimpleObjectProperty(null), - override val status: SimpleObjectProperty = SimpleObjectProperty(null), - override val lastUpdate: SimpleObjectProperty, - override val allEvents: ObservableList = FXCollections.observableArrayList() -) : GatheredTransactionData - -private val log = LoggerFactory.getLogger(GatheredTransactionDataModel::class.java) +data class StateMachineData( + val id: StateMachineRunId, + val protocolStatus: ObservableValue, + val stateMachineStatus: ObservableValue +) /** - * This model provides an observable list of states relating to the creation of a transaction not yet on ledger. + * This model provides an observable list of transactions and what state machines/protocols recorded them */ class GatheredTransactionDataModel { - private val serviceToClient: Observable by observable(NodeMonitorModel::serviceToClient) + private val transactions: Observable by observable(NodeMonitorModel::transactions) + private val stateMachineUpdates: Observable by observable(NodeMonitorModel::stateMachineUpdates) + private val progressTracking: Observable by observable(NodeMonitorModel::progressTracking) + private val stateMachineTransactionMapping: Observable by observable(NodeMonitorModel::stateMachineTransactionMapping) - /** - * Aggregation of updates to transactions. We use the observable list as the only container and do linear search for - * matching transactions because we have three keys(fiber ID, UUID, tx id) and this way it's easier to avoid syncing issues. - * - * The Fiber ID is used to identify events that relate to the same transaction server-side, whereas the UUID is - * generated on the UI and is used to identify events with the UI action that triggered them. Currently a UUID is - * generated for each outgoing [ClientToServiceCommand]. - * - * TODO: Make this more efficient by maintaining and syncing two maps (for the two keys) in the accumulator - * (Note that a transaction may be mapped by one or both) - * TODO: Expose a writable stream to combine [serviceToClient] with to allow recording of transactions made locally(UUID) - */ - val gatheredTransactionDataList: ObservableList = - serviceToClient.foldToObservableList>( - initialAccumulator = FXCollections.observableHashMap(), - folderFun = { serviceToClientEvent, transactions, transactionStates -> - val _unit = when (serviceToClientEvent) { - is ServiceToClientEvent.Transaction -> { - transactions.set(serviceToClientEvent.transaction.id, serviceToClientEvent.transaction) - val somewhatResolvedTransaction = PartiallyResolvedTransaction.fromSignedTransaction( - serviceToClientEvent.transaction, - transactions - ) - newTransactionIdTransactionStateOrModify(transactionStates, serviceToClientEvent, - transaction = somewhatResolvedTransaction, - tweak = {} - ) - } - is ServiceToClientEvent.OutputState -> { - } - is ServiceToClientEvent.StateMachine -> { - newFiberIdTransactionStateOrModify(transactionStates, serviceToClientEvent, - stateMachineRunId = serviceToClientEvent.id, - tweak = { - stateMachineStatus.set(when (serviceToClientEvent.addOrRemove) { - AddOrRemove.ADD -> StateMachineStatus.Added(serviceToClientEvent.label) - AddOrRemove.REMOVE -> { - val currentStatus = stateMachineStatus.value - if (currentStatus is StateMachineStatus.Added) { - StateMachineStatus.Removed(currentStatus.stateMachineName) - } else { - StateMachineStatus.Removed(serviceToClientEvent.label) - } - } - }) - } - ) - } - is ServiceToClientEvent.Progress -> { - newFiberIdTransactionStateOrModify(transactionStates, serviceToClientEvent, - stateMachineRunId = serviceToClientEvent.id, - tweak = { - protocolStatus.set(ProtocolStatus(serviceToClientEvent.message)) - } - ) - } - is ServiceToClientEvent.TransactionBuild -> { - val state = serviceToClientEvent.state - - when (state) { - is TransactionBuildResult.ProtocolStarted -> { - state.transaction?.let { - transactions.set(it.id, it) - } - } - } - - newUuidTransactionStateOrModify(transactionStates, serviceToClientEvent, - uuid = serviceToClientEvent.id, - stateMachineRunId = when (state) { - is TransactionBuildResult.ProtocolStarted -> state.id - is TransactionBuildResult.Failed -> null - }, - transactionId = when (state) { - is TransactionBuildResult.ProtocolStarted -> state.transaction?.id - is TransactionBuildResult.Failed -> null - }, - tweak = { - return@newUuidTransactionStateOrModify when (state) { - is TransactionBuildResult.ProtocolStarted -> { - state.transaction?.let { - transaction.set(PartiallyResolvedTransaction.fromSignedTransaction(it, transactions)) - } - status.set(TransactionCreateStatus.Started(state.message)) - } - is TransactionBuildResult.Failed -> { - status.set(TransactionCreateStatus.Failed(state.message)) - } - } - } - ) - } - } - transactions + val collectedTransactions = transactions.recordInSequence() + val transactionMap = collectedTransactions.associateBy(SignedTransaction::id) + val progressEvents = progressTracking.recordAsAssociation(ProgressTrackingEvent::stateMachineId) + val stateMachineStatus: ObservableMap> = + stateMachineUpdates.foldToObservableMap(Unit) { update, _unit, map: ObservableMap> -> + when (update) { + is StateMachineUpdate.Added -> { + val added: SimpleObjectProperty = + SimpleObjectProperty(StateMachineStatus.Added(update.stateMachineInfo.protocolLogicClassName)) + map[update.id] = added + } + is StateMachineUpdate.Removed -> { + val added = map[update.id] + added ?: throw Exception("State machine removed with unknown id ${update.id}") + added.set(StateMachineStatus.Removed(added.value.stateMachineName)) } - ) - - companion object { - - private fun newTransactionIdTransactionStateOrModify( - transactionStates: ObservableList, - event: ServiceToClientEvent, - transaction: PartiallyResolvedTransaction, - tweak: GatheredTransactionDataWritable.() -> Unit - ) { - val index = transactionStates.indexOfFirst { transaction.id == it.transaction.value?.id } - val state = if (index < 0) { - val newState = GatheredTransactionDataWritable( - transaction = SimpleObjectProperty(transaction), - lastUpdate = SimpleObjectProperty(event.time) - ) - tweak(newState) - transactionStates.add(newState) - newState - } else { - val existingState = transactionStates[index] - existingState.lastUpdate.set(event.time) - tweak(existingState) - existingState - } - state.allEvents.add(event) - } - - private fun newFiberIdTransactionStateOrModify( - transactionStates: ObservableList, - event: ServiceToClientEvent, - stateMachineRunId: StateMachineRunId, - tweak: GatheredTransactionDataWritable.() -> Unit - ) { - val index = transactionStates.indexOfFirst { it.stateMachineRunId.value == stateMachineRunId } - val state = if (index < 0) { - val newState = GatheredTransactionDataWritable( - stateMachineRunId = SimpleObjectProperty(stateMachineRunId), - lastUpdate = SimpleObjectProperty(event.time) - ) - tweak(newState) - transactionStates.add(newState) - newState - } else { - val existingState = transactionStates[index] - existingState.lastUpdate.set(event.time) - tweak(existingState) - existingState - } - state.allEvents.add(event) - } - - private fun newUuidTransactionStateOrModify( - transactionStates: ObservableList, - event: ServiceToClientEvent, - uuid: UUID, - stateMachineRunId: StateMachineRunId?, - transactionId: SecureHash?, - tweak: GatheredTransactionDataWritable.() -> Unit - ) { - val matchingStates = transactionStates.filtered { - it.uuid.value == uuid || - (stateMachineRunId != null && it.stateMachineRunId.value == stateMachineRunId) || - (transactionId != null && it.transaction.value?.transaction?.id == transactionId) - } - val mergedState = mergeGatheredData(matchingStates) - for (i in 0 .. matchingStates.size - 1) { - transactionStates.removeAt(matchingStates.getSourceIndex(i)) - } - val state = if (mergedState == null) { - val newState = GatheredTransactionDataWritable( - uuid = SimpleObjectProperty(uuid), - stateMachineRunId = SimpleObjectProperty(stateMachineRunId), - lastUpdate = SimpleObjectProperty(event.time) - ) - transactionStates.add(newState) - newState - } else { - mergedState.lastUpdate.set(event.time) - mergedState - } - tweak(state) - state.allEvents.add(event) - } - - private fun mergeGatheredData( - gatheredDataList: List - ): GatheredTransactionDataWritable? { - var gathered: GatheredTransactionDataWritable? = null - // Modify the last one if we can - gatheredDataList.asReversed().forEach { - val localGathered = gathered - if (localGathered == null) { - gathered = it - } else { - mergeField(it, localGathered, GatheredTransactionDataWritable::stateMachineRunId) - mergeField(it, localGathered, GatheredTransactionDataWritable::uuid) - mergeField(it, localGathered, GatheredTransactionDataWritable::stateMachineStatus) - mergeField(it, localGathered, GatheredTransactionDataWritable::protocolStatus) - mergeField(it, localGathered, GatheredTransactionDataWritable::transaction) - mergeField(it, localGathered, GatheredTransactionDataWritable::status) - localGathered.allEvents.addAll(it.allEvents) } } - return gathered - } - - private fun mergeField( - from: GatheredTransactionDataWritable, - to: GatheredTransactionDataWritable, - field: KProperty1>) { - val fromValue = field(from).value - if (fromValue != null) { - val toField = field(to) - val toValue = toField.value - if (toValue != null && fromValue != toValue) { - log.warn("Conflicting data for field ${field.name}: $fromValue vs $toValue") - } - toField.set(fromValue) - } - } + val stateMachineDataList: ObservableList = + LeftOuterJoinedMap(stateMachineStatus, progressEvents) { id, status, progress -> + StateMachineData(id, progress.map { it?.let { ProtocolStatus(it.message) } }, status) + }.getObservableValues() + val stateMachineDataMap = stateMachineDataList.associateBy(StateMachineData::id) + val smTxMappingList = stateMachineTransactionMapping.recordInSequence() + val partiallyResolvedTransactions = collectedTransactions.map { + PartiallyResolvedTransaction.fromSignedTransaction(it, transactionMap) } + /** + * We JOIN the transaction list with state machines + */ + val gatheredTransactionDataList: ObservableList = + partiallyResolvedTransactions.leftOuterJoin( + smTxMappingList, + PartiallyResolvedTransaction::id, + StateMachineTransactionMapping::transactionId + ) { transaction, mappings -> + GatheredTransactionData( + transaction, + mappings.map { mapping -> + stateMachineDataMap.getObservableValue(mapping.stateMachineRunId) + }.flatten().filterNotNull() + ) + } } diff --git a/client/src/main/kotlin/com/r3corda/client/model/Models.kt b/client/src/main/kotlin/com/r3corda/client/model/Models.kt index 605c2c4226..ce7a40f33d 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/Models.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/Models.kt @@ -8,6 +8,7 @@ import org.reactfx.EventSink import org.reactfx.EventStream import rx.Observable import rx.Observer +import rx.subjects.Subject import java.util.* import kotlin.reflect.KClass import kotlin.reflect.KProperty @@ -72,6 +73,9 @@ inline fun observable(noinline observableProperty: (M) -> O inline fun observer(noinline observerProperty: (M) -> Observer) = TrackedDelegate.ObserverDelegate(M::class, observerProperty) +inline fun subject(noinline subjectProperty: (M) -> Subject) = + TrackedDelegate.SubjectDelegate(M::class, subjectProperty) + inline fun eventStream(noinline streamProperty: (M) -> EventStream) = TrackedDelegate.EventStreamDelegate(M::class, streamProperty) @@ -118,14 +122,19 @@ object Models { sealed class TrackedDelegate(val klass: KClass) { init { Models.initModel(klass) } - class ObservableDelegate (klass: KClass, val eventStreamProperty: (M) -> Observable) : TrackedDelegate(klass) { + class ObservableDelegate (klass: KClass, val observableProperty: (M) -> Observable) : TrackedDelegate(klass) { operator fun getValue(thisRef: Any, property: KProperty<*>): Observable { - return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin)) + return observableProperty(Models.get(klass, thisRef.javaClass.kotlin)) } } - class ObserverDelegate (klass: KClass, val eventStreamProperty: (M) -> Observer) : TrackedDelegate(klass) { + class ObserverDelegate (klass: KClass, val observerProperty: (M) -> Observer) : TrackedDelegate(klass) { operator fun getValue(thisRef: Any, property: KProperty<*>): Observer { - return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin)) + return observerProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class SubjectDelegate (klass: KClass, val subjectProperty: (M) -> Subject) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): Subject { + return subjectProperty(Models.get(klass, thisRef.javaClass.kotlin)) } } class EventStreamDelegate (klass: KClass, val eventStreamProperty: (M) -> org.reactfx.EventStream) : TrackedDelegate(klass) { diff --git a/client/src/main/kotlin/com/r3corda/client/model/NodeMonitorModel.kt b/client/src/main/kotlin/com/r3corda/client/model/NodeMonitorModel.kt index e0d4e7436e..15dc047e5d 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/NodeMonitorModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/NodeMonitorModel.kt @@ -1,42 +1,97 @@ package com.r3corda.client.model -import com.r3corda.client.NodeMonitorClient +import com.r3corda.client.CordaRPCClient import com.r3corda.core.contracts.ClientToServiceCommand -import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.NodeInfo -import com.r3corda.node.services.monitor.ServiceToClientEvent -import com.r3corda.node.services.monitor.StateSnapshotMessage +import com.r3corda.core.node.services.StateMachineTransactionMapping +import com.r3corda.core.node.services.Vault +import com.r3corda.core.protocols.StateMachineRunId +import com.r3corda.core.transactions.SignedTransaction +import com.r3corda.node.services.messaging.ArtemisMessagingComponent +import com.r3corda.node.services.messaging.StateMachineInfo +import com.r3corda.node.services.messaging.StateMachineUpdate import rx.Observable -import rx.Observer import rx.subjects.PublishSubject +import java.nio.file.Path + +data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val message: String) { + companion object { + fun createStreamFromStateMachineInfo(stateMachine: StateMachineInfo): Observable? { + return stateMachine.progressTrackerStepAndUpdates?.let { pair -> + val (current, future) = pair + future.map { ProgressTrackingEvent(stateMachine.id, it) }.startWith(ProgressTrackingEvent(stateMachine.id, current)) + } + } + } +} /** - * This model exposes raw event streams to and from the [NodeMonitorService] through a [NodeMonitorClient] + * This model exposes raw event streams to and from the node. */ class NodeMonitorModel { + + private val stateMachineUpdatesSubject = PublishSubject.create() + private val vaultUpdatesSubject = PublishSubject.create() + private val transactionsSubject = PublishSubject.create() + private val stateMachineTransactionMappingSubject = PublishSubject.create() + private val progressTrackingSubject = PublishSubject.create() + + val stateMachineUpdates: Observable = stateMachineUpdatesSubject + val vaultUpdates: Observable = vaultUpdatesSubject + val transactions: Observable = transactionsSubject + val stateMachineTransactionMapping: Observable = stateMachineTransactionMappingSubject + val progressTracking: Observable = progressTrackingSubject + private val clientToServiceSource = PublishSubject.create() - val clientToService: Observer = clientToServiceSource - - private val serviceToClientSource = PublishSubject.create() - val serviceToClient: Observable = serviceToClientSource - - private val snapshotSource = PublishSubject.create() - val snapshot: Observable = snapshotSource + val clientToService: PublishSubject = clientToServiceSource /** - * Register for updates to/from a given wallet. + * Register for updates to/from a given vault. * @param messagingService The messaging to use for communication. * @param monitorNodeInfo the [Node] to connect to. * TODO provide an unsubscribe mechanism */ - fun register(messagingService: MessagingService, monitorNodeInfo: NodeInfo) { - val monitorClient = NodeMonitorClient( - messagingService, - monitorNodeInfo, - clientToServiceSource, - serviceToClientSource, - snapshotSource - ) - require(monitorClient.register().get()) + fun register(vaultMonitorNodeInfo: NodeInfo, certificatesPath: Path) { + + val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(vaultMonitorNodeInfo.address), certificatesPath) + client.start() + val proxy = client.proxy() + + val (stateMachines, stateMachineUpdates) = proxy.stateMachinesAndUpdates() + // Extract the protocol tracking stream + // TODO is there a nicer way of doing this? Stream of streams in general results in code like this... + val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine -> + ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine) + } + val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate -> + if (stateMachineUpdate is StateMachineUpdate.Added) { + ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty() + } else { + Observable.empty() + } + } + futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.subscribe(progressTrackingSubject) + + // Now the state machines + val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } + stateMachineUpdates.startWith(currentStateMachines).subscribe(stateMachineUpdatesSubject) + + // Vault updates + val (vault, vaultUpdates) = proxy.vaultAndUpdates() + val initialVaultUpdate = Vault.Update(setOf(), vault.toSet()) + vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject) + + // Transactions + val (transactions, newTransactions) = proxy.verifiedTransactions() + newTransactions.startWith(transactions).subscribe(transactionsSubject) + + // SM -> TX mapping + val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMapping() + futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject) + + // Client -> Service + clientToServiceSource.subscribe { + proxy.executeCommand(it) + } } } diff --git a/explorer/src/main/kotlin/com/r3corda/explorer/Main.kt b/explorer/src/main/kotlin/com/r3corda/explorer/Main.kt index 04f3180e0d..79a9293916 100644 --- a/explorer/src/main/kotlin/com/r3corda/explorer/Main.kt +++ b/explorer/src/main/kotlin/com/r3corda/explorer/Main.kt @@ -1,29 +1,24 @@ package com.r3corda.explorer -import com.r3corda.client.NodeMonitorClient import com.r3corda.client.mock.EventGenerator -import com.r3corda.client.mock.Generator -import com.r3corda.client.mock.oneOf import com.r3corda.client.model.Models import com.r3corda.client.model.NodeMonitorModel -import com.r3corda.client.model.observer +import com.r3corda.client.model.subject import com.r3corda.core.contracts.ClientToServiceCommand import com.r3corda.core.node.services.ServiceInfo import com.r3corda.explorer.model.IdentityModel import com.r3corda.node.driver.PortAllocation import com.r3corda.node.driver.driver import com.r3corda.node.driver.startClient -import com.r3corda.node.services.monitor.ServiceToClientEvent import com.r3corda.node.services.transactions.SimpleNotaryService import javafx.stage.Stage -import rx.Observer -import rx.subjects.PublishSubject +import rx.subjects.Subject import tornadofx.App import java.util.* class Main : App() { override val primaryView = MainWindow::class - val aliceOutStream: Observer by observer(NodeMonitorModel::clientToService) + val aliceOutStream: Subject by subject(NodeMonitorModel::clientToService) override fun start(stage: Stage) { @@ -42,35 +37,26 @@ class Main : App() { driver(portAllocation = portAllocation) { val aliceNodeFuture = startNode("Alice") - val bobNodeFuture = startNode("Bob") val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.Type))) val aliceNode = aliceNodeFuture.get() - val bobNode = bobNodeFuture.get() val notaryNode = notaryNodeFuture.get() val aliceClient = startClient(aliceNode).get() Models.get(Main::class).myIdentity.set(aliceNode.identity) - Models.get(Main::class).register(aliceClient, aliceNode) - - val bobInStream = PublishSubject.create() - val bobOutStream = PublishSubject.create() - - val bobClient = startClient(bobNode).get() - val bobMonitorClient = NodeMonitorClient(bobClient, bobNode, bobOutStream, bobInStream, PublishSubject.create()) - assert(bobMonitorClient.register().get()) + Models.get(Main::class).register(aliceNode, aliceClient.config.certificatesPath) for (i in 0 .. 10000) { Thread.sleep(500) val eventGenerator = EventGenerator( - parties = listOf(aliceNode.identity, bobNode.identity), + parties = listOf(aliceNode.identity), notary = notaryNode.identity ) - eventGenerator.clientToServiceCommandGenerator.combine(Generator.oneOf(listOf(aliceOutStream, bobOutStream))) { - command, stream -> stream.onNext(command) + eventGenerator.clientToServiceCommandGenerator.map { command -> + aliceOutStream.onNext(command) }.generate(Random()) } @@ -80,4 +66,3 @@ class Main : App() { }).start() } } - diff --git a/explorer/src/main/kotlin/com/r3corda/explorer/views/TransactionViewer.kt b/explorer/src/main/kotlin/com/r3corda/explorer/views/TransactionViewer.kt index f0374c85cc..121ae3adc9 100644 --- a/explorer/src/main/kotlin/com/r3corda/explorer/views/TransactionViewer.kt +++ b/explorer/src/main/kotlin/com/r3corda/explorer/views/TransactionViewer.kt @@ -16,7 +16,6 @@ import com.r3corda.explorer.model.IdentityModel import com.r3corda.explorer.model.ReportingCurrencyModel import com.r3corda.explorer.sign import com.r3corda.explorer.ui.* -import com.r3corda.node.services.monitor.ServiceToClientEvent import javafx.beans.binding.Bindings import javafx.beans.value.ObservableValue import javafx.collections.FXCollections @@ -31,7 +30,6 @@ import javafx.scene.layout.VBox import javafx.scene.paint.Color import tornadofx.View import java.security.PublicKey -import java.time.Instant import java.util.* class TransactionViewer: View() { @@ -74,11 +72,6 @@ class TransactionViewer: View() { private val signaturesTitledPane: TitledPane by fxid() private val signaturesList: ListView by fxid() - private val lowLevelEventsTitledPane: TitledPane by fxid() - private val lowLevelEventsTable: TableView by fxid() - private val lowLevelEventsTimestamp: TableColumn by fxid() - private val lowLevelEventsEvent: TableColumn by fxid() - private val matchingTransactionsLabel: Label by fxid() // Inject data @@ -93,18 +86,13 @@ class TransactionViewer: View() { * have the data. */ data class ViewerNode( - val transactionId: ObservableValue, + val transaction: PartiallyResolvedTransaction, + val transactionId: SecureHash, val stateMachineRunId: ObservableValue, - val clientUuid: ObservableValue, - val originator: ObservableValue, - val transactionStatus: ObservableValue, - val stateMachineStatus: ObservableValue, - val protocolStatus: ObservableValue, - val statusUpdated: ObservableValue, - val commandTypes: ObservableValue>>, - val totalValueEquiv: ObservableValue?>, - val transaction: ObservableValue, - val allEvents: ObservableList + val stateMachineStatus: ObservableValue, + val protocolStatus: ObservableValue, + val commandTypes: Collection>, + val totalValueEquiv: ObservableValue?> ) /** @@ -119,59 +107,40 @@ class TransactionViewer: View() { * We map the gathered data about transactions almost one-to-one to the nodes. */ private val viewerNodes = gatheredTransactionDataList.map { + // TODO in theory there may be several associated state machines, we should at least give a warning if there are + // several, currently we just throw others away + val stateMachine = it.stateMachines.first() + fun stateMachineProperty(property: (StateMachineData) -> ObservableValue): ObservableValue { + return stateMachine.map { it?.let(property) }.bindOut { it ?: null.lift() } + } ViewerNode( - transactionId = it.transaction.map { it?.id }, - stateMachineRunId = it.stateMachineRunId, - clientUuid = it.uuid, - /** - * We can't really do any better based on uuid, we need to store explicit data for this TODO - */ - originator = it.uuid.map { uuid -> - if (uuid == null) { - "Someone" - } else { - "Us" - } - }, - transactionStatus = it.status, - protocolStatus = it.protocolStatus, - stateMachineStatus = it.stateMachineStatus, - statusUpdated = it.lastUpdate, - commandTypes = it.transaction.map { - val commands = mutableSetOf>() - it?.transaction?.tx?.commands?.forEach { - commands.add(it.value.javaClass) - } - commands - }, - totalValueEquiv = it.transaction.bind { transaction -> - if (transaction == null) { - null.lift?>() - } else { - - val resolvedInputs = transaction.inputs.sequence().map { resolution -> - when (resolution) { - is PartiallyResolvedTransaction.InputResolution.Unresolved -> null - is PartiallyResolvedTransaction.InputResolution.Resolved -> resolution.stateAndRef - } - }.foldObservable(listOf()) { inputs: List>?, state: StateAndRef? -> - if (inputs != null && state != null) { - inputs + state - } else { - null - } - } - - ::calculateTotalEquiv.lift( - myIdentity, - reportingExchange, - resolvedInputs, - transaction.transaction.tx.outputs.lift() - ) - } - }, transaction = it.transaction, - allEvents = it.allEvents + transactionId = it.transaction.id, + stateMachineRunId = stateMachine.map { it?.id }, + protocolStatus = stateMachineProperty { it.protocolStatus }, + stateMachineStatus = stateMachineProperty { it.stateMachineStatus }, + commandTypes = it.transaction.transaction.tx.commands.map { it.value.javaClass }, + totalValueEquiv = { + val resolvedInputs = it.transaction.inputs.sequence().map { resolution -> + when (resolution) { + is PartiallyResolvedTransaction.InputResolution.Unresolved -> null + is PartiallyResolvedTransaction.InputResolution.Resolved -> resolution.stateAndRef + } + }.fold(listOf()) { inputs: List>?, state: StateAndRef? -> + if (inputs != null && state != null) { + inputs + state + } else { + null + } + } + + ::calculateTotalEquiv.lift( + myIdentity, + reportingExchange, + resolvedInputs.lift(), + it.transaction.transaction.tx.outputs.lift() + ) + }() ) } @@ -179,9 +148,9 @@ class TransactionViewer: View() { * The detail panes are only filled out if a transaction is selected */ private val selectedViewerNode = transactionViewTable.singleRowSelection() - private val selectedTransaction = selectedViewerNode.bindOut { + private val selectedTransaction = selectedViewerNode.map { when (it) { - is SingleRowSelection.None -> null.lift() + is SingleRowSelection.None -> null is SingleRowSelection.Selected -> it.node.transaction } } @@ -215,21 +184,13 @@ class TransactionViewer: View() { } }) - private val lowLevelEvents = ChosenList(selectedViewerNode.map { - when (it) { - is SingleRowSelection.None -> FXCollections.emptyObservableList() - is SingleRowSelection.Selected -> it.node.allEvents - } - }) - /** * We only display the detail panes if there is a node selected. */ private val allNodesShown = FXCollections.observableArrayList( transactionViewTable, contractStatesTitledPane, - signaturesTitledPane, - lowLevelEventsTitledPane + signaturesTitledPane ) private val onlyTransactionsTableShown = FXCollections.observableArrayList( transactionViewTable @@ -326,11 +287,9 @@ class TransactionViewer: View() { Math.floor(tableWidthWithoutPaddingAndBorder.toDouble() / transactionViewTable.columns.size).toInt() } - transactionViewTransactionId.setCellValueFactory { it.value.transactionId.map { "${it ?: ""}" } } + transactionViewTransactionId.setCellValueFactory { "${it.value.transactionId}".lift() } transactionViewStateMachineId.setCellValueFactory { it.value.stateMachineRunId.map { "${it?.uuid ?: ""}" } } - transactionViewClientUuid.setCellValueFactory { it.value.clientUuid.map { "${it ?: ""}" } } transactionViewProtocolStatus.setCellValueFactory { it.value.protocolStatus.map { "${it ?: ""}" } } - transactionViewTransactionStatus.setCellValueFactory { it.value.transactionStatus } transactionViewTransactionStatus.setCustomCellFactory { val label = Label() val backgroundFill = when (it) { @@ -342,7 +301,7 @@ class TransactionViewer: View() { label.text = "$it" label } - transactionViewStateMachineStatus.setCellValueFactory { it.value.stateMachineStatus } + transactionViewStateMachineStatus.setCellValueFactory { it.value.stateMachineStatus.map { it } } transactionViewStateMachineStatus.setCustomCellFactory { val label = Label() val backgroundFill = when (it) { @@ -356,7 +315,7 @@ class TransactionViewer: View() { } transactionViewCommandTypes.setCellValueFactory { - it.value.commandTypes.map { it.map { it.simpleName }.joinToString(",") } + it.value.commandTypes.map { it.simpleName }.joinToString(",").lift() } transactionViewTotalValueEquiv.setCellValueFactory> { it.value.totalValueEquiv } transactionViewTotalValueEquiv.cellFactory = object : Formatter> { @@ -394,14 +353,6 @@ class TransactionViewer: View() { override fun format(value: PublicKey) = value.toStringShort() }.toListCellFactory() - // Low level events - Bindings.bindContent(lowLevelEventsTable.items, lowLevelEvents) - lowLevelEventsTimestamp.setCellValueFactory { it.value.time.lift() } - lowLevelEventsEvent.setCellValueFactory { it.value.lift() } - lowLevelEventsTable.setColumnPrefWidthPolicy { tableWidthWithoutPaddingAndBorder, column -> - Math.floor(tableWidthWithoutPaddingAndBorder.toDouble() / lowLevelEventsTable.columns.size).toInt() - } - matchingTransactionsLabel.textProperty().bind(Bindings.size(viewerNodes).map { "$it matching transaction${if (it == 1) "" else "s"}" }) diff --git a/explorer/src/main/resources/com/r3corda/explorer/views/TransactionViewer.fxml b/explorer/src/main/resources/com/r3corda/explorer/views/TransactionViewer.fxml index 3697e5901c..b886208120 100644 --- a/explorer/src/main/resources/com/r3corda/explorer/views/TransactionViewer.fxml +++ b/explorer/src/main/resources/com/r3corda/explorer/views/TransactionViewer.fxml @@ -40,7 +40,7 @@ - + @@ -121,21 +121,11 @@ - + - - - - - - - - - - diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 4c6139b966..2866e8512a 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -33,7 +33,6 @@ import com.r3corda.node.services.events.ScheduledActivityObserver import com.r3corda.node.services.identity.InMemoryIdentityService import com.r3corda.node.services.keys.PersistentKeyManagementService import com.r3corda.node.services.messaging.CordaRPCOps -import com.r3corda.node.services.monitor.NodeMonitorService import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC @@ -140,7 +139,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap lateinit var vault: VaultService lateinit var keyManagement: KeyManagementService var inNodeNetworkMapService: NetworkMapService? = null - var inNodeMonitorService: NodeMonitorService? = null var inNodeNotaryService: NotaryService? = null var uniquenessProvider: UniquenessProvider? = null lateinit var identity: IdentityService @@ -231,7 +229,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap } } - inNodeMonitorService = makeMonitorService() // Note this HAS to be after smm is set buildAdvertisedServices() // TODO: this model might change but for now it provides some de-coupling @@ -410,8 +407,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap // TODO: sort out ordering of open & protected modifiers of functions in this class. protected open fun makeVaultService(): VaultService = NodeVaultService(services) - protected open fun makeMonitorService(): NodeMonitorService = NodeMonitorService(services, smm) - open fun stop() { // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the // network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop() diff --git a/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt b/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt index 4aec40a792..13286064c4 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt @@ -1,24 +1,33 @@ package com.r3corda.node.internal -import com.r3corda.core.contracts.ContractState -import com.r3corda.core.contracts.StateAndRef +import com.r3corda.contracts.asset.Cash +import com.r3corda.contracts.asset.InsufficientBalanceException +import com.r3corda.core.contracts.* +import com.r3corda.core.crypto.Party +import com.r3corda.core.crypto.toStringShort +import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.Vault +import com.r3corda.core.transactions.TransactionBuilder import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.messaging.CordaRPCOps import com.r3corda.node.services.messaging.StateMachineInfo import com.r3corda.node.services.messaging.StateMachineUpdate +import com.r3corda.node.services.messaging.TransactionBuildResult import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.utilities.databaseTransaction +import com.r3corda.protocols.BroadcastTransactionProtocol +import com.r3corda.protocols.FinalityProtocol import org.jetbrains.exposed.sql.Database import rx.Observable +import java.security.KeyPair /** * Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server * thread (i.e. serially). Arguments are serialised and deserialised automatically. */ class ServerRPCOps( - val services: ServiceHubInternal, - val stateMachineManager: StateMachineManager, + val services: ServiceHub, + val smm: StateMachineManager, val database: Database ) : CordaRPCOps { override val protocolVersion: Int = 0 @@ -31,11 +40,100 @@ class ServerRPCOps( } override fun verifiedTransactions() = services.storageService.validatedTransactions.track() override fun stateMachinesAndUpdates(): Pair, Observable> { - val (allStateMachines, changes) = stateMachineManager.track() + val (allStateMachines, changes) = smm.track() return Pair( allStateMachines.map { StateMachineInfo.fromProtocolStateMachineImpl(it) }, changes.map { StateMachineUpdate.fromStateMachineChange(it) } ) } override fun stateMachineRecordedTransactionMapping() = services.storageService.stateMachineRecordedTransactionMapping.track() + + override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult { + return databaseTransaction(database) { + when (command) { + is ClientToServiceCommand.IssueCash -> issueCash(command) + is ClientToServiceCommand.PayCash -> initiatePayment(command) + is ClientToServiceCommand.ExitCash -> exitCash(command) + } + } + } + + // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service + private fun initiatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult { + val builder: TransactionBuilder = TransactionType.General.Builder(null) + // TODO: Have some way of restricting this to states the caller controls + try { + Cash().generateSpend(builder, req.amount.withoutIssuer(), req.recipient.owningKey, + // TODO: Move cash state filtering by issuer down to the contract itself + services.vaultService.currentVault.statesOfType().filter { it.state.data.amount.token == req.amount.token }, + setOf(req.amount.token.issuer.party)) + .forEach { + val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}") + builder.signWith(KeyPair(it, key)) + } + val tx = builder.toSignedTransaction(checkSufficientSignatures = false) + val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient)) + return TransactionBuildResult.ProtocolStarted( + smm.add(BroadcastTransactionProtocol.TOPIC, protocol).id, + tx, + "Cash payment transaction generated" + ) + } catch(ex: InsufficientBalanceException) { + return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance") + } + } + + // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service + private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult { + val builder: TransactionBuilder = TransactionType.General.Builder(null) + try { + val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef) + Cash().generateExit(builder, req.amount.issuedBy(issuer), + services.vaultService.currentVault.statesOfType().filter { it.state.data.owner == issuer.party.owningKey }) + builder.signWith(services.storageService.myLegalIdentityKey) + + // Work out who the owners of the burnt states were + val inputStatesNullable = services.vaultService.statesForRefs(builder.inputStates()) + val inputStates = inputStatesNullable.values.filterNotNull().map { it.data } + if (inputStatesNullable.size != inputStates.size) { + val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key } + throw InputStateRefResolveFailed(unresolvedStateRefs) + } + + // TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them + // count as a reason to fail? + val participants: Set = inputStates.filterIsInstance().map { services.identityService.partyFromKey(it.owner) }.filterNotNull().toSet() + + // Commit the transaction + val tx = builder.toSignedTransaction(checkSufficientSignatures = false) + val protocol = FinalityProtocol(tx, setOf(req), participants) + return TransactionBuildResult.ProtocolStarted( + smm.add(BroadcastTransactionProtocol.TOPIC, protocol).id, + tx, + "Cash destruction transaction generated" + ) + } catch (ex: InsufficientBalanceException) { + return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance") + } + } + + // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service + private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult { + val builder: TransactionBuilder = TransactionType.General.Builder(notary = null) + val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef) + Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary) + builder.signWith(services.storageService.myLegalIdentityKey) + val tx = builder.toSignedTransaction(checkSufficientSignatures = true) + // Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it + val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient)) + return TransactionBuildResult.ProtocolStarted( + smm.add(BroadcastTransactionProtocol.TOPIC, protocol).id, + tx, + "Cash issuance completed" + ) + } + + class InputStateRefResolveFailed(stateRefs: List) : + Exception("Failed to resolve input StateRefs $stateRefs") + } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt index 095a039596..de263fc8df 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt @@ -1,8 +1,8 @@ package com.r3corda.node.services.messaging +import com.r3corda.core.contracts.ClientToServiceCommand import com.r3corda.core.contracts.ContractState import com.r3corda.core.contracts.StateAndRef -import com.r3corda.core.crypto.SecureHash import com.r3corda.core.node.services.StateMachineTransactionMapping import com.r3corda.core.node.services.Vault import com.r3corda.core.protocols.StateMachineRunId @@ -28,9 +28,9 @@ data class StateMachineInfo( } } -sealed class StateMachineUpdate { - class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate() - class Removed(val stateMachineRunId: StateMachineRunId) : StateMachineUpdate() +sealed class StateMachineUpdate(val id: StateMachineRunId) { + class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate(stateMachineInfo.id) + class Removed(id: StateMachineRunId) : StateMachineUpdate(id) companion object { fun fromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate { @@ -51,6 +51,28 @@ sealed class StateMachineUpdate { } } +sealed class TransactionBuildResult { + /** + * State indicating that a protocol is managing this request, and that the client should track protocol state machine + * updates for further information. The monitor will separately receive notification of the state machine having been + * added, as it would any other state machine. This response is used solely to enable the monitor to identify + * the state machine (and its progress) as associated with the request. + * + * @param transaction the transaction created as a result, in the case where the protocol has completed. + */ + class ProtocolStarted(val id: StateMachineRunId, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult() { + override fun toString() = "Started($message)" + } + + /** + * State indicating the action undertaken failed, either directly (it is not something which requires a + * state machine), or before a state machine was started. + */ + class Failed(val message: String?) : TransactionBuildResult() { + override fun toString() = "Failed($message)" + } +} + /** * RPC operations that the node exposes to clients using the Java client library. These can be called from * client apps and are implemented by the node in the [ServerRPCOps] class. @@ -73,6 +95,15 @@ interface CordaRPCOps : RPCOps { */ @RPCReturnsObservables fun verifiedTransactions(): Pair, Observable> + + /** + * Returns a pair of state machine id - recorded transaction hash pairs + */ @RPCReturnsObservables fun stateMachineRecordedTransactionMapping(): Pair, Observable> + + /** + * Executes the given command, possibly triggering cash creation etc. + */ + fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCStructures.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCStructures.kt index bc027c7358..2702aaadf9 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCStructures.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCStructures.kt @@ -5,13 +5,23 @@ import com.esotericsoftware.kryo.Registration import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.serializers.DefaultSerializers +import com.r3corda.contracts.asset.Cash import com.r3corda.core.ErrorOr +import com.r3corda.core.contracts.* +import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.node.services.StateMachineTransactionMapping +import com.r3corda.core.node.services.Vault +import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.serialization.* import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.WireTransaction import de.javakaffee.kryoserializers.ArraysAsListSerializer import de.javakaffee.kryoserializers.guava.* +import net.i2p.crypto.eddsa.EdDSAPrivateKey +import net.i2p.crypto.eddsa.EdDSAPublicKey import org.apache.activemq.artemis.api.core.client.ClientMessage import org.objenesis.strategy.StdInstantiatorStrategy import org.slf4j.LoggerFactory @@ -118,7 +128,41 @@ private class RPCKryo(private val observableSerializer: Serializer().javaClass) // EmptyList + register(IllegalStateException::class.java) + register(Pair::class.java) + register(StateMachineUpdate.Added::class.java) + register(StateMachineUpdate.Removed::class.java) + register(StateMachineInfo::class.java) + register(DigitalSignature.WithKey::class.java) + register(DigitalSignature.LegallyIdentifiable::class.java) + register(ByteArray::class.java) + register(EdDSAPublicKey::class.java, Ed25519PublicKeySerializer) + register(EdDSAPrivateKey::class.java, Ed25519PrivateKeySerializer) + register(Vault::class.java) + register(Vault.Update::class.java) + register(StateMachineRunId::class.java) + register(StateMachineTransactionMapping::class.java) + register(UUID::class.java) + register(LinkedHashSet::class.java) + register(StateAndRef::class.java) + register(setOf().javaClass) // EmptySet + register(StateRef::class.java) + register(SecureHash.SHA256::class.java) + register(TransactionState::class.java) + register(Cash.State::class.java) + register(Amount::class.java) + register(Issued::class.java) + register(PartyAndReference::class.java) + register(OpaqueBytes::class.java) + register(Currency::class.java) + register(Cash::class.java) + register(Cash.Clauses.ConserveAmount::class.java) + register(listOf(Unit).javaClass) // SingletonList + register(setOf(Unit).javaClass) // SingletonSet + register(TransactionBuildResult.ProtocolStarted::class.java) + register(TransactionBuildResult.Failed::class.java) // Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway. register(IllegalArgumentException::class.java) @@ -139,4 +183,4 @@ private class RPCKryo(private val observableSerializer: Serializer>? = null): Kryo = RPCKryo(observableSerializer) \ No newline at end of file +fun createRPCKryo(observableSerializer: Serializer>? = null): Kryo = RPCKryo(observableSerializer) diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt deleted file mode 100644 index 6863005e26..0000000000 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt +++ /dev/null @@ -1,60 +0,0 @@ -package com.r3corda.node.services.monitor - -import com.r3corda.core.contracts.* -import com.r3corda.core.protocols.StateMachineRunId -import com.r3corda.core.transactions.SignedTransaction -import com.r3corda.node.utilities.AddOrRemove -import java.time.Instant -import java.util.* - -/** - * Events triggered by changes in the node, and sent to monitoring client(s). - */ -sealed class ServiceToClientEvent(val time: Instant) { - class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time) { - override fun toString() = "Transaction(${transaction.tx.commands})" - } - class OutputState( - time: Instant, - val consumed: Set, - val produced: Set> - ) : ServiceToClientEvent(time) { - override fun toString() = "OutputState(consumed=$consumed, produced=${produced.map { it.state.data.javaClass.simpleName } })" - } - class StateMachine( - time: Instant, - val id: StateMachineRunId, - val label: String, - val addOrRemove: AddOrRemove - ) : ServiceToClientEvent(time) { - override fun toString() = "StateMachine($label, ${addOrRemove.name})" - } - class Progress(time: Instant, val id: StateMachineRunId, val message: String) : ServiceToClientEvent(time) { - override fun toString() = "Progress($message)" - } - class TransactionBuild(time: Instant, val id: UUID, val state: TransactionBuildResult) : ServiceToClientEvent(time) { - override fun toString() = "TransactionBuild($state)" - } -} - -sealed class TransactionBuildResult { - /** - * State indicating that a protocol is managing this request, and that the client should track protocol state machine - * updates for further information. The monitor will separately receive notification of the state machine having been - * added, as it would any other state machine. This response is used solely to enable the monitor to identify - * the state machine (and its progress) as associated with the request. - * - * @param transaction the transaction created as a result, in the case where the protocol has completed. - */ - class ProtocolStarted(val id: StateMachineRunId, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult() { - override fun toString() = "Started($message)" - } - - /** - * State indicating the action undertaken failed, either directly (it is not something which requires a - * state machine), or before a state machine was started. - */ - class Failed(val message: String?) : TransactionBuildResult() { - override fun toString() = "Failed($message)" - } -} diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt deleted file mode 100644 index 610770ecdb..0000000000 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt +++ /dev/null @@ -1,20 +0,0 @@ -package com.r3corda.node.services.monitor - -import com.r3corda.core.contracts.ClientToServiceCommand -import com.r3corda.core.contracts.ContractState -import com.r3corda.core.contracts.StateAndRef -import com.r3corda.core.messaging.SingleMessageRecipient -import com.r3corda.protocols.DirectRequestMessage - -data class RegisterRequest(override val replyToRecipient: SingleMessageRecipient, - override val sessionID: Long) : DirectRequestMessage - -data class RegisterResponse(val success: Boolean) -// TODO: This should have a shared secret the monitor was sent in the registration response, for security -data class DeregisterRequest(override val replyToRecipient: SingleMessageRecipient, - override val sessionID: Long) : DirectRequestMessage - -data class DeregisterResponse(val success: Boolean) -data class StateSnapshotMessage(val contractStates: Collection>, val protocolStates: Collection) - -data class ClientToServiceCommandMessage(override val sessionID: Long, override val replyToRecipient: SingleMessageRecipient, val command: ClientToServiceCommand) : DirectRequestMessage diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/NodeMonitorService.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/NodeMonitorService.kt deleted file mode 100644 index acea7d6148..0000000000 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/NodeMonitorService.kt +++ /dev/null @@ -1,233 +0,0 @@ -package com.r3corda.node.services.monitor - -import co.paralleluniverse.common.util.VisibleForTesting -import com.r3corda.contracts.asset.Cash -import com.r3corda.contracts.asset.InsufficientBalanceException -import com.r3corda.core.contracts.* -import com.r3corda.core.crypto.Party -import com.r3corda.core.crypto.toStringShort -import com.r3corda.core.messaging.MessageRecipients -import com.r3corda.core.messaging.createMessage -import com.r3corda.core.node.services.DEFAULT_SESSION_ID -import com.r3corda.core.node.services.Vault -import com.r3corda.core.protocols.ProtocolLogic -import com.r3corda.core.protocols.StateMachineRunId -import com.r3corda.core.serialization.serialize -import com.r3corda.core.transactions.SignedTransaction -import com.r3corda.core.transactions.TransactionBuilder -import com.r3corda.core.utilities.loggerFor -import com.r3corda.node.services.api.AbstractNodeService -import com.r3corda.node.services.api.ServiceHubInternal -import com.r3corda.node.services.statemachine.StateMachineManager -import com.r3corda.node.utilities.AddOrRemove -import com.r3corda.protocols.BroadcastTransactionProtocol -import com.r3corda.protocols.FinalityProtocol -import java.security.KeyPair -import java.time.Instant -import java.util.* -import javax.annotation.concurrent.ThreadSafe - -/** - * Service which allows external clients to monitor the node's vault and state machine manager, as well as trigger - * actions within the node. The service also sends requests for user input back to clients, for example to enter - * additional information while a protocol runs, or confirm an action. - * - * This is intended to enable a range of tools from end user UI to ops tools which monitor health across a number of nodes. - */ -// TODO: Implement authorization controls+ -// TODO: Replace this entirely with a publish/subscribe based solution on a to-be-written service (likely JMS or similar), -// rather than implement authentication and publish/subscribe ourselves. -// TODO: Clients need to be able to indicate whether they support interactivity (no point in sending requests for input -// to a monitoring tool) -@ThreadSafe -class NodeMonitorService(services: ServiceHubInternal, val smm: StateMachineManager) : AbstractNodeService(services) { - companion object { - val REGISTER_TOPIC = "platform.monitor.register" - val DEREGISTER_TOPIC = "platform.monitor.deregister" - val STATE_TOPIC = "platform.monitor.state_snapshot" - val IN_EVENT_TOPIC = "platform.monitor.in" - val OUT_EVENT_TOPIC = "platform.monitor.out" - - val logger = loggerFor() - } - - val listeners: MutableSet = HashSet() - - data class RegisteredListener(val recipients: MessageRecipients, val sessionID: Long) - - init { - addMessageHandler(REGISTER_TOPIC) { req: RegisterRequest -> processRegisterRequest(req) } - addMessageHandler(DEREGISTER_TOPIC) { req: DeregisterRequest -> processDeregisterRequest(req) } - addMessageHandler(OUT_EVENT_TOPIC) { req: ClientToServiceCommandMessage -> processEventRequest(req) } - - // Notify listeners on state changes - services.storageService.validatedTransactions.updates.subscribe { tx -> notifyTransaction(tx) } - services.vaultService.updates.subscribe { update -> notifyVaultUpdate(update) } - smm.changes.subscribe { change -> - val id: StateMachineRunId = change.id - val logic: ProtocolLogic<*> = change.logic - val progressTracker = logic.progressTracker - - notifyEvent(ServiceToClientEvent.StateMachine(Instant.now(), id, logic.javaClass.name, change.addOrRemove)) - if (progressTracker != null) { - when (change.addOrRemove) { - AddOrRemove.ADD -> progressTracker.changes.subscribe { progress -> - notifyEvent(ServiceToClientEvent.Progress(Instant.now(), id, progress.toString())) - } - AddOrRemove.REMOVE -> { - // Nothing to do - } - } - } - } - } - - @VisibleForTesting - internal fun notifyVaultUpdate(update: Vault.Update) - = notifyEvent(ServiceToClientEvent.OutputState(Instant.now(), update.consumed, update.produced)) - - @VisibleForTesting - internal fun notifyTransaction(transaction: SignedTransaction) - = notifyEvent(ServiceToClientEvent.Transaction(Instant.now(), transaction)) - - private fun processEventRequest(reqMessage: ClientToServiceCommandMessage) { - val req = reqMessage.command - val result: TransactionBuildResult? = - try { - when (req) { - is ClientToServiceCommand.IssueCash -> issueCash(req) - is ClientToServiceCommand.PayCash -> initiatePayment(req) - is ClientToServiceCommand.ExitCash -> exitCash(req) - else -> throw IllegalArgumentException("Unknown request type ${req.javaClass.name}") - } - } catch(ex: Exception) { - logger.warn("Exception while processing message of type ${req.javaClass.simpleName}", ex) - TransactionBuildResult.Failed(ex.message) - } - - // Send back any result from the event. Not all events (especially TransactionInput) produce a - // result. - if (result != null) { - val event = ServiceToClientEvent.TransactionBuild(Instant.now(), req.id, result) - val respMessage = net.createMessage(IN_EVENT_TOPIC, reqMessage.sessionID, - event.serialize().bits) - net.send(respMessage, reqMessage.getReplyTo(services.networkMapCache)) - } - } - - /** - * Process a request from a monitor to remove them from the subscribers. - */ - fun processDeregisterRequest(req: DeregisterRequest) { - val message = try { - // TODO: Session ID should be managed by the messaging layer, so it handles ensuring that the - // request comes from the same endpoint that registered at the start. - listeners.remove(RegisteredListener(req.replyToRecipient, req.sessionID)) - net.createMessage(DEREGISTER_TOPIC, req.sessionID, DeregisterResponse(true).serialize().bits) - } catch (ex: IllegalStateException) { - net.createMessage(DEREGISTER_TOPIC, req.sessionID, DeregisterResponse(false).serialize().bits) - } - net.send(message, req.replyToRecipient) - } - - /** - * Process a request from a monitor to add them to the subscribers. This includes hooks to authenticate the request, - * but currently all requests pass (and there's no access control on vaults, so it has no actual meaning). - */ - fun processRegisterRequest(req: RegisterRequest) { - try { - listeners.add(RegisteredListener(req.replyToRecipient, req.sessionID)) - val stateMessage = StateSnapshotMessage(services.vaultService.currentVault.states.toList(), - smm.allStateMachines.map { it.javaClass.name }) - net.send(net.createMessage(STATE_TOPIC, DEFAULT_SESSION_ID, stateMessage.serialize().bits), req.replyToRecipient) - - val message = net.createMessage(REGISTER_TOPIC, req.sessionID, RegisterResponse(true).serialize().bits) - net.send(message, req.replyToRecipient) - } catch (ex: IllegalStateException) { - val message = net.createMessage(REGISTER_TOPIC, req.sessionID, RegisterResponse(false).serialize().bits) - net.send(message, req.replyToRecipient) - } - } - - private fun notifyEvent(event: ServiceToClientEvent) = listeners.forEach { monitor -> - net.send(net.createMessage(IN_EVENT_TOPIC, monitor.sessionID, event.serialize().bits), monitor.recipients) - } - - // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service - private fun initiatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult { - val builder: TransactionBuilder = TransactionType.General.Builder(null) - // TODO: Have some way of restricting this to states the caller controls - try { - Cash().generateSpend(builder, req.amount.withoutIssuer(), req.recipient.owningKey, - // TODO: Move cash state filtering by issuer down to the contract itself - services.vaultService.currentVault.statesOfType().filter { it.state.data.amount.token == req.amount.token }, - setOf(req.amount.token.issuer.party)) - .forEach { - val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}") - builder.signWith(KeyPair(it, key)) - } - val tx = builder.toSignedTransaction(checkSufficientSignatures = false) - val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient)) - return TransactionBuildResult.ProtocolStarted( - smm.add("broadcast", protocol).id, - tx, - "Cash payment transaction generated" - ) - } catch(ex: InsufficientBalanceException) { - return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance") - } - } - - // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service - private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult { - val builder: TransactionBuilder = TransactionType.General.Builder(null) - try { - val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef) - Cash().generateExit(builder, req.amount.issuedBy(issuer), - services.vaultService.currentVault.statesOfType().filter { it.state.data.owner == issuer.party.owningKey }) - builder.signWith(services.storageService.myLegalIdentityKey) - - // Work out who the owners of the burnt states were - val inputStatesNullable = services.vaultService.statesForRefs(builder.inputStates()) - val inputStates = inputStatesNullable.values.filterNotNull().map { it.data } - if (inputStatesNullable.size != inputStates.size) { - val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key } - throw InputStateRefResolveFailed(unresolvedStateRefs) - } - - // TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them - // count as a reason to fail? - val participants: Set = inputStates.filterIsInstance().map { services.identityService.partyFromKey(it.owner) }.filterNotNull().toSet() - - // Commit the transaction - val tx = builder.toSignedTransaction(checkSufficientSignatures = false) - val protocol = FinalityProtocol(tx, setOf(req), participants) - return TransactionBuildResult.ProtocolStarted( - smm.add("broadcast", protocol).id, - tx, - "Cash destruction transaction generated" - ) - } catch (ex: InsufficientBalanceException) { - return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance") - } - } - - // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service - private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult { - val builder: TransactionBuilder = TransactionType.General.Builder(notary = null) - val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef) - Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary) - builder.signWith(services.storageService.myLegalIdentityKey) - val tx = builder.toSignedTransaction(checkSufficientSignatures = true) - // Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it - val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient)) - return TransactionBuildResult.ProtocolStarted( - smm.add("broadcast", protocol).id, - tx, - "Cash issuance completed" - ) - } - - class InputStateRefResolveFailed(stateRefs: List) : - Exception("Failed to resolve input StateRefs $stateRefs") -} diff --git a/node/src/test/kotlin/com/r3corda/node/ServerRPCTest.kt b/node/src/test/kotlin/com/r3corda/node/ServerRPCTest.kt new file mode 100644 index 0000000000..42cde455bc --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/ServerRPCTest.kt @@ -0,0 +1,174 @@ +package com.r3corda.node + +import com.r3corda.contracts.asset.Cash +import com.r3corda.core.contracts.* +import com.r3corda.core.node.services.Vault +import com.r3corda.core.protocols.StateMachineRunId +import com.r3corda.core.serialization.OpaqueBytes +import com.r3corda.core.transactions.SignedTransaction +import com.r3corda.core.utilities.DUMMY_NOTARY +import com.r3corda.node.internal.ServerRPCOps +import com.r3corda.node.services.messaging.StateMachineUpdate +import com.r3corda.node.services.network.NetworkMapService +import com.r3corda.node.services.transactions.ValidatingNotaryService +import com.r3corda.testing.expect +import com.r3corda.testing.expectEvents +import com.r3corda.testing.node.MockNetwork +import com.r3corda.testing.node.MockNetwork.MockNode +import com.r3corda.testing.sequence +import org.junit.Before +import org.junit.Test +import rx.Observable +import kotlin.test.assertEquals +import kotlin.test.assertFalse + +/** + * Unit tests for the node monitoring service. + */ +class ServerRPCTest { + lateinit var network: MockNetwork + lateinit var aliceNode: MockNode + lateinit var notaryNode: MockNode + lateinit var rpc: ServerRPCOps + lateinit var stateMachineUpdates: Observable + lateinit var transactions: Observable + lateinit var vaultUpdates: Observable + + @Before + fun setup() { + network = MockNetwork() + val networkMap = network.createNode(advertisedServices = NetworkMapService.Type) + aliceNode = network.createNode(networkMapAddress = networkMap.info.address) + notaryNode = network.createNode(advertisedServices = ValidatingNotaryService.Type, networkMapAddress = networkMap.info.address) + rpc = ServerRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database) + + stateMachineUpdates = rpc.stateMachinesAndUpdates().second + transactions = rpc.verifiedTransactions().second + vaultUpdates = rpc.vaultAndUpdates().second + } + + @Test + fun `cash issue accepted`() { + val quantity = 1000L + val ref = OpaqueBytes(ByteArray(1) {1}) + + // Check the monitoring service wallet is empty + assertFalse(aliceNode.services.vaultService.currentVault.states.iterator().hasNext()) + + // Tell the monitoring service node to issue some cash + val recipient = aliceNode.services.storageService.myLegalIdentity + val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY) + rpc.executeCommand(outEvent) + network.runNetwork() + + val expectedState = Cash.State(Amount(quantity, + Issued(aliceNode.services.storageService.myLegalIdentity.ref(ref), GBP)), + recipient.owningKey) + + var issueSmId: StateMachineRunId? = null + stateMachineUpdates.expectEvents { + sequence( + // ISSUE + expect { add: StateMachineUpdate.Added -> + issueSmId = add.id + }, + expect { remove: StateMachineUpdate.Removed -> + require(remove.id == issueSmId) + } + ) + } + + transactions.expectEvents { + expect { tx -> + assertEquals(expectedState, tx.tx.outputs.single().data) + } + } + + vaultUpdates.expectEvents { + expect { update -> + val actual = update.produced.single().state.data + assertEquals(expectedState, actual) + } + } + } + + @Test + fun issueAndMoveWorks() { + + rpc.executeCommand(ClientToServiceCommand.IssueCash( + amount = Amount(100, USD), + issueRef = OpaqueBytes(ByteArray(1, { 1 })), + recipient = aliceNode.services.storageService.myLegalIdentity, + notary = notaryNode.services.storageService.myLegalIdentity + )) + + network.runNetwork() + + rpc.executeCommand(ClientToServiceCommand.PayCash( + amount = Amount(100, Issued(PartyAndReference(aliceNode.services.storageService.myLegalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)), + recipient = aliceNode.services.storageService.myLegalIdentity + )) + + network.runNetwork() + + var issueSmId: StateMachineRunId? = null + var moveSmId: StateMachineRunId? = null + stateMachineUpdates.expectEvents { + sequence( + // ISSUE + expect { add: StateMachineUpdate.Added -> + issueSmId = add.id + }, + expect { remove: StateMachineUpdate.Removed -> + require(remove.id == issueSmId) + }, + // MOVE + expect { add: StateMachineUpdate.Added -> + moveSmId = add.id + }, + expect { remove: StateMachineUpdate.Removed -> + require(remove.id == moveSmId) + } + ) + } + + transactions.expectEvents { + sequence( + // ISSUE + expect { tx -> + require(tx.tx.inputs.isEmpty()) + require(tx.tx.outputs.size == 1) + val signaturePubKeys = tx.sigs.map { it.by }.toSet() + // Only Alice signed + require(signaturePubKeys.size == 1) + require(signaturePubKeys.contains(aliceNode.services.storageService.myLegalIdentity.owningKey)) + }, + // MOVE + expect { tx -> + require(tx.tx.inputs.size == 1) + require(tx.tx.outputs.size == 1) + val signaturePubKeys = tx.sigs.map { it.by }.toSet() + // Alice and Notary signed + require(signaturePubKeys.size == 2) + require(signaturePubKeys.contains(aliceNode.services.storageService.myLegalIdentity.owningKey)) + require(signaturePubKeys.contains(notaryNode.services.storageService.myLegalIdentity.owningKey)) + } + ) + } + + vaultUpdates.expectEvents { + sequence( + // ISSUE + expect { update -> + require(update.consumed.size == 0) { update.consumed.size } + require(update.produced.size == 1) { update.produced.size } + }, + // MOVE + expect { update -> + require(update.consumed.size == 1) { update.consumed.size } + require(update.produced.size == 1) { update.produced.size } + } + ) + } + } +} diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeMonitorServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeMonitorServiceTests.kt deleted file mode 100644 index 4e0f42e566..0000000000 --- a/node/src/test/kotlin/com/r3corda/node/services/NodeMonitorServiceTests.kt +++ /dev/null @@ -1,234 +0,0 @@ -package com.r3corda.node.services - -import com.google.common.util.concurrent.ListenableFuture -import com.r3corda.contracts.asset.Cash -import com.r3corda.core.contracts.* -import com.r3corda.core.crypto.SecureHash -import com.r3corda.core.crypto.newSecureRandom -import com.r3corda.core.messaging.createMessage -import com.r3corda.core.node.services.DEFAULT_SESSION_ID -import com.r3corda.core.node.services.Vault -import com.r3corda.core.random63BitValue -import com.r3corda.core.serialization.OpaqueBytes -import com.r3corda.core.serialization.deserialize -import com.r3corda.core.serialization.serialize -import com.r3corda.core.utilities.DUMMY_NOTARY -import com.r3corda.core.utilities.DUMMY_PUBKEY_1 -import com.r3corda.node.services.monitor.* -import com.r3corda.node.services.monitor.NodeMonitorService.Companion.IN_EVENT_TOPIC -import com.r3corda.node.services.monitor.NodeMonitorService.Companion.REGISTER_TOPIC -import com.r3corda.node.utilities.AddOrRemove -import com.r3corda.testing.expect -import com.r3corda.testing.expectEvents -import com.r3corda.testing.node.MockNetwork -import com.r3corda.testing.node.MockNetwork.MockNode -import com.r3corda.testing.parallel -import com.r3corda.testing.sequence -import org.junit.Before -import org.junit.Test -import rx.subjects.ReplaySubject -import java.util.concurrent.TimeUnit -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertNotNull -import kotlin.test.assertTrue - -/** - * Unit tests for the node monitoring service. - */ -class NodeMonitorServiceTests { - lateinit var network: MockNetwork - - @Before - fun setup() { - network = MockNetwork() - } - - /** - * Authenticate the register node with the monitor service node. - */ - private fun authenticate(monitorServiceNode: MockNode, registerNode: MockNode): Long { - network.runNetwork() - val sessionId = random63BitValue() - val authenticatePsm = register(registerNode, monitorServiceNode, sessionId) - network.runNetwork() - authenticatePsm.get(1, TimeUnit.SECONDS) - return sessionId - } - - /** - * Test a very simple case of trying to register against the service. - */ - @Test - fun `success with network`() { - val (monitorServiceNode, registerNode) = network.createTwoNodes() - - network.runNetwork() - val authenticatePsm = register(registerNode, monitorServiceNode, random63BitValue()) - network.runNetwork() - val result = authenticatePsm.get(1, TimeUnit.SECONDS) - assertTrue(result.success) - } - - /** - * Test that having registered, changes are relayed correctly. - */ - @Test - fun `event received`() { - val (monitorServiceNode, registerNode) = network.createTwoNodes() - val sessionID = authenticate(monitorServiceNode, registerNode) - var receivePsm = receiveWalletUpdate(registerNode, sessionID) - var expected = Vault.Update(emptySet(), emptySet()) - monitorServiceNode.inNodeMonitorService!!.notifyVaultUpdate(expected) - network.runNetwork() - var actual = receivePsm.get(1, TimeUnit.SECONDS) - assertEquals(expected.consumed, actual.consumed) - assertEquals(expected.produced, actual.produced) - - // Check that states are passed through correctly - receivePsm = receiveWalletUpdate(registerNode, sessionID) - val consumed = setOf(StateRef(SecureHash.randomSHA256(), 0)) - val producedState = TransactionState(DummyContract.SingleOwnerState(newSecureRandom().nextInt(), DUMMY_PUBKEY_1), DUMMY_NOTARY) - val produced = setOf(StateAndRef(producedState, StateRef(SecureHash.randomSHA256(), 0))) - expected = Vault.Update(consumed, produced) - monitorServiceNode.inNodeMonitorService!!.notifyVaultUpdate(expected) - network.runNetwork() - actual = receivePsm.get(1, TimeUnit.SECONDS) - assertEquals(expected.produced, actual.produced) - assertEquals(expected.consumed, actual.consumed) - } - - @Test - fun `cash issue accepted`() { - val (monitorServiceNode, registerNode) = network.createTwoNodes() - val sessionID = authenticate(monitorServiceNode, registerNode) - val quantity = 1000L - val events = ReplaySubject.create() - val ref = OpaqueBytes(ByteArray(1) {1}) - - registerNode.net.addMessageHandler(IN_EVENT_TOPIC, sessionID) { msg, reg -> - events.onNext(msg.data.deserialize()) - } - - // Check the monitoring service wallet is empty - assertFalse(monitorServiceNode.services.vaultService.currentVault.states.iterator().hasNext()) - - // Tell the monitoring service node to issue some cash - val recipient = monitorServiceNode.services.storageService.myLegalIdentity - val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY) - val message = registerNode.net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID, - ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits) - registerNode.net.send(message, monitorServiceNode.net.myAddress) - network.runNetwork() - - val expectedState = Cash.State(Amount(quantity, - Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)), - recipient.owningKey) - - // Check we've received a response - events.expectEvents { - parallel( - sequence( - expect { event: ServiceToClientEvent.StateMachine -> - require(event.addOrRemove == AddOrRemove.ADD) - }, - expect { event: ServiceToClientEvent.StateMachine -> - require(event.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { event: ServiceToClientEvent.Transaction -> }, - expect { event: ServiceToClientEvent.TransactionBuild -> - // Check the returned event is correct - val tx = (event.state as TransactionBuildResult.ProtocolStarted).transaction - assertNotNull(tx) - assertEquals(expectedState, tx!!.tx.outputs.single().data) - }, - expect { event: ServiceToClientEvent.OutputState -> - // Check the generated state is correct - val actual = event.produced.single().state.data - assertEquals(expectedState, actual) - } - ) - } - } - - @Test - fun `cash move accepted`() { - val (monitorServiceNode, registerNode) = network.createTwoNodes() - val sessionID = authenticate(monitorServiceNode, registerNode) - val quantity = 1000L - val events = ReplaySubject.create() - - registerNode.net.addMessageHandler(IN_EVENT_TOPIC, sessionID) { msg, reg -> - events.onNext(msg.data.deserialize()) - } - - val recipient = monitorServiceNode.services.storageService.myLegalIdentity - - // Tell the monitoring service node to issue some cash so we can spend it later - val issueCommand = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), OpaqueBytes.of(0), recipient, recipient) - val issueMessage = registerNode.net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID, - ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, issueCommand).serialize().bits) - registerNode.net.send(issueMessage, monitorServiceNode.net.myAddress) - val payCommand = ClientToServiceCommand.PayCash(Amount(quantity, Issued(recipient.ref(0), GBP)), recipient) - val payMessage = registerNode.net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID, - ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, payCommand).serialize().bits) - registerNode.net.send(payMessage, monitorServiceNode.net.myAddress) - network.runNetwork() - - events.expectEvents(isStrict = false) { - sequence( - // ISSUE - parallel( - sequence( - expect { event: ServiceToClientEvent.StateMachine -> - require(event.addOrRemove == AddOrRemove.ADD) - }, - expect { event: ServiceToClientEvent.StateMachine -> - require(event.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { event: ServiceToClientEvent.Transaction -> }, - expect { event: ServiceToClientEvent.TransactionBuild -> }, - expect { event: ServiceToClientEvent.OutputState -> } - ), - // MOVE - parallel( - sequence( - expect { event: ServiceToClientEvent.StateMachine -> - require(event.addOrRemove == AddOrRemove.ADD) - }, - expect { event: ServiceToClientEvent.StateMachine -> - require(event.addOrRemove == AddOrRemove.REMOVE) - } - ), - expect { event: ServiceToClientEvent.Transaction -> - require(event.transaction.sigs.size == 1) - event.transaction.sigs.map { it.by }.containsAll( - listOf( - monitorServiceNode.services.storageService.myLegalIdentity.owningKey - ) - ) - }, - expect { event: ServiceToClientEvent.TransactionBuild -> - require(event.state is TransactionBuildResult.ProtocolStarted) - }, - expect { event: ServiceToClientEvent.OutputState -> - require(event.consumed.size == 1) - require(event.produced.size == 1) - } - ) - ) - } - } - - private fun register(registerNode: MockNode, monitorServiceNode: MockNode, sessionId: Long): ListenableFuture { - val req = RegisterRequest(registerNode.services.networkService.myAddress, sessionId) - return registerNode.sendAndReceive(REGISTER_TOPIC, monitorServiceNode, req) - } - - private fun receiveWalletUpdate(registerNode: MockNode, sessionId: Long): ListenableFuture { - return registerNode.receive(IN_EVENT_TOPIC, sessionId) - } - -}