Merge commit '2f1b8ff23e5a0400f72ff603da10b849604c6b04' into aslemmer-merge-2f1b8ff23e5a0400f72ff603da10b849604c6b04

This commit is contained in:
Andras Slemmer 2018-04-03 17:10:33 +01:00
commit fbafe36643
2 changed files with 27 additions and 9 deletions

View File

@ -35,7 +35,7 @@ class SignedNodeInfo(val raw: SerializedBytes<NodeInfo>, val signatures: List<Di
fun verified(): NodeInfo { fun verified(): NodeInfo {
val nodeInfo = raw.deserialize() val nodeInfo = raw.deserialize()
val identities = nodeInfo.legalIdentities.filterNot { it.owningKey is CompositeKey } val identities = nodeInfo.legalIdentities.filterNot { it.owningKey is CompositeKey }
require(identities.isNotEmpty()) { "At least one identity with a non-composite key needs to be specified." }
if (identities.size < signatures.size) { if (identities.size < signatures.size) {
throw SignatureException("Extra signatures. Found ${signatures.size} expected ${identities.size}") throw SignatureException("Extra signatures. Found ${signatures.size} expected ${identities.size}")
} }

View File

@ -18,9 +18,6 @@ import com.esotericsoftware.kryo.io.Output
import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalListener import com.github.benmanes.caffeine.cache.RemovalListener
import com.google.common.collect.HashMultimap
import com.google.common.collect.Multimaps
import com.google.common.collect.SetMultimap
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCException
import net.corda.core.context.Actor import net.corda.core.context.Actor
@ -122,7 +119,7 @@ class RPCServer(
/** The observable subscription mapping. */ /** The observable subscription mapping. */
private val observableMap = createObservableSubscriptionMap() private val observableMap = createObservableSubscriptionMap()
/** A mapping from client addresses to IDs of associated Observables */ /** A mapping from client addresses to IDs of associated Observables */
private val clientAddressToObservables = Multimaps.synchronizedSetMultimap(HashMultimap.create<SimpleString, InvocationId>()) private val clientAddressToObservables = ConcurrentHashMap<SimpleString, HashSet<InvocationId>>()
/** The scheduled reaper handle. */ /** The scheduled reaper handle. */
private var reaperScheduledFuture: ScheduledFuture<*>? = null private var reaperScheduledFuture: ScheduledFuture<*>? = null
@ -301,8 +298,10 @@ class RPCServer(
// Observables may be serialised and thus registered. // Observables may be serialised and thus registered.
private fun invalidateClient(clientAddress: SimpleString) { private fun invalidateClient(clientAddress: SimpleString) {
lifeCycle.requireState(State.STARTED) lifeCycle.requireState(State.STARTED)
val observableIds = clientAddressToObservables.removeAll(clientAddress) val observableIds = clientAddressToObservables.remove(clientAddress)
if (observableIds != null) {
observableMap.invalidateAll(observableIds) observableMap.invalidateAll(observableIds)
}
responseMessageBuffer.remove(clientAddress) responseMessageBuffer.remove(clientAddress)
} }
@ -429,7 +428,7 @@ class RPCServer(
*/ */
inner class ObservableContext( inner class ObservableContext(
val observableMap: ObservableSubscriptionMap, val observableMap: ObservableSubscriptionMap,
val clientAddressToObservables: SetMultimap<SimpleString, InvocationId>, val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<InvocationId>>,
val deduplicationIdentity: String, val deduplicationIdentity: String,
val clientAddress: SimpleString val clientAddress: SimpleString
) { ) {
@ -535,11 +534,30 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
} }
override fun onCompleted() { override fun onCompleted() {
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
if (observables != null) {
observables.remove(observableId)
if (observables.isEmpty()) {
null
} else {
observables
}
} else {
null
}
}
} }
} }
) )
) )
observableContext.clientAddressToObservables.put(observableContext.clientAddress, observableId) observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
if (observables == null) {
hashSetOf(observableId)
} else {
observables.add(observableId)
observables
}
}
observableContext.observableMap.put(observableId, observableWithSubscription) observableContext.observableMap.put(observableId, observableWithSubscription)
} }