From e41fb714c19e5333e7181f04222d4c8b1b4d5414 Mon Sep 17 00:00:00 2001 From: Kyriakos Tharrouniatis Date: Tue, 11 Feb 2020 14:19:45 +0000 Subject: [PATCH] Introducing not unsubscribing version of Rx.Subscriber --- .../corda/core/internal/FlowSafeSubscriber.kt | 68 ++++++++ .../corda/core/utilities/FlowSafeSubject.kt | 32 ++++ .../corda/node/utilities/ObservablesTests.kt | 153 ++++++++++++++++++ 3 files changed, 253 insertions(+) create mode 100644 core/src/main/kotlin/net/corda/core/internal/FlowSafeSubscriber.kt create mode 100644 core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowSafeSubscriber.kt b/core/src/main/kotlin/net/corda/core/internal/FlowSafeSubscriber.kt new file mode 100644 index 0000000000..e81de4ef73 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/FlowSafeSubscriber.kt @@ -0,0 +1,68 @@ +package net.corda.core.internal + +import rx.Subscriber +import rx.exceptions.CompositeException +import rx.exceptions.Exceptions +import rx.exceptions.OnErrorFailedException +import rx.exceptions.OnErrorNotImplementedException +import rx.internal.util.ActionSubscriber +import rx.observers.SafeSubscriber +import rx.plugins.RxJavaHooks +import rx.plugins.RxJavaPlugins + +/** + * Extends [SafeSubscriber] to override [SafeSubscriber.onNext], [SafeSubscriber.onError] and [SafeSubscriber._onError]. + */ +@VisibleForTesting +class FlowSafeSubscriber(actual: Subscriber) : SafeSubscriber(actual) { + + /** + * Duplicate of [SafeSubscriber.onNext], however it only delegates to [SafeSubscriber.onError] if it + * wraps an [ActionSubscriber] which is a leaf in an Subscribers' tree structure. + */ + override fun onNext(t: T) { + try { + actual.onNext(t) + } catch(e: Throwable) { + if(actual is ActionSubscriber) { + // this Subscriber wraps an ActionSubscriber which is always a leaf Observer, then call user-defined onError + Exceptions.throwOrReport(e, this) + } else { + // this Subscriber may wrap a non leaf Observer. In case the wrapped Observer is a PublishSubject then we + // should not call onError because PublishSubjectState.onError will shut down all of the Observers under it + throw OnNextFailedException( + "Observer.onNext failed, this is a non leaf FlowSafeSubscriber, therefore onError will be skipped", e + ) + } + } + } + + /** + * Duplicate of [SafeSubscriber.onError]. However, it will not set [SafeSubscriber.done] flag to true. + */ + override fun onError(e: Throwable) { + Exceptions.throwIfFatal(e) + _onError(e) + } + + /** + * Duplicate of [SafeSubscriber._onError]. However, it will not call [Subscriber.unsubscribe]. + */ + override fun _onError(e: Throwable) { + RxJavaPlugins.getInstance().errorHandler.handleError(e) + try { + actual.onError(e) + } catch (e: OnErrorNotImplementedException) { + throw e + } catch (e2: Throwable) { + RxJavaHooks.onError(e2) + throw OnErrorFailedException( + "Error occurred when trying to propagate error to Observer.onError", CompositeException(listOf(e, e2)) + ) + } + } + +} + +@VisibleForTesting +class OnNextFailedException(message: String, cause: Throwable): OnErrorNotImplementedException(message, cause) \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt b/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt new file mode 100644 index 0000000000..4d38fbd3a2 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt @@ -0,0 +1,32 @@ +package net.corda.core.utilities + +import net.corda.core.internal.FlowSafeSubscriber +import net.corda.core.internal.VisibleForTesting +import rx.Observable.OnSubscribe +import rx.Observer +import rx.Subscriber +import rx.observers.SafeSubscriber +import rx.subjects.Subject + +/** + * The [FlowSafeSubject] is used to unwrap a [SafeSubscriber] to prevent the observer from unsubscribing from the base observable when any + * error occurs. Calls to [SafeSubscriber._onError] call [Subscriber.unsubscribe] multiple times, which stops the observer receiving updates + * from the observable. + * + * Preventing this is useful to observers that are subscribed to important observables to prevent them from ever unsubscribing due to an + * error. Unsubscribing could lead to a malfunctioning CorDapp, for the rest of the current run time, due to a single isolated error. + */ +@VisibleForTesting +class FlowSafeSubject(private val actual: Subject) : Observer by actual, + Subject(OnSubscribe { subscriber -> + if (subscriber::class == SafeSubscriber::class) { + actual.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual)) + } else { + actual.unsafeSubscribe(subscriber) + } + }) { + + override fun hasObservers(): Boolean { + return actual.hasObservers() + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt index e9817111d3..cd984012e9 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt @@ -3,6 +3,9 @@ package net.corda.node.utilities import com.google.common.util.concurrent.SettableFuture import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.tee +import net.corda.core.utilities.FlowSafeSubject +import net.corda.core.internal.FlowSafeSubscriber +import net.corda.core.internal.OnNextFailedException import net.corda.nodeapi.internal.persistence.* import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties @@ -10,9 +13,15 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Test import rx.Observable +import rx.exceptions.CompositeException +import rx.exceptions.OnErrorFailedException +import rx.exceptions.OnErrorNotImplementedException +import rx.internal.util.ActionSubscriber +import rx.observers.SafeSubscriber import rx.observers.Subscribers import rx.subjects.PublishSubject import java.io.Closeable +import java.lang.IllegalStateException import java.lang.RuntimeException import java.util.* import kotlin.test.assertEquals @@ -125,6 +134,7 @@ class ObservablesTests { assertThat(secondEvent.get()).isEqualTo(2 to false) } + // TODO: change all below PublishSubject to FlowSafeSubject @Test(timeout=300_000) fun `bufferUntilDatabaseCommit delays until transaction closed repeatable`() { val database = createDatabase() @@ -159,6 +169,7 @@ class ObservablesTests { assertThat(secondEvent.get()).isEqualTo(1 to false) } + // TODO: change all below PublishSubject to FlowSafeSubject @Test(timeout=300_000) fun `tee correctly copies observations to multiple observers`() { @@ -196,6 +207,7 @@ class ObservablesTests { * SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will * eventually shut down all of the subscribers under that PublishSubjectState. */ + // TODO: change all below PublishSubject to FlowSafeSubject @Test(timeout=300_000) fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() { val source1 = PublishSubject.create() @@ -214,6 +226,147 @@ class ObservablesTests { assertEquals(2, count) } + @Test + fun `FlowSafeSubject subscribes by default FlowSafeSubscribers, wrapped Observers will survive errors from onNext`() { + var heartBeat = 0 + val source = FlowSafeSubject(PublishSubject.create()) + source.subscribe { runNo -> + // subscribes with a FlowSafeSubscriber + heartBeat++ + if (runNo == 1) { + throw IllegalStateException() + } + } + source.subscribe { runNo -> + // subscribes with a FlowSafeSubscriber + heartBeat++ + if (runNo == 2) { + throw IllegalStateException() + } + } + + assertFailsWith { + source.onNext(1) // first observer only will run and throw + } + assertFailsWith { + source.onNext(2) // first observer will run, second observer will run and throw + } + source.onNext(3) // both observers will run + assertThat(heartBeat == 5) + } + + @Test + fun `FlowSafeSubject unsubscribes FlowSafeSubscribers only upon explicitly calling onError`() { + var heartBeat = 0 + val source = FlowSafeSubject(PublishSubject.create()) + source.subscribe { heartBeat += it } + source.subscribe { heartBeat += it } + source.onNext(1) + // send an onError event + assertFailsWith { + source.onError(IllegalStateException()) // all FlowSafeSubscribers under FlowSafeSubject get unsubscribed here + } + source.onNext(1) + assertThat(heartBeat == 2) + } + + @Test + fun `FlowSafeSubject wrapped with a SafeSubscriber shuts down the whole structure, if one of them is unsafe and it throws`() { + var heartBeat = 0 + val source = FlowSafeSubject(PublishSubject.create()) + source.unsafeSubscribe(Subscribers.create { runNo -> + heartBeat++ + if (runNo == 1) { + throw IllegalStateException() + } + }) + source.subscribe { heartBeat += it } + // wrap FlowSafeSubject with a FlowSafeSubscriber + val sourceWrapper = SafeSubscriber(Subscribers.from(source)) + assertFailsWith { + sourceWrapper.onNext(1) + } + sourceWrapper.onNext(2) + assertThat(heartBeat == 1) + } + + /** + * A FlowSafeSubscriber that is not a leaf in a Subscribers structure, if it throws at onNext, + * it will not call its onError, because in case it wraps a [PublishSubject] it will then call [PublishSubject.onError] + * which will shut down all of the Subscribers under it. + */ + @Test + fun `FlowSafeSubject wrapped with a FlowSafeSubscriber will preserve the structure, if one of them is unsafe and it throws`() { + var heartBeat = 0 + val source = FlowSafeSubject(PublishSubject.create()) + source.unsafeSubscribe(Subscribers.create { runNo -> + heartBeat++ + if (runNo == 1) { + throw IllegalStateException() + } + }) + source.subscribe { heartBeat++ } + // wrap FlowSafeSubject with a FlowSafeSubscriber + val sourceWrapper = FlowSafeSubscriber(Subscribers.from(source)) + assertFailsWith("Observer.onNext failed, this is a non leaf FlowSafeSubscriber, therefore onError will be skipped") { + sourceWrapper.onNext(1) + } + sourceWrapper.onNext(2) + assertThat(heartBeat == 3) + } + + @Test + fun `throwing FlowSafeSubscriber as a leaf will call onError`() { + var heartBeat = 0 + val source = FlowSafeSubject(PublishSubject.create()) + // add a leaf FlowSafeSubscriber + source.subscribe(/*onNext*/{ + heartBeat++ + throw IllegalStateException() + },/*onError*/{ + heartBeat++ + }) + + source.onNext(1) + source.onNext(1) + assertThat(heartBeat == 4) + } + + @Test + fun `throwing FlowSafeSubscriber at onNext will wrap with a Rx OnErrorNotImplementedException`() { + val flowSafeSubscriber = FlowSafeSubscriber(Subscribers.create { throw IllegalStateException() }) + assertFailsWith { + flowSafeSubscriber.onNext(1) + } + } + + @Test + fun `throwing FlowSafeSubscriber at onError will wrap with a Rx OnErrorFailedException`() { + val flowSafeSubscriber = FlowSafeSubscriber( + ActionSubscriber( + { throw IllegalStateException() }, + { throw IllegalStateException() }, + null + ) + ) + assertFailsWith { + flowSafeSubscriber.onNext(1) + } + } + + // 3. check that FlowSafeSubscriber will just rethrow a propagated Rx Exception + @Test + fun `propagated Rx exception will be rethrown at ConsistentSafeSubscriber onError`() { + val source = FlowSafeSubject(PublishSubject.create()) + source.subscribe { throw IllegalStateException("123") } + val sourceWrapper = FlowSafeSubscriber(Subscribers.from(source)) + + assertFailsWith("123") { + sourceWrapper.onNext(1) + } + } + + // TODO: change all PublishSubject to FlowSafeSubject @Test(timeout=300_000) fun `combine tee and bufferUntilDatabaseCommit`() { val database = createDatabase()