diff --git a/node/src/main/kotlin/net/corda/node/internal/FlowSafeSubject.kt b/node/src/main/kotlin/net/corda/node/internal/FlowSafeSubject.kt index 5336298a03..4570c8f653 100644 --- a/node/src/main/kotlin/net/corda/node/internal/FlowSafeSubject.kt +++ b/node/src/main/kotlin/net/corda/node/internal/FlowSafeSubject.kt @@ -14,10 +14,17 @@ import rx.subjects.Subject * * Upon [rx.Observable.subscribe] it will wrap everything that is a non [SafeSubscriber] with a [FlowSafeSubscriber] the same way * [rx.subjects.PublishSubject] wraps everything that is a non [SafeSubscriber] with a [SafeSubscriber]. + * + * In case we need to subscribe with a [SafeSubscriber] to a [FlowSafeSubject], we have to: + * 1. Declare a custom Subscriber that will extend [SafeSubscriber]. + * 2. Wrap with the custom Subscriber the [rx.Observer] to be subscribed to [FlowSafeSubject]. + * 3. Subscribe to [FlowSafeSubject] passing the custom Subscriber. */ @VisibleForTesting class FlowSafeSubject(private val actual: Subject) : Observer by actual, Subject(OnSubscribe { subscriber -> + // we used '==' instead of 'is', so that we replace only instances of SafeSubscriber with FlowSafeSubscriber, + // but leave untouched instances of a classes extending SafeSubscriber. That way, we allow subscribing SafeSubscribers. if (subscriber::class == SafeSubscriber::class) { actual.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual)) } else {