Client observable improvement (#56)

* merge foldToObservableList and foldToObservableMap to fold
* added a 1 second buffer to the rx observable subscription to batch up the incoming updates, to avoid flooding FX UI thread with runnable
* renamed GatheredTransactionDataModel to TransactionDataModel
This commit is contained in:
Patrick Kuo 2016-12-15 12:48:27 +00:00 committed by GitHub
parent 978ab7e35e
commit 64732f8701
6 changed files with 38 additions and 64 deletions

View File

@ -7,6 +7,7 @@ import javafx.collections.FXCollections
import javafx.collections.ObservableList
import javafx.collections.ObservableMap
import rx.Observable
import java.util.concurrent.TimeUnit
/**
* Simple utilities for converting an [rx.Observable] into a javafx [ObservableValue]/[ObservableList]
@ -29,76 +30,46 @@ fun <A, B> Observable<A>.foldToObservableValue(initial: B, folderFun: (A, B) ->
}
/**
* [foldToObservableList] takes an [rx.Observable] stream and creates an [ObservableList] out of it, while maintaining
* an accumulator.
* @param initialAccumulator The initial value of the accumulator.
* [fold] takes an [rx.Observable] stream and applies fold function on it, and collects all elements using the accumulator.
* @param accumulator The accumulator for accumulating elements.
* @param folderFun The transformation function to be called on the observable list when a new element is emitted on
* the stream, which should modify the list as needed.
*/
fun <A, B, C> Observable<A>.foldToObservableList(
initialAccumulator: C, folderFun: (A, C, ObservableList<B>) -> C
): ObservableList<B> {
val result = FXCollections.observableArrayList<B>()
fun <T, R> Observable<T>.fold(accumulator: R, folderFun: (R, T) -> Unit): R {
/**
* This capture is fine, as [Platform.runLater] runs closures in order
* This capture is fine, as [Platform.runLater] runs closures in order.
* The buffer is to avoid flooding FX thread with runnable.
*/
var currentAccumulator = initialAccumulator
subscribe {
Platform.runLater {
currentAccumulator = folderFun(it, currentAccumulator, result)
buffer(1, TimeUnit.SECONDS).subscribe {
if (it.isNotEmpty()) {
Platform.runLater {
it.fold(accumulator) { list, item ->
folderFun.invoke(list, item)
list
}
}
}
}
return result
return accumulator
}
/**
* [recordInSequence] records incoming events on the [rx.Observable] in sequence.
*/
fun <A> Observable<A>.recordInSequence(): ObservableList<A> {
return foldToObservableList(Unit) { newElement, _unit, list ->
return fold(FXCollections.observableArrayList()) { list, newElement ->
list.add(newElement)
}
}
/**
* [foldToObservableMap] takes an [rx.Observable] stream and creates an [ObservableMap] out of it, while maintaining
* an accumulator.
* @param initialAccumulator The initial value of the accumulator.
* @param folderFun The transformation function to be called on the observable map when a new element is emitted on
* the stream, which should modify the map as needed.
*/
fun <A, B, K, C> Observable<A>.foldToObservableMap(
initialAccumulator: C, folderFun: (A, C, ObservableMap<K, B>) -> C
): ObservableMap<K, out B> {
val result = FXCollections.observableHashMap<K, B>()
/**
* This capture is fine, as [Platform.runLater] runs closures in order
*/
var currentAccumulator = initialAccumulator
subscribe {
Platform.runLater {
currentAccumulator = folderFun(it, currentAccumulator, result)
}
}
return result
}
/**
* This variant simply associates each event with its key.
* @param toKey Function retrieving the key to associate with.
* @param merge The function to be called if there is an existing element at the key.
*/
fun <A, K> Observable<A>.recordAsAssociation(
toKey: (A) -> K,
merge: (K, oldValue: A, newValue: A) -> A = { _key, _oldValue, newValue -> newValue }
): ObservableMap<K, out A> {
return foldToObservableMap(Unit) { newElement, _unit, map ->
val key = toKey(newElement)
val oldValue = map.get(key)
if (oldValue != null) {
map.set(key, merge(key, oldValue, newElement))
} else {
map.set(key, newElement)
}
fun <A, K> Observable<A>.recordAsAssociation(toKey: (A) -> K, merge: (K, oldValue: A, newValue: A) -> A = { _key, _oldValue, newValue -> newValue }): ObservableMap<K, A> {
return fold(FXCollections.observableHashMap<K, A>()) { map, item ->
val key = toKey(item)
map[key] = map[key]?.let { merge(key, it, item) } ?: item
}
}

View File

@ -1,8 +1,9 @@
package net.corda.client.model
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import kotlinx.support.jdk8.collections.removeIf
import net.corda.client.fxutils.foldToObservableList
import net.corda.client.fxutils.fold
import net.corda.client.fxutils.map
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.ContractState
@ -29,11 +30,10 @@ class ContractStateModel {
// We can't filter removed hashes here as we don't have type info
Diff(it.added.filterCashStateAndRefs(), it.removed)
}
val cashStates: ObservableList<StateAndRef<Cash.State>> =
cashStatesDiff.foldToObservableList(Unit) { statesDiff, _accumulator, observableList ->
observableList.removeIf { it.ref in statesDiff.removed }
observableList.addAll(statesDiff.added)
}
val cashStates: ObservableList<StateAndRef<Cash.State>> = cashStatesDiff.fold(FXCollections.observableArrayList()) { list, statesDiff ->
list.removeIf { it.ref in statesDiff.removed }
list.addAll(statesDiff.added)
}
val cash = cashStates.map { it.state.data.amount }

View File

@ -1,11 +1,12 @@
package net.corda.client.model
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import kotlinx.support.jdk8.collections.removeIf
import net.corda.client.fxutils.firstOrDefault
import net.corda.client.fxutils.firstOrNullObservable
import net.corda.client.fxutils.foldToObservableList
import net.corda.client.fxutils.fold
import net.corda.client.fxutils.map
import net.corda.core.crypto.CompositeKey
import net.corda.core.node.NodeInfo
@ -17,15 +18,15 @@ class NetworkIdentityModel {
private val networkIdentityObservable by observable(NodeMonitorModel::networkMap)
val networkIdentities: ObservableList<NodeInfo> =
networkIdentityObservable.foldToObservableList(Unit) { update, _accumulator, observableList ->
observableList.removeIf {
networkIdentityObservable.fold(FXCollections.observableArrayList()) { list, update ->
list.removeIf {
when (update) {
is MapChange.Removed -> it == update.node
is MapChange.Modified -> it == update.previousNode
else -> false
}
}
observableList.addAll(update.node)
list.addAll(update.node)
}
private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable)

View File

@ -2,6 +2,7 @@ package net.corda.client.model
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import javafx.collections.ObservableMap
import net.corda.client.fxutils.*
@ -80,7 +81,7 @@ data class StateMachineData(
/**
* This model provides an observable list of transactions and what state machines/flows recorded them
*/
class GatheredTransactionDataModel {
class TransactionDataModel {
private val transactions by observable(NodeMonitorModel::transactions)
private val stateMachineUpdates by observable(NodeMonitorModel::stateMachineUpdates)
private val progressTracking by observable(NodeMonitorModel::progressTracking)
@ -89,7 +90,7 @@ class GatheredTransactionDataModel {
private val collectedTransactions = transactions.recordInSequence()
private val transactionMap = collectedTransactions.associateBy(SignedTransaction::id)
private val progressEvents = progressTracking.recordAsAssociation(ProgressTrackingEvent::stateMachineId)
private val stateMachineStatus = stateMachineUpdates.foldToObservableMap(Unit) { update, _unit, map: ObservableMap<StateMachineRunId, SimpleObjectProperty<StateMachineStatus>> ->
private val stateMachineStatus = stateMachineUpdates.fold(FXCollections.observableHashMap<StateMachineRunId, SimpleObjectProperty<StateMachineStatus>>()) { map, update ->
when (update) {
is StateMachineUpdate.Added -> {
val added: SimpleObjectProperty<StateMachineStatus> =
@ -102,6 +103,7 @@ class GatheredTransactionDataModel {
added.set(StateMachineStatus.Removed(added.value.stateMachineName))
}
}
map
}
private val stateMachineDataList = LeftOuterJoinedMap(stateMachineStatus, progressEvents) { id, status, progress ->
StateMachineData(id, progress.map { it?.let { FlowStatus(it.message) } }, status)

View File

@ -38,7 +38,7 @@ class Network : CordaView() {
val myIdentity by observableValue(NetworkIdentityModel::myIdentity)
val notaries by observableList(NetworkIdentityModel::notaries)
val peers by observableList(NetworkIdentityModel::parties)
val transactions by observableList(GatheredTransactionDataModel::partiallyResolvedTransactions)
val transactions by observableList(TransactionDataModel::partiallyResolvedTransactions)
// UI components
private val myIdentityPane by fxid<BorderPane>()
private val notaryList by fxid<VBox>()

View File

@ -44,7 +44,7 @@ class TransactionViewer : CordaView("Transactions") {
private val transactionViewTable by fxid<TableView<Transaction>>()
private val matchingTransactionsLabel by fxid<Label>()
// Inject data
private val transactions by observableListReadOnly(GatheredTransactionDataModel::partiallyResolvedTransactions)
private val transactions by observableListReadOnly(TransactionDataModel::partiallyResolvedTransactions)
private val reportingExchange by observableValue(ReportingCurrencyModel::reportingExchange)
private val reportingCurrency by observableValue(ReportingCurrencyModel::reportingCurrency)
private val myIdentity by observableValue(NetworkIdentityModel::myIdentity)
@ -155,7 +155,7 @@ class TransactionViewer : CordaView("Transactions") {
private fun ObservableList<StateAndRef<ContractState>>.toText() = map { it.contract().javaClass.simpleName }.groupBy { it }.map { "${it.key} (${it.value.size})" }.joinToString()
private class TransactionWidget() : BorderPane() {
private val partiallyResolvedTransactions by observableListReadOnly(GatheredTransactionDataModel::partiallyResolvedTransactions)
private val partiallyResolvedTransactions by observableListReadOnly(TransactionDataModel::partiallyResolvedTransactions)
// TODO : Add a scrolling table to show latest transaction.
// TODO : Add a chart to show types of transactions.