CORDA-3381 Make internalUtilsKt Observer tee not wrap with SafeSubscriber (#5913)

* Make tee not wrap PublishSubjects in SafeSubscribers, otherwise a non Rx exception from an unsafe observer shuts down all other observers under the same PublishSubject

* Throw SQLException or PersistenceException plain, that may come out of an unsafe subscriber

* Revert "Throw SQLException or PersistenceException plain, that may come out of an unsafe subscriber"

This reverts commit c7b8af3fa6.

* Update Detekt baseline
This commit is contained in:
Kyriakos Tharrouniatis 2020-01-31 12:32:59 +00:00 committed by GitHub
parent cb6ed6042c
commit 9ca1dd59da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 2454 deletions

View File

@ -15,6 +15,7 @@ import net.corda.core.utilities.seconds
import org.slf4j.Logger import org.slf4j.Logger
import rx.Observable import rx.Observable
import rx.Observer import rx.Observer
import rx.observers.Subscribers
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
@ -172,8 +173,8 @@ fun <T> Observable<T>.bufferUntilSubscribed(): Observable<T> {
@DeleteForDJVM @DeleteForDJVM
fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> { fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> {
val subject = PublishSubject.create<T>() val subject = PublishSubject.create<T>()
subject.subscribe(this) subject.unsafeSubscribe(Subscribers.from(this))
teeTo.forEach { subject.subscribe(it) } teeTo.forEach { subject.unsafeSubscribe(Subscribers.from(it)) }
return subject return subject
} }

File diff suppressed because one or more lines are too long

View File

@ -10,9 +10,13 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.After import org.junit.After
import org.junit.Test import org.junit.Test
import rx.Observable import rx.Observable
import rx.observers.Subscribers
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.io.Closeable import java.io.Closeable
import java.lang.RuntimeException
import java.util.* import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class ObservablesTests { class ObservablesTests {
private fun isInDatabaseTransaction() = contextTransactionOrNull != null private fun isInDatabaseTransaction() = contextTransactionOrNull != null
@ -186,6 +190,30 @@ class ObservablesTests {
assertThat(source3.hasCompleted()).isTrue() assertThat(source3.hasCompleted()).isTrue()
} }
/**
* tee combines [PublishSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber].
* Otherwise, if a non Rx exception gets thrown from a subscriber under one of the PublishSubject it will get caught by the
* SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will
* eventually shut down all of the subscribers under that PublishSubjectState.
*/
@Test
fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() {
val source1 = PublishSubject.create<Int>()
val source2 = PublishSubject.create<Int>()
var count = 0
source1.subscribe { count += it } // safe subscriber
source1.unsafeSubscribe(Subscribers.create { throw RuntimeException() }) // this subscriber should not shut down the above subscriber
assertFailsWith<RuntimeException> {
source1.tee(source2).onNext(1)
}
assertFailsWith<RuntimeException> {
source1.tee(source2).onNext(1)
}
assertEquals(2, count)
}
@Test @Test
fun `combine tee and bufferUntilDatabaseCommit`() { fun `combine tee and bufferUntilDatabaseCommit`() {
val database = createDatabase() val database = createDatabase()