diff --git a/client/src/main/kotlin/net/corda/client/fxutils/ObservableFold.kt b/client/src/main/kotlin/net/corda/client/fxutils/ObservableFold.kt index 6e77fcdc4e..d9c34b11dd 100644 --- a/client/src/main/kotlin/net/corda/client/fxutils/ObservableFold.kt +++ b/client/src/main/kotlin/net/corda/client/fxutils/ObservableFold.kt @@ -7,6 +7,7 @@ import javafx.collections.FXCollections import javafx.collections.ObservableList import javafx.collections.ObservableMap import rx.Observable +import java.util.concurrent.TimeUnit /** * Simple utilities for converting an [rx.Observable] into a javafx [ObservableValue]/[ObservableList] @@ -29,76 +30,46 @@ fun Observable.foldToObservableValue(initial: B, folderFun: (A, B) -> } /** - * [foldToObservableList] takes an [rx.Observable] stream and creates an [ObservableList] out of it, while maintaining - * an accumulator. - * @param initialAccumulator The initial value of the accumulator. + * [fold] takes an [rx.Observable] stream and applies fold function on it, and collects all elements using the accumulator. + * @param accumulator The accumulator for accumulating elements. * @param folderFun The transformation function to be called on the observable list when a new element is emitted on * the stream, which should modify the list as needed. */ -fun Observable.foldToObservableList( - initialAccumulator: C, folderFun: (A, C, ObservableList) -> C -): ObservableList { - val result = FXCollections.observableArrayList() +fun Observable.fold(accumulator: R, folderFun: (R, T) -> Unit): R { /** - * This capture is fine, as [Platform.runLater] runs closures in order + * This capture is fine, as [Platform.runLater] runs closures in order. + * The buffer is to avoid flooding FX thread with runnable. */ - var currentAccumulator = initialAccumulator - subscribe { - Platform.runLater { - currentAccumulator = folderFun(it, currentAccumulator, result) + buffer(1, TimeUnit.SECONDS).subscribe { + if (it.isNotEmpty()) { + Platform.runLater { + it.fold(accumulator) { list, item -> + folderFun.invoke(list, item) + list + } + } } } - return result + return accumulator } /** * [recordInSequence] records incoming events on the [rx.Observable] in sequence. */ fun Observable.recordInSequence(): ObservableList { - return foldToObservableList(Unit) { newElement, _unit, list -> + return fold(FXCollections.observableArrayList()) { list, newElement -> list.add(newElement) } } -/** - * [foldToObservableMap] takes an [rx.Observable] stream and creates an [ObservableMap] out of it, while maintaining - * an accumulator. - * @param initialAccumulator The initial value of the accumulator. - * @param folderFun The transformation function to be called on the observable map when a new element is emitted on - * the stream, which should modify the map as needed. - */ -fun Observable.foldToObservableMap( - initialAccumulator: C, folderFun: (A, C, ObservableMap) -> C -): ObservableMap { - val result = FXCollections.observableHashMap() - /** - * This capture is fine, as [Platform.runLater] runs closures in order - */ - var currentAccumulator = initialAccumulator - subscribe { - Platform.runLater { - currentAccumulator = folderFun(it, currentAccumulator, result) - } - } - return result -} - /** * This variant simply associates each event with its key. * @param toKey Function retrieving the key to associate with. * @param merge The function to be called if there is an existing element at the key. */ -fun Observable.recordAsAssociation( - toKey: (A) -> K, - merge: (K, oldValue: A, newValue: A) -> A = { _key, _oldValue, newValue -> newValue } -): ObservableMap { - return foldToObservableMap(Unit) { newElement, _unit, map -> - val key = toKey(newElement) - val oldValue = map.get(key) - if (oldValue != null) { - map.set(key, merge(key, oldValue, newElement)) - } else { - map.set(key, newElement) - } +fun Observable.recordAsAssociation(toKey: (A) -> K, merge: (K, oldValue: A, newValue: A) -> A = { _key, _oldValue, newValue -> newValue }): ObservableMap { + return fold(FXCollections.observableHashMap()) { map, item -> + val key = toKey(item) + map[key] = map[key]?.let { merge(key, it, item) } ?: item } } diff --git a/client/src/main/kotlin/net/corda/client/model/ContractStateModel.kt b/client/src/main/kotlin/net/corda/client/model/ContractStateModel.kt index 8eb3c6bb5d..affa3aa7b1 100644 --- a/client/src/main/kotlin/net/corda/client/model/ContractStateModel.kt +++ b/client/src/main/kotlin/net/corda/client/model/ContractStateModel.kt @@ -1,8 +1,9 @@ package net.corda.client.model +import javafx.collections.FXCollections import javafx.collections.ObservableList import kotlinx.support.jdk8.collections.removeIf -import net.corda.client.fxutils.foldToObservableList +import net.corda.client.fxutils.fold import net.corda.client.fxutils.map import net.corda.contracts.asset.Cash import net.corda.core.contracts.ContractState @@ -29,11 +30,10 @@ class ContractStateModel { // We can't filter removed hashes here as we don't have type info Diff(it.added.filterCashStateAndRefs(), it.removed) } - val cashStates: ObservableList> = - cashStatesDiff.foldToObservableList(Unit) { statesDiff, _accumulator, observableList -> - observableList.removeIf { it.ref in statesDiff.removed } - observableList.addAll(statesDiff.added) - } + val cashStates: ObservableList> = cashStatesDiff.fold(FXCollections.observableArrayList()) { list, statesDiff -> + list.removeIf { it.ref in statesDiff.removed } + list.addAll(statesDiff.added) + } val cash = cashStates.map { it.state.data.amount } diff --git a/client/src/main/kotlin/net/corda/client/model/NetworkIdentityModel.kt b/client/src/main/kotlin/net/corda/client/model/NetworkIdentityModel.kt index c1673238a4..2b5cea50bb 100644 --- a/client/src/main/kotlin/net/corda/client/model/NetworkIdentityModel.kt +++ b/client/src/main/kotlin/net/corda/client/model/NetworkIdentityModel.kt @@ -1,11 +1,12 @@ package net.corda.client.model import javafx.beans.value.ObservableValue +import javafx.collections.FXCollections import javafx.collections.ObservableList import kotlinx.support.jdk8.collections.removeIf import net.corda.client.fxutils.firstOrDefault import net.corda.client.fxutils.firstOrNullObservable -import net.corda.client.fxutils.foldToObservableList +import net.corda.client.fxutils.fold import net.corda.client.fxutils.map import net.corda.core.crypto.CompositeKey import net.corda.core.node.NodeInfo @@ -17,15 +18,15 @@ class NetworkIdentityModel { private val networkIdentityObservable by observable(NodeMonitorModel::networkMap) val networkIdentities: ObservableList = - networkIdentityObservable.foldToObservableList(Unit) { update, _accumulator, observableList -> - observableList.removeIf { + networkIdentityObservable.fold(FXCollections.observableArrayList()) { list, update -> + list.removeIf { when (update) { is MapChange.Removed -> it == update.node is MapChange.Modified -> it == update.previousNode else -> false } } - observableList.addAll(update.node) + list.addAll(update.node) } private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable) diff --git a/client/src/main/kotlin/net/corda/client/model/GatheredTransactionDataModel.kt b/client/src/main/kotlin/net/corda/client/model/TransactionDataModel.kt similarity index 95% rename from client/src/main/kotlin/net/corda/client/model/GatheredTransactionDataModel.kt rename to client/src/main/kotlin/net/corda/client/model/TransactionDataModel.kt index 69a466b39b..596979ccef 100644 --- a/client/src/main/kotlin/net/corda/client/model/GatheredTransactionDataModel.kt +++ b/client/src/main/kotlin/net/corda/client/model/TransactionDataModel.kt @@ -2,6 +2,7 @@ package net.corda.client.model import javafx.beans.property.SimpleObjectProperty import javafx.beans.value.ObservableValue +import javafx.collections.FXCollections import javafx.collections.ObservableList import javafx.collections.ObservableMap import net.corda.client.fxutils.* @@ -80,7 +81,7 @@ data class StateMachineData( /** * This model provides an observable list of transactions and what state machines/flows recorded them */ -class GatheredTransactionDataModel { +class TransactionDataModel { private val transactions by observable(NodeMonitorModel::transactions) private val stateMachineUpdates by observable(NodeMonitorModel::stateMachineUpdates) private val progressTracking by observable(NodeMonitorModel::progressTracking) @@ -89,7 +90,7 @@ class GatheredTransactionDataModel { private val collectedTransactions = transactions.recordInSequence() private val transactionMap = collectedTransactions.associateBy(SignedTransaction::id) private val progressEvents = progressTracking.recordAsAssociation(ProgressTrackingEvent::stateMachineId) - private val stateMachineStatus = stateMachineUpdates.foldToObservableMap(Unit) { update, _unit, map: ObservableMap> -> + private val stateMachineStatus = stateMachineUpdates.fold(FXCollections.observableHashMap>()) { map, update -> when (update) { is StateMachineUpdate.Added -> { val added: SimpleObjectProperty = @@ -102,6 +103,7 @@ class GatheredTransactionDataModel { added.set(StateMachineStatus.Removed(added.value.stateMachineName)) } } + map } private val stateMachineDataList = LeftOuterJoinedMap(stateMachineStatus, progressEvents) { id, status, progress -> StateMachineData(id, progress.map { it?.let { FlowStatus(it.message) } }, status) diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt index 699a89b996..4b6d92ee8c 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt @@ -38,7 +38,7 @@ class Network : CordaView() { val myIdentity by observableValue(NetworkIdentityModel::myIdentity) val notaries by observableList(NetworkIdentityModel::notaries) val peers by observableList(NetworkIdentityModel::parties) - val transactions by observableList(GatheredTransactionDataModel::partiallyResolvedTransactions) + val transactions by observableList(TransactionDataModel::partiallyResolvedTransactions) // UI components private val myIdentityPane by fxid() private val notaryList by fxid() diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/TransactionViewer.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/TransactionViewer.kt index 5c5188210f..403cda67eb 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/TransactionViewer.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/TransactionViewer.kt @@ -44,7 +44,7 @@ class TransactionViewer : CordaView("Transactions") { private val transactionViewTable by fxid>() private val matchingTransactionsLabel by fxid