mirror of
https://github.com/corda/corda.git
synced 2025-01-31 08:25:50 +00:00
Add comment and update kdoc explaining how to subscribe with SafeSubscriber to FlowSafeSubject
This commit is contained in:
parent
17b94c45e8
commit
2c0e4396d1
@ -14,10 +14,17 @@ import rx.subjects.Subject
|
||||
*
|
||||
* Upon [rx.Observable.subscribe] it will wrap everything that is a non [SafeSubscriber] with a [FlowSafeSubscriber] the same way
|
||||
* [rx.subjects.PublishSubject] wraps everything that is a non [SafeSubscriber] with a [SafeSubscriber].
|
||||
*
|
||||
* In case we need to subscribe with a [SafeSubscriber] to a [FlowSafeSubject], we have to:
|
||||
* 1. Declare a custom Subscriber that will extend [SafeSubscriber].
|
||||
* 2. Wrap with the custom Subscriber the [rx.Observer] to be subscribed to [FlowSafeSubject].
|
||||
* 3. Subscribe to [FlowSafeSubject] passing the custom Subscriber.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class FlowSafeSubject<T, R>(private val actual: Subject<T, R>) : Observer<T> by actual,
|
||||
Subject<T, R>(OnSubscribe<R> { subscriber ->
|
||||
// we used '==' instead of 'is', so that we replace only instances of SafeSubscriber with FlowSafeSubscriber,
|
||||
// but leave untouched instances of a classes extending SafeSubscriber. That way, we allow subscribing SafeSubscribers.
|
||||
if (subscriber::class == SafeSubscriber::class) {
|
||||
actual.unsafeSubscribe(FlowSafeSubscriber((subscriber as SafeSubscriber).actual))
|
||||
} else {
|
||||
|
Loading…
x
Reference in New Issue
Block a user