From e235375a89b5bd9088a91a86f66ecd821e51545b Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 6 Sep 2016 12:29:23 +0100 Subject: [PATCH] client: Add stubs/todo for handling state snapshots --- .../kotlin/com/r3corda/client/WalletMonitorClient.kt | 9 ++++++--- .../com/r3corda/client/model/ContractStateModel.kt | 3 +++ .../com/r3corda/client/model/WalletMonitorModel.kt | 7 ++++++- .../com/r3corda/client/WalletMonitorClientTests.kt | 10 +++++----- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt b/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt index bc48f8ed06..02efa1741c 100644 --- a/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt +++ b/client/src/main/kotlin/com/r3corda/client/WalletMonitorClient.kt @@ -24,7 +24,8 @@ class WalletMonitorClient( val net: MessagingService, val node: NodeInfo, val outEvents: Observable, - val inEvents: Observer + val inEvents: Observer, + val snapshot: Observer ) { private val sessionID = random63BitValue() @@ -37,8 +38,10 @@ class WalletMonitorClient( net.removeMessageHandler(reg) future.set(resp.success) } - net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, req -> - // TODO + net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, reg -> + val snapshotMessage = msg.data.deserialize() + net.removeMessageHandler(reg) + snapshot.onNext(snapshotMessage) } net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg -> diff --git a/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt index cba8c774f1..a16225c6d1 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt @@ -6,6 +6,7 @@ import com.r3corda.core.contracts.StateAndRef import com.r3corda.core.contracts.StateRef import com.r3corda.client.fxutils.foldToObservableList import com.r3corda.node.services.monitor.ServiceToClientEvent +import com.r3corda.node.services.monitor.StateSnapshotMessage import javafx.collections.ObservableList import kotlinx.support.jdk8.collections.removeIf import rx.Observable @@ -20,10 +21,12 @@ class StatesDiff( */ class ContractStateModel { private val serviceToClient: Observable by observable(WalletMonitorModel::serviceToClient) + private val snapshot: Observable by observable(WalletMonitorModel::snapshot) private val outputStates = serviceToClient.ofType(ServiceToClientEvent.OutputState::class.java) val contractStatesDiff = outputStates.map { StatesDiff(it.produced, it.consumed) } // We filter the diff first rather than the complete contract state list. + // TODO wire up snapshot once it holds StateAndRefs val cashStatesDiff = contractStatesDiff.map { StatesDiff(it.added.filterIsInstance>(), it.removed) } diff --git a/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt b/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt index beac467349..5aba8756b0 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt @@ -5,6 +5,7 @@ import com.r3corda.core.contracts.ClientToServiceCommand import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.NodeInfo import com.r3corda.node.services.monitor.ServiceToClientEvent +import com.r3corda.node.services.monitor.StateSnapshotMessage import rx.Observable import rx.Observer import rx.subjects.PublishSubject @@ -19,6 +20,9 @@ class WalletMonitorModel { private val serviceToClientSource = PublishSubject.create() val serviceToClient: Observable = serviceToClientSource + private val snapshotSource = PublishSubject.create() + val snapshot: Observable = snapshotSource + /** * Register for updates to/from a given wallet. * @param messagingService The messaging to use for communication. @@ -30,7 +34,8 @@ class WalletMonitorModel { messagingService, walletMonitorNodeInfo, clientToServiceSource, - serviceToClientSource + serviceToClientSource, + snapshotSource ) require(monitorClient.register().get()) } diff --git a/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt index fb88da5fb0..bc7d228089 100644 --- a/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt +++ b/client/src/test/kotlin/com/r3corda/client/WalletMonitorClientTests.kt @@ -15,9 +15,9 @@ import org.slf4j.LoggerFactory import rx.subjects.PublishSubject import kotlin.test.fail -val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java) +val log: Logger = LoggerFactory.getLogger(WalletMonitorClientTests::class.java) -class WalletMonitorServiceTests { +class WalletMonitorClientTests { @Test fun cashIssueWorksEndToEnd() { driver { @@ -34,7 +34,7 @@ class WalletMonitorServiceTests { val aliceInStream = PublishSubject.create() val aliceOutStream = PublishSubject.create() - val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) + val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create()) require(aliceMonitorClient.register().get()) aliceOutStream.onNext(ClientToServiceCommand.IssueCash( @@ -77,7 +77,7 @@ class WalletMonitorServiceTests { val aliceInStream = PublishSubject.create() val aliceOutStream = PublishSubject.create() - val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) + val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create()) require(aliceMonitorClient.register().get()) aliceOutStream.onNext(ClientToServiceCommand.IssueCash( @@ -185,7 +185,7 @@ class WalletMonitorServiceTests { val aliceInStream = PublishSubject.create() val aliceOutStream = PublishSubject.create() - val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) + val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create()) require(aliceMonitorClient.register().get()) aliceOutStream.onNext(ClientToServiceCommand.IssueCash(