diff --git a/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt b/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt index 7f8e61bde7..65795bdab1 100644 --- a/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt +++ b/client/src/main/kotlin/com/r3corda/client/fxutils/EventStreamFold.kt @@ -5,13 +5,13 @@ import javafx.beans.property.SimpleObjectProperty import javafx.beans.value.ObservableValue import javafx.collections.FXCollections import javafx.collections.ObservableList -import org.reactfx.EventStream +import rx.Observable /** - * Simple utilities for converting an [EventStream] into an [ObservableValue]/[ObservableList] + * Simple utilities for converting an [rx.Observable] into an [ObservableValue]/[ObservableList] */ -fun <A, B> EventStream<A>.foldToObservable(initial: B, folderFun: (A, B) -> B): ObservableValue<B> { +fun <A, B> Observable<A>.foldToObservable(initial: B, folderFun: (A, B) -> B): ObservableValue<B> { val result = SimpleObjectProperty<B>(initial) subscribe { Platform.runLater { @@ -21,7 +21,7 @@ fun <A, B> EventStream<A>.foldToObservable(initial: B, folderFun: (A, B) -> B): return result } -fun <A, B, C> EventStream<A>.foldToObservableList( +fun <A, B, C> Observable<A>.foldToObservableList( initialAccumulator: C, folderFun: (A, C, ObservableList<B>) -> C ): ObservableList<B> { val result = FXCollections.observableArrayList<B>() diff --git a/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt index a19e86a5ed..42b070d20d 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/ContractStateModel.kt @@ -9,6 +9,7 @@ import com.r3corda.node.services.monitor.ServiceToClientEvent import javafx.collections.ObservableList import kotlinx.support.jdk8.collections.removeIf import org.reactfx.EventStream +import rx.Observable class StatesDiff<out T : ContractState>( val added: Collection<StateAndRef<T>>, @@ -19,8 +20,8 @@ class StatesDiff<out T : ContractState>( * This model exposes the list of owned contract states. */ class ContractStateModel { - private val serviceToClient: EventStream<ServiceToClientEvent> by stream(WalletMonitorModel::serviceToClient) - private val outputStates = serviceToClient.filter(ServiceToClientEvent.OutputState::class.java) + private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient) + private val outputStates = serviceToClient.ofType(ServiceToClientEvent.OutputState::class.java) val contractStatesDiff = outputStates.map { StatesDiff(it.produced, it.consumed) } // We filter the diff first rather than the complete contract state list. diff --git a/client/src/main/kotlin/com/r3corda/client/model/Models.kt b/client/src/main/kotlin/com/r3corda/client/model/Models.kt index 47e32d5dd8..e0490439a0 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/Models.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/Models.kt @@ -6,6 +6,8 @@ import javafx.beans.value.WritableValue import javafx.collections.ObservableList import org.reactfx.EventSink import org.reactfx.EventStream +import rx.Observable +import rx.Observer import java.util.* import kotlin.reflect.KClass import kotlin.reflect.KProperty @@ -20,16 +22,22 @@ import kotlin.reflect.KProperty * General rule of thumb: A stream/observable should be a model if it may be reused several times. * * Usage: - * // Inject service -> client stream - * private val serviceToClient: EventStream<ServiceToClientEvent> by stream(WalletMonitorModel::serviceToClient) + * // Inject service -> client event stream + * private val serviceToClient: EventStream<ServiceToClientEvent> by eventStream(WalletMonitorModel::serviceToClient) * */ -inline fun <reified M : Any, T> stream(noinline streamProperty: (M) -> EventStream<T>) = - TrackedDelegate.Stream(M::class, streamProperty) +inline fun <reified M : Any, T> observable(noinline observableProperty: (M) -> Observable<T>) = + TrackedDelegate.ObservableDelegate(M::class, observableProperty) -inline fun <reified M : Any, T> sink(noinline sinkProperty: (M) -> EventSink<T>) = - TrackedDelegate.Sink(M::class, sinkProperty) +inline fun <reified M : Any, T> observer(noinline observerProperty: (M) -> Observer<T>) = + TrackedDelegate.ObserverDelegate(M::class, observerProperty) + +inline fun <reified M : Any, T> eventStream(noinline streamProperty: (M) -> EventStream<T>) = + TrackedDelegate.EventStreamDelegate(M::class, streamProperty) + +inline fun <reified M : Any, T> eventSink(noinline sinkProperty: (M) -> EventSink<T>) = + TrackedDelegate.EventSinkDelegate(M::class, sinkProperty) inline fun <reified M : Any, T> observableValue(noinline observableValueProperty: (M) -> ObservableValue<T>) = TrackedDelegate.ObservableValueDelegate(M::class, observableValueProperty) @@ -67,14 +75,24 @@ object Models { sealed class TrackedDelegate<M : Any>(val klass: KClass<M>) { init { Models.initModel(klass) } - class Stream<M : Any, T> (klass: KClass<M>, val streamProperty: (M) -> EventStream<T>) : TrackedDelegate<M>(klass) { - operator fun getValue(thisRef: Any, property: KProperty<*>): EventStream<T> { - return streamProperty(Models.get(klass, thisRef.javaClass.kotlin)) + class ObservableDelegate<M : Any, T> (klass: KClass<M>, val eventStreamProperty: (M) -> Observable<T>) : TrackedDelegate<M>(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): Observable<T> { + return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin)) } } - class Sink<M : Any, T> (klass: KClass<M>, val sinkProperty: (M) -> EventSink<T>) : TrackedDelegate<M>(klass) { - operator fun getValue(thisRef: Any, property: KProperty<*>): EventSink<T> { - return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) + class ObserverDelegate<M : Any, T> (klass: KClass<M>, val eventStreamProperty: (M) -> Observer<T>) : TrackedDelegate<M>(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): Observer<T> { + return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class EventStreamDelegate<M : Any, T> (klass: KClass<M>, val eventStreamProperty: (M) -> org.reactfx.EventStream<T>) : TrackedDelegate<M>(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): org.reactfx.EventStream<T> { + return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin)) + } + } + class EventSinkDelegate<M : Any, T> (klass: KClass<M>, val eventSinkProperty: (M) -> org.reactfx.EventSink<T>) : TrackedDelegate<M>(klass) { + operator fun getValue(thisRef: Any, property: KProperty<*>): org.reactfx.EventSink<T> { + return eventSinkProperty(Models.get(klass, thisRef.javaClass.kotlin)) } } class ObservableValueDelegate<M : Any, T>(klass: KClass<M>, val observableValueProperty: (M) -> ObservableValue<T>) : TrackedDelegate<M>(klass) { diff --git a/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt b/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt index 988381c114..a938d7ab26 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/TransactionCreateStateModel.kt @@ -8,9 +8,9 @@ import com.r3corda.node.utilities.AddOrRemove import javafx.beans.property.SimpleObjectProperty import javafx.beans.value.ObservableValue import javafx.collections.ObservableList -import org.reactfx.EventStream +import rx.Observable import java.time.Instant -import java.util.* +import java.util.UUID sealed class TransactionCreateStatus() { class Started(val message: String?) : TransactionCreateStatus() @@ -63,7 +63,7 @@ data class TransactionCreateStateWritable( */ class TransactionCreateStateModel { - private val serviceToClient: EventStream<ServiceToClientEvent> by stream(WalletMonitorModel::serviceToClient) + private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient) /** * Aggregation of updates to transactions. We use the observable list as the only container and do linear search for diff --git a/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt b/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt index 0b692bb49d..beac467349 100644 --- a/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt +++ b/client/src/main/kotlin/com/r3corda/client/model/WalletMonitorModel.kt @@ -5,19 +5,19 @@ import com.r3corda.core.contracts.ClientToServiceCommand import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.NodeInfo import com.r3corda.node.services.monitor.ServiceToClientEvent -import org.reactfx.EventSink -import org.reactfx.EventSource -import org.reactfx.EventStream +import rx.Observable +import rx.Observer +import rx.subjects.PublishSubject /** * This model exposes raw event streams to and from the [WalletMonitorService] through a [WalletMonitorClient] */ class WalletMonitorModel { - private val clientToServiceSource = EventSource<ClientToServiceCommand>() - val clientToService: EventSink<ClientToServiceCommand> = clientToServiceSource + private val clientToServiceSource = PublishSubject.create<ClientToServiceCommand>() + val clientToService: Observer<ClientToServiceCommand> = clientToServiceSource - private val serviceToClientSource = EventSource<ServiceToClientEvent>() - val serviceToClient: EventStream<ServiceToClientEvent> = serviceToClientSource + private val serviceToClientSource = PublishSubject.create<ServiceToClientEvent>() + val serviceToClient: Observable<ServiceToClientEvent> = serviceToClientSource /** * Register for updates to/from a given wallet.