CORDA-1295: Clean up client->observable association on observable finish (#2915)

This commit is contained in:
Andras Slemmer 2018-04-04 12:40:32 +01:00 committed by Michele Sollecito
parent 6ae0142bc8
commit 7501e87a54

View File

@ -7,9 +7,6 @@ import com.esotericsoftware.kryo.io.Output
import com.google.common.cache.Cache import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder import com.google.common.cache.CacheBuilder
import com.google.common.cache.RemovalListener import com.google.common.cache.RemovalListener
import com.google.common.collect.HashMultimap
import com.google.common.collect.Multimaps
import com.google.common.collect.SetMultimap
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCException
import net.corda.core.context.Actor import net.corda.core.context.Actor
@ -111,7 +108,7 @@ class RPCServer(
/** The observable subscription mapping. */ /** The observable subscription mapping. */
private val observableMap = createObservableSubscriptionMap() private val observableMap = createObservableSubscriptionMap()
/** A mapping from client addresses to IDs of associated Observables */ /** A mapping from client addresses to IDs of associated Observables */
private val clientAddressToObservables = Multimaps.synchronizedSetMultimap(HashMultimap.create<SimpleString, InvocationId>()) private val clientAddressToObservables = ConcurrentHashMap<SimpleString, HashSet<InvocationId>>()
/** The scheduled reaper handle. */ /** The scheduled reaper handle. */
private var reaperScheduledFuture: ScheduledFuture<*>? = null private var reaperScheduledFuture: ScheduledFuture<*>? = null
@ -290,8 +287,10 @@ class RPCServer(
// Observables may be serialised and thus registered. // Observables may be serialised and thus registered.
private fun invalidateClient(clientAddress: SimpleString) { private fun invalidateClient(clientAddress: SimpleString) {
lifeCycle.requireState(State.STARTED) lifeCycle.requireState(State.STARTED)
val observableIds = clientAddressToObservables.removeAll(clientAddress) val observableIds = clientAddressToObservables.remove(clientAddress)
if (observableIds != null) {
observableMap.invalidateAll(observableIds) observableMap.invalidateAll(observableIds)
}
responseMessageBuffer.remove(clientAddress) responseMessageBuffer.remove(clientAddress)
} }
@ -419,7 +418,7 @@ class RPCServer(
*/ */
inner class ObservableContext( inner class ObservableContext(
val observableMap: ObservableSubscriptionMap, val observableMap: ObservableSubscriptionMap,
val clientAddressToObservables: SetMultimap<SimpleString, InvocationId>, val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<InvocationId>>,
val deduplicationIdentity: String, val deduplicationIdentity: String,
val clientAddress: SimpleString val clientAddress: SimpleString
) { ) {
@ -525,11 +524,30 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
} }
override fun onCompleted() { override fun onCompleted() {
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
if (observables != null) {
observables.remove(observableId)
if (observables.isEmpty()) {
null
} else {
observables
}
} else {
null
}
}
} }
} }
) )
) )
observableContext.clientAddressToObservables.put(observableContext.clientAddress, observableId) observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
if (observables == null) {
hashSetOf(observableId)
} else {
observables.add(observableId)
observables
}
}
observableContext.observableMap.put(observableId, observableWithSubscription) observableContext.observableMap.put(observableId, observableWithSubscription)
} }