diff --git a/core/src/main/kotlin/net/corda/core/observable/flowSafeSubscribe.kt b/core/src/main/kotlin/net/corda/core/observable/flowSafeSubscribe.kt new file mode 100644 index 0000000000..685c251467 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/observable/flowSafeSubscribe.kt @@ -0,0 +1,44 @@ +package net.corda.core.observable + +import net.corda.core.observable.internal.FlowSafeSubscriber +import rx.Observable +import rx.Subscriber +import rx.observers.SafeSubscriber + +/** + * [Observable.flowSafeSubscribe] is used to return an Observable, through which we can subscribe non unsubscribing [rx.Observer]s + * to the source [Observable]. + * + * It unwraps an [Observer] from a [SafeSubscriber], re-wraps it with a [FlowSafeSubscriber] and then subscribes it + * to the source [Observable]. + * + * In case we need to subscribe with a [SafeSubscriber] via [flowSafeSubscribe], we have to: + * 1. Declare a custom SafeSubscriber extending [SafeSubscriber]. + * 2. Wrap with the custom SafeSubscriber the [rx.Observer] to be subscribed to the source [Observable]. + * 3. Call [Observable.flowSafeSubscribe] with [strictMode] = false + * 4. Subscribe to the returned [Observable] passing in as argument the custom SafeSubscriber. + */ +fun Observable.flowSafeSubscribe(strictMode: Boolean = false): Observable { + + class OnFlowSafeSubscribe(val source: Observable): Observable.OnSubscribe { + + override fun call(subscriber: Subscriber) { + if (isSafeSubscriber(subscriber)) { + source.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual)) + } else { + source.unsafeSubscribe(subscriber) + } + } + + private fun isSafeSubscriber(subscriber: Subscriber<*>): Boolean { + return if (strictMode) { + // In strictMode mode we capture SafeSubscriber subclasses as well + SafeSubscriber::class.java.isAssignableFrom(subscriber::class.java) + } else { + subscriber::class == SafeSubscriber::class + } + } + } + + return Observable.unsafeCreate(OnFlowSafeSubscribe(this)) +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowSafeSubscriber.kt b/core/src/main/kotlin/net/corda/core/observable/internal/FlowSafeSubscriber.kt similarity index 65% rename from core/src/main/kotlin/net/corda/core/internal/FlowSafeSubscriber.kt rename to core/src/main/kotlin/net/corda/core/observable/internal/FlowSafeSubscriber.kt index 8e4d641f23..7f7e71a3d1 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowSafeSubscriber.kt +++ b/core/src/main/kotlin/net/corda/core/observable/internal/FlowSafeSubscriber.kt @@ -1,7 +1,6 @@ -package net.corda.core.internal +package net.corda.core.observable.internal -import rx.Observable -import rx.Observable.unsafeCreate +import net.corda.core.internal.VisibleForTesting import rx.Observer import rx.Subscriber import rx.exceptions.CompositeException @@ -82,42 +81,4 @@ class FlowSafeSubscriber(actual: Subscriber) : SafeSubscriber(actual * the exception will be re-thrown at [Exceptions.throwOrReport]. */ @VisibleForTesting -class OnNextFailedException(message: String, cause: Throwable) : OnErrorNotImplementedException(message, cause) - -/** - * [Observable.flowSafeSubscribe] is used to return an Observable, through which we can subscribe non unsubscribing [rx.Observer]s - * to the source [Observable]. - * - * It unwraps an [Observer] from a [SafeSubscriber], re-wraps it with a [FlowSafeSubscriber] and then subscribes it - * to the source [Observable]. - * - * In case we need to subscribe with a [SafeSubscriber] via [flowSafeSubscribe], we have to: - * 1. Declare a custom SafeSubscriber extending [SafeSubscriber]. - * 2. Wrap with the custom SafeSubscriber the [rx.Observer] to be subscribed to the source [Observable]. - * 3. Call [Observable.flowSafeSubscribe] with [strictMode] = false - * 4. Subscribe to the returned [Observable] passing in as argument the custom SafeSubscriber. - */ -fun Observable.flowSafeSubscribe(strictMode: Boolean = false): Observable { - - class OnFlowSafeSubscribe(val source: Observable): Observable.OnSubscribe { - - override fun call(subscriber: Subscriber) { - if (isSafeSubscriber(subscriber)) { - source.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual)) - } else { - source.unsafeSubscribe(subscriber) - } - } - - private fun isSafeSubscriber(subscriber: Subscriber<*>): Boolean { - return if (strictMode) { - // In strictMode mode we capture SafeSubscriber subclasses as well - SafeSubscriber::class.java.isAssignableFrom(subscriber::class.java) - } else { - subscriber::class == SafeSubscriber::class - } - } - } - - return unsafeCreate(OnFlowSafeSubscribe(this)) -} \ No newline at end of file +class OnNextFailedException(message: String, cause: Throwable) : OnErrorNotImplementedException(message, cause) \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 365a4598be..74ab36e58d 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -14,6 +14,7 @@ import net.corda.core.node.StatesToRecord import net.corda.core.node.services.* import net.corda.core.node.services.Vault.ConstraintInfo.Companion.constraintInfo import net.corda.core.node.services.vault.* +import net.corda.core.observable.flowSafeSubscribe import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.* diff --git a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt index d29ce5ac9e..fc42bc3984 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt @@ -3,9 +3,9 @@ package net.corda.node.utilities import com.google.common.util.concurrent.SettableFuture import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.tee -import net.corda.core.internal.FlowSafeSubscriber -import net.corda.core.internal.OnNextFailedException -import net.corda.core.internal.flowSafeSubscribe +import net.corda.core.observable.internal.FlowSafeSubscriber +import net.corda.core.observable.internal.OnNextFailedException +import net.corda.core.observable.flowSafeSubscribe import net.corda.nodeapi.internal.persistence.* import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties