Merge pull request #2907 from corda/aslemmer-CORDA-1295

CORDA-1295: Clean up client->observable association on observable finish
This commit is contained in:
Andras Slemmer 2018-04-03 14:20:05 +01:00 committed by GitHub
commit 623eae7da1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -8,9 +8,6 @@ import com.esotericsoftware.kryo.io.Output
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalListener
import com.google.common.collect.HashMultimap
import com.google.common.collect.Multimaps
import com.google.common.collect.SetMultimap
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.RPCException
import net.corda.core.context.Actor
@ -112,7 +109,7 @@ class RPCServer(
/** The observable subscription mapping. */
private val observableMap = createObservableSubscriptionMap()
/** A mapping from client addresses to IDs of associated Observables */
private val clientAddressToObservables = Multimaps.synchronizedSetMultimap(HashMultimap.create<SimpleString, InvocationId>())
private val clientAddressToObservables = ConcurrentHashMap<SimpleString, HashSet<InvocationId>>()
/** The scheduled reaper handle. */
private var reaperScheduledFuture: ScheduledFuture<*>? = null
@ -291,8 +288,10 @@ class RPCServer(
// Observables may be serialised and thus registered.
private fun invalidateClient(clientAddress: SimpleString) {
lifeCycle.requireState(State.STARTED)
val observableIds = clientAddressToObservables.removeAll(clientAddress)
observableMap.invalidateAll(observableIds)
val observableIds = clientAddressToObservables.remove(clientAddress)
if (observableIds != null) {
observableMap.invalidateAll(observableIds)
}
responseMessageBuffer.remove(clientAddress)
}
@ -419,7 +418,7 @@ class RPCServer(
*/
inner class ObservableContext(
val observableMap: ObservableSubscriptionMap,
val clientAddressToObservables: SetMultimap<SimpleString, InvocationId>,
val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<InvocationId>>,
val deduplicationIdentity: String,
val clientAddress: SimpleString
) {
@ -525,11 +524,30 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
}
override fun onCompleted() {
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
if (observables != null) {
observables.remove(observableId)
if (observables.isEmpty()) {
null
} else {
observables
}
} else {
null
}
}
}
}
)
)
observableContext.clientAddressToObservables.put(observableContext.clientAddress, observableId)
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
if (observables == null) {
hashSetOf(observableId)
} else {
observables.add(observableId)
observables
}
}
observableContext.observableMap.put(observableId, observableWithSubscription)
}