From 435754a043380e832b4f13cb5288d79ddd18774d Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 30 Aug 2016 15:26:13 +0100 Subject: [PATCH] client: reactfx->rx --- .../r3corda/client/fxutils/EventStreamFold.kt | 8 ++-- .../client/model/ContractStateModel.kt | 5 ++- .../kotlin/com/r3corda/client/model/Models.kt | 42 +++++++++++++------ .../model/TransactionCreateStateModel.kt | 6 +-- .../client/model/WalletMonitorModel.kt | 14 +++---- 5 files changed, 47 insertions(+), 28 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt b/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt index 7f8e61bde7..65795bdab1 100644 --- a/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt +++ b/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt @@ -5,13 +5,13 @@ import javafx.beans.property.SimpleObjectProperty import javafx.beans.value.ObservableValue import javafx.collections.FXCollections import javafx.collections.ObservableList -import org.reactfx.EventStream +import rx.Observable /** - * Simple utilities for converting an [EventStream] into an [ObservableValue]/[ObservableList] + * Simple utilities for converting an [rx.Observable] into an [ObservableValue]/[ObservableList] */ -fun EventStream.foldToObservable(initial: B, folderFun: (A, B) -> B): ObservableValue { +fun Observable.foldToObservable(initial: B, folderFun: (A, B) -> B): ObservableValue { val result = SimpleObjectProperty(initial) subscribe { Platform.runLater { @@ -21,7 +21,7 @@ fun EventStream.foldToObservable(initial: B, folderFun: (A, B) -> B): return result } -fun EventStream.foldToObservableList( +fun Observable.foldToObservableList( initialAccumulator: C, folderFun: (A, C, ObservableList) -> C ): ObservableList { val result = FXCollections.observableArrayList() diff --git a/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt index a19e86a5ed..42b070d20d 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt @@ -9,6 +9,7 @@ import com.r3corda.node.services.monitor.ServiceToClientEvent import javafx.collections.ObservableList import kotlinx.support.jdk8.collections.removeIf import org.reactfx.EventStream +import rx.Observable class StatesDiff( val added: Collection>, @@ -19,8 +20,8 @@ class StatesDiff( * This model exposes the list of owned contract states. */ class ContractStateModel { - private val serviceToClient: EventStream by stream(WalletMonitorModel::serviceToClient) - private val outputStates = serviceToClient.filter(ServiceToClientEvent.OutputState::class.java) + private val serviceToClient: Observable by observable(WalletMonitorModel::serviceToClient) + private val outputStates = serviceToClient.ofType(ServiceToClientEvent.OutputState::class.java) val contractStatesDiff = outputStates.map { StatesDiff(it.produced, it.consumed) } // We filter the diff first rather than the complete contract state list. diff --git a/client/src/main/kotlin/com/r3corda/client/model/Models.kt b/client/src/main/kotlin/com/r3corda/client/model/Models.kt index 47e32d5dd8..e0490439a0 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/Models.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/Models.kt @@ -6,6 +6,8 @@ import javafx.beans.value.WritableValue import javafx.collections.ObservableList import org.reactfx.EventSink import org.reactfx.EventStream +import rx.Observable +import rx.Observer import java.util.* import kotlin.reflect.KClass import kotlin.reflect.KProperty @@ -20,16 +22,22 @@ import kotlin.reflect.KProperty * General rule of thumb: A stream/observable should be a model if it may be reused several times. * * Usage: - * // Inject service -> client stream - * private val serviceToClient: EventStream by stream(WalletMonitorModel::serviceToClient) + * // Inject service -> client event stream + * private val serviceToClient: EventStream by eventStream(WalletMonitorModel::serviceToClient) * */ -inline fun stream(noinline streamProperty: (M) -> EventStream) = - TrackedDelegate.Stream(M::class, streamProperty) +inline fun observable(noinline observableProperty: (M) -> Observable) = + TrackedDelegate.ObservableDelegate(M::class, observableProperty) -inline fun sink(noinline sinkProperty: (M) -> EventSink) = - TrackedDelegate.Sink(M::class, sinkProperty) +inline fun observer(noinline observerProperty: (M) -> Observer) = + TrackedDelegate.ObserverDelegate(M::class, observerProperty) + +inline fun eventStream(noinline streamProperty: (M) -> EventStream) = + TrackedDelegate.EventStreamDelegate(M::class, streamProperty) + +inline fun eventSink(noinline sinkProperty: (M) -> EventSink) = + TrackedDelegate.EventSinkDelegate(M::class, sinkProperty) inline fun observableValue(noinline observableValueProperty: (M) -> ObservableValue) = TrackedDelegate.ObservableValueDelegate(M::class, observableValueProperty) @@ -67,14 +75,24 @@ object Models { sealed class TrackedDelegate(val klass: KClass) { init { Models.initModel(klass) } - class Stream (klass: KClass, val streamProperty: (M) -> EventStream) : TrackedDelegate(klass) { - operator fun getValue(thisRef: Any, property: KProperty<*>): EventStream { - return streamProperty(Models.get(klass, thisRef.javaClass.kotlin)) + class ObservableDelegate (klass: KClass, val eventStreamProperty: (M) -> Observable) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): Observable { + return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin)) } } - class Sink (klass: KClass, val sinkProperty: (M) -> EventSink) : TrackedDelegate(klass) { - operator fun getValue(thisRef: Any, property: KProperty<*>): EventSink { - return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) + class ObserverDelegate (klass: KClass, val eventStreamProperty: (M) -> Observer) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): Observer { + return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class EventStreamDelegate (klass: KClass, val eventStreamProperty: (M) -> org.reactfx.EventStream) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): org.reactfx.EventStream { + return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class EventSinkDelegate (klass: KClass, val eventSinkProperty: (M) -> org.reactfx.EventSink) : TrackedDelegate(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): org.reactfx.EventSink { + return eventSinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) } } class ObservableValueDelegate(klass: KClass, val observableValueProperty: (M) -> ObservableValue) : TrackedDelegate(klass) { diff --git a/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt b/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt index 988381c114..a938d7ab26 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt @@ -8,9 +8,9 @@ import com.r3corda.node.utilities.AddOrRemove import javafx.beans.property.SimpleObjectProperty import javafx.beans.value.ObservableValue import javafx.collections.ObservableList -import org.reactfx.EventStream +import rx.Observable import java.time.Instant -import java.util.* +import java.util.UUID sealed class TransactionCreateStatus() { class Started(val message: String?) : TransactionCreateStatus() @@ -63,7 +63,7 @@ data class TransactionCreateStateWritable( */ class TransactionCreateStateModel { - private val serviceToClient: EventStream by stream(WalletMonitorModel::serviceToClient) + private val serviceToClient: Observable by observable(WalletMonitorModel::serviceToClient) /** * Aggregation of updates to transactions. We use the observable list as the only container and do linear search for diff --git a/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt b/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt index 0b692bb49d..beac467349 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt @@ -5,19 +5,19 @@ import com.r3corda.core.contracts.ClientToServiceCommand import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.NodeInfo import com.r3corda.node.services.monitor.ServiceToClientEvent -import org.reactfx.EventSink -import org.reactfx.EventSource -import org.reactfx.EventStream +import rx.Observable +import rx.Observer +import rx.subjects.PublishSubject /** * This model exposes raw event streams to and from the [WalletMonitorService] through a [WalletMonitorClient] */ class WalletMonitorModel { - private val clientToServiceSource = EventSource() - val clientToService: EventSink = clientToServiceSource + private val clientToServiceSource = PublishSubject.create() + val clientToService: Observer = clientToServiceSource - private val serviceToClientSource = EventSource() - val serviceToClient: EventStream = serviceToClientSource + private val serviceToClientSource = PublishSubject.create() + val serviceToClient: Observable = serviceToClientSource /** * Register for updates to/from a given wallet.