From 4517457e16cb985815fe55e91cf4fe95ceb4b511 Mon Sep 17 00:00:00 2001 From: Kyriakos Tharrouniatis Date: Thu, 13 Feb 2020 15:15:32 +0000 Subject: [PATCH] Revert "Wrap PublishSubjects with FlowSafeSubjects in all tests that test Observer.tee" This reverts commit e419af86 --- .../corda/node/utilities/ObservablesTests.kt | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt index 25196fadab..db2e6251db 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt @@ -175,21 +175,15 @@ class ObservablesTests { val source2 = PublishSubject.create() val source3 = PublishSubject.create() - // wrap PublishSubjects with FlowSafeSubjects, and pass in observers through them, - // that way the subscribers under PublishSubjects will be FlowSafeSubscribers instead of SafeSubscribers - val flowSafeSource1 = FlowSafeSubject(source1) - val flowSafeSource2 = FlowSafeSubject(source2) - val flowSafeSource3 = FlowSafeSubject(source3) - val event1 = SettableFuture.create() val event2 = SettableFuture.create() val event3 = SettableFuture.create() - flowSafeSource1.subscribe { event1.set(it) } - flowSafeSource2.subscribe { event2.set(it) } - flowSafeSource3.subscribe { event3.set(it) } + source1.subscribe { event1.set(it) } + source2.subscribe { event2.set(it) } + source3.subscribe { event3.set(it) } - val tee = flowSafeSource1.tee(flowSafeSource2, flowSafeSource3) + val tee = source1.tee(source2, source3) tee.onNext(0) assertThat(event1.isDone).isTrue() @@ -200,23 +194,21 @@ class ObservablesTests { assertThat(event3.get()).isEqualTo(0) tee.onCompleted() - // PublishSubjects underneath the FlowSafeSubjects are just the same, - // so all rest behaviour is there e.g. PublishSubject.hasCompleted() assertThat(source1.hasCompleted()).isTrue() assertThat(source2.hasCompleted()).isTrue() assertThat(source3.hasCompleted()).isTrue() } /** - * tee combines [FlowSafeSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber]. + * tee combines [PublishSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber]. * Otherwise, if a non Rx exception gets thrown from a subscriber under one of the PublishSubject it will get caught by the * SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will * eventually shut down all of the subscribers under that PublishSubject. */ @Test(timeout=300_000) fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() { - val source1 = FlowSafeSubject(PublishSubject.create()) - val source2 = FlowSafeSubject(PublishSubject.create()) + val source1 = PublishSubject.create() + val source2 = PublishSubject.create() var count = 0 source1.subscribe { count += it } // safe subscriber @@ -376,8 +368,8 @@ class ObservablesTests { fun `combine tee and bufferUntilDatabaseCommit`() { val database = createDatabase() - val source = FlowSafeSubject(PublishSubject.create()) - val teed = FlowSafeSubject(PublishSubject.create()) + val source = PublishSubject.create() + val teed = PublishSubject.create() val observable: Observable = source