client: reactfx->rx

This commit is contained in:
Andras Slemmer 2016-08-30 15:26:13 +01:00
parent 4c36072849
commit 435754a043
5 changed files with 47 additions and 28 deletions

@ -5,13 +5,13 @@ import javafx.beans.property.SimpleObjectProperty
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
import org.reactfx.EventStream
import rx.Observable
/**
* Simple utilities for converting an [EventStream] into an [ObservableValue]/[ObservableList]
* Simple utilities for converting an [rx.Observable] into an [ObservableValue]/[ObservableList]
*/
fun <A, B> EventStream<A>.foldToObservable(initial: B, folderFun: (A, B) -> B): ObservableValue<B> {
fun <A, B> Observable<A>.foldToObservable(initial: B, folderFun: (A, B) -> B): ObservableValue<B> {
val result = SimpleObjectProperty<B>(initial)
subscribe {
Platform.runLater {
@ -21,7 +21,7 @@ fun <A, B> EventStream<A>.foldToObservable(initial: B, folderFun: (A, B) -> B):
return result
}
fun <A, B, C> EventStream<A>.foldToObservableList(
fun <A, B, C> Observable<A>.foldToObservableList(
initialAccumulator: C, folderFun: (A, C, ObservableList<B>) -> C
): ObservableList<B> {
val result = FXCollections.observableArrayList<B>()

@ -9,6 +9,7 @@ import com.r3corda.node.services.monitor.ServiceToClientEvent
import javafx.collections.ObservableList
import kotlinx.support.jdk8.collections.removeIf
import org.reactfx.EventStream
import rx.Observable
class StatesDiff<out T : ContractState>(
val added: Collection<StateAndRef<T>>,
@ -19,8 +20,8 @@ class StatesDiff<out T : ContractState>(
* This model exposes the list of owned contract states.
*/
class ContractStateModel {
private val serviceToClient: EventStream<ServiceToClientEvent> by stream(WalletMonitorModel::serviceToClient)
private val outputStates = serviceToClient.filter(ServiceToClientEvent.OutputState::class.java)
private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient)
private val outputStates = serviceToClient.ofType(ServiceToClientEvent.OutputState::class.java)
val contractStatesDiff = outputStates.map { StatesDiff(it.produced, it.consumed) }
// We filter the diff first rather than the complete contract state list.

@ -6,6 +6,8 @@ import javafx.beans.value.WritableValue
import javafx.collections.ObservableList
import org.reactfx.EventSink
import org.reactfx.EventStream
import rx.Observable
import rx.Observer
import java.util.*
import kotlin.reflect.KClass
import kotlin.reflect.KProperty
@ -20,16 +22,22 @@ import kotlin.reflect.KProperty
* General rule of thumb: A stream/observable should be a model if it may be reused several times.
*
* Usage:
* // Inject service -> client stream
* private val serviceToClient: EventStream<ServiceToClientEvent> by stream(WalletMonitorModel::serviceToClient)
* // Inject service -> client event stream
* private val serviceToClient: EventStream<ServiceToClientEvent> by eventStream(WalletMonitorModel::serviceToClient)
*
*/
inline fun <reified M : Any, T> stream(noinline streamProperty: (M) -> EventStream<T>) =
TrackedDelegate.Stream(M::class, streamProperty)
inline fun <reified M : Any, T> observable(noinline observableProperty: (M) -> Observable<T>) =
TrackedDelegate.ObservableDelegate(M::class, observableProperty)
inline fun <reified M : Any, T> sink(noinline sinkProperty: (M) -> EventSink<T>) =
TrackedDelegate.Sink(M::class, sinkProperty)
inline fun <reified M : Any, T> observer(noinline observerProperty: (M) -> Observer<T>) =
TrackedDelegate.ObserverDelegate(M::class, observerProperty)
inline fun <reified M : Any, T> eventStream(noinline streamProperty: (M) -> EventStream<T>) =
TrackedDelegate.EventStreamDelegate(M::class, streamProperty)
inline fun <reified M : Any, T> eventSink(noinline sinkProperty: (M) -> EventSink<T>) =
TrackedDelegate.EventSinkDelegate(M::class, sinkProperty)
inline fun <reified M : Any, T> observableValue(noinline observableValueProperty: (M) -> ObservableValue<T>) =
TrackedDelegate.ObservableValueDelegate(M::class, observableValueProperty)
@ -67,14 +75,24 @@ object Models {
sealed class TrackedDelegate<M : Any>(val klass: KClass<M>) {
init { Models.initModel(klass) }
class Stream<M : Any, T> (klass: KClass<M>, val streamProperty: (M) -> EventStream<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): EventStream<T> {
return streamProperty(Models.get(klass, thisRef.javaClass.kotlin))
class ObservableDelegate<M : Any, T> (klass: KClass<M>, val eventStreamProperty: (M) -> Observable<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): Observable<T> {
return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class Sink<M : Any, T> (klass: KClass<M>, val sinkProperty: (M) -> EventSink<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): EventSink<T> {
return sinkProperty(Models.get(klass, thisRef.javaClass.kotlin))
class ObserverDelegate<M : Any, T> (klass: KClass<M>, val eventStreamProperty: (M) -> Observer<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): Observer<T> {
return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class EventStreamDelegate<M : Any, T> (klass: KClass<M>, val eventStreamProperty: (M) -> org.reactfx.EventStream<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): org.reactfx.EventStream<T> {
return eventStreamProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class EventSinkDelegate<M : Any, T> (klass: KClass<M>, val eventSinkProperty: (M) -> org.reactfx.EventSink<T>) : TrackedDelegate<M>(klass) {
operator fun getValue(thisRef: Any, property: KProperty<*>): org.reactfx.EventSink<T> {
return eventSinkProperty(Models.get(klass, thisRef.javaClass.kotlin))
}
}
class ObservableValueDelegate<M : Any, T>(klass: KClass<M>, val observableValueProperty: (M) -> ObservableValue<T>) : TrackedDelegate<M>(klass) {

@ -8,9 +8,9 @@ import com.r3corda.node.utilities.AddOrRemove
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.value.ObservableValue
import javafx.collections.ObservableList
import org.reactfx.EventStream
import rx.Observable
import java.time.Instant
import java.util.*
import java.util.UUID
sealed class TransactionCreateStatus() {
class Started(val message: String?) : TransactionCreateStatus()
@ -63,7 +63,7 @@ data class TransactionCreateStateWritable(
*/
class TransactionCreateStateModel {
private val serviceToClient: EventStream<ServiceToClientEvent> by stream(WalletMonitorModel::serviceToClient)
private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient)
/**
* Aggregation of updates to transactions. We use the observable list as the only container and do linear search for

@ -5,19 +5,19 @@ import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.NodeInfo
import com.r3corda.node.services.monitor.ServiceToClientEvent
import org.reactfx.EventSink
import org.reactfx.EventSource
import org.reactfx.EventStream
import rx.Observable
import rx.Observer
import rx.subjects.PublishSubject
/**
* This model exposes raw event streams to and from the [WalletMonitorService] through a [WalletMonitorClient]
*/
class WalletMonitorModel {
private val clientToServiceSource = EventSource<ClientToServiceCommand>()
val clientToService: EventSink<ClientToServiceCommand> = clientToServiceSource
private val clientToServiceSource = PublishSubject.create<ClientToServiceCommand>()
val clientToService: Observer<ClientToServiceCommand> = clientToServiceSource
private val serviceToClientSource = EventSource<ServiceToClientEvent>()
val serviceToClient: EventStream<ServiceToClientEvent> = serviceToClientSource
private val serviceToClientSource = PublishSubject.create<ServiceToClientEvent>()
val serviceToClient: Observable<ServiceToClientEvent> = serviceToClientSource
/**
* Register for updates to/from a given wallet.