mirror of
https://github.com/corda/corda.git
synced 2025-04-05 10:27:11 +00:00
Revert "Wrap PublishSubjects with FlowSafeSubjects in all tests that test Observer.tee"
This reverts commit e419af86
This commit is contained in:
parent
91d8de32c4
commit
4517457e16
@ -175,21 +175,15 @@ class ObservablesTests {
|
||||
val source2 = PublishSubject.create<Int>()
|
||||
val source3 = PublishSubject.create<Int>()
|
||||
|
||||
// wrap PublishSubjects with FlowSafeSubjects, and pass in observers through them,
|
||||
// that way the subscribers under PublishSubjects will be FlowSafeSubscribers instead of SafeSubscribers
|
||||
val flowSafeSource1 = FlowSafeSubject(source1)
|
||||
val flowSafeSource2 = FlowSafeSubject(source2)
|
||||
val flowSafeSource3 = FlowSafeSubject(source3)
|
||||
|
||||
val event1 = SettableFuture.create<Int>()
|
||||
val event2 = SettableFuture.create<Int>()
|
||||
val event3 = SettableFuture.create<Int>()
|
||||
|
||||
flowSafeSource1.subscribe { event1.set(it) }
|
||||
flowSafeSource2.subscribe { event2.set(it) }
|
||||
flowSafeSource3.subscribe { event3.set(it) }
|
||||
source1.subscribe { event1.set(it) }
|
||||
source2.subscribe { event2.set(it) }
|
||||
source3.subscribe { event3.set(it) }
|
||||
|
||||
val tee = flowSafeSource1.tee(flowSafeSource2, flowSafeSource3)
|
||||
val tee = source1.tee(source2, source3)
|
||||
tee.onNext(0)
|
||||
|
||||
assertThat(event1.isDone).isTrue()
|
||||
@ -200,23 +194,21 @@ class ObservablesTests {
|
||||
assertThat(event3.get()).isEqualTo(0)
|
||||
|
||||
tee.onCompleted()
|
||||
// PublishSubjects underneath the FlowSafeSubjects are just the same,
|
||||
// so all rest behaviour is there e.g. PublishSubject.hasCompleted()
|
||||
assertThat(source1.hasCompleted()).isTrue()
|
||||
assertThat(source2.hasCompleted()).isTrue()
|
||||
assertThat(source3.hasCompleted()).isTrue()
|
||||
}
|
||||
|
||||
/**
|
||||
* tee combines [FlowSafeSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber].
|
||||
* tee combines [PublishSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber].
|
||||
* Otherwise, if a non Rx exception gets thrown from a subscriber under one of the PublishSubject it will get caught by the
|
||||
* SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will
|
||||
* eventually shut down all of the subscribers under that PublishSubject.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() {
|
||||
val source1 = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
val source2 = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
val source1 = PublishSubject.create<Int>()
|
||||
val source2 = PublishSubject.create<Int>()
|
||||
var count = 0
|
||||
|
||||
source1.subscribe { count += it } // safe subscriber
|
||||
@ -376,8 +368,8 @@ class ObservablesTests {
|
||||
fun `combine tee and bufferUntilDatabaseCommit`() {
|
||||
val database = createDatabase()
|
||||
|
||||
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
val teed = FlowSafeSubject(PublishSubject.create<Int>())
|
||||
val source = PublishSubject.create<Int>()
|
||||
val teed = PublishSubject.create<Int>()
|
||||
|
||||
val observable: Observable<Int> = source
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user