Merged in aslemmer-node-client-models (pull request #307)

Aslemmer node client models
This commit is contained in:
Andras Slemmer 2016-09-08 12:31:35 +01:00
commit abe9cfbd67
13 changed files with 836 additions and 32 deletions

View File

@ -24,7 +24,8 @@ class WalletMonitorClient(
val net: MessagingService,
val node: NodeInfo,
val outEvents: Observable<ClientToServiceCommand>,
val inEvents: Observer<ServiceToClientEvent>
val inEvents: Observer<ServiceToClientEvent>,
val snapshot: Observer<StateSnapshotMessage>
) {
private val sessionID = random63BitValue()
@ -37,8 +38,10 @@ class WalletMonitorClient(
net.removeMessageHandler(reg)
future.set(resp.success)
}
net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, req ->
// TODO
net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, reg ->
val snapshotMessage = msg.data.deserialize<StateSnapshotMessage>()
net.removeMessageHandler(reg)
snapshot.onNext(snapshotMessage)
}
net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg ->

View File

@ -0,0 +1,133 @@
package com.r3corda.client.fxutils
import javafx.collections.FXCollections
import javafx.collections.ListChangeListener
import javafx.collections.ObservableList
import javafx.collections.transformation.TransformationList
import kotlin.comparisons.compareValues
/**
* Given an [ObservableList]<[E]> and a grouping key [K], [AggregatedList] groups the elements by the key into a fresh
* [ObservableList]<[E]> for each group and exposes the groups as an observable list of [A]s by calling [assemble] on each.
*
* Changes done to elements of the input list are reflected in the observable list of the respective group, whereas
* additions/removals of elements in the underlying list are reflected in the exposed [ObservableList]<[A]> by
* adding/deleting aggregations as expected.
*
* The ordering of the exposed list is based on the [hashCode] of keys.
*
* Example:
* val statesGroupedByCurrency = AggregatedList(states, { state -> state.currency }) { currency, group ->
* object {
* val currency = currency
* val states = group
* }
* }
*
* The above creates an observable list of (currency, statesOfCurrency) pairs.
*
* Note that update events to the source list are discarded, assuming the key of elements does not change.
* TODO Should we handle this case? It requires additional bookkeeping of sourceIndex->(aggregationIndex, groupIndex)
*
* @param list The underlying list.
* @param toKey Function to extract the key from an element.
* @param assemble Function to assemble the aggregation into the exposed [A].
*/
class AggregatedList<A, E, K : Any>(
list: ObservableList<out E>,
val toKey: (E) -> K,
val assemble: (K, ObservableList<E>) -> A
) : TransformationList<A, E>(list) {
private class AggregationGroup<E, out A>(
val keyHashCode: Int,
val value: A,
val elements: ObservableList<E>
)
// Invariant: sorted by K.hashCode()
private val aggregationList = mutableListOf<AggregationGroup<E, A>>()
init {
list.forEach { addItem(it) }
}
override fun get(index: Int): A? = aggregationList.getOrNull(index)?.value
/**
* We cannot implement this as aggregations are one to many
*/
override fun getSourceIndex(index: Int): Int {
throw UnsupportedOperationException()
}
override val size: Int get() = aggregationList.size
override fun sourceChanged(c: ListChangeListener.Change<out E>) {
beginChange()
while (c.next()) {
if (c.wasPermutated()) {
// Permutation should not change aggregation
} else if (c.wasUpdated()) {
// Update should not change aggregation
} else {
for (removedSourceItem in c.removed) {
val removedPair = removeItem(removedSourceItem)
if (removedPair != null) {
nextRemove(removedPair.first, removedPair.second.value)
}
}
for (addedItem in c.addedSubList) {
val insertIndex = addItem(addedItem)
if (insertIndex != null) {
nextAdd(insertIndex, insertIndex + 1)
}
}
}
}
endChange()
}
private fun removeItem(removedItem: E): Pair<Int, AggregationGroup<E, A>>? {
val key = toKey(removedItem)
val keyHashCode = key.hashCode()
val index = aggregationList.binarySearch(
comparison = { group -> compareValues(keyHashCode, group.keyHashCode.hashCode()) }
)
if (index < 0) {
throw IllegalStateException("Removed element $removedItem does not map to an existing aggregation")
} else {
val aggregationGroup = aggregationList[index]
if (aggregationGroup.elements.size == 1) {
return Pair(index, aggregationList.removeAt(index))
}
aggregationGroup.elements.remove(removedItem)
}
return null
}
private fun addItem(addedItem: E): Int? {
val key = toKey(addedItem)
val keyHashCode = key.hashCode()
val index = aggregationList.binarySearch(
comparison = { group -> compareValues(keyHashCode, group.keyHashCode.hashCode()) }
)
if (index < 0) {
// New aggregation
val observableGroupElements = FXCollections.observableArrayList<E>()
observableGroupElements.add(addedItem)
val aggregationGroup = AggregationGroup(
keyHashCode = keyHashCode,
value = assemble(key, observableGroupElements),
elements = observableGroupElements
)
val insertIndex = -index - 1
aggregationList.add(insertIndex, aggregationGroup)
return insertIndex
} else {
aggregationList[index].elements.add(addedItem)
return null
}
}
}

View File

@ -0,0 +1,51 @@
package com.r3corda.client.fxutils
import com.r3corda.client.model.ExchangeRate
import com.r3corda.core.contracts.Amount
import javafx.beans.binding.Bindings
import javafx.beans.value.ObservableValue
import javafx.collections.ObservableList
import kotlinx.support.jdk8.collections.stream
import org.fxmisc.easybind.EasyBind
import java.util.*
import java.util.stream.Collectors
/**
* Utility bindings for the [Amount] type, similar in spirit to [Bindings]
*/
object AmountBindings {
fun <T> sum(amounts: ObservableList<Amount<T>>, token: T) = EasyBind.map(
Bindings.createLongBinding({
amounts.stream().collect(Collectors.summingLong {
require(it.token == token)
it.quantity
})
}, arrayOf(amounts))
) { sum -> Amount(sum.toLong(), token) }
fun exchange(
currency: ObservableValue<Currency>,
exchangeRate: ObservableValue<ExchangeRate>
): ObservableValue<Pair<Currency, (Amount<Currency>) -> Long>> {
return EasyBind.combine(currency, exchangeRate) { currency, exchangeRate ->
Pair(currency) { amount: Amount<Currency> ->
(exchangeRate.rate(amount.token, currency) * amount.quantity).toLong()
}
}
}
fun sumAmountExchange(
amounts: ObservableList<Amount<Currency>>,
currency: ObservableValue<Currency>,
exchangeRate: ObservableValue<ExchangeRate>
): ObservableValue<Amount<Currency>> {
return EasyBind.monadic(exchange(currency, exchangeRate)).flatMap {
val (currencyValue, exchange: (Amount<Currency>) -> Long) = it
EasyBind.map(
Bindings.createLongBinding({
amounts.stream().collect(Collectors.summingLong { exchange(it) })
} , arrayOf(amounts))
) { Amount(it.toLong(), currencyValue) }
}
}
}

View File

@ -0,0 +1,61 @@
package com.r3corda.client.fxutils
import javafx.beans.Observable
import javafx.beans.value.ObservableValue
import javafx.collections.ListChangeListener
import javafx.collections.ObservableList
import javafx.collections.ObservableListBase
/**
* [ChosenList] manages an [ObservableList] that may be changed by the wrapping [ObservableValue]. Whenever the underlying
* [ObservableValue] changes the exposed list changes to the new value. Changes to the list are simply propagated.
*
* Example:
* val filteredStates = ChosenList(EasyBind.map(filterCriteriaType) { type ->
* when (type) {
* is (ByCurrency) -> statesFilteredByCurrency
* is (ByIssuer) -> statesFilteredByIssuer
* }
* })
*
* The above will create a list that chooses and delegates to the appropriate filtered list based on the type of filter.
*/
class ChosenList<E>(
private val chosenListObservable: ObservableValue<ObservableList<E>>
): ObservableListBase<E>() {
private var currentList = chosenListObservable.value
private val listener = object : ListChangeListener<E> {
override fun onChanged(change: ListChangeListener.Change<out E>) = fireChange(change)
}
init {
chosenListObservable.addListener { observable: Observable -> rechoose() }
currentList.addListener(listener)
beginChange()
nextAdd(0, currentList.size)
endChange()
}
override fun get(index: Int) = currentList.get(index)
override val size: Int get() = currentList.size
private fun rechoose() {
val chosenList = chosenListObservable.value
if (currentList != chosenList) {
pick(chosenList)
}
}
private fun pick(list: ObservableList<E>) {
currentList.removeListener(listener)
list.addListener(listener)
beginChange()
nextRemove(0, currentList)
currentList = list
nextAdd(0, list.size)
endChange()
}
}

View File

@ -0,0 +1,38 @@
package com.r3corda.client.fxutils
import javafx.application.Platform
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import rx.Observable
/**
* Simple utilities for converting an [rx.Observable] into a javafx [ObservableValue]/[ObservableList]
*/
fun <A, B> Observable<A>.foldToObservableValue(initial: B, folderFun: (A, B) -> B): ObservableValue<B> {
val result = SimpleObjectProperty<B>(initial)
subscribe {
Platform.runLater {
result.set(folderFun(it, result.get()))
}
}
return result
}
fun <A, B, C> Observable<A>.foldToObservableList(
initialAccumulator: C, folderFun: (A, C, ObservableList<B>) -> C
): ObservableList<B> {
val result = FXCollections.observableArrayList<B>()
/**
* This capture is fine, as [Platform.runLater] runs closures in order
*/
var currentAccumulator = initialAccumulator
subscribe {
Platform.runLater {
currentAccumulator = folderFun(it, currentAccumulator, result)
}
}
return result
}

View File

@ -0,0 +1,39 @@
package com.r3corda.client.model
import com.r3corda.contracts.asset.Cash
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.contracts.StateRef
import com.r3corda.client.fxutils.foldToObservableList
import com.r3corda.node.services.monitor.ServiceToClientEvent
import com.r3corda.node.services.monitor.StateSnapshotMessage
import javafx.collections.ObservableList
import kotlinx.support.jdk8.collections.removeIf
import rx.Observable
class StatesDiff<out T : ContractState>(
val added: Collection<StateAndRef<T>>,
val removed: Collection<StateRef>
)
/**
* This model exposes the list of owned contract states.
*/
class ContractStateModel {
private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient)
private val snapshot: Observable<StateSnapshotMessage> by observable(WalletMonitorModel::snapshot)
private val outputStates = serviceToClient.ofType(ServiceToClientEvent.OutputState::class.java)
val contractStatesDiff = outputStates.map { StatesDiff(it.produced, it.consumed) }
// We filter the diff first rather than the complete contract state list.
// TODO wire up snapshot once it holds StateAndRefs
val cashStatesDiff = contractStatesDiff.map {
StatesDiff(it.added.filterIsInstance<StateAndRef<Cash.State>>(), it.removed)
}
val cashStates: ObservableList<StateAndRef<Cash.State>> =
cashStatesDiff.foldToObservableList(Unit) { statesDiff, _accumulator, observableList ->
observableList.removeIf { it.ref in statesDiff.removed }
observableList.addAll(statesDiff.added)
}
}

View File

@ -0,0 +1,25 @@
package com.r3corda.client.model
import com.r3corda.core.contracts.Amount
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.value.ObservableValue
import java.util.*
interface ExchangeRate {
fun rate(from: Currency, to: Currency): Double
}
fun ExchangeRate.exchangeAmount(amount: Amount<Currency>, to: Currency) =
Amount(exchangeDouble(amount, to).toLong(), to)
fun ExchangeRate.exchangeDouble(amount: Amount<Currency>, to: Currency) =
rate(amount.token, to) * amount.quantity
/**
* This model provides an exchange rate from arbitrary currency to arbitrary currency.
* TODO hook up an actual oracle
*/
class ExchangeRateModel {
val exchangeRate: ObservableValue<ExchangeRate> = SimpleObjectProperty<ExchangeRate>(object : ExchangeRate {
override fun rate(from: Currency, to: Currency) = 1.0
})
}

View File

@ -0,0 +1,172 @@
package com.r3corda.client.model
import com.r3corda.client.fxutils.foldToObservableList
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.services.monitor.ServiceToClientEvent
import com.r3corda.node.services.monitor.TransactionBuildResult
import com.r3corda.node.utilities.AddOrRemove
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.value.ObservableValue
import javafx.collections.ObservableList
import rx.Observable
import java.time.Instant
import java.util.UUID
interface GatheredTransactionData {
val fiberId: ObservableValue<Long?>
val uuid: ObservableValue<UUID?>
val protocolName: ObservableValue<String?>
val protocolStatus: ObservableValue<ProtocolStatus?>
val transaction: ObservableValue<SignedTransaction?>
val status: ObservableValue<TransactionCreateStatus?>
val lastUpdate: ObservableValue<Instant>
}
sealed class TransactionCreateStatus(val message: String?) {
class Started(message: String?) : TransactionCreateStatus(message)
class Failed(message: String?) : TransactionCreateStatus(message)
override fun toString(): String = message ?: javaClass.simpleName
}
sealed class ProtocolStatus(val status: String?) {
object Added: ProtocolStatus(null)
object Removed: ProtocolStatus(null)
class InProgress(status: String): ProtocolStatus(status)
override fun toString(): String = status ?: javaClass.simpleName
}
data class GatheredTransactionDataWritable(
override val fiberId: SimpleObjectProperty<Long?> = SimpleObjectProperty(null),
override val uuid: SimpleObjectProperty<UUID?> = SimpleObjectProperty(null),
override val protocolName: SimpleObjectProperty<String?> = SimpleObjectProperty(null),
override val protocolStatus: SimpleObjectProperty<ProtocolStatus?> = SimpleObjectProperty(null),
override val transaction: SimpleObjectProperty<SignedTransaction?> = SimpleObjectProperty(null),
override val status: SimpleObjectProperty<TransactionCreateStatus?> = SimpleObjectProperty(null),
override val lastUpdate: SimpleObjectProperty<Instant>
) : GatheredTransactionData
/**
* This model provides an observable list of states relating to the creation of a transaction not yet on ledger.
*/
class GatheredTransactionDataModel {
private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient)
/**
* Aggregation of updates to transactions. We use the observable list as the only container and do linear search for
* matching transactions because we have two keys(fiber ID and UUID) and this way it's easier to avoid syncing issues.
*
* The Fiber ID is used to identify events that relate to the same transaction server-side, whereas the UUID is
* generated on the UI and is used to identify events with the UI action that triggered them. Currently a UUID is
* generated for each outgoing [ClientToServiceCommand].
*
* TODO: Make this more efficient by maintaining and syncing two maps (for the two keys) in the accumulator
* (Note that a transaction may be mapped by one or both)
* TODO: Expose a writable stream to combine [serviceToClient] with to allow recording of transactions made locally(UUID)
*/
val gatheredGatheredTransactionDataList: ObservableList<out GatheredTransactionData> =
serviceToClient.foldToObservableList<ServiceToClientEvent, GatheredTransactionDataWritable, Unit>(
initialAccumulator = Unit,
folderFun = { serviceToClientEvent, _unit, transactionStates ->
return@foldToObservableList when (serviceToClientEvent) {
is ServiceToClientEvent.Transaction -> {
// TODO handle this once we have some id to associate the tx with
}
is ServiceToClientEvent.OutputState -> {}
is ServiceToClientEvent.StateMachine -> {
newFiberIdTransactionStateOrModify(transactionStates,
fiberId = serviceToClientEvent.fiberId,
lastUpdate = serviceToClientEvent.time,
tweak = {
protocolName.set(serviceToClientEvent.label)
protocolStatus.set(when (serviceToClientEvent.addOrRemove) {
AddOrRemove.ADD -> ProtocolStatus.Added
AddOrRemove.REMOVE -> ProtocolStatus.Removed
})
}
)
}
is ServiceToClientEvent.Progress -> {
newFiberIdTransactionStateOrModify(transactionStates,
fiberId = serviceToClientEvent.fiberId,
lastUpdate = serviceToClientEvent.time,
tweak = {
protocolStatus.set(ProtocolStatus.InProgress(serviceToClientEvent.message))
}
)
}
is ServiceToClientEvent.TransactionBuild -> {
val state = serviceToClientEvent.state
newUuidTransactionStateOrModify(transactionStates,
uuid = serviceToClientEvent.id,
fiberId = when (state) {
is TransactionBuildResult.ProtocolStarted -> state.fiberId
is TransactionBuildResult.Failed -> null
},
lastUpdate = serviceToClientEvent.time,
tweak = {
return@newUuidTransactionStateOrModify when (state) {
is TransactionBuildResult.ProtocolStarted -> {
transaction.set(state.transaction)
status.set(TransactionCreateStatus.Started(state.message))
}
is TransactionBuildResult.Failed -> {
status.set(TransactionCreateStatus.Failed(state.message))
}
}
}
)
}
}
}
)
companion object {
private fun newFiberIdTransactionStateOrModify(
transactionStates: ObservableList<GatheredTransactionDataWritable>,
fiberId: Long,
lastUpdate: Instant,
tweak: GatheredTransactionDataWritable.() -> Unit
) {
val index = transactionStates.indexOfFirst { it.fiberId.value == fiberId }
if (index < 0) {
val newState = GatheredTransactionDataWritable(
fiberId = SimpleObjectProperty(fiberId),
lastUpdate = SimpleObjectProperty(lastUpdate)
)
tweak(newState)
transactionStates.add(newState)
} else {
val existingState = transactionStates[index]
existingState.lastUpdate.set(lastUpdate)
tweak(existingState)
}
}
private fun newUuidTransactionStateOrModify(
transactionStates: ObservableList<GatheredTransactionDataWritable>,
uuid: UUID,
fiberId: Long?,
lastUpdate: Instant,
tweak: GatheredTransactionDataWritable.() -> Unit
) {
val index = transactionStates.indexOfFirst {
it.uuid.value == uuid || (fiberId != null && it.fiberId.value == fiberId)
}
if (index < 0) {
val newState = GatheredTransactionDataWritable(
uuid = SimpleObjectProperty(uuid),
fiberId = SimpleObjectProperty(fiberId),
lastUpdate = SimpleObjectProperty(lastUpdate)
)
tweak(newState)
transactionStates.add(newState)
} else {
val existingState = transactionStates[index]
existingState.lastUpdate.set(lastUpdate)
tweak(existingState)
}
}
}
}

View File

@ -0,0 +1,166 @@
package com.r3corda.client.model
import javafx.beans.property.ObjectProperty
import javafx.beans.value.ObservableValue
import javafx.beans.value.WritableValue
import javafx.collections.ObservableList
import org.reactfx.EventSink
import org.reactfx.EventStream
import rx.Observable
import rx.Observer
import java.util.*
import kotlin.reflect.KClass
import kotlin.reflect.KProperty
/**
* This file defines a global [Models] store and delegates to inject event streams/sinks. Note that all streams here
* are global.
*
* This allows decoupling of UI logic from stream initialisation and provides us with a central place to inspect data
* flows. It also allows detecting of looping logic by constructing a stream dependency graph TODO do this.
*
* Usage:
* // Inject service -> client event stream
* private val serviceToClient: EventStream<ServiceToClientEvent> by eventStream(WalletMonitorModel::serviceToClient)
*
* Each Screen code should have a code layout like this:
*
* class Screen {
* val root = (..)
*
* [ inject UI elements using fxid()/inject() ]
*
* [ inject observable dependencies using observable()/eventSink() etc]
*
* [ define screen-specific observables ]
*
* init {
* [ wire up UI elements ]
* }
* }
*
* For example if I wanted to display a list of all USD cash states:
* class USDCashStatesScreen {
* val root: Pane by fxml()
*
* val usdCashStatesListView: ListView<Cash.State> by fxid("USDCashStatesListView")
*
* val cashStates: ObservableList<Cash.State> by observableList(ContractStateModel::cashStates)
*
* val usdCashStates = cashStates.filter { it.(..).currency == USD }
*
* init {
* Bindings.bindContent(usdCashStatesListView.items, usdCashStates)
* usdCashStatesListView.setCellValueFactory(somethingsomething)
* }
* }
*
* The UI code can just assume that the cash state list comes from somewhere outside. The initialisation of that
* observable is decoupled, it may be mocked or be streamed from the network etc.
*
* Later on we may even want to move all screen-specific observables to a separate Model as well (like usdCashStates) - this
* would allow moving all of the aggregation logic to e.g. a different machine, all the UI will do is inject these and wire
* them up with the UI elements.
*
* Another advantage of this separation is that once we start adding a lot of screens we can still track data dependencies
* in a central place as opposed to ad-hoc wiring up the observables.
*/
inline fun <reified M : Any, T> observable(noinline observableProperty: (M) -> Observable<T>) =
TrackedDelegate.ObservableDelegate(M::class, observableProperty)
inline fun <reified M : Any, T> observer(noinline observerProperty: (M) -> Observer<T>) =
TrackedDelegate.ObserverDelegate(M::class, observerProperty)
inline fun <reified M : Any, T> eventStream(noinline streamProperty: (M) -> EventStream<T>) =
TrackedDelegate.EventStreamDelegate(M::class, streamProperty)
inline fun <reified M : Any, T> eventSink(noinline sinkProperty: (M) -> EventSink<T>) =
TrackedDelegate.EventSinkDelegate(M::class, sinkProperty)
inline fun <reified M : Any, T> observableValue(noinline observableValueProperty: (M) -> ObservableValue<T>) =
TrackedDelegate.ObservableValueDelegate(M::class, observableValueProperty)
inline fun <reified M : Any, T> writableValue(noinline writableValueProperty: (M) -> WritableValue<T>) =
TrackedDelegate.WritableValueDelegate(M::class, writableValueProperty)
inline fun <reified M : Any, T> objectProperty(noinline objectProperty: (M) -> ObjectProperty<T>) =
TrackedDelegate.ObjectPropertyDelegate(M::class, objectProperty)
inline fun <reified M : Any, T> observableList(noinline observableListProperty: (M) -> ObservableList<T>) =
TrackedDelegate.ObservableListDelegate(M::class, observableListProperty)
inline fun <reified M : Any, T> observableListReadOnly(noinline observableListProperty: (M) -> ObservableList<out T>) =
TrackedDelegate.ObservableListReadOnlyDelegate(M::class, observableListProperty)
object Models {
private val modelStore = HashMap<KClass<*>, Any>()
/**
* Holds a class->dependencies map that tracks what screens are depending on what model.
*/
private val dependencyGraph = HashMap<KClass<*>, MutableSet<KClass<*>>>()
fun <M : Any> initModel(klass: KClass<M>) = modelStore.getOrPut(klass) { klass.java.newInstance() }
fun <M : Any> get(klass: KClass<M>, origin: KClass<*>) : M {
dependencyGraph.getOrPut(origin) { mutableSetOf<KClass<*>>() }.add(klass)
val model = initModel(klass)
if (model.javaClass != klass.java) {
throw IllegalStateException("Model stored as ${klass.qualifiedName} has type ${model.javaClass}")
}
@Suppress("UNCHECKED_CAST")
return model as M
}
inline fun <reified M : Any> get(origin: KClass<*>) : M = get(M::class, origin)
}
sealed class TrackedDelegate<M : Any>(val klass: KClass<M>) {
init { Models.initModel(klass) }
class ObservableDelegate<M : Any, T> (klass: KClass<M>, val eventStreamProperty: (M) -> Observable<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): Observable<T> {
return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObserverDelegate<M : Any, T> (klass: KClass<M>, val eventStreamProperty: (M) -> Observer<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): Observer<T> {
return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class EventStreamDelegate<M : Any, T> (klass: KClass<M>, val eventStreamProperty: (M) -> org.reactfx.EventStream<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): org.reactfx.EventStream<T> {
return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class EventSinkDelegate<M : Any, T> (klass: KClass<M>, val eventSinkProperty: (M) -> org.reactfx.EventSink<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): org.reactfx.EventSink<T> {
return eventSinkProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObservableValueDelegate<M : Any, T>(klass: KClass<M>, val observableValueProperty: (M) -> ObservableValue<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): ObservableValue<T> {
return observableValueProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class WritableValueDelegate<M : Any, T>(klass: KClass<M>, val writableValueProperty: (M) -> WritableValue<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): WritableValue<T> {
return writableValueProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObservableListDelegate<M : Any, T>(klass: KClass<M>, val observableListProperty: (M) -> ObservableList<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): ObservableList<T> {
return observableListProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObservableListReadOnlyDelegate<M : Any, out T>(klass: KClass<M>, val observableListReadOnlyProperty: (M) -> ObservableList<out T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): ObservableList<out T> {
return observableListReadOnlyProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObjectPropertyDelegate<M : Any, T>(klass: KClass<M>, val objectPropertyProperty: (M) -> ObjectProperty<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): ObjectProperty<T> {
return objectPropertyProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
}

View File

@ -0,0 +1,42 @@
package com.r3corda.client.model
import com.r3corda.client.WalletMonitorClient
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.NodeInfo
import com.r3corda.node.services.monitor.ServiceToClientEvent
import com.r3corda.node.services.monitor.StateSnapshotMessage
import rx.Observable
import rx.Observer
import rx.subjects.PublishSubject
/**
* This model exposes raw event streams to and from the [WalletMonitorService] through a [WalletMonitorClient]
*/
class WalletMonitorModel {
private val clientToServiceSource = PublishSubject.create<ClientToServiceCommand>()
val clientToService: Observer<ClientToServiceCommand> = clientToServiceSource
private val serviceToClientSource = PublishSubject.create<ServiceToClientEvent>()
val serviceToClient: Observable<ServiceToClientEvent> = serviceToClientSource
private val snapshotSource = PublishSubject.create<StateSnapshotMessage>()
val snapshot: Observable<StateSnapshotMessage> = snapshotSource
/**
* Register for updates to/from a given wallet.
* @param messagingService The messaging to use for communication.
* @param walletMonitorNodeInfo the [Node] to connect to.
* TODO provide an unsubscribe mechanism
*/
fun register(messagingService: MessagingService, walletMonitorNodeInfo: NodeInfo) {
val monitorClient = WalletMonitorClient(
messagingService,
walletMonitorNodeInfo,
clientToServiceSource,
serviceToClientSource,
snapshotSource
)
require(monitorClient.register().get())
}
}

View File

@ -1,6 +1,5 @@
package com.r3corda.client
import co.paralleluniverse.strands.SettableFuture
import com.r3corda.core.contracts.*
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.node.driver.driver
@ -16,12 +15,11 @@ import org.slf4j.LoggerFactory
import rx.subjects.PublishSubject
import kotlin.test.fail
val log: Logger = LoggerFactory.getLogger(WalletMonitorServiceTests::class.java)
val log: Logger = LoggerFactory.getLogger(WalletMonitorClientTests::class.java)
class WalletMonitorServiceTests {
class WalletMonitorClientTests {
@Test
fun cashIssueWorksEndToEnd() {
driver {
val aliceNodeFuture = startNode("Alice")
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(SimpleNotaryService.Type))
@ -36,7 +34,7 @@ class WalletMonitorServiceTests {
val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
require(aliceMonitorClient.register().get())
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
@ -46,26 +44,20 @@ class WalletMonitorServiceTests {
notary = notaryNode.identity
))
val buildFuture = SettableFuture<ServiceToClientEvent.TransactionBuild>()
val eventFuture = SettableFuture<ServiceToClientEvent.OutputState>()
aliceInStream.subscribe {
if (it is ServiceToClientEvent.OutputState)
eventFuture.set(it)
else if (it is ServiceToClientEvent.TransactionBuild)
buildFuture.set(it)
else
log.warn("Unexpected event $it")
aliceInStream.expectEvents(isStrict = false) {
parallel(
expect { build: ServiceToClientEvent.TransactionBuild ->
val state = build.state
if (state is TransactionBuildResult.Failed) {
fail(state.message)
}
},
expect { output: ServiceToClientEvent.OutputState ->
require(output.consumed.size == 0)
require(output.produced.size == 1)
}
)
}
val buildEvent = buildFuture.get()
val state = buildEvent.state
if (state is TransactionBuildResult.Failed) {
fail(state.message)
}
val outputEvent = eventFuture.get()
require(outputEvent.consumed.size == 0)
require(outputEvent.produced.size == 1)
}
}
@ -85,7 +77,7 @@ class WalletMonitorServiceTests {
val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
require(aliceMonitorClient.register().get())
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
@ -193,7 +185,7 @@ class WalletMonitorServiceTests {
val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream)
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
require(aliceMonitorClient.register().get())
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(

View File

@ -0,0 +1,82 @@
package com.r3corda.client.fxutils
import javafx.collections.FXCollections
import org.junit.Before
import org.junit.Test
import kotlin.test.fail
class AggregatedListTest {
var sourceList = FXCollections.observableArrayList<Int>()
@Before
fun setup() {
sourceList = FXCollections.observableArrayList<Int>()
}
@Test
fun addWorks() {
val aggregatedList = AggregatedList(sourceList, { it % 3 }) { mod3, group -> Pair(mod3, group) }
require(aggregatedList.size == 0) { "Aggregation is empty is source list is" }
sourceList.add(9)
require(aggregatedList.size == 1) { "Aggregation list has one element if one was added to source list" }
require(aggregatedList[0]!!.first == 0)
sourceList.add(8)
require(aggregatedList.size == 2) { "Aggregation list has two elements if two were added to source list with different keys" }
sourceList.add(6)
require(aggregatedList.size == 2) { "Aggregation list's size doesn't change if element with existing key is added" }
aggregatedList.forEach {
when (it.first) {
0 -> require(it.second.toSet() == setOf(6, 9))
2 -> require(it.second.size == 1)
else -> fail("No aggregation expected with key ${it.first}")
}
}
}
@Test
fun removeWorks() {
val aggregatedList = AggregatedList(sourceList, { it % 3 }) { mod3, group -> Pair(mod3, group) }
sourceList.addAll(0, 1, 2, 3, 4)
require(aggregatedList.size == 3)
aggregatedList.forEach {
when (it.first) {
0 -> require(it.second.toSet() == setOf(0, 3))
1 -> require(it.second.toSet() == setOf(1, 4))
2 -> require(it.second.toSet() == setOf(2))
else -> fail("No aggregation expected with key ${it.first}")
}
}
sourceList.remove(4)
require(aggregatedList.size == 3)
aggregatedList.forEach {
when (it.first) {
0 -> require(it.second.toSet() == setOf(0, 3))
1 -> require(it.second.toSet() == setOf(1))
2 -> require(it.second.toSet() == setOf(2))
else -> fail("No aggregation expected with key ${it.first}")
}
}
sourceList.remove(2, 4)
require(aggregatedList.size == 2)
aggregatedList.forEach {
when (it.first) {
0 -> require(it.second.toSet() == setOf(0))
1 -> require(it.second.toSet() == setOf(1))
else -> fail("No aggregation expected with key ${it.first}")
}
}
sourceList.removeAll(0, 1)
require(aggregatedList.size == 0)
}
}

View File

@ -13,12 +13,12 @@ import rx.Observable
* The only restriction on [parallel] is that we should be able to discriminate which branch to take based on the
* arrived event's type. If this is ambiguous the first matching piece of DSL will be run.
* [sequence]s and [parallel]s can be nested arbitrarily
* [sequence]s and [parallel]s can be nested arbitrarily.
*
* Example usage:
*
* val stream: Observable<SomeEvent> = (..)
* stream.expectEvents(
* stream.expectEvents {
* sequence(
* expect { event: SomeEvent.A -> require(event.isOk()) },
* parallel(
@ -26,7 +26,7 @@ import rx.Observable
* expect { event.SomeEvent.C -> }
* )
* )
* )
* }
*
* The above will test our expectation that the stream should first emit an A, and then a B and C in unspecified order.
*/