Move FlowSafeSubscriber and flowSafeSubscribe under their own package

This commit is contained in:
Kyriakos Tharrouniatis 2020-02-18 15:55:19 +00:00
parent 5f939661ec
commit 3d0ad702de
4 changed files with 51 additions and 45 deletions

View File

@ -0,0 +1,44 @@
package net.corda.core.observable
import net.corda.core.observable.internal.FlowSafeSubscriber
import rx.Observable
import rx.Subscriber
import rx.observers.SafeSubscriber
/**
* [Observable.flowSafeSubscribe] is used to return an Observable, through which we can subscribe non unsubscribing [rx.Observer]s
* to the source [Observable].
*
* It unwraps an [Observer] from a [SafeSubscriber], re-wraps it with a [FlowSafeSubscriber] and then subscribes it
* to the source [Observable].
*
* In case we need to subscribe with a [SafeSubscriber] via [flowSafeSubscribe], we have to:
* 1. Declare a custom SafeSubscriber extending [SafeSubscriber].
* 2. Wrap with the custom SafeSubscriber the [rx.Observer] to be subscribed to the source [Observable].
* 3. Call [Observable.flowSafeSubscribe] with [strictMode] = false
* 4. Subscribe to the returned [Observable] passing in as argument the custom SafeSubscriber.
*/
fun <T> Observable<T>.flowSafeSubscribe(strictMode: Boolean = false): Observable<T> {
class OnFlowSafeSubscribe<T>(val source: Observable<T>): Observable.OnSubscribe<T> {
override fun call(subscriber: Subscriber<in T>) {
if (isSafeSubscriber(subscriber)) {
source.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual))
} else {
source.unsafeSubscribe(subscriber)
}
}
private fun isSafeSubscriber(subscriber: Subscriber<*>): Boolean {
return if (strictMode) {
// In strictMode mode we capture SafeSubscriber subclasses as well
SafeSubscriber::class.java.isAssignableFrom(subscriber::class.java)
} else {
subscriber::class == SafeSubscriber::class
}
}
}
return Observable.unsafeCreate(OnFlowSafeSubscribe(this))
}

View File

@ -1,7 +1,6 @@
package net.corda.core.internal package net.corda.core.observable.internal
import rx.Observable import net.corda.core.internal.VisibleForTesting
import rx.Observable.unsafeCreate
import rx.Observer import rx.Observer
import rx.Subscriber import rx.Subscriber
import rx.exceptions.CompositeException import rx.exceptions.CompositeException
@ -83,41 +82,3 @@ class FlowSafeSubscriber<T>(actual: Subscriber<in T>) : SafeSubscriber<T>(actual
*/ */
@VisibleForTesting @VisibleForTesting
class OnNextFailedException(message: String, cause: Throwable) : OnErrorNotImplementedException(message, cause) class OnNextFailedException(message: String, cause: Throwable) : OnErrorNotImplementedException(message, cause)
/**
* [Observable.flowSafeSubscribe] is used to return an Observable, through which we can subscribe non unsubscribing [rx.Observer]s
* to the source [Observable].
*
* It unwraps an [Observer] from a [SafeSubscriber], re-wraps it with a [FlowSafeSubscriber] and then subscribes it
* to the source [Observable].
*
* In case we need to subscribe with a [SafeSubscriber] via [flowSafeSubscribe], we have to:
* 1. Declare a custom SafeSubscriber extending [SafeSubscriber].
* 2. Wrap with the custom SafeSubscriber the [rx.Observer] to be subscribed to the source [Observable].
* 3. Call [Observable.flowSafeSubscribe] with [strictMode] = false
* 4. Subscribe to the returned [Observable] passing in as argument the custom SafeSubscriber.
*/
fun <T> Observable<T>.flowSafeSubscribe(strictMode: Boolean = false): Observable<T> {
class OnFlowSafeSubscribe<T>(val source: Observable<T>): Observable.OnSubscribe<T> {
override fun call(subscriber: Subscriber<in T>) {
if (isSafeSubscriber(subscriber)) {
source.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual))
} else {
source.unsafeSubscribe(subscriber)
}
}
private fun isSafeSubscriber(subscriber: Subscriber<*>): Boolean {
return if (strictMode) {
// In strictMode mode we capture SafeSubscriber subclasses as well
SafeSubscriber::class.java.isAssignableFrom(subscriber::class.java)
} else {
subscriber::class == SafeSubscriber::class
}
}
}
return unsafeCreate(OnFlowSafeSubscribe(this))
}

View File

@ -14,6 +14,7 @@ import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.* import net.corda.core.node.services.*
import net.corda.core.node.services.Vault.ConstraintInfo.Companion.constraintInfo import net.corda.core.node.services.Vault.ConstraintInfo.Companion.constraintInfo
import net.corda.core.node.services.vault.* import net.corda.core.node.services.vault.*
import net.corda.core.observable.flowSafeSubscribe
import net.corda.core.schemas.PersistentStateRef import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.* import net.corda.core.transactions.*

View File

@ -3,9 +3,9 @@ package net.corda.node.utilities
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.tee import net.corda.core.internal.tee
import net.corda.core.internal.FlowSafeSubscriber import net.corda.core.observable.internal.FlowSafeSubscriber
import net.corda.core.internal.OnNextFailedException import net.corda.core.observable.internal.OnNextFailedException
import net.corda.core.internal.flowSafeSubscribe import net.corda.core.observable.flowSafeSubscribe
import net.corda.nodeapi.internal.persistence.* import net.corda.nodeapi.internal.persistence.*
import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties