mirror of
https://github.com/corda/corda.git
synced 2025-05-13 14:03:27 +00:00
Wrap PublishSubjects with FlowSafeSubjects in all tests that test Observer.tee
This commit is contained in:
parent
e41fb714c1
commit
cfcca4ac18
@ -134,7 +134,6 @@ class ObservablesTests {
|
|||||||
assertThat(secondEvent.get()).isEqualTo(2 to false)
|
assertThat(secondEvent.get()).isEqualTo(2 to false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: change all below PublishSubject to FlowSafeSubject
|
|
||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `bufferUntilDatabaseCommit delays until transaction closed repeatable`() {
|
fun `bufferUntilDatabaseCommit delays until transaction closed repeatable`() {
|
||||||
val database = createDatabase()
|
val database = createDatabase()
|
||||||
@ -169,7 +168,6 @@ class ObservablesTests {
|
|||||||
assertThat(secondEvent.get()).isEqualTo(1 to false)
|
assertThat(secondEvent.get()).isEqualTo(1 to false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: change all below PublishSubject to FlowSafeSubject
|
|
||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `tee correctly copies observations to multiple observers`() {
|
fun `tee correctly copies observations to multiple observers`() {
|
||||||
|
|
||||||
@ -177,15 +175,21 @@ class ObservablesTests {
|
|||||||
val source2 = PublishSubject.create<Int>()
|
val source2 = PublishSubject.create<Int>()
|
||||||
val source3 = PublishSubject.create<Int>()
|
val source3 = PublishSubject.create<Int>()
|
||||||
|
|
||||||
|
// wrap PublishSubjects with FlowSafeSubjects, and pass in observers through them,
|
||||||
|
// that way the subscribers under PublishSubjects will be FlowSafeSubscribers instead of SafeSubscribers
|
||||||
|
val flowSafeSource1 = FlowSafeSubject(source1)
|
||||||
|
val flowSafeSource2 = FlowSafeSubject(source2)
|
||||||
|
val flowSafeSource3 = FlowSafeSubject(source3)
|
||||||
|
|
||||||
val event1 = SettableFuture.create<Int>()
|
val event1 = SettableFuture.create<Int>()
|
||||||
val event2 = SettableFuture.create<Int>()
|
val event2 = SettableFuture.create<Int>()
|
||||||
val event3 = SettableFuture.create<Int>()
|
val event3 = SettableFuture.create<Int>()
|
||||||
|
|
||||||
source1.subscribe { event1.set(it) }
|
flowSafeSource1.subscribe { event1.set(it) }
|
||||||
source2.subscribe { event2.set(it) }
|
flowSafeSource2.subscribe { event2.set(it) }
|
||||||
source3.subscribe { event3.set(it) }
|
flowSafeSource3.subscribe { event3.set(it) }
|
||||||
|
|
||||||
val tee = source1.tee(source2, source3)
|
val tee = flowSafeSource1.tee(flowSafeSource2, flowSafeSource3)
|
||||||
tee.onNext(0)
|
tee.onNext(0)
|
||||||
|
|
||||||
assertThat(event1.isDone).isTrue()
|
assertThat(event1.isDone).isTrue()
|
||||||
@ -196,22 +200,23 @@ class ObservablesTests {
|
|||||||
assertThat(event3.get()).isEqualTo(0)
|
assertThat(event3.get()).isEqualTo(0)
|
||||||
|
|
||||||
tee.onCompleted()
|
tee.onCompleted()
|
||||||
|
// PublishSubjects underneath the FlowSafeSubjects are just the same,
|
||||||
|
// so all rest behaviour is there e.g. PublishSubject.hasCompleted()
|
||||||
assertThat(source1.hasCompleted()).isTrue()
|
assertThat(source1.hasCompleted()).isTrue()
|
||||||
assertThat(source2.hasCompleted()).isTrue()
|
assertThat(source2.hasCompleted()).isTrue()
|
||||||
assertThat(source3.hasCompleted()).isTrue()
|
assertThat(source3.hasCompleted()).isTrue()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* tee combines [PublishSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber].
|
* tee combines [FlowSafeSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber].
|
||||||
* Otherwise, if a non Rx exception gets thrown from a subscriber under one of the PublishSubject it will get caught by the
|
* Otherwise, if a non Rx exception gets thrown from a subscriber under one of the PublishSubject it will get caught by the
|
||||||
* SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will
|
* SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will
|
||||||
* eventually shut down all of the subscribers under that PublishSubjectState.
|
* eventually shut down all of the subscribers under that PublishSubject.
|
||||||
*/
|
*/
|
||||||
// TODO: change all below PublishSubject to FlowSafeSubject
|
|
||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() {
|
fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() {
|
||||||
val source1 = PublishSubject.create<Int>()
|
val source1 = FlowSafeSubject(PublishSubject.create<Int>())
|
||||||
val source2 = PublishSubject.create<Int>()
|
val source2 = FlowSafeSubject(PublishSubject.create<Int>())
|
||||||
var count = 0
|
var count = 0
|
||||||
|
|
||||||
source1.subscribe { count += it } // safe subscriber
|
source1.subscribe { count += it } // safe subscriber
|
||||||
@ -354,25 +359,25 @@ class ObservablesTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. check that FlowSafeSubscriber will just rethrow a propagated Rx Exception
|
|
||||||
@Test
|
@Test
|
||||||
fun `propagated Rx exception will be rethrown at ConsistentSafeSubscriber onError`() {
|
fun `propagated Rx exception will be rethrown at ConsistentSafeSubscriber onError`() {
|
||||||
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
||||||
source.subscribe { throw IllegalStateException("123") }
|
source.subscribe { throw IllegalStateException("123") } // will give a leaf FlowSafeSubscriber
|
||||||
val sourceWrapper = FlowSafeSubscriber(Subscribers.from(source))
|
val sourceWrapper = FlowSafeSubscriber(Subscribers.from(source)) // will give an inner FlowSafeSubscriber
|
||||||
|
|
||||||
assertFailsWith<OnErrorNotImplementedException>("123") {
|
assertFailsWith<OnErrorNotImplementedException>("123") {
|
||||||
|
// IllegalStateException will be wrapped and rethrown as a OnErrorNotImplementedException in leaf FlowSafeSubscriber,
|
||||||
|
// will be caught by inner FlowSafeSubscriber and just be rethrown
|
||||||
sourceWrapper.onNext(1)
|
sourceWrapper.onNext(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: change all PublishSubject to FlowSafeSubject
|
|
||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `combine tee and bufferUntilDatabaseCommit`() {
|
fun `combine tee and bufferUntilDatabaseCommit`() {
|
||||||
val database = createDatabase()
|
val database = createDatabase()
|
||||||
|
|
||||||
val source = PublishSubject.create<Int>()
|
val source = FlowSafeSubject(PublishSubject.create<Int>())
|
||||||
val teed = PublishSubject.create<Int>()
|
val teed = FlowSafeSubject(PublishSubject.create<Int>())
|
||||||
|
|
||||||
val observable: Observable<Int> = source
|
val observable: Observable<Int> = source
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user