From 60520412c88bd14806fc00efc6b38e93e09d762d Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Wed, 8 Mar 2017 17:03:25 +0000 Subject: [PATCH 1/2] rpc: Fix deadlock caused by deliver() and close() --- .../services/messaging/CordaRPCClientImpl.kt | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt index fc8017f15a..b46dad17ac 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt @@ -284,6 +284,10 @@ class CordaRPCClientImpl(private val session: ClientSession, private val rootShared = root.doOnUnsubscribe { close() }.share() // This could be made more efficient by using a specialised IntMap + /** + * When handling this map we synchronise on it explicitly instead of on [this], otherwise there is a race + * condition between close() and deliver() + */ private val observables = HashMap>() private var consumer: ClientConsumer? = null @@ -320,24 +324,25 @@ class CordaRPCClientImpl(private val session: ClientSession, } } - @Synchronized fun getForHandle(handle: Int): Observable { - return observables.getOrPut(handle) { - /** - * Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here. - * - * In particular doing it the other way around may result in the following edge case: - * The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*, - * before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing - * the underlying artemis queue, even though the second Observable was not even registered. - * - * The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller - * subscribes, which must be after full deserialisation and registering of all top level Observables. - * - * In addition, when subscribe and unsubscribe is called on the [Observable] returned here, we - * reference count a hard reference to this [QueuedObservable] to prevent premature GC. - */ - rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize().doOnSubscribe { refCountUp() }.doOnUnsubscribe { refCountDown() }.share() + synchronized(observables) { + return observables.getOrPut(handle) { + /** + * Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here. + * + * In particular doing it the other way around may result in the following edge case: + * The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*, + * before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing + * the underlying artemis queue, even though the second Observable was not even registered. + * + * The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller + * subscribes, which must be after full deserialisation and registering of all top level Observables. + * + * In addition, when subscribe and unsubscribe is called on the [Observable] returned here, we + * reference count a hard reference to this [QueuedObservable] to prevent premature GC. + */ + rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize().doOnSubscribe { refCountUp() }.doOnUnsubscribe { refCountDown() }.share() + } } } @@ -346,7 +351,7 @@ class CordaRPCClientImpl(private val session: ClientSession, val kryo = createRPCKryo(observableSerializer = observableDeserializer) val received: MarshalledObservation = msg.deserialize(kryo) rpcLog.debug { "<- Observable [$rpcName] <- Received $received" } - synchronized(this) { + synchronized(observables) { // Force creation of the buffer if it doesn't already exist. getForHandle(received.forHandle) root.onNext(received) From 389685a31e4be1528707934702349f85c1cbf76c Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 14 Mar 2017 15:17:06 +0000 Subject: [PATCH 2/2] PR 339 Address comments --- .../corda/node/services/messaging/CordaRPCClientImpl.kt | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt index b46dad17ac..9afacf8ebd 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt @@ -284,11 +284,8 @@ class CordaRPCClientImpl(private val session: ClientSession, private val rootShared = root.doOnUnsubscribe { close() }.share() // This could be made more efficient by using a specialised IntMap - /** - * When handling this map we synchronise on it explicitly instead of on [this], otherwise there is a race - * condition between close() and deliver() - */ - private val observables = HashMap>() + // When handling this map we don't synchronise on [this], otherwise there is a race condition between close() and deliver() + private val observables = Collections.synchronizedMap(HashMap>()) private var consumer: ClientConsumer? = null