client: Add stubs/todo for handling state snapshots

This commit is contained in:
Andras Slemmer 2016-09-06 12:29:23 +01:00
parent 3acbda53e4
commit e235375a89
4 changed files with 20 additions and 9 deletions

View File

@ -24,7 +24,8 @@ class WalletMonitorClient(
val net: MessagingService, val net: MessagingService,
val node: NodeInfo, val node: NodeInfo,
val outEvents: Observable<ClientToServiceCommand>, val outEvents: Observable<ClientToServiceCommand>,
val inEvents: Observer<ServiceToClientEvent> val inEvents: Observer<ServiceToClientEvent>,
val snapshot: Observer<StateSnapshotMessage>
) { ) {
private val sessionID = random63BitValue() private val sessionID = random63BitValue()
@ -37,8 +38,10 @@ class WalletMonitorClient(
net.removeMessageHandler(reg) net.removeMessageHandler(reg)
future.set(resp.success) future.set(resp.success)
} }
net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, req -> net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, reg ->
// TODO val snapshotMessage = msg.data.deserialize<StateSnapshotMessage>()
net.removeMessageHandler(reg)
snapshot.onNext(snapshotMessage)
} }
net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg -> net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg ->

View File

@ -6,6 +6,7 @@ import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.contracts.StateRef import com.r3corda.core.contracts.StateRef
import com.r3corda.client.fxutils.foldToObservableList import com.r3corda.client.fxutils.foldToObservableList
import com.r3corda.node.services.monitor.ServiceToClientEvent import com.r3corda.node.services.monitor.ServiceToClientEvent
import com.r3corda.node.services.monitor.StateSnapshotMessage
import javafx.collections.ObservableList import javafx.collections.ObservableList
import kotlinx.support.jdk8.collections.removeIf import kotlinx.support.jdk8.collections.removeIf
import rx.Observable import rx.Observable
@ -20,10 +21,12 @@ class StatesDiff<out T : ContractState>(
*/ */
class ContractStateModel { class ContractStateModel {
private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient) private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient)
private val snapshot: Observable<StateSnapshotMessage> by observable(WalletMonitorModel::snapshot)
private val outputStates = serviceToClient.ofType(ServiceToClientEvent.OutputState::class.java) private val outputStates = serviceToClient.ofType(ServiceToClientEvent.OutputState::class.java)
val contractStatesDiff = outputStates.map { StatesDiff(it.produced, it.consumed) } val contractStatesDiff = outputStates.map { StatesDiff(it.produced, it.consumed) }
// We filter the diff first rather than the complete contract state list. // We filter the diff first rather than the complete contract state list.
// TODO wire up snapshot once it holds StateAndRefs
val cashStatesDiff = contractStatesDiff.map { val cashStatesDiff = contractStatesDiff.map {
StatesDiff(it.added.filterIsInstance<StateAndRef<Cash.State>>(), it.removed) StatesDiff(it.added.filterIsInstance<StateAndRef<Cash.State>>(), it.removed)
} }

View File

@ -5,6 +5,7 @@ import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.node.services.monitor.ServiceToClientEvent import com.r3corda.node.services.monitor.ServiceToClientEvent
import com.r3corda.node.services.monitor.StateSnapshotMessage
import rx.Observable import rx.Observable
import rx.Observer import rx.Observer
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
@ -19,6 +20,9 @@ class WalletMonitorModel {
private val serviceToClientSource = PublishSubject.create<ServiceToClientEvent>() private val serviceToClientSource = PublishSubject.create<ServiceToClientEvent>()
val serviceToClient: Observable<ServiceToClientEvent> = serviceToClientSource val serviceToClient: Observable<ServiceToClientEvent> = serviceToClientSource
private val snapshotSource = PublishSubject.create<StateSnapshotMessage>()
val snapshot: Observable<StateSnapshotMessage> = snapshotSource
/** /**
* Register for updates to/from a given wallet. * Register for updates to/from a given wallet.
* @param messagingService The messaging to use for communication. * @param messagingService The messaging to use for communication.
@ -30,7 +34,8 @@ class WalletMonitorModel {
messagingService, messagingService,
walletMonitorNodeInfo, walletMonitorNodeInfo,
clientToServiceSource, clientToServiceSource,
serviceToClientSource serviceToClientSource,
snapshotSource
) )
require(monitorClient.register().get()) require(monitorClient.register().get())
} }

View File

@ -15,9 +15,9 @@ import org.slf4j.LoggerFactory
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import kotlin.test.fail import kotlin.test.fail
val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java) val log: Logger = LoggerFactory.getLogger(WalletMonitorClientTests::class.java)
class WalletMonitorServiceTests { class WalletMonitorClientTests {
@Test @Test
fun cashIssueWorksEndToEnd() { fun cashIssueWorksEndToEnd() {
driver { driver {
@ -34,7 +34,7 @@ class WalletMonitorServiceTests {
val aliceInStream = PublishSubject.create<ServiceToClientEvent>() val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>() val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
require(aliceMonitorClient.register().get()) require(aliceMonitorClient.register().get())
aliceOutStream.onNext(ClientToServiceCommand.IssueCash( aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
@ -77,7 +77,7 @@ class WalletMonitorServiceTests {
val aliceInStream = PublishSubject.create<ServiceToClientEvent>() val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>() val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
require(aliceMonitorClient.register().get()) require(aliceMonitorClient.register().get())
aliceOutStream.onNext(ClientToServiceCommand.IssueCash( aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
@ -185,7 +185,7 @@ class WalletMonitorServiceTests {
val aliceInStream = PublishSubject.create<ServiceToClientEvent>() val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>() val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream) val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
require(aliceMonitorClient.register().get()) require(aliceMonitorClient.register().get())
aliceOutStream.onNext(ClientToServiceCommand.IssueCash( aliceOutStream.onNext(ClientToServiceCommand.IssueCash(