diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/SignedNodeInfo.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/SignedNodeInfo.kt index c0303c28ee..0b8657600b 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/SignedNodeInfo.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/SignedNodeInfo.kt @@ -35,7 +35,7 @@ class SignedNodeInfo(val raw: SerializedBytes, val signatures: List()) + private val clientAddressToObservables = ConcurrentHashMap>() /** The scheduled reaper handle. */ private var reaperScheduledFuture: ScheduledFuture<*>? = null @@ -301,8 +298,10 @@ class RPCServer( // Observables may be serialised and thus registered. private fun invalidateClient(clientAddress: SimpleString) { lifeCycle.requireState(State.STARTED) - val observableIds = clientAddressToObservables.removeAll(clientAddress) - observableMap.invalidateAll(observableIds) + val observableIds = clientAddressToObservables.remove(clientAddress) + if (observableIds != null) { + observableMap.invalidateAll(observableIds) + } responseMessageBuffer.remove(clientAddress) } @@ -429,7 +428,7 @@ class RPCServer( */ inner class ObservableContext( val observableMap: ObservableSubscriptionMap, - val clientAddressToObservables: SetMultimap, + val clientAddressToObservables: ConcurrentHashMap>, val deduplicationIdentity: String, val clientAddress: SimpleString ) { @@ -535,11 +534,30 @@ object RpcServerObservableSerializer : Serializer>() { } override fun onCompleted() { + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables != null) { + observables.remove(observableId) + if (observables.isEmpty()) { + null + } else { + observables + } + } else { + null + } + } } } ) ) - observableContext.clientAddressToObservables.put(observableContext.clientAddress, observableId) + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables == null) { + hashSetOf(observableId) + } else { + observables.add(observableId) + observables + } + } observableContext.observableMap.put(observableId, observableWithSubscription) }