client: Add javafx utilities and basic WalletMonitor models

This commit is contained in:
Andras Slemmer 2016-08-30 08:53:02 +01:00
parent e36eef8935
commit bdb22e1b74
9 changed files with 611 additions and 0 deletions

View File

@ -0,0 +1,110 @@
package com.r3corda.client.fxutils
import javafx.collections.FXCollections
import javafx.collections.ListChangeListener
import javafx.collections.ObservableList
import javafx.collections.transformation.TransformationList
import kotlin.comparisons.compareValues
/**
* [AggregatedList] provides an [ObservableList] that is an aggregate of the underlying list based on key [K].
* Internally it uses a sorted list TODO think of a more efficient representation
*/
class AggregatedList<A, E, K : Any>(
list: ObservableList<out E>,
val toKey: (E) -> K,
val assemble: (K, ObservableList<E>) -> A
) : TransformationList<A, E>(list) {
private class AggregationGroup<E, out A>(
val key: Int,
val value: A,
val elements: ObservableList<E>
)
// Invariant: sorted by K
private val aggregationList = mutableListOf<AggregationGroup<E, A>>()
init {
list.forEach { addItem(it) }
}
override fun get(index: Int): A? = aggregationList.getOrNull(index)?.value
/**
* We cannot implement this as aggregations are one to many
*/
override fun getSourceIndex(index: Int): Int {
throw UnsupportedOperationException()
}
override val size: Int get() = aggregationList.size
override fun sourceChanged(c: ListChangeListener.Change<out E>) {
beginChange()
while (c.next()) {
if (c.wasPermutated()) {
// Permutation should not change aggregation
} else if (c.wasUpdated()) {
// Update should not change aggregation
} else {
for (removedSourceItem in c.removed) {
val removedPair = removeItem(removedSourceItem)
if (removedPair != null) {
nextRemove(removedPair.first, removedPair.second.value)
}
}
for (addedItem in c.addedSubList) {
val insertIndex = addItem(addedItem)
if (insertIndex != null) {
nextAdd(insertIndex, insertIndex + 1)
}
}
}
}
endChange()
}
private fun removeItem(removedItem: E): Pair<Int, AggregationGroup<E, A>>? {
val key = toKey(removedItem)
val keyHashCode = key.hashCode()
val index = aggregationList.binarySearch(
comparison = { group -> compareValues(keyHashCode, group.key.hashCode()) }
)
if (index < 0) {
throw IllegalStateException("Removed element $removedItem does not map to an existing aggregation")
} else {
val aggregationGroup = aggregationList[index]
if (aggregationGroup.elements.size == 1) {
return Pair(index, aggregationList.removeAt(index))
}
aggregationGroup.elements.remove(removedItem)
}
return null
}
private fun addItem(addedItem: E): Int? {
val key = toKey(addedItem)
val keyHashCode = key.hashCode()
val index = aggregationList.binarySearch(
comparison = { group -> compareValues(keyHashCode, group.key.hashCode()) }
)
if (index < 0) {
// New aggregation
val observableGroupElements = FXCollections.observableArrayList<E>()
observableGroupElements.add(addedItem)
val aggregationGroup = AggregationGroup(
key = keyHashCode,
value = assemble(key, observableGroupElements),
elements = observableGroupElements
)
val insertIndex = -index - 1
aggregationList.add(insertIndex, aggregationGroup)
return insertIndex
} else {
aggregationList[index].elements.add(addedItem)
return null
}
}
}

View File

@ -0,0 +1,50 @@
package com.r3corda.client.fxutils
import com.r3corda.client.model.ExchangeRate
import com.r3corda.core.contracts.Amount
import javafx.beans.binding.Bindings
import javafx.beans.value.ObservableValue
import javafx.collections.ObservableList
import kotlinx.support.jdk8.collections.stream
import org.fxmisc.easybind.EasyBind
import java.util.*
import java.util.stream.Collectors
class AmountBindings {
companion object {
fun <T> sum(amounts: ObservableList<Amount<T>>, token: T) = EasyBind.map(
Bindings.createLongBinding({
amounts.stream().collect(Collectors.summingLong {
require(it.token == token)
it.quantity
})
}, arrayOf(amounts))
) { sum -> Amount(sum.toLong(), token) }
fun exchange(
currency: ObservableValue<Currency>,
exchangeRate: ObservableValue<ExchangeRate>
): ObservableValue<Pair<Currency, (Amount<Currency>) -> Long>> {
return EasyBind.combine(currency, exchangeRate) { currency, exchangeRate ->
Pair(currency) { amount: Amount<Currency> ->
(exchangeRate.rate(amount.token, currency) * amount.quantity).toLong()
}
}
}
fun sumAmountExchange(
amounts: ObservableList<Amount<Currency>>,
currency: ObservableValue<Currency>,
exchangeRate: ObservableValue<ExchangeRate>
): ObservableValue<Amount<Currency>> {
return EasyBind.monadic(exchange(currency, exchangeRate)).flatMap {
val (currencyValue, exchange: (Amount<Currency>) -> Long) = it
EasyBind.map(
Bindings.createLongBinding({
amounts.stream().collect(Collectors.summingLong { exchange(it) })
} , arrayOf(amounts))
) { Amount(it.toLong(), currencyValue) }
}
}
}
}

View File

@ -0,0 +1,52 @@
package com.r3corda.client.fxutils
import javafx.beans.Observable
import javafx.beans.value.ObservableValue
import javafx.collections.ListChangeListener
import javafx.collections.ObservableList
import javafx.collections.ObservableListBase
/**
* [ChosenList] is essentially a monadic join of an [ObservableValue] of an [ObservableList] into an [ObservableList].
* Whenever the underlying [ObservableValue] changes the exposed list changes to the new value. Changes to the list are
* simply propagated.
*/
class ChosenList<E>(
private val chosenListObservable: ObservableValue<ObservableList<E>>
): ObservableListBase<E>() {
private var currentList = chosenListObservable.value
private val listener = object : ListChangeListener<E> {
override fun onChanged(change: ListChangeListener.Change<out E>) = fireChange(change)
}
init {
chosenListObservable.addListener { observable: Observable -> rechoose() }
currentList.addListener(listener)
beginChange()
nextAdd(0, currentList.size)
endChange()
}
override fun get(index: Int) = currentList.get(index)
override val size: Int get() = currentList.size
private fun rechoose() {
val chosenList = chosenListObservable.value
if (currentList != chosenList) {
pick(chosenList)
}
}
private fun pick(list: ObservableList<E>) {
currentList.removeListener(listener)
list.addListener(listener)
beginChange()
nextRemove(0, currentList)
currentList = list
nextAdd(0, list.size)
endChange()
}
}

View File

@ -0,0 +1,38 @@
package com.r3corda.client.fxutils
import javafx.application.Platform
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import org.reactfx.EventStream
/**
* Simple utilities for converting an [EventStream] into an [ObservableValue]/[ObservableList]
*/
fun <A, B> EventStream<A>.foldToObservable(initial: B, folderFun: (A, B) -> B): ObservableValue<B> {
val result = SimpleObjectProperty<B>(initial)
subscribe {
Platform.runLater {
result.set(folderFun(it, result.get()))
}
}
return result
}
fun <A, B, C> EventStream<A>.foldToObservableList(
initialAccumulator: C, folderFun: (A, C, ObservableList<B>) -> C
): ObservableList<B> {
val result = FXCollections.observableArrayList<B>()
/**
* This capture is fine, as [Platform.runLater] runs closures in order
*/
var currentAccumulator = initialAccumulator
subscribe {
Platform.runLater {
currentAccumulator = folderFun(it, currentAccumulator, result)
}
}
return result
}

View File

@ -0,0 +1,29 @@
package com.r3corda.client.model
import com.r3corda.contracts.asset.Cash
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.contracts.StateRef
import com.r3corda.client.fxutils.foldToObservableList
import com.r3corda.node.services.monitor.ServiceToClientEvent
import javafx.collections.ObservableList
import kotlinx.support.jdk8.collections.removeIf
import org.reactfx.EventStream
class StatesDiff<out T : ContractState>(val added: Collection<StateAndRef<T>>, val removed: Collection<StateRef>)
class ContractStateModel {
private val serviceToClient: EventStream<ServiceToClientEvent> by stream(WalletMonitorModel::serviceToClient)
private val outputStates = serviceToClient.filter(ServiceToClientEvent.OutputState::class.java)
val contractStatesDiff = outputStates.map { StatesDiff(it.produced, it.consumed) }
val cashStatesDiff = contractStatesDiff.map {
StatesDiff(it.added.filterIsInstance<StateAndRef<Cash.State>>(), it.removed)
}
val cashStates: ObservableList<StateAndRef<Cash.State>> =
cashStatesDiff.foldToObservableList(Unit) { statesDiff, _accumulator, observableList ->
observableList.removeIf { it.ref in statesDiff.removed }
observableList.addAll(statesDiff.added)
}
}

View File

@ -0,0 +1,21 @@
package com.r3corda.client.model
import com.r3corda.core.contracts.Amount
import javafx.beans.property.SimpleObjectProperty
import java.util.*
interface ExchangeRate {
fun rate(from: Currency, to: Currency): Double
}
fun ExchangeRate.exchangeAmount(amount: Amount<Currency>, to: Currency) =
Amount(exchangeDouble(amount, to).toLong(), to)
fun ExchangeRate.exchangeDouble(amount: Amount<Currency>, to: Currency) =
rate(amount.token, to) * amount.quantity
class ExchangeRateModel {
// TODO hook up an actual oracle
val exchangeRate = SimpleObjectProperty<ExchangeRate>(object : ExchangeRate {
override fun rate(from: Currency, to: Currency) = 1.0
})
}

View File

@ -0,0 +1,102 @@
package com.r3corda.client.model
import javafx.beans.property.ObjectProperty
import javafx.beans.value.ObservableValue
import javafx.beans.value.WritableValue
import javafx.collections.ObservableList
import org.reactfx.EventSink
import org.reactfx.EventStream
import java.util.*
import kotlin.reflect.KClass
import kotlin.reflect.KProperty
/**
* This file defines a global [Models] store and delegates to inject event streams/sinks. Note that all streams here
* are global to the app.
*
* This allows us to decouple UI logic from stream initialisation and provides us with a central place to inspect data flows.
* It also allows detecting of looping logic by constructing a stream dependency graph TODO do this
*
* Usage: In your piece of UI use the [stream] and [sink] delegates to access external streams. If you have a reusable
* stream put it in a Model. See [NetworkModel] for an example
*/
inline fun <reified M : Any, T> stream(noinline streamProperty: (M) -> EventStream<T>) =
TrackedDelegate.Stream(M::class, streamProperty)
inline fun <reified M : Any, T> sink(noinline sinkProperty: (M) -> EventSink<T>) =
TrackedDelegate.Sink(M::class, sinkProperty)
inline fun <reified M : Any, T> observableValue(noinline observableValueProperty: (M) -> ObservableValue<T>) =
TrackedDelegate.ObservableValueDelegate(M::class, observableValueProperty)
inline fun <reified M : Any, T> writableValue(noinline writableValueProperty: (M) -> WritableValue<T>) =
TrackedDelegate.WritableValueDelegate(M::class, writableValueProperty)
inline fun <reified M : Any, T> objectProperty(noinline objectProperty: (M) -> ObjectProperty<T>) =
TrackedDelegate.ObjectPropertyDelegate(M::class, objectProperty)
inline fun <reified M : Any, T> observableList(noinline observableListProperty: (M) -> ObservableList<T>) =
TrackedDelegate.ObservableListDelegate(M::class, observableListProperty)
inline fun <reified M : Any, T> observableListReadOnly(noinline observableListProperty: (M) -> ObservableList<out T>) =
TrackedDelegate.ObservableListDelegateReadOnly(M::class, observableListProperty)
object Models {
private val modelStore = HashMap<KClass<*>, Any>()
private val dependencyGraph = HashMap<KClass<*>, MutableSet<KClass<*>>>()
fun <M : Any> initModel(klass: KClass<M>) = modelStore.getOrPut(klass) { klass.java.newInstance() }
fun <M : Any> get(klass: KClass<M>, origin: KClass<*>) : M {
dependencyGraph.getOrPut(origin) { mutableSetOf<KClass<*>>() }.add(klass)
val model = initModel(klass)
if (model.javaClass != klass.java) {
throw IllegalStateException("Model stored as ${klass.qualifiedName} has type ${model.javaClass}")
}
@Suppress("UNCHECKED_CAST")
return model as M
}
inline fun <reified M : Any> get(origin: KClass<*>) : M = get(M::class, origin)
}
sealed class TrackedDelegate<M : Any>(val klass: KClass<M>) {
init { Models.initModel(klass) }
class Stream<M : Any, T> (klass: KClass<M>, val streamProperty: (M) -> EventStream<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): EventStream<T> {
return streamProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class Sink<M : Any, T> (klass: KClass<M>, val sinkProperty: (M) -> EventSink<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): EventSink<T> {
return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObservableValueDelegate<M : Any, T>(klass: KClass<M>, val sinkProperty: (M) -> ObservableValue<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): ObservableValue<T> {
return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class WritableValueDelegate<M : Any, T>(klass: KClass<M>, val sinkProperty: (M) -> WritableValue<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): WritableValue<T> {
return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObservableListDelegate<M : Any, T>(klass: KClass<M>, val sinkProperty: (M) -> ObservableList<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): ObservableList<T> {
return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObservableListDelegateReadOnly<M : Any, T>(klass: KClass<M>, val sinkProperty: (M) -> ObservableList<out T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): ObservableList<out T> {
return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObjectPropertyDelegate<M : Any, T>(klass: KClass<M>, val sinkProperty: (M) -> ObjectProperty<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): ObjectProperty<T> {
return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
}

View File

@ -0,0 +1,177 @@
package com.r3corda.client.model
import com.r3corda.client.fxutils.foldToObservableList
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.node.services.monitor.ServiceToClientEvent
import com.r3corda.node.services.monitor.TransactionBuildResult
import com.r3corda.node.utilities.AddOrRemove
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.value.ObservableValue
import javafx.collections.ObservableList
import org.reactfx.EventStream
import java.time.Instant
import java.util.*
sealed class TransactionCreateStatus() {
class Started(val message: String?) : TransactionCreateStatus()
class Failed(val message: String?) : TransactionCreateStatus()
override fun toString(): String {
return when (this) {
is TransactionCreateStatus.Started -> message ?: "Started"
is TransactionCreateStatus.Failed -> message ?: "Failed"
}
}
}
sealed class ProtocolStatus() {
object Added: ProtocolStatus()
object Removed: ProtocolStatus()
class InProgress(val status: String): ProtocolStatus()
override fun toString(): String {
return when (this) {
ProtocolStatus.Added -> "Added"
ProtocolStatus.Removed -> "Removed"
is ProtocolStatus.InProgress -> status
}
}
}
interface TransactionCreateState {
val fiberId: ObservableValue<Long?>
val uuid: ObservableValue<UUID?>
val protocolName: ObservableValue<String?>
val protocolStatus: ObservableValue<ProtocolStatus?>
val transaction: ObservableValue<SignedTransaction?>
val status: ObservableValue<TransactionCreateStatus?>
val lastUpdate: ObservableValue<Instant>
}
data class TransactionCreateStateWritable(
override val fiberId: SimpleObjectProperty<Long?> = SimpleObjectProperty(null),
override val uuid: SimpleObjectProperty<UUID?> = SimpleObjectProperty(null),
override val protocolName: SimpleObjectProperty<String?> = SimpleObjectProperty(null),
override val protocolStatus: SimpleObjectProperty<ProtocolStatus?> = SimpleObjectProperty(null),
override val transaction: SimpleObjectProperty<SignedTransaction?> = SimpleObjectProperty(null),
override val status: SimpleObjectProperty<TransactionCreateStatus?> = SimpleObjectProperty(null),
override val lastUpdate: SimpleObjectProperty<Instant>
) : TransactionCreateState
class TransactionCreateStateModel {
private val serviceToClient: EventStream<ServiceToClientEvent> by stream(WalletMonitorModel::serviceToClient)
/**
* Aggregation of updates to transactions. We use the observable list as the only container and do linear search for
* matching transactions because we have two keys(fiber ID and UUID) and this way it's easier to avoid syncing issues
* TODO: Make this more efficient by maintaining and syncing two maps (for the two keys) in the accumulator
* (Note that a transaction may be mapped by one or both)
* TODO: Expose a writable stream to combine [serviceToClient] with to allow recording of transactions made locally(UUID)
*/
val transactionCreateStates: ObservableList<out TransactionCreateState> =
serviceToClient.foldToObservableList<ServiceToClientEvent, TransactionCreateStateWritable, Unit>(
initialAccumulator = Unit,
folderFun = { serviceToClientEvent, _unit, transactionStates ->
return@foldToObservableList when (serviceToClientEvent) {
is ServiceToClientEvent.Transaction -> {
// TODO handle this once we have some id to associate the tx with
}
is ServiceToClientEvent.OutputState -> {}
is ServiceToClientEvent.StateMachine -> {
newFiberIdTransactionStateOrModify(transactionStates,
fiberId = serviceToClientEvent.fiberId,
lastUpdate = serviceToClientEvent.time,
tweak = {
protocolName.set(serviceToClientEvent.label)
protocolStatus.set(when (serviceToClientEvent.addOrRemove) {
AddOrRemove.ADD -> ProtocolStatus.Added
AddOrRemove.REMOVE -> ProtocolStatus.Removed
})
}
)
}
is ServiceToClientEvent.Progress -> {
newFiberIdTransactionStateOrModify(transactionStates,
fiberId = serviceToClientEvent.fiberId,
lastUpdate = serviceToClientEvent.time,
tweak = {
protocolStatus.set(ProtocolStatus.InProgress(serviceToClientEvent.message))
}
)
}
is ServiceToClientEvent.TransactionBuild -> {
val state = serviceToClientEvent.state
newUuidTransactionStateOrModify(transactionStates,
uuid = serviceToClientEvent.id,
fiberId = when (state) {
is TransactionBuildResult.ProtocolStarted -> state.fiberId
is TransactionBuildResult.Failed -> null
},
lastUpdate = serviceToClientEvent.time,
tweak = {
return@newUuidTransactionStateOrModify when (state) {
is TransactionBuildResult.ProtocolStarted -> {
transaction.set(state.transaction)
status.set(TransactionCreateStatus.Started(state.message))
}
is TransactionBuildResult.Failed -> {
status.set(TransactionCreateStatus.Failed(state.message))
}
}
}
)
}
}
}
)
companion object {
private fun newFiberIdTransactionStateOrModify(
transactionStates: ObservableList<TransactionCreateStateWritable>,
fiberId: Long,
lastUpdate: Instant,
tweak: TransactionCreateStateWritable.() -> Unit
) {
val index = transactionStates.indexOfFirst { it.fiberId.value == fiberId }
if (index < 0) {
val newState = TransactionCreateStateWritable(
fiberId = SimpleObjectProperty(fiberId),
lastUpdate = SimpleObjectProperty(lastUpdate)
)
tweak(newState)
transactionStates.add(newState)
} else {
val existingState = transactionStates[index]
existingState.lastUpdate.set(lastUpdate)
tweak(existingState)
}
}
private fun newUuidTransactionStateOrModify(
transactionStates: ObservableList<TransactionCreateStateWritable>,
uuid: UUID,
fiberId: Long?,
lastUpdate: Instant,
tweak: TransactionCreateStateWritable.() -> Unit
) {
val index = transactionStates.indexOfFirst {
it.uuid.value == uuid || (fiberId != null && it.fiberId.value == fiberId)
}
if (index < 0) {
val newState = TransactionCreateStateWritable(
uuid = SimpleObjectProperty(uuid),
fiberId = SimpleObjectProperty(fiberId),
lastUpdate = SimpleObjectProperty(lastUpdate)
)
tweak(newState)
transactionStates.add(newState)
} else {
val existingState = transactionStates[index]
existingState.lastUpdate.set(lastUpdate)
tweak(existingState)
}
}
}
}

View File

@ -0,0 +1,32 @@
package com.r3corda.client.model
import com.r3corda.client.WalletMonitorClient
import com.r3corda.core.contracts.Amount
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.NodeInfo
import com.r3corda.node.services.monitor.ServiceToClientEvent
import javafx.beans.property.SimpleObjectProperty
import org.reactfx.EventSink
import org.reactfx.EventSource
import org.reactfx.EventStream
import java.util.*
class WalletMonitorModel {
private val clientToServiceSource = EventSource<ClientToServiceCommand>()
val clientToService: EventSink<ClientToServiceCommand> = clientToServiceSource
private val serviceToClientSource = EventSource<ServiceToClientEvent>()
val serviceToClient: EventStream<ServiceToClientEvent> = serviceToClientSource
// TODO provide an unsubscribe mechanism
fun register(messagingService: MessagingService, walletMonitorNodeInfo: NodeInfo) {
val monitorClient = WalletMonitorClient(
messagingService,
walletMonitorNodeInfo,
clientToServiceSource,
serviceToClientSource
)
require(monitorClient.register().get())
}
}