rpc: Fix deadlock caused by deliver() and close()

This commit is contained in:
Andras Slemmer 2017-03-08 17:03:25 +00:00
parent a394adbb44
commit 60520412c8

View File

@ -284,6 +284,10 @@ class CordaRPCClientImpl(private val session: ClientSession,
private val rootShared = root.doOnUnsubscribe { close() }.share() private val rootShared = root.doOnUnsubscribe { close() }.share()
// This could be made more efficient by using a specialised IntMap // This could be made more efficient by using a specialised IntMap
/**
* When handling this map we synchronise on it explicitly instead of on [this], otherwise there is a race
* condition between close() and deliver()
*/
private val observables = HashMap<Int, Observable<Any>>() private val observables = HashMap<Int, Observable<Any>>()
private var consumer: ClientConsumer? = null private var consumer: ClientConsumer? = null
@ -320,24 +324,25 @@ class CordaRPCClientImpl(private val session: ClientSession,
} }
} }
@Synchronized
fun getForHandle(handle: Int): Observable<Any> { fun getForHandle(handle: Int): Observable<Any> {
return observables.getOrPut(handle) { synchronized(observables) {
/** return observables.getOrPut(handle) {
* Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here. /**
* * Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here.
* In particular doing it the other way around may result in the following edge case: *
* The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*, * In particular doing it the other way around may result in the following edge case:
* before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing * The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*,
* the underlying artemis queue, even though the second Observable was not even registered. * before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing
* * the underlying artemis queue, even though the second Observable was not even registered.
* The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller *
* subscribes, which must be after full deserialisation and registering of all top level Observables. * The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller
* * subscribes, which must be after full deserialisation and registering of all top level Observables.
* In addition, when subscribe and unsubscribe is called on the [Observable] returned here, we *
* reference count a hard reference to this [QueuedObservable] to prevent premature GC. * In addition, when subscribe and unsubscribe is called on the [Observable] returned here, we
*/ * reference count a hard reference to this [QueuedObservable] to prevent premature GC.
rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize<Any>().doOnSubscribe { refCountUp() }.doOnUnsubscribe { refCountDown() }.share() */
rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize<Any>().doOnSubscribe { refCountUp() }.doOnUnsubscribe { refCountDown() }.share()
}
} }
} }
@ -346,7 +351,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
val kryo = createRPCKryo(observableSerializer = observableDeserializer) val kryo = createRPCKryo(observableSerializer = observableDeserializer)
val received: MarshalledObservation = msg.deserialize(kryo) val received: MarshalledObservation = msg.deserialize(kryo)
rpcLog.debug { "<- Observable [$rpcName] <- Received $received" } rpcLog.debug { "<- Observable [$rpcName] <- Received $received" }
synchronized(this) { synchronized(observables) {
// Force creation of the buffer if it doesn't already exist. // Force creation of the buffer if it doesn't already exist.
getForHandle(received.forHandle) getForHandle(received.forHandle)
root.onNext(received) root.onNext(received)