diff --git a/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt b/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt index 900dcbdcf6..4a7077e6aa 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt @@ -4,17 +4,16 @@ import net.corda.core.internal.FlowSafeSubscriber import net.corda.core.internal.VisibleForTesting import rx.Observable.OnSubscribe import rx.Observer -import rx.Subscriber import rx.observers.SafeSubscriber import rx.subjects.Subject /** - * The [FlowSafeSubject] is used to unwrap a [SafeSubscriber] to prevent the observer from unsubscribing from the base observable when any - * error occurs. Calls to [SafeSubscriber._onError] call [Subscriber.unsubscribe] multiple times, which stops the observer receiving updates - * from the observable. + * [FlowSafeSubject] is used to unwrap an [Observer] from a [SafeSubscriber], re-wrap it with a [FlowSafeSubscriber] + * and then subscribe it to its underlying [Subject]. It is only used to provide therefore, its underlying [Subject] with + * non unsubscribing [rx.Observer]s. * - * Preventing this is useful to observers that are subscribed to important observables to prevent them from ever unsubscribing due to an - * error. Unsubscribing could lead to a malfunctioning CorDapp, for the rest of the current run time, due to a single isolated error. + * Upon [rx.Observable.subscribe] it will wrap everything that is a non [SafeSubscriber] with a [FlowSafeSubscriber] the same way + * [rx.subjects.PublishSubject] wraps everything that is a non [SafeSubscriber] with a [SafeSubscriber]. */ @VisibleForTesting class FlowSafeSubject(private val actual: Subject) : Observer by actual, @@ -32,7 +31,7 @@ class FlowSafeSubject(private val actual: Subject) : Observer by } /** - * The [PreventSubscriptionsSubject] is used to prevent any subscriptions to a [Subject]. + * The [PreventSubscriptionsSubject] is used to prevent any subscriptions to its underlying [Subject]. */ class PreventSubscriptionsSubject(private val actual: Subject, errorAction: () -> Unit) : Observer by actual, Subject(OnSubscribe { _ ->