diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt index 5ccc04a7c4..198fa779c5 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt @@ -1,5 +1,6 @@ package net.corda.client.jfx.utils +import javafx.application.Platform import javafx.beans.binding.Bindings import javafx.beans.binding.BooleanBinding import javafx.beans.property.ReadOnlyObjectWrapper @@ -10,7 +11,13 @@ import javafx.collections.MapChangeListener import javafx.collections.ObservableList import javafx.collections.ObservableMap import javafx.collections.transformation.FilteredList +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateAndRef +import net.corda.core.messaging.DataFeed +import net.corda.core.node.services.Vault import org.fxmisc.easybind.EasyBind +import rx.Observable +import rx.schedulers.Schedulers import java.util.function.Predicate /** @@ -313,3 +320,36 @@ fun ObservableList.firstOrDefault(default: ObservableValue, predicate fun ObservableList.firstOrNullObservable(predicate: (A) -> Boolean): ObservableValue { return Bindings.createObjectBinding({ this.firstOrNull(predicate) }, arrayOf(this)) } + +/** + * Modifies the given Rx observable such that emissions are run on the JavaFX GUI thread. Use this when you have an Rx + * observable that may emit in the background e.g. from the network and you wish to link it to the user interface. + * + * Note: you should use the returned observable, not the original one this method is called on. + */ +fun Observable.observeOnFXThread(): Observable = observeOn(Schedulers.from(Platform::runLater)) + +/** + * Given a [DataFeed] that contains the results of a vault query and a subsequent stream of changes, returns a JavaFX + * [ObservableList] that mirrors the streamed results on the UI thread. Note that the paging is *not* respected by this + * function: if a state is added that would not have appeared in the page in the initial query, it will still be added + * to the observable list. + * + * @see toFXListOfStates if you want just the state objects and not the ledger pointers too. + */ +fun DataFeed, Vault.Update>.toFXListOfStateRefs(): ObservableList> { + val list = FXCollections.observableArrayList(snapshot.states) + updates.observeOnFXThread().subscribe { (consumed, produced) -> + list.removeAll(consumed) + list.addAll(produced) + } + return list +} + +/** + * Returns the same list as [toFXListOfStateRefs] but which contains the states instead of [StateAndRef] wrappers. + * The same notes apply as with that function. + */ +fun DataFeed, Vault.Update>.toFXListOfStates(): ObservableList { + return toFXListOfStateRefs().map { it.state.data } +} \ No newline at end of file