diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index c043fdeecd..117df07d75 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -27,6 +27,7 @@ import com.r3corda.node.services.events.NodeSchedulerService import com.r3corda.node.services.events.ScheduledActivityObserver import com.r3corda.node.services.identity.InMemoryIdentityService import com.r3corda.node.services.keys.E2ETestKeyManagementService +import com.r3corda.node.services.monitor.WalletMonitorService import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.NetworkMapService @@ -125,6 +126,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, lateinit var keyManagement: E2ETestKeyManagementService var inNodeNetworkMapService: NetworkMapService? = null var inNodeNotaryService: NotaryService? = null + var inNodeWalletMonitorService: WalletMonitorService? = null lateinit var identity: IdentityService lateinit var net: MessagingServiceInternal lateinit var netMapCache: NetworkMapCache @@ -167,6 +169,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, wallet = makeWalletService() identity = makeIdentityService() + // Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because // the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with // the identity key. But the infrastructure to make that easy isn't here yet. @@ -186,6 +189,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, checkpointStorage, serverThread) + inNodeWalletMonitorService = makeWalletMonitorService() // Note this HAS to be after smm is set buildAdvertisedServices() // TODO: this model might change but for now it provides some de-coupling @@ -338,6 +342,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, // TODO: sort out ordering of open & protected modifiers of functions in this class. protected open fun makeWalletService(): WalletService = NodeWalletService(services) + protected open fun makeWalletMonitorService(): WalletMonitorService = WalletMonitorService(net, smm, services) + open fun stop() { // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the // network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop() diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt new file mode 100644 index 0000000000..3cc6fb8ef8 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt @@ -0,0 +1,86 @@ +package com.r3corda.node.services.monitor + +import com.r3corda.core.contracts.* +import com.r3corda.core.crypto.Party +import com.r3corda.core.serialization.OpaqueBytes +import com.r3corda.node.utilities.AddOrRemove +import java.security.PublicKey +import java.time.Instant +import java.util.* + +/** + * Events triggered by changes in the node, and sent to monitoring client(s). + */ +sealed class ServiceToClientEvent(val time: Instant) { + class OutputState(time: Instant, val consumed: Set<StateRef>, val produced: Set<StateAndRef<ContractState>>) : ServiceToClientEvent(time) + class StateMachine(time: Instant, val fiberId: Long, val label: String, val addOrRemove: AddOrRemove) : ServiceToClientEvent(time) + class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time) + class TransactionBuild(time: Instant, val id: UUID, val state: TransactionBuildResult) : ServiceToClientEvent(time) +} + +sealed class TransactionBuildResult { + /** + * State indicating the action undertaken has been completed (it was not complex enough to require a + * state machine starting). + * + * @param transaction the transaction created as a result. + */ + // TODO: We should have a consistent "Transaction your request triggered has been built" event, rather than these + // once-off results from a request. Unclear if that means all requests need to trigger a protocol state machine, + // so the client sees a consistent process, or if some other solution can be found. + class Complete(val transaction: SignedTransaction, val message: String?) : TransactionBuildResult() + + /** + * State indicating that a protocol is managing this request, and that the client should track protocol state machine + * updates for further information. The monitor will separately receive notification of the state machine having been + * added, as it would any other state machine. This response is used solely to enable the monitor to identify + * the state machine (and its progress) as associated with the request. + * + * @param transaction the transaction created as a result, in the case where the protocol has completed. + */ + class ProtocolStarted(val fiberId: Long, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult() + + /** + * State indicating the action undertaken failed, either directly (it is not something which requires a + * state machine), or before a state machine was started. + */ + class Failed(val message: String?) : TransactionBuildResult() +} + +/** + * A command from the monitoring client, to the node. + * + * @param id ID used to tag event(s) resulting from a command. + */ +sealed class ClientToServiceCommand(val id: UUID) { + // TODO: Replace with a generic event for starting a protocol which then passes back required information, rather + // than using an event for every conceivable action. + /** + * Issue cash state objects. + * + * @param currency the currency to issue. + * @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is + * to use the single byte "0x01" as a default. + * @param pennies the amount to issue, in the smallest unit of the currency. + * @param recipient the public key of the recipient. + * @param notary the notary to use for this transaction. + * @param id the ID to be provided in events resulting from this request. + */ + class IssueCash(val currency: Currency, + val issueRef: OpaqueBytes, + val pennies: Long, + val recipient: PublicKey, + val notary: Party, + id: UUID = UUID.randomUUID()) + : ClientToServiceCommand(id) + class PayCash(val tokenDef: Issued<Currency>, val pennies: Long, val owner: PublicKey, + id: UUID = UUID.randomUUID()) + : ClientToServiceCommand(id) + + /** + * @param id the ID to be provided in events resulting from this request. + */ + class ExitCash(val currency: Currency, val issueRef: OpaqueBytes, val pennies: Long, + id: UUID = UUID.randomUUID()) + : ClientToServiceCommand(id) +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt new file mode 100644 index 0000000000..e3eae18617 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/Messages.kt @@ -0,0 +1,18 @@ +package com.r3corda.node.services.monitor + +import com.r3corda.core.contracts.ContractState +import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.protocols.DirectRequestMessage + +data class RegisterRequest(override val replyToRecipient: SingleMessageRecipient, + override val sessionID: Long) : DirectRequestMessage + +data class RegisterResponse(val success: Boolean) +// TODO: This should have a shared secret the monitor was sent in the registration response, for security +data class DeregisterRequest(override val replyToRecipient: SingleMessageRecipient, + override val sessionID: Long) : DirectRequestMessage + +data class DeregisterResponse(val success: Boolean) +data class StateSnapshotMessage(val contractStates: Collection<ContractState>, val protocolStates: Collection<String>) + +data class ClientToServiceCommandMessage(override val sessionID: Long, override val replyToRecipient: SingleMessageRecipient, val command: ClientToServiceCommand) : DirectRequestMessage diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt new file mode 100644 index 0000000000..5b901af5fe --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt @@ -0,0 +1,225 @@ +package com.r3corda.node.services.monitor + +import co.paralleluniverse.common.util.VisibleForTesting +import com.google.common.util.concurrent.ListenableFuture +import com.r3corda.contracts.asset.Cash +import com.r3corda.contracts.asset.InsufficientBalanceException +import com.r3corda.core.contracts.* +import com.r3corda.core.crypto.toStringShort +import com.r3corda.core.messaging.Message +import com.r3corda.core.messaging.MessageRecipients +import com.r3corda.core.messaging.MessagingService +import com.r3corda.core.node.ServiceHub +import com.r3corda.core.node.services.DEFAULT_SESSION_ID +import com.r3corda.core.node.services.ServiceType +import com.r3corda.core.node.services.Wallet +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.serialization.serialize +import com.r3corda.core.utilities.loggerFor +import com.r3corda.node.services.api.AbstractNodeService +import com.r3corda.node.services.persistence.DataVending +import com.r3corda.node.services.statemachine.StateMachineManager +import com.r3corda.node.utilities.AddOrRemove +import org.slf4j.LoggerFactory +import java.security.KeyPair +import java.security.PublicKey +import java.time.Instant +import java.util.* +import javax.annotation.concurrent.ThreadSafe + +/** + * Service which allows external clients to monitor the wallet service and state machine manager, as well as trigger + * actions within the node. The service also sends requests for user input back to clients, for example to enter + * additional information while a protocol runs, or confirm an action. + * + * This is intended to enable a range of tools from end user UI to ops tools which monitor health across a number of nodes. + */ +// TODO: Implement authorization controls+ +// TODO: Replace this entirely with a publish/subscribe based solution on a to-be-written service (likely JMS or similar), +// rather than implement authentication and publish/subscribe ourselves. +// TODO: Clients need to be able to indicate whether they support interactivity (no point in sending requests for input +// to a monitoring tool) +@ThreadSafe +class WalletMonitorService(net: MessagingService, val smm: StateMachineManager, val services: ServiceHub) + : AbstractNodeService(net, services.networkMapCache) { + companion object { + val REGISTER_TOPIC = "platform.wallet_monitor.register" + val DEREGISTER_TOPIC = "platform.wallet_monitor.deregister" + val STATE_TOPIC = "platform.wallet_monitor.state_snapshot" + val IN_EVENT_TOPIC = "platform.wallet_monitor.in" + val OUT_EVENT_TOPIC = "platform.wallet_monitor.out" + + val logger = loggerFor<WalletMonitorService>() + } + + val listeners: MutableSet<RegisteredListener> = HashSet() + + data class RegisteredListener(val recipients: MessageRecipients, val sessionID: Long) + + init { + addMessageHandler(REGISTER_TOPIC) { req: RegisterRequest -> processRegisterRequest(req) } + addMessageHandler(DEREGISTER_TOPIC) { req: DeregisterRequest -> processDeregisterRequest(req) } + addMessageHandler(OUT_EVENT_TOPIC) { req: ClientToServiceCommandMessage -> processEventRequest(req) } + + // Notify listeners on state changes + services.walletService.updates.subscribe { update -> notifyWalletUpdate(update) } + smm.changes.subscribe { change -> + val fiberId: Long = change.third + val logic: ProtocolLogic<*> = change.first + val progressTracker = logic.progressTracker + + notifyEvent(ServiceToClientEvent.StateMachine(Instant.now(), fiberId, logic.javaClass.name, change.second)) + if (progressTracker != null) { + when (change.second) { + AddOrRemove.ADD -> progressTracker.changes.subscribe { progress -> + notifyEvent(ServiceToClientEvent.Progress(Instant.now(), fiberId, progress.toString())) + } + AddOrRemove.REMOVE -> { + // Nothing to do + } + } + } + } + } + + @VisibleForTesting + internal fun notifyWalletUpdate(update: Wallet.Update) + = notifyEvent(ServiceToClientEvent.OutputState(Instant.now(), update.consumed, update.produced)) + + private fun processEventRequest(reqMessage: ClientToServiceCommandMessage) { + val req = reqMessage.command + val result: TransactionBuildResult? = + try { + when (req) { + is ClientToServiceCommand.IssueCash -> issueCash(req) + is ClientToServiceCommand.PayCash -> initatePayment(req) + is ClientToServiceCommand.ExitCash -> exitCash(req) + else -> throw IllegalArgumentException("Unknown request type ${req.javaClass.name}") + } + } catch(ex: Exception) { + TransactionBuildResult.Failed(ex.message) + } + + // Send back any result from the event. Not all events (especially TransactionInput) produce a + // result. + if (result != null) { + val event = ServiceToClientEvent.TransactionBuild(Instant.now(), req.id, result) + val respMessage = net.createMessage(IN_EVENT_TOPIC, reqMessage.sessionID, + event.serialize().bits) + net.send(respMessage, reqMessage.getReplyTo(services.networkMapCache)) + } + } + + /** + * Process a request from a monitor to remove them from the subscribers. + */ + fun processDeregisterRequest(req: DeregisterRequest) { + val message: Message + try { + // TODO: Session ID should be managed by the messaging layer, so it handles ensuring that the + // request comes from the same endpoint that registered at the start. + listeners.remove(RegisteredListener(req.replyToRecipient, req.sessionID)) + message = net.createMessage(DEREGISTER_TOPIC, req.sessionID, DeregisterResponse(true).serialize().bits) + } catch (ex: IllegalStateException) { + message = net.createMessage(DEREGISTER_TOPIC, req.sessionID, DeregisterResponse(false).serialize().bits) + } + net.send(message, req.replyToRecipient) + } + + /** + * Process a request from a monitor to add them to the subscribers. This includes hooks to authenticate the request, + * but currently all requests pass (and there's no access control on wallets, so it has no actual meaning). + */ + fun processRegisterRequest(req: RegisterRequest) { + val message: Message + try { + message = net.createMessage(REGISTER_TOPIC, req.sessionID, RegisterResponse(true).serialize().bits) + listeners.add(RegisteredListener(req.replyToRecipient, req.sessionID)) + val stateMessage = StateSnapshotMessage(services.walletService.currentWallet.states.map { it.state.data }.toList(), + smm.allStateMachines.map { it.javaClass.name }) + net.send(net.createMessage(STATE_TOPIC, DEFAULT_SESSION_ID, stateMessage.serialize().bits), req.replyToRecipient) + } catch (ex: IllegalStateException) { + message = net.createMessage(REGISTER_TOPIC, req.sessionID, RegisterResponse(false).serialize().bits) + } + net.send(message, req.replyToRecipient) + } + + private fun notifyEvent(event: ServiceToClientEvent) = listeners.forEach { monitor -> + net.send(net.createMessage(IN_EVENT_TOPIC, monitor.sessionID, event.serialize().bits), + monitor.recipients) + } + + /** + * Notifies the node associated with the [recipient] public key. Returns a future holding a Boolean of whether the + * node accepted the transaction or not. + */ + private fun notifyRecipientAboutTransaction( + recipient: PublicKey, + transaction: SignedTransaction + ): ListenableFuture<Unit> { + val recipientNodeInfo = services.networkMapCache.getNodeByPublicKey(recipient) ?: throw PublicKeyLookupFailed(recipient) + return DataVending.Service.notify(net, services.storageService.myLegalIdentity, + recipientNodeInfo, transaction) + } + + // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service + private fun initatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult { + val builder: TransactionBuilder = TransactionType.General.Builder() + // TODO: Have some way of restricting this to states the caller controls + try { + Cash().generateSpend(builder, Amount(req.pennies, req.tokenDef.product), req.owner, + services.walletService.currentWallet.statesOfType<Cash.State>(), + setOf(req.tokenDef.issuer.party)) + .forEach { + val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}") + builder.signWith(KeyPair(it, key)) + } + val tx = builder.toSignedTransaction() + services.walletService.notify(tx.tx) + notifyRecipientAboutTransaction(req.owner, tx) + return TransactionBuildResult.Complete(tx, "Cash payment completed") + } catch(ex: InsufficientBalanceException) { + return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance") + } + } + + // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service + private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult { + val builder: TransactionBuilder = TransactionType.General.Builder() + val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef) + Cash().generateExit(builder, Amount(req.pennies, Issued(issuer, req.currency)), + issuer.party.owningKey, services.walletService.currentWallet.statesOfType<Cash.State>()) + builder.signWith(services.storageService.myLegalIdentityKey) + val tx = builder.toSignedTransaction() + services.walletService.notify(tx.tx) + // Notify the owners + val inputStatesNullable = services.walletService.statesForRefs(tx.tx.inputs) + val inputStates = inputStatesNullable.values.filterNotNull().map { it.data } + if (inputStatesNullable.size != inputStates.size) { + val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key } + throw InputStateRefResolveFailed(unresolvedStateRefs) + } + inputStates.filterIsInstance<Cash.State>().map { it.owner }.toSet().forEach { + notifyRecipientAboutTransaction(it, tx) + } + return TransactionBuildResult.Complete(tx, "Cash destruction completed") + } + + // TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service + private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult { + val builder: TransactionBuilder = TransactionType.General.Builder(notary = req.notary) + val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef) + Cash().generateIssue(builder, Amount(req.pennies, Issued(issuer, req.currency)), req.recipient, req.notary) + builder.signWith(services.storageService.myLegalIdentityKey) + val tx = builder.toSignedTransaction() + services.walletService.notify(tx.tx) + notifyRecipientAboutTransaction(req.recipient, tx) + return TransactionBuildResult.Complete(tx, "Cash issuance completed") + } + + class PublicKeyLookupFailed(failedPublicKey: PublicKey) : + Exception("Failed to lookup public keys $failedPublicKey") + + class InputStateRefResolveFailed(stateRefs: List<StateRef>) : + Exception("Failed to resolve input StateRefs $stateRefs") +} diff --git a/node/src/test/kotlin/com/r3corda/node/services/WalletMonitorServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/WalletMonitorServiceTests.kt new file mode 100644 index 0000000000..853a52a641 --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/services/WalletMonitorServiceTests.kt @@ -0,0 +1,209 @@ +package com.r3corda.node.services + +import co.paralleluniverse.fibers.Suspendable +import com.r3corda.contracts.asset.Cash +import com.r3corda.core.contracts.* +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.crypto.newSecureRandom +import com.r3corda.core.messaging.MessageHandlerRegistration +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.DEFAULT_SESSION_ID +import com.r3corda.core.node.services.Wallet +import com.r3corda.core.protocols.ProtocolLogic +import com.r3corda.core.random63BitValue +import com.r3corda.core.serialization.OpaqueBytes +import com.r3corda.core.serialization.deserialize +import com.r3corda.core.serialization.serialize +import com.r3corda.core.testing.DUMMY_NOTARY +import com.r3corda.core.testing.DUMMY_PUBKEY_1 +import com.r3corda.node.internal.testing.MockNetwork +import com.r3corda.node.services.monitor.* +import org.junit.Before +import org.junit.Test +import java.util.* +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue +import kotlin.test.fail + +/** + * Unit tests for the wallet monitoring service. + */ +class WalletMonitorServiceTests { + lateinit var network: MockNetwork + + @Before + fun setup() { + network = MockNetwork() + } + + /** + * Authenticate the register node with the monitor service node. + */ + private fun authenticate(monitorServiceNode: MockNetwork.MockNode, registerNode: MockNetwork.MockNode): Long { + network.runNetwork() + val sessionID = random63BitValue() + val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC, + TestRegisterPSM(monitorServiceNode.info, sessionID)) + network.runNetwork() + authenticatePsm.get(1, TimeUnit.SECONDS) + return sessionID + } + + class TestReceiveWalletUpdatePSM(val sessionID: Long) + : ProtocolLogic<ServiceToClientEvent.OutputState>() { + override val topic: String get() = WalletMonitorService.IN_EVENT_TOPIC + @Suspendable + override fun call(): ServiceToClientEvent.OutputState + = receive<ServiceToClientEvent.OutputState>(sessionID).validate { it } + } + + class TestRegisterPSM(val server: NodeInfo, val sessionID: Long) + : ProtocolLogic<RegisterResponse>() { + override val topic: String get() = WalletMonitorService.REGISTER_TOPIC + @Suspendable + override fun call(): RegisterResponse { + val req = RegisterRequest(serviceHub.networkService.myAddress, sessionID) + return sendAndReceive<RegisterResponse>(server.identity, 0, sessionID, req).validate { it } + } + } + + /** + * Test a very simple case of trying to register against the service. + */ + @Test + fun `success with network`() { + val (monitorServiceNode, registerNode) = network.createTwoNodes() + + network.runNetwork() + val sessionID = random63BitValue() + val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC, + TestRegisterPSM(monitorServiceNode.info, sessionID)) + network.runNetwork() + val result = authenticatePsm.get(1, TimeUnit.SECONDS) + assertTrue(result.success) + } + + /** + * Test that having registered, changes are relayed correctly. + */ + @Test + fun `event received`() { + val (monitorServiceNode, registerNode) = network.createTwoNodes() + val sessionID = authenticate(monitorServiceNode, registerNode) + var receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC, + TestReceiveWalletUpdatePSM(sessionID)) + var expected = Wallet.Update(emptySet(), emptySet()) + monitorServiceNode.inNodeWalletMonitorService!!.notifyWalletUpdate(expected) + network.runNetwork() + var actual = receivePsm.get(1, TimeUnit.SECONDS) + assertEquals(expected.consumed, actual.consumed) + assertEquals(expected.produced, actual.produced) + + // Check that states are passed through correctly + receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC, + TestReceiveWalletUpdatePSM(sessionID)) + val consumed = setOf(StateRef(SecureHash.randomSHA256(), 0)) + val producedState = TransactionState(DummyContract.SingleOwnerState(newSecureRandom().nextInt(), DUMMY_PUBKEY_1), DUMMY_NOTARY) + val produced = setOf(StateAndRef(producedState, StateRef(SecureHash.randomSHA256(), 0))) + expected = Wallet.Update(consumed, produced) + monitorServiceNode.inNodeWalletMonitorService!!.notifyWalletUpdate(expected) + network.runNetwork() + actual = receivePsm.get(1, TimeUnit.SECONDS) + assertEquals(expected.produced, actual.produced) + assertEquals(expected.consumed, actual.consumed) + } + + @Test + fun `cash issue accepted`() { + val (monitorServiceNode, registerNode) = network.createTwoNodes() + val sessionID = authenticate(monitorServiceNode, registerNode) + val quantity = 1000L + val events = Collections.synchronizedList(ArrayList<ServiceToClientEvent>()) + val ref = OpaqueBytes(ByteArray(1) {1}) + + registerNode.net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC + ".0") { msg, reg -> + events.add(msg.data.deserialize<ServiceToClientEvent>()) + } + + // Check the monitoring service wallet is empty + assertFalse(monitorServiceNode.services.walletService.currentWallet.states.iterator().hasNext()) + + // Tell the monitoring service node to issue some cash + val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public + val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY) + val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID, + ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits) + registerNode.net.send(message, monitorServiceNode.net.myAddress) + network.runNetwork() + + // Check we've received a response + events.forEach { event -> + when (event) { + is ServiceToClientEvent.TransactionBuild -> { + // Check the returned event is correct + val actual = event.state as TransactionBuildResult.Complete + val expected = TransactionBuildResult.Complete(actual.transaction, null) + assertEquals(expected, actual) + } + is ServiceToClientEvent.OutputState -> { + // Check the generated state is correct + val actual = event.produced.single().state.data + val expected = Cash.State(Amount(quantity, + Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)), + recipientKey) + assertEquals(expected, actual) + } + else -> fail("Unexpected in event ${event}") + } + } + } + + @Test + fun `cash move accepted`() { + val (monitorServiceNode, registerNode) = network.createTwoNodes() + val sessionID = authenticate(monitorServiceNode, registerNode) + val quantity = 1000L + val events = Collections.synchronizedList(ArrayList<ServiceToClientEvent>()) + val ref = OpaqueBytes(ByteArray(1) {1}) + var handlerReg: MessageHandlerRegistration? = null + + registerNode.net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC + ".0") { msg, reg -> + events.add(msg.data.deserialize<ServiceToClientEvent>()) + handlerReg = reg + } + + // Check the monitoring service wallet is empty + assertFalse(monitorServiceNode.services.walletService.currentWallet.states.iterator().hasNext()) + + // Tell the monitoring service node to issue some cash + val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public + val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY) + val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID, + ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits) + registerNode.net.send(message, monitorServiceNode.net.myAddress) + network.runNetwork() + + // Check we've received a response + events.forEach { event -> + when (event) { + is ServiceToClientEvent.TransactionBuild -> { + // Check the returned event is correct + val actual = event.state as TransactionBuildResult.Complete + val expected = TransactionBuildResult.Complete(actual.transaction, null) + assertEquals(expected, actual) + } + is ServiceToClientEvent.OutputState -> { + // Check the generated state is correct + val actual = event.produced.single().state.data + val expected = Cash.State(Amount(quantity, + Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)), + recipientKey) + assertEquals(expected, actual) + } + else -> fail("Unexpected in event ${event}") + } + } + } +} \ No newline at end of file