mirror of
https://github.com/corda/corda.git
synced 2024-12-19 04:57:58 +00:00
Merge pull request #339 from corda/aslemmer-fix-rpc-close-deliver-deadlock
rpc: Fix deadlock caused by deliver() and close()
This commit is contained in:
commit
6a6698b598
@ -284,7 +284,8 @@ class CordaRPCClientImpl(private val session: ClientSession,
|
||||
private val rootShared = root.doOnUnsubscribe { close() }.share()
|
||||
|
||||
// This could be made more efficient by using a specialised IntMap
|
||||
private val observables = HashMap<Int, Observable<Any>>()
|
||||
// When handling this map we don't synchronise on [this], otherwise there is a race condition between close() and deliver()
|
||||
private val observables = Collections.synchronizedMap(HashMap<Int, Observable<Any>>())
|
||||
|
||||
private var consumer: ClientConsumer? = null
|
||||
|
||||
@ -320,24 +321,25 @@ class CordaRPCClientImpl(private val session: ClientSession,
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
fun getForHandle(handle: Int): Observable<Any> {
|
||||
return observables.getOrPut(handle) {
|
||||
/**
|
||||
* Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here.
|
||||
*
|
||||
* In particular doing it the other way around may result in the following edge case:
|
||||
* The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*,
|
||||
* before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing
|
||||
* the underlying artemis queue, even though the second Observable was not even registered.
|
||||
*
|
||||
* The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller
|
||||
* subscribes, which must be after full deserialisation and registering of all top level Observables.
|
||||
*
|
||||
* In addition, when subscribe and unsubscribe is called on the [Observable] returned here, we
|
||||
* reference count a hard reference to this [QueuedObservable] to prevent premature GC.
|
||||
*/
|
||||
rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize<Any>().doOnSubscribe { refCountUp() }.doOnUnsubscribe { refCountDown() }.share()
|
||||
synchronized(observables) {
|
||||
return observables.getOrPut(handle) {
|
||||
/**
|
||||
* Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here.
|
||||
*
|
||||
* In particular doing it the other way around may result in the following edge case:
|
||||
* The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*,
|
||||
* before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing
|
||||
* the underlying artemis queue, even though the second Observable was not even registered.
|
||||
*
|
||||
* The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller
|
||||
* subscribes, which must be after full deserialisation and registering of all top level Observables.
|
||||
*
|
||||
* In addition, when subscribe and unsubscribe is called on the [Observable] returned here, we
|
||||
* reference count a hard reference to this [QueuedObservable] to prevent premature GC.
|
||||
*/
|
||||
rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize<Any>().doOnSubscribe { refCountUp() }.doOnUnsubscribe { refCountDown() }.share()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -346,7 +348,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
|
||||
val kryo = createRPCKryo(observableSerializer = observableDeserializer)
|
||||
val received: MarshalledObservation = msg.deserialize(kryo)
|
||||
rpcLog.debug { "<- Observable [$rpcName] <- Received $received" }
|
||||
synchronized(this) {
|
||||
synchronized(observables) {
|
||||
// Force creation of the buffer if it doesn't already exist.
|
||||
getForHandle(received.forHandle)
|
||||
root.onNext(received)
|
||||
|
Loading…
Reference in New Issue
Block a user