diff --git a/client/src/main/kotlin/com/r3corda/client/fxutils/AggregatedList.kt b/client/src/main/kotlin/com/r3corda/client/fxutils/AggregatedList.kt new file mode 100644 index 0000000000..55425cc662 --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/fxutils/AggregatedList.kt @@ -0,0 +1,110 @@ +package com.r3corda.client.fxutils + +import javafx.collections.FXCollections +import javafx.collections.ListChangeListener +import javafx.collections.ObservableList +import javafx.collections.transformation.TransformationList +import kotlin.comparisons.compareValues + +/** + * [AggregatedList] provides an [ObservableList] that is an aggregate of the underlying list based on key [K]. + * Internally it uses a sorted list TODO think of a more efficient representation + */ +class AggregatedList( + list: ObservableList, + val toKey: (E) -> K, + val assemble: (K, ObservableList) -> A +) : TransformationList(list) { + + private class AggregationGroup( + val key: Int, + val value: A, + val elements: ObservableList + ) + + // Invariant: sorted by K + private val aggregationList = mutableListOf>() + + init { + list.forEach { addItem(it) } + } + + override fun get(index: Int): A? = aggregationList.getOrNull(index)?.value + + /** + * We cannot implement this as aggregations are one to many + */ + override fun getSourceIndex(index: Int): Int { + throw UnsupportedOperationException() + } + + override val size: Int get() = aggregationList.size + + override fun sourceChanged(c: ListChangeListener.Change) { + beginChange() + while (c.next()) { + if (c.wasPermutated()) { + // Permutation should not change aggregation + } else if (c.wasUpdated()) { + // Update should not change aggregation + } else { + for (removedSourceItem in c.removed) { + val removedPair = removeItem(removedSourceItem) + if (removedPair != null) { + nextRemove(removedPair.first, removedPair.second.value) + } + } + for (addedItem in c.addedSubList) { + val insertIndex = addItem(addedItem) + if (insertIndex != null) { + nextAdd(insertIndex, insertIndex + 1) + } + } + } + } + endChange() + } + + private fun removeItem(removedItem: E): Pair>? { + val key = toKey(removedItem) + val keyHashCode = key.hashCode() + + val index = aggregationList.binarySearch( + comparison = { group -> compareValues(keyHashCode, group.key.hashCode()) } + ) + if (index < 0) { + throw IllegalStateException("Removed element $removedItem does not map to an existing aggregation") + } else { + val aggregationGroup = aggregationList[index] + if (aggregationGroup.elements.size == 1) { + return Pair(index, aggregationList.removeAt(index)) + } + aggregationGroup.elements.remove(removedItem) + } + return null + } + + private fun addItem(addedItem: E): Int? { + val key = toKey(addedItem) + val keyHashCode = key.hashCode() + val index = aggregationList.binarySearch( + comparison = { group -> compareValues(keyHashCode, group.key.hashCode()) } + ) + if (index < 0) { + // New aggregation + val observableGroupElements = FXCollections.observableArrayList() + observableGroupElements.add(addedItem) + val aggregationGroup = AggregationGroup( + key = keyHashCode, + value = assemble(key, observableGroupElements), + elements = observableGroupElements + ) + val insertIndex = -index - 1 + aggregationList.add(insertIndex, aggregationGroup) + return insertIndex + } else { + aggregationList[index].elements.add(addedItem) + return null + } + } +} diff --git a/client/src/main/kotlin/com/r3corda/client/fxutils/AmountBindings.kt b/client/src/main/kotlin/com/r3corda/client/fxutils/AmountBindings.kt new file mode 100644 index 0000000000..ea3c517331 --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/fxutils/AmountBindings.kt @@ -0,0 +1,50 @@ +package com.r3corda.client.fxutils + +import com.r3corda.client.model.ExchangeRate +import com.r3corda.core.contracts.Amount +import javafx.beans.binding.Bindings +import javafx.beans.value.ObservableValue +import javafx.collections.ObservableList +import kotlinx.support.jdk8.collections.stream +import org.fxmisc.easybind.EasyBind +import java.util.* +import java.util.stream.Collectors + +class AmountBindings { + companion object { + fun sum(amounts: ObservableList>, token: T) = EasyBind.map( + Bindings.createLongBinding({ + amounts.stream().collect(Collectors.summingLong { + require(it.token == token) + it.quantity + }) + }, arrayOf(amounts)) + ) { sum -> Amount(sum.toLong(), token) } + + fun exchange( + currency: ObservableValue, + exchangeRate: ObservableValue + ): ObservableValue) -> Long>> { + return EasyBind.combine(currency, exchangeRate) { currency, exchangeRate -> + Pair(currency) { amount: Amount -> + (exchangeRate.rate(amount.token, currency) * amount.quantity).toLong() + } + } + } + + fun sumAmountExchange( + amounts: ObservableList>, + currency: ObservableValue, + exchangeRate: ObservableValue + ): ObservableValue> { + return EasyBind.monadic(exchange(currency, exchangeRate)).flatMap { + val (currencyValue, exchange: (Amount) -> Long) = it + EasyBind.map( + Bindings.createLongBinding({ + amounts.stream().collect(Collectors.summingLong { exchange(it) }) + } , arrayOf(amounts)) + ) { Amount(it.toLong(), currencyValue) } + } + } + } +} diff --git a/client/src/main/kotlin/com/r3corda/client/fxutils/ChosenList.kt b/client/src/main/kotlin/com/r3corda/client/fxutils/ChosenList.kt new file mode 100644 index 0000000000..3f98811283 --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/fxutils/ChosenList.kt @@ -0,0 +1,52 @@ +package com.r3corda.client.fxutils + +import javafx.beans.Observable +import javafx.beans.value.ObservableValue +import javafx.collections.ListChangeListener +import javafx.collections.ObservableList +import javafx.collections.ObservableListBase + +/** + * [ChosenList] is essentially a monadic join of an [ObservableValue] of an [ObservableList] into an [ObservableList]. + * Whenever the underlying [ObservableValue] changes the exposed list changes to the new value. Changes to the list are + * simply propagated. + */ +class ChosenList( + private val chosenListObservable: ObservableValue> +): ObservableListBase() { + + private var currentList = chosenListObservable.value + + private val listener = object : ListChangeListener { + override fun onChanged(change: ListChangeListener.Change) = fireChange(change) + } + + init { + chosenListObservable.addListener { observable: Observable -> rechoose() } + currentList.addListener(listener) + beginChange() + nextAdd(0, currentList.size) + endChange() + } + + override fun get(index: Int) = currentList.get(index) + override val size: Int get() = currentList.size + + private fun rechoose() { + val chosenList = chosenListObservable.value + if (currentList != chosenList) { + pick(chosenList) + } + } + + private fun pick(list: ObservableList) { + currentList.removeListener(listener) + list.addListener(listener) + beginChange() + nextRemove(0, currentList) + currentList = list + nextAdd(0, list.size) + endChange() + } + +} diff --git a/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt b/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt new file mode 100644 index 0000000000..7f8e61bde7 --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt @@ -0,0 +1,38 @@ +package com.r3corda.client.fxutils + +import javafx.application.Platform +import javafx.beans.property.SimpleObjectProperty +import javafx.beans.value.ObservableValue +import javafx.collections.FXCollections +import javafx.collections.ObservableList +import org.reactfx.EventStream + +/** + * Simple utilities for converting an [EventStream] into an [ObservableValue]/[ObservableList] + */ + +fun EventStream.foldToObservable(initial: B, folderFun: (A, B) -> B): ObservableValue { + val result = SimpleObjectProperty(initial) + subscribe { + Platform.runLater { + result.set(folderFun(it, result.get())) + } + } + return result +} + +fun EventStream.foldToObservableList( + initialAccumulator: C, folderFun: (A, C, ObservableList) -> C +): ObservableList { + val result = FXCollections.observableArrayList() + /** + * This capture is fine, as [Platform.runLater] runs closures in order + */ + var currentAccumulator = initialAccumulator + subscribe { + Platform.runLater { + currentAccumulator = folderFun(it, currentAccumulator, result) + } + } + return result +} diff --git a/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt new file mode 100644 index 0000000000..aad38d9159 --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt @@ -0,0 +1,29 @@ +package com.r3corda.client.model + +import com.r3corda.contracts.asset.Cash +import com.r3corda.core.contracts.ContractState +import com.r3corda.core.contracts.StateAndRef +import com.r3corda.core.contracts.StateRef +import com.r3corda.client.fxutils.foldToObservableList +import com.r3corda.node.services.monitor.ServiceToClientEvent +import javafx.collections.ObservableList +import kotlinx.support.jdk8.collections.removeIf +import org.reactfx.EventStream + +class StatesDiff(val added: Collection>, val removed: Collection) + +class ContractStateModel { + private val serviceToClient: EventStream by stream(WalletMonitorModel::serviceToClient) + private val outputStates = serviceToClient.filter(ServiceToClientEvent.OutputState::class.java) + + val contractStatesDiff = outputStates.map { StatesDiff(it.produced, it.consumed) } + val cashStatesDiff = contractStatesDiff.map { + StatesDiff(it.added.filterIsInstance>(), it.removed) + } + val cashStates: ObservableList> = + cashStatesDiff.foldToObservableList(Unit) { statesDiff, _accumulator, observableList -> + observableList.removeIf { it.ref in statesDiff.removed } + observableList.addAll(statesDiff.added) + } + +} diff --git a/client/src/main/kotlin/com/r3corda/client/model/ExchangeRateModel.kt b/client/src/main/kotlin/com/r3corda/client/model/ExchangeRateModel.kt new file mode 100644 index 0000000000..cf2b8333ee --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/model/ExchangeRateModel.kt @@ -0,0 +1,21 @@ +package com.r3corda.client.model + +import com.r3corda.core.contracts.Amount +import javafx.beans.property.SimpleObjectProperty +import java.util.* + + +interface ExchangeRate { + fun rate(from: Currency, to: Currency): Double +} +fun ExchangeRate.exchangeAmount(amount: Amount, to: Currency) = + Amount(exchangeDouble(amount, to).toLong(), to) +fun ExchangeRate.exchangeDouble(amount: Amount, to: Currency) = + rate(amount.token, to) * amount.quantity + +class ExchangeRateModel { + // TODO hook up an actual oracle + val exchangeRate = SimpleObjectProperty(object : ExchangeRate { + override fun rate(from: Currency, to: Currency) = 1.0 + }) +} diff --git a/client/src/main/kotlin/com/r3corda/client/model/Models.kt b/client/src/main/kotlin/com/r3corda/client/model/Models.kt new file mode 100644 index 0000000000..3ff63765f4 --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/model/Models.kt @@ -0,0 +1,102 @@ +package com.r3corda.client.model + +import javafx.beans.property.ObjectProperty +import javafx.beans.value.ObservableValue +import javafx.beans.value.WritableValue +import javafx.collections.ObservableList +import org.reactfx.EventSink +import org.reactfx.EventStream +import java.util.* +import kotlin.reflect.KClass +import kotlin.reflect.KProperty + +/** + * This file defines a global [Models] store and delegates to inject event streams/sinks. Note that all streams here + * are global to the app. + * + * This allows us to decouple UI logic from stream initialisation and provides us with a central place to inspect data flows. + * It also allows detecting of looping logic by constructing a stream dependency graph TODO do this + * + * Usage: In your piece of UI use the [stream] and [sink] delegates to access external streams. If you have a reusable + * stream put it in a Model. See [NetworkModel] for an example + */ + +inline fun stream(noinline streamProperty: (M) -> EventStream) = + TrackedDelegate.Stream(M::class, streamProperty) + +inline fun sink(noinline sinkProperty: (M) -> EventSink) = + TrackedDelegate.Sink(M::class, sinkProperty) + +inline fun observableValue(noinline observableValueProperty: (M) -> ObservableValue) = + TrackedDelegate.ObservableValueDelegate(M::class, observableValueProperty) + +inline fun writableValue(noinline writableValueProperty: (M) -> WritableValue) = + TrackedDelegate.WritableValueDelegate(M::class, writableValueProperty) + +inline fun objectProperty(noinline objectProperty: (M) -> ObjectProperty) = + TrackedDelegate.ObjectPropertyDelegate(M::class, objectProperty) + +inline fun observableList(noinline observableListProperty: (M) -> ObservableList) = + TrackedDelegate.ObservableListDelegate(M::class, observableListProperty) + +inline fun observableListReadOnly(noinline observableListProperty: (M) -> ObservableList) = + TrackedDelegate.ObservableListDelegateReadOnly(M::class, observableListProperty) + +object Models { + private val modelStore = HashMap, Any>() + private val dependencyGraph = HashMap, MutableSet>>() + + fun initModel(klass: KClass) = modelStore.getOrPut(klass) { klass.java.newInstance() } + fun get(klass: KClass, origin: KClass<*>) : M { + dependencyGraph.getOrPut(origin) { mutableSetOf>() }.add(klass) + val model = initModel(klass) + if (model.javaClass != klass.java) { + throw IllegalStateException("Model stored as ${klass.qualifiedName} has type ${model.javaClass}") + } + + @Suppress("UNCHECKED_CAST") + return model as M + } + inline fun get(origin: KClass<*>) : M = get(M::class, origin) +} + +sealed class TrackedDelegate(val klass: KClass) { + init { Models.initModel(klass) } + + class Stream (klass: KClass, val streamProperty: (M) -> EventStream) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): EventStream { + return streamProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + + class Sink (klass: KClass, val sinkProperty: (M) -> EventSink) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): EventSink { + return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class ObservableValueDelegate(klass: KClass, val sinkProperty: (M) -> ObservableValue) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): ObservableValue { + return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class WritableValueDelegate(klass: KClass, val sinkProperty: (M) -> WritableValue) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): WritableValue { + return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class ObservableListDelegate(klass: KClass, val sinkProperty: (M) -> ObservableList) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): ObservableList { + return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class ObservableListDelegateReadOnly(klass: KClass, val sinkProperty: (M) -> ObservableList) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): ObservableList { + return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class ObjectPropertyDelegate(klass: KClass, val sinkProperty: (M) -> ObjectProperty) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): ObjectProperty { + return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } +} diff --git a/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt b/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt new file mode 100644 index 0000000000..974cb1182f --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt @@ -0,0 +1,177 @@ +package com.r3corda.client.model + +import com.r3corda.client.fxutils.foldToObservableList +import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.node.services.monitor.ServiceToClientEvent +import com.r3corda.node.services.monitor.TransactionBuildResult +import com.r3corda.node.utilities.AddOrRemove +import javafx.beans.property.SimpleObjectProperty +import javafx.beans.value.ObservableValue +import javafx.collections.ObservableList +import org.reactfx.EventStream +import java.time.Instant +import java.util.* + +sealed class TransactionCreateStatus() { + class Started(val message: String?) : TransactionCreateStatus() + class Failed(val message: String?) : TransactionCreateStatus() + + override fun toString(): String { + return when (this) { + is TransactionCreateStatus.Started -> message ?: "Started" + is TransactionCreateStatus.Failed -> message ?: "Failed" + } + } +} + +sealed class ProtocolStatus() { + object Added: ProtocolStatus() + object Removed: ProtocolStatus() + class InProgress(val status: String): ProtocolStatus() + + override fun toString(): String { + return when (this) { + ProtocolStatus.Added -> "Added" + ProtocolStatus.Removed -> "Removed" + is ProtocolStatus.InProgress -> status + } + } +} + +interface TransactionCreateState { + val fiberId: ObservableValue + val uuid: ObservableValue + val protocolName: ObservableValue + val protocolStatus: ObservableValue + val transaction: ObservableValue + val status: ObservableValue + val lastUpdate: ObservableValue +} + +data class TransactionCreateStateWritable( + override val fiberId: SimpleObjectProperty = SimpleObjectProperty(null), + override val uuid: SimpleObjectProperty = SimpleObjectProperty(null), + override val protocolName: SimpleObjectProperty = SimpleObjectProperty(null), + override val protocolStatus: SimpleObjectProperty = SimpleObjectProperty(null), + override val transaction: SimpleObjectProperty = SimpleObjectProperty(null), + override val status: SimpleObjectProperty = SimpleObjectProperty(null), + override val lastUpdate: SimpleObjectProperty +) : TransactionCreateState + +class TransactionCreateStateModel { + + private val serviceToClient: EventStream by stream(WalletMonitorModel::serviceToClient) + + /** + * Aggregation of updates to transactions. We use the observable list as the only container and do linear search for + * matching transactions because we have two keys(fiber ID and UUID) and this way it's easier to avoid syncing issues + * TODO: Make this more efficient by maintaining and syncing two maps (for the two keys) in the accumulator + * (Note that a transaction may be mapped by one or both) + * TODO: Expose a writable stream to combine [serviceToClient] with to allow recording of transactions made locally(UUID) + */ + val transactionCreateStates: ObservableList = + serviceToClient.foldToObservableList( + initialAccumulator = Unit, + folderFun = { serviceToClientEvent, _unit, transactionStates -> + return@foldToObservableList when (serviceToClientEvent) { + is ServiceToClientEvent.Transaction -> { + // TODO handle this once we have some id to associate the tx with + } + is ServiceToClientEvent.OutputState -> {} + is ServiceToClientEvent.StateMachine -> { + newFiberIdTransactionStateOrModify(transactionStates, + fiberId = serviceToClientEvent.fiberId, + lastUpdate = serviceToClientEvent.time, + tweak = { + protocolName.set(serviceToClientEvent.label) + protocolStatus.set(when (serviceToClientEvent.addOrRemove) { + AddOrRemove.ADD -> ProtocolStatus.Added + AddOrRemove.REMOVE -> ProtocolStatus.Removed + }) + } + ) + } + is ServiceToClientEvent.Progress -> { + newFiberIdTransactionStateOrModify(transactionStates, + fiberId = serviceToClientEvent.fiberId, + lastUpdate = serviceToClientEvent.time, + tweak = { + protocolStatus.set(ProtocolStatus.InProgress(serviceToClientEvent.message)) + } + ) + } + is ServiceToClientEvent.TransactionBuild -> { + val state = serviceToClientEvent.state + newUuidTransactionStateOrModify(transactionStates, + uuid = serviceToClientEvent.id, + fiberId = when (state) { + is TransactionBuildResult.ProtocolStarted -> state.fiberId + is TransactionBuildResult.Failed -> null + }, + lastUpdate = serviceToClientEvent.time, + tweak = { + return@newUuidTransactionStateOrModify when (state) { + is TransactionBuildResult.ProtocolStarted -> { + transaction.set(state.transaction) + status.set(TransactionCreateStatus.Started(state.message)) + } + is TransactionBuildResult.Failed -> { + status.set(TransactionCreateStatus.Failed(state.message)) + } + } + } + ) + } + } + } + ) + + companion object { + private fun newFiberIdTransactionStateOrModify( + transactionStates: ObservableList, + fiberId: Long, + lastUpdate: Instant, + tweak: TransactionCreateStateWritable.() -> Unit + ) { + val index = transactionStates.indexOfFirst { it.fiberId.value == fiberId } + if (index < 0) { + val newState = TransactionCreateStateWritable( + fiberId = SimpleObjectProperty(fiberId), + lastUpdate = SimpleObjectProperty(lastUpdate) + ) + tweak(newState) + transactionStates.add(newState) + } else { + val existingState = transactionStates[index] + existingState.lastUpdate.set(lastUpdate) + tweak(existingState) + } + } + + private fun newUuidTransactionStateOrModify( + transactionStates: ObservableList, + uuid: UUID, + fiberId: Long?, + lastUpdate: Instant, + tweak: TransactionCreateStateWritable.() -> Unit + ) { + val index = transactionStates.indexOfFirst { + it.uuid.value == uuid || (fiberId != null && it.fiberId.value == fiberId) + } + if (index < 0) { + val newState = TransactionCreateStateWritable( + uuid = SimpleObjectProperty(uuid), + fiberId = SimpleObjectProperty(fiberId), + lastUpdate = SimpleObjectProperty(lastUpdate) + ) + tweak(newState) + transactionStates.add(newState) + } else { + val existingState = transactionStates[index] + existingState.lastUpdate.set(lastUpdate) + tweak(existingState) + } + } + } + +} diff --git a/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt b/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt new file mode 100644 index 0000000000..6ff6c22ac0 --- /dev/null +++ b/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt @@ -0,0 +1,32 @@ +package com.r3corda.client.model + +import com.r3corda.client.WalletMonitorClient +import com.r3corda.core.contracts.Amount +import com.r3corda.core.contracts.ClientToServiceCommand +import com.r3corda.core.messaging.MessagingService +import com.r3corda.core.node.NodeInfo +import com.r3corda.node.services.monitor.ServiceToClientEvent +import javafx.beans.property.SimpleObjectProperty +import org.reactfx.EventSink +import org.reactfx.EventSource +import org.reactfx.EventStream +import java.util.* + +class WalletMonitorModel { + private val clientToServiceSource = EventSource() + val clientToService: EventSink = clientToServiceSource + + private val serviceToClientSource = EventSource() + val serviceToClient: EventStream = serviceToClientSource + + // TODO provide an unsubscribe mechanism + fun register(messagingService: MessagingService, walletMonitorNodeInfo: NodeInfo) { + val monitorClient = WalletMonitorClient( + messagingService, + walletMonitorNodeInfo, + clientToServiceSource, + serviceToClientSource + ) + require(monitorClient.register().get()) + } +}