mirror of
https://github.com/corda/corda.git
synced 2025-02-26 11:20:44 +00:00
Introducing Observable.flowSafeSubscribe public API method to subscribe with -non unsubscribing- Rx.Subscribers to Observables. It also replaced FlowSafeSubject
This commit is contained in:
parent
b3941809ef
commit
6e0ae53156
@ -1,5 +1,7 @@
|
||||
package net.corda.core.internal
|
||||
|
||||
import rx.Observable
|
||||
import rx.Observable.unsafeCreate
|
||||
import rx.Observer
|
||||
import rx.Subscriber
|
||||
import rx.exceptions.CompositeException
|
||||
@ -80,4 +82,42 @@ class FlowSafeSubscriber<T>(actual: Subscriber<in T>) : SafeSubscriber<T>(actual
|
||||
* the exception will be re-thrown at [Exceptions.throwOrReport].
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class OnNextFailedException(message: String, cause: Throwable) : OnErrorNotImplementedException(message, cause)
|
||||
class OnNextFailedException(message: String, cause: Throwable) : OnErrorNotImplementedException(message, cause)
|
||||
|
||||
/**
|
||||
* [flowSafeSubscribe] is used to return an Observable, through which we can subscribe non unsubscribing [rx.Observer]s
|
||||
* to the source [Observable].
|
||||
*
|
||||
* It unwraps an [Observer] from a [SafeSubscriber], re-wraps it with a [FlowSafeSubscriber] and then subscribes it
|
||||
* to the source [Observable].
|
||||
*
|
||||
* In case we need to subscribe with a [SafeSubscriber] via [flowSafeSubscribe], we have to:
|
||||
* 1. Call it with [strictMode] = false
|
||||
* 1. Declare a custom Subscriber that will extend [SafeSubscriber].
|
||||
* 2. Wrap with the custom Subscriber the [rx.Observer] to be subscribed to [FlowSafeSubject].
|
||||
* 3. Subscribe to [FlowSafeSubject] passing the custom Subscriber.
|
||||
*/
|
||||
fun <T> Observable<T>.flowSafeSubscribe(strictMode: Boolean = false): Observable<T> {
|
||||
|
||||
class OnFlowSafeSubscribe<T>(val source: Observable<T>): Observable.OnSubscribe<T> {
|
||||
|
||||
override fun call(subscriber: Subscriber<in T>) {
|
||||
if (isSafeSubscriber(subscriber)) {
|
||||
source.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual))
|
||||
} else {
|
||||
source.unsafeSubscribe(subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
private fun isSafeSubscriber(subscriber: Subscriber<*>): Boolean {
|
||||
return if (strictMode) {
|
||||
// In strictMode mode we capture SafeSubscriber subclasses as well
|
||||
SafeSubscriber::class.java.isAssignableFrom(subscriber::class.java)
|
||||
} else {
|
||||
subscriber::class == SafeSubscriber::class
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return unsafeCreate(OnFlowSafeSubscribe(this))
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import net.corda.core.internal.FlowSafeSubscriber
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import rx.Observable.OnSubscribe
|
||||
import rx.Observer
|
||||
import rx.observers.SafeSubscriber
|
||||
import rx.subjects.Subject
|
||||
|
||||
/**
|
||||
* [FlowSafeSubject] is used to unwrap an [Observer] from a [SafeSubscriber], re-wrap it with a [FlowSafeSubscriber]
|
||||
* and then subscribe it to its underlying [Subject]. It is only used, to provide its underlying [Subject] with
|
||||
* subscribing of non unsubscribing [rx.Observer]s.
|
||||
*
|
||||
* Upon [rx.Observable.subscribe] it will wrap everything that is a non [SafeSubscriber] with a [FlowSafeSubscriber] the same way
|
||||
* [rx.subjects.PublishSubject] wraps everything that is a non [SafeSubscriber] with a [SafeSubscriber].
|
||||
*
|
||||
* In case we need to subscribe with a [SafeSubscriber] to a [FlowSafeSubject], we have to:
|
||||
* 1. Declare a custom Subscriber that will extend [SafeSubscriber].
|
||||
* 2. Wrap with the custom Subscriber the [rx.Observer] to be subscribed to [FlowSafeSubject].
|
||||
* 3. Subscribe to [FlowSafeSubject] passing the custom Subscriber.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class FlowSafeSubject<T, R>(private val actual: Subject<T, R>) : Observer<T> by actual,
|
||||
Subject<T, R>(OnSubscribe<R> { subscriber ->
|
||||
// we used '==' instead of 'is', so that we replace only instances of SafeSubscriber with FlowSafeSubscriber,
|
||||
// but leave untouched instances of a classes extending SafeSubscriber. That way, we allow subscribing SafeSubscribers.
|
||||
if (subscriber::class == SafeSubscriber::class) {
|
||||
actual.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual))
|
||||
} else {
|
||||
actual.unsafeSubscribe(subscriber)
|
||||
}
|
||||
}) {
|
||||
|
||||
override fun hasObservers(): Boolean {
|
||||
return actual.hasObservers()
|
||||
}
|
||||
}
|
@ -18,7 +18,6 @@ import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.*
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.internal.FlowSafeSubject
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.api.VaultServiceInternal
|
||||
import net.corda.node.services.schema.PersistentStateService
|
||||
@ -230,7 +229,7 @@ class NodeVaultService(
|
||||
}
|
||||
// we are not inside a flow, we are most likely inside a CordaService;
|
||||
// we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates.
|
||||
return FlowSafeSubject(_rawUpdatesPublisher)
|
||||
return _rawUpdatesPublisher.flowSafeSubscribe(strictMode = true)
|
||||
}
|
||||
|
||||
override val updates: Observable<Vault.Update<ContractState>>
|
||||
|
@ -3,9 +3,9 @@ package net.corda.node.utilities
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.tee
|
||||
import net.corda.node.internal.FlowSafeSubject
|
||||
import net.corda.core.internal.FlowSafeSubscriber
|
||||
import net.corda.core.internal.OnNextFailedException
|
||||
import net.corda.core.internal.flowSafeSubscribe
|
||||
import net.corda.nodeapi.internal.persistence.*
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
@ -13,6 +13,7 @@ import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
import rx.exceptions.CompositeException
|
||||
import rx.exceptions.OnErrorFailedException
|
||||
import rx.exceptions.OnErrorNotImplementedException
|
||||
@ -21,6 +22,7 @@ import rx.observers.SafeSubscriber
|
||||
import rx.observers.Subscribers
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.Closeable
|
||||
import java.lang.IllegalArgumentException
|
||||
import java.lang.IllegalStateException
|
||||
import java.lang.RuntimeException
|
||||
import java.util.*
|
||||
@ -227,15 +229,16 @@ class ObservablesTests {
|
||||
fun `FlowSafeSubject subscribes by default FlowSafeSubscribers, wrapped Observers will survive errors from onNext`() {
|
||||
var heartBeat1 = 0
|
||||
var heartBeat2 = 0
|
||||
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
source.subscribe { runNo ->
|
||||
val source = PublishSubject.create<Int>()
|
||||
val flowSafeSubscriber = source.flowSafeSubscribe()
|
||||
flowSafeSubscriber.subscribe { runNo ->
|
||||
// subscribes with a FlowSafeSubscriber
|
||||
heartBeat1++
|
||||
if (runNo == 1) {
|
||||
throw IllegalStateException()
|
||||
}
|
||||
}
|
||||
source.subscribe { runNo ->
|
||||
flowSafeSubscriber.subscribe { runNo ->
|
||||
// subscribes with a FlowSafeSubscriber
|
||||
heartBeat2++
|
||||
if (runNo == 2) {
|
||||
@ -257,13 +260,13 @@ class ObservablesTests {
|
||||
@Test
|
||||
fun `FlowSafeSubject unsubscribes FlowSafeSubscribers only upon explicitly calling onError`() {
|
||||
var heartBeat = 0
|
||||
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
source.subscribe { heartBeat += it }
|
||||
source.subscribe { heartBeat += it }
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.flowSafeSubscribe().subscribe { heartBeat += it }
|
||||
source.flowSafeSubscribe().subscribe { heartBeat += it }
|
||||
source.onNext(1)
|
||||
// send an onError event
|
||||
assertFailsWith<CompositeException> {
|
||||
source.onError(IllegalStateException()) // all FlowSafeSubscribers under FlowSafeSubject get unsubscribed here
|
||||
source.onError(IllegalStateException()) // all FlowSafeSubscribers under PublishSubject get unsubscribed here
|
||||
}
|
||||
source.onNext(1)
|
||||
assertEquals(2, heartBeat)
|
||||
@ -272,15 +275,15 @@ class ObservablesTests {
|
||||
@Test
|
||||
fun `FlowSafeSubject wrapped with a SafeSubscriber shuts down the whole structure, if one of them is unsafe and it throws`() {
|
||||
var heartBeat = 0
|
||||
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
source.unsafeSubscribe(Subscribers.create { runNo ->
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.unsafeSubscribe(Subscribers.create { runNo -> // subscribes unsafe; It does not wrap with FlowSafeSubscriber
|
||||
heartBeat++
|
||||
if (runNo == 1) {
|
||||
throw IllegalStateException()
|
||||
}
|
||||
})
|
||||
source.subscribe { heartBeat += it }
|
||||
// wrapping FlowSafeSubject with a SafeSubscriber
|
||||
source.flowSafeSubscribe().subscribe { heartBeat += it }
|
||||
// wrapping PublishSubject with a SafeSubscriber
|
||||
val sourceWrapper = SafeSubscriber(Subscribers.from(source))
|
||||
assertFailsWith<OnErrorFailedException> {
|
||||
sourceWrapper.onNext(1)
|
||||
@ -299,15 +302,15 @@ class ObservablesTests {
|
||||
@Test
|
||||
fun `FlowSafeSubject wrapped with a FlowSafeSubscriber will preserve the structure, if one of them is unsafe and it throws`() {
|
||||
var heartBeat = 0
|
||||
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.unsafeSubscribe(Subscribers.create { runNo ->
|
||||
heartBeat++
|
||||
if (runNo == 1) {
|
||||
throw IllegalStateException()
|
||||
}
|
||||
})
|
||||
source.subscribe { heartBeat++ }
|
||||
// wrap FlowSafeSubject with a FlowSafeSubscriber
|
||||
source.flowSafeSubscribe().subscribe { heartBeat++ }
|
||||
// wrap PublishSubject with a FlowSafeSubscriber
|
||||
val sourceWrapper = FlowSafeSubscriber(Subscribers.from(source))
|
||||
assertFailsWith<OnNextFailedException>("Observer.onNext failed, this is a non leaf FlowSafeSubscriber, therefore onError will be skipped") {
|
||||
sourceWrapper.onNext(1)
|
||||
@ -320,9 +323,9 @@ class ObservablesTests {
|
||||
fun `throwing inside onNext of a FlowSafeSubscriber leaf subscriber will call onError`() {
|
||||
var heartBeatOnNext = 0
|
||||
var heartBeatOnError = 0
|
||||
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
val source = PublishSubject.create<Int>()
|
||||
// add a leaf FlowSafeSubscriber
|
||||
source.subscribe({
|
||||
source.flowSafeSubscribe().subscribe({
|
||||
heartBeatOnNext++
|
||||
throw IllegalStateException()
|
||||
}, {
|
||||
@ -363,7 +366,7 @@ class ObservablesTests {
|
||||
|
||||
/**
|
||||
* In this test we create a chain of Subscribers with this the following order:
|
||||
* FlowSafeSubscriber_X -> FlowSafeSubject -> PublishSubject -> FlowSafeSubscriber_Y
|
||||
* FlowSafeSubscriber_X -> PublishSubject -> FlowSafeSubscriber_Y
|
||||
*
|
||||
* FlowSafeSubscriber_Y.onNext throws an error, since FlowSafeSubscriber_Y.onError is not defined,
|
||||
* it will throw a OnErrorNotImplementedException. Then it will be propagated back until FlowSafeSubscriber_X.
|
||||
@ -371,8 +374,8 @@ class ObservablesTests {
|
||||
*/
|
||||
@Test
|
||||
fun `propagated Rx exception will be rethrown at FlowSafeSubscriber onError`() {
|
||||
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
source.subscribe { throw IllegalStateException("123") } // will give a leaf FlowSafeSubscriber
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.flowSafeSubscribe().subscribe { throw IllegalStateException("123") } // will give a leaf FlowSafeSubscriber
|
||||
val sourceWrapper = FlowSafeSubscriber(Subscribers.from(source)) // will give an inner FlowSafeSubscriber
|
||||
|
||||
assertFailsWith<OnNextFailedException>("Observer.onNext failed, this is a non leaf FlowSafeSubscriber, therefore onError will be skipped") {
|
||||
@ -382,6 +385,44 @@ class ObservablesTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test flowSafeSubscribe strictMode = true replaces SafeSubscriber subclass`() {
|
||||
class CustomSafeSubscriber<T>(actual: Subscriber<in T>): SafeSubscriber<T>(actual)
|
||||
|
||||
var heartBeat = 0
|
||||
val customSafeSubscriber = CustomSafeSubscriber(
|
||||
Subscribers.create<Int> {
|
||||
heartBeat++
|
||||
throw IllegalArgumentException()
|
||||
})
|
||||
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.flowSafeSubscribe(strictMode = true).subscribe(customSafeSubscriber) // it should replace CustomSafeSubscriber with FlowSafeSubscriber
|
||||
|
||||
assertFailsWith<OnErrorNotImplementedException> { source.onNext(1) }
|
||||
assertFailsWith<OnErrorNotImplementedException> { source.onNext(1) }
|
||||
assertEquals(2, heartBeat)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test flowSafeSubscribe strictMode = false will not replace SafeSubscriber subclass`() {
|
||||
class CustomSafeSubscriber<T>(actual: Subscriber<in T>): SafeSubscriber<T>(actual)
|
||||
|
||||
var heartBeat = 0
|
||||
val customSafeSubscriber = CustomSafeSubscriber(
|
||||
Subscribers.create<Int> {
|
||||
heartBeat++
|
||||
throw IllegalArgumentException()
|
||||
})
|
||||
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.flowSafeSubscribe(strictMode = false).subscribe(customSafeSubscriber) // it should not replace CustomSafeSubscriber with FlowSafeSubscriber
|
||||
|
||||
assertFailsWith<OnErrorNotImplementedException> { source.onNext(1) }
|
||||
source.onNext(1)
|
||||
assertEquals(1, heartBeat)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `combine tee and bufferUntilDatabaseCommit`() {
|
||||
val database = createDatabase()
|
||||
|
Loading…
x
Reference in New Issue
Block a user