Fix a thread safety issue on session close in RPC

This commit is contained in:
Mike Hearn 2017-03-27 15:19:39 +02:00
parent aca00700aa
commit 3dea759587

View File

@ -294,6 +294,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
// When handling this map we don't synchronise on [this], otherwise there is a race condition between close() and deliver()
private val observables = Collections.synchronizedMap(HashMap<Int, Observable<Any>>())
@GuardedBy("sessionLock")
private var consumer: ClientConsumer? = null
private val referenceCount = AtomicInteger(0)
@ -351,7 +352,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
}
private fun deliver(msg: ClientMessage) {
msg.acknowledge()
sessionLock.withLock { msg.acknowledge() }
val kryo = createRPCKryoForDeserialization(this@CordaRPCClientImpl, qName, rpcName, rpcLocation)
val received: MarshalledObservation = try { msg.deserialize(kryo) } finally {
releaseRPCKryoForDeserialization(kryo)
@ -364,22 +365,31 @@ class CordaRPCClientImpl(private val session: ClientSession,
}
}
@Synchronized
fun close() {
rpcLog.debug("Closing queue observable for call to $rpcName : $qName")
consumer?.close()
consumer = null
sessionLock.withLock { session.deleteQueue(qName) }
sessionLock.withLock {
if (consumer != null) {
rpcLog.debug("Closing queue observable for call to $rpcName : $qName")
consumer?.close()
consumer = null
session.deleteQueue(qName)
}
}
}
@Suppress("UNUSED")
fun finalize() {
val c = synchronized(this) { consumer }
if (c != null) {
val closed = sessionLock.withLock {
if (consumer != null) {
consumer!!.close()
consumer = null
true
} else
false
}
if (closed) {
rpcLog.warn("A hot observable returned from an RPC ($rpcName) was never subscribed to. " +
"This wastes server-side resources because it was queueing observations for retrieval. " +
"It is being closed now, but please adjust your code to subscribe and unsubscribe from the observable to close it explicitly.", rpcLocation)
c.close()
}
}
}