From 6d5e08b44e60cb6dade5142d146b7b078095d61a Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 3 Apr 2018 11:43:17 +0100 Subject: [PATCH 1/3] CORDA-1295: Clean up client->observable association on observable finish --- .../main/kotlin/net/corda/node/services/messaging/RPCServer.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index e70f80d39d..d3717de4ec 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -525,6 +525,7 @@ object RpcServerObservableSerializer : Serializer>() { } override fun onCompleted() { + observableContext.clientAddressToObservables.remove(observableContext.clientAddress, observableId) } } ) From ef723b1b6854fb22d97a90ccde02997e0966da62 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 3 Apr 2018 12:06:59 +0100 Subject: [PATCH 2/3] CORDA-1295: Use ConcurrentHashMap instead of synchronised multimap --- .../node/services/messaging/RPCServer.kt | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index d3717de4ec..39c8a8c3b5 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -8,9 +8,6 @@ import com.esotericsoftware.kryo.io.Output import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.RemovalListener -import com.google.common.collect.HashMultimap -import com.google.common.collect.Multimaps -import com.google.common.collect.SetMultimap import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.client.rpc.RPCException import net.corda.core.context.Actor @@ -112,7 +109,7 @@ class RPCServer( /** The observable subscription mapping. */ private val observableMap = createObservableSubscriptionMap() /** A mapping from client addresses to IDs of associated Observables */ - private val clientAddressToObservables = Multimaps.synchronizedSetMultimap(HashMultimap.create()) + private val clientAddressToObservables = ConcurrentHashMap>() /** The scheduled reaper handle. */ private var reaperScheduledFuture: ScheduledFuture<*>? = null @@ -291,8 +288,10 @@ class RPCServer( // Observables may be serialised and thus registered. private fun invalidateClient(clientAddress: SimpleString) { lifeCycle.requireState(State.STARTED) - val observableIds = clientAddressToObservables.removeAll(clientAddress) - observableMap.invalidateAll(observableIds) + val observableIds = clientAddressToObservables.remove(clientAddress) + if (observableIds != null) { + observableMap.invalidateAll(observableIds) + } responseMessageBuffer.remove(clientAddress) } @@ -419,7 +418,7 @@ class RPCServer( */ inner class ObservableContext( val observableMap: ObservableSubscriptionMap, - val clientAddressToObservables: SetMultimap, + val clientAddressToObservables: ConcurrentHashMap>, val deduplicationIdentity: String, val clientAddress: SimpleString ) { @@ -525,12 +524,30 @@ object RpcServerObservableSerializer : Serializer>() { } override fun onCompleted() { - observableContext.clientAddressToObservables.remove(observableContext.clientAddress, observableId) + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables != null) { + observables.remove(observableId) + if (observables.isEmpty()) { + null + } else { + observables + } + } else { + null + } + } } } ) ) - observableContext.clientAddressToObservables.put(observableContext.clientAddress, observableId) + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables == null) { + hashSetOf(observableId) + } else { + observables.add(observableId) + observables + } + } observableContext.observableMap.put(observableId, observableWithSubscription) } From 2f1b8ff23e5a0400f72ff603da10b849604c6b04 Mon Sep 17 00:00:00 2001 From: Michal Kit Date: Tue, 3 Apr 2018 15:49:06 +0100 Subject: [PATCH 3/3] Fixing SignedNodeInfoe security issue (#2908) --- .../main/kotlin/net/corda/nodeapi/internal/SignedNodeInfo.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/SignedNodeInfo.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/SignedNodeInfo.kt index b1fb7054d4..385ae81eba 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/SignedNodeInfo.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/SignedNodeInfo.kt @@ -25,7 +25,7 @@ class SignedNodeInfo(val raw: SerializedBytes, val signatures: List