diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index fcfd9a5986..3d170290b5 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -192,6 +192,9 @@ fun <T> Observable<T>.bufferUntilSubscribed(): Observable<T> { @DeleteForDJVM fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> { val subject = PublishSubject.create<T>() + // use unsafe subscribe, so that the teed subscribers will not get wrapped with SafeSubscribers, + // therefore a potential raw exception (non Rx) coming from a child -unsafe subscribed- observer + // will not unsubscribe all of the subscribers under the PublishSubject. subject.unsafeSubscribe(Subscribers.from(this)) teeTo.forEach { subject.unsafeSubscribe(Subscribers.from(it)) } return subject