Introducing not unsubscribing version of Rx.Subscriber

This commit is contained in:
Kyriakos Tharrouniatis 2020-02-11 14:19:45 +00:00
parent 5f446aea7e
commit e41fb714c1
3 changed files with 253 additions and 0 deletions

View File

@ -0,0 +1,68 @@
package net.corda.core.internal
import rx.Subscriber
import rx.exceptions.CompositeException
import rx.exceptions.Exceptions
import rx.exceptions.OnErrorFailedException
import rx.exceptions.OnErrorNotImplementedException
import rx.internal.util.ActionSubscriber
import rx.observers.SafeSubscriber
import rx.plugins.RxJavaHooks
import rx.plugins.RxJavaPlugins
/**
* Extends [SafeSubscriber] to override [SafeSubscriber.onNext], [SafeSubscriber.onError] and [SafeSubscriber._onError].
*/
@VisibleForTesting
class FlowSafeSubscriber<T>(actual: Subscriber<in T>) : SafeSubscriber<T>(actual) {
/**
* Duplicate of [SafeSubscriber.onNext], however it only delegates to [SafeSubscriber.onError] if it
* wraps an [ActionSubscriber] which is a leaf in an Subscribers' tree structure.
*/
override fun onNext(t: T) {
try {
actual.onNext(t)
} catch(e: Throwable) {
if(actual is ActionSubscriber) {
// this Subscriber wraps an ActionSubscriber which is always a leaf Observer, then call user-defined onError
Exceptions.throwOrReport(e, this)
} else {
// this Subscriber may wrap a non leaf Observer. In case the wrapped Observer is a PublishSubject then we
// should not call onError because PublishSubjectState.onError will shut down all of the Observers under it
throw OnNextFailedException(
"Observer.onNext failed, this is a non leaf FlowSafeSubscriber, therefore onError will be skipped", e
)
}
}
}
/**
* Duplicate of [SafeSubscriber.onError]. However, it will not set [SafeSubscriber.done] flag to true.
*/
override fun onError(e: Throwable) {
Exceptions.throwIfFatal(e)
_onError(e)
}
/**
* Duplicate of [SafeSubscriber._onError]. However, it will not call [Subscriber.unsubscribe].
*/
override fun _onError(e: Throwable) {
RxJavaPlugins.getInstance().errorHandler.handleError(e)
try {
actual.onError(e)
} catch (e: OnErrorNotImplementedException) {
throw e
} catch (e2: Throwable) {
RxJavaHooks.onError(e2)
throw OnErrorFailedException(
"Error occurred when trying to propagate error to Observer.onError", CompositeException(listOf(e, e2))
)
}
}
}
@VisibleForTesting
class OnNextFailedException(message: String, cause: Throwable): OnErrorNotImplementedException(message, cause)

View File

@ -0,0 +1,32 @@
package net.corda.core.utilities
import net.corda.core.internal.FlowSafeSubscriber
import net.corda.core.internal.VisibleForTesting
import rx.Observable.OnSubscribe
import rx.Observer
import rx.Subscriber
import rx.observers.SafeSubscriber
import rx.subjects.Subject
/**
* The [FlowSafeSubject] is used to unwrap a [SafeSubscriber] to prevent the observer from unsubscribing from the base observable when any
* error occurs. Calls to [SafeSubscriber._onError] call [Subscriber.unsubscribe] multiple times, which stops the observer receiving updates
* from the observable.
*
* Preventing this is useful to observers that are subscribed to important observables to prevent them from ever unsubscribing due to an
* error. Unsubscribing could lead to a malfunctioning CorDapp, for the rest of the current run time, due to a single isolated error.
*/
@VisibleForTesting
class FlowSafeSubject<T, R>(private val actual: Subject<T, R>) : Observer<T> by actual,
Subject<T, R>(OnSubscribe<R> { subscriber ->
if (subscriber::class == SafeSubscriber::class) {
actual.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual))
} else {
actual.unsafeSubscribe(subscriber)
}
}) {
override fun hasObservers(): Boolean {
return actual.hasObservers()
}
}

View File

@ -3,6 +3,9 @@ package net.corda.node.utilities
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.tee
import net.corda.core.utilities.FlowSafeSubject
import net.corda.core.internal.FlowSafeSubscriber
import net.corda.core.internal.OnNextFailedException
import net.corda.nodeapi.internal.persistence.*
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -10,9 +13,15 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
import rx.Observable
import rx.exceptions.CompositeException
import rx.exceptions.OnErrorFailedException
import rx.exceptions.OnErrorNotImplementedException
import rx.internal.util.ActionSubscriber
import rx.observers.SafeSubscriber
import rx.observers.Subscribers
import rx.subjects.PublishSubject
import java.io.Closeable
import java.lang.IllegalStateException
import java.lang.RuntimeException
import java.util.*
import kotlin.test.assertEquals
@ -125,6 +134,7 @@ class ObservablesTests {
assertThat(secondEvent.get()).isEqualTo(2 to false)
}
// TODO: change all below PublishSubject to FlowSafeSubject
@Test(timeout=300_000)
fun `bufferUntilDatabaseCommit delays until transaction closed repeatable`() {
val database = createDatabase()
@ -159,6 +169,7 @@ class ObservablesTests {
assertThat(secondEvent.get()).isEqualTo(1 to false)
}
// TODO: change all below PublishSubject to FlowSafeSubject
@Test(timeout=300_000)
fun `tee correctly copies observations to multiple observers`() {
@ -196,6 +207,7 @@ class ObservablesTests {
* SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will
* eventually shut down all of the subscribers under that PublishSubjectState.
*/
// TODO: change all below PublishSubject to FlowSafeSubject
@Test(timeout=300_000)
fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() {
val source1 = PublishSubject.create<Int>()
@ -214,6 +226,147 @@ class ObservablesTests {
assertEquals(2, count)
}
@Test
fun `FlowSafeSubject subscribes by default FlowSafeSubscribers, wrapped Observers will survive errors from onNext`() {
var heartBeat = 0
val source = FlowSafeSubject(PublishSubject.create<Int>())
source.subscribe { runNo ->
// subscribes with a FlowSafeSubscriber
heartBeat++
if (runNo == 1) {
throw IllegalStateException()
}
}
source.subscribe { runNo ->
// subscribes with a FlowSafeSubscriber
heartBeat++
if (runNo == 2) {
throw IllegalStateException()
}
}
assertFailsWith<OnErrorNotImplementedException> {
source.onNext(1) // first observer only will run and throw
}
assertFailsWith<OnErrorNotImplementedException> {
source.onNext(2) // first observer will run, second observer will run and throw
}
source.onNext(3) // both observers will run
assertThat(heartBeat == 5)
}
@Test
fun `FlowSafeSubject unsubscribes FlowSafeSubscribers only upon explicitly calling onError`() {
var heartBeat = 0
val source = FlowSafeSubject(PublishSubject.create<Int>())
source.subscribe { heartBeat += it }
source.subscribe { heartBeat += it }
source.onNext(1)
// send an onError event
assertFailsWith<CompositeException> {
source.onError(IllegalStateException()) // all FlowSafeSubscribers under FlowSafeSubject get unsubscribed here
}
source.onNext(1)
assertThat(heartBeat == 2)
}
@Test
fun `FlowSafeSubject wrapped with a SafeSubscriber shuts down the whole structure, if one of them is unsafe and it throws`() {
var heartBeat = 0
val source = FlowSafeSubject(PublishSubject.create<Int>())
source.unsafeSubscribe(Subscribers.create { runNo ->
heartBeat++
if (runNo == 1) {
throw IllegalStateException()
}
})
source.subscribe { heartBeat += it }
// wrap FlowSafeSubject with a FlowSafeSubscriber
val sourceWrapper = SafeSubscriber(Subscribers.from(source))
assertFailsWith<OnErrorFailedException> {
sourceWrapper.onNext(1)
}
sourceWrapper.onNext(2)
assertThat(heartBeat == 1)
}
/**
* A FlowSafeSubscriber that is not a leaf in a Subscribers structure, if it throws at onNext,
* it will not call its onError, because in case it wraps a [PublishSubject] it will then call [PublishSubject.onError]
* which will shut down all of the Subscribers under it.
*/
@Test
fun `FlowSafeSubject wrapped with a FlowSafeSubscriber will preserve the structure, if one of them is unsafe and it throws`() {
var heartBeat = 0
val source = FlowSafeSubject(PublishSubject.create<Int>())
source.unsafeSubscribe(Subscribers.create { runNo ->
heartBeat++
if (runNo == 1) {
throw IllegalStateException()
}
})
source.subscribe { heartBeat++ }
// wrap FlowSafeSubject with a FlowSafeSubscriber
val sourceWrapper = FlowSafeSubscriber(Subscribers.from(source))
assertFailsWith<OnNextFailedException>("Observer.onNext failed, this is a non leaf FlowSafeSubscriber, therefore onError will be skipped") {
sourceWrapper.onNext(1)
}
sourceWrapper.onNext(2)
assertThat(heartBeat == 3)
}
@Test
fun `throwing FlowSafeSubscriber as a leaf will call onError`() {
var heartBeat = 0
val source = FlowSafeSubject(PublishSubject.create<Int>())
// add a leaf FlowSafeSubscriber
source.subscribe(/*onNext*/{
heartBeat++
throw IllegalStateException()
},/*onError*/{
heartBeat++
})
source.onNext(1)
source.onNext(1)
assertThat(heartBeat == 4)
}
@Test
fun `throwing FlowSafeSubscriber at onNext will wrap with a Rx OnErrorNotImplementedException`() {
val flowSafeSubscriber = FlowSafeSubscriber<Int>(Subscribers.create { throw IllegalStateException() })
assertFailsWith<OnErrorNotImplementedException> {
flowSafeSubscriber.onNext(1)
}
}
@Test
fun `throwing FlowSafeSubscriber at onError will wrap with a Rx OnErrorFailedException`() {
val flowSafeSubscriber = FlowSafeSubscriber<Int>(
ActionSubscriber(
{ throw IllegalStateException() },
{ throw IllegalStateException() },
null
)
)
assertFailsWith<OnErrorFailedException> {
flowSafeSubscriber.onNext(1)
}
}
// 3. check that FlowSafeSubscriber will just rethrow a propagated Rx Exception
@Test
fun `propagated Rx exception will be rethrown at ConsistentSafeSubscriber onError`() {
val source = FlowSafeSubject(PublishSubject.create<Int>())
source.subscribe { throw IllegalStateException("123") }
val sourceWrapper = FlowSafeSubscriber(Subscribers.from(source))
assertFailsWith<OnErrorNotImplementedException>("123") {
sourceWrapper.onNext(1)
}
}
// TODO: change all PublishSubject to FlowSafeSubject
@Test(timeout=300_000)
fun `combine tee and bufferUntilDatabaseCommit`() {
val database = createDatabase()