mirror of
https://github.com/corda/corda.git
synced 2025-01-28 07:04:12 +00:00
Merge pull request #672 from corda/aslemmer-merge-2f1b8ff23e5a0400f72ff603da10b849604c6b04
Merge OS up to 2f1b8ff23e5a0400f72ff603da10b849604c6b04
This commit is contained in:
commit
4334d38bf5
@ -35,7 +35,7 @@ class SignedNodeInfo(val raw: SerializedBytes<NodeInfo>, val signatures: List<Di
|
||||
fun verified(): NodeInfo {
|
||||
val nodeInfo = raw.deserialize()
|
||||
val identities = nodeInfo.legalIdentities.filterNot { it.owningKey is CompositeKey }
|
||||
|
||||
require(identities.isNotEmpty()) { "At least one identity with a non-composite key needs to be specified." }
|
||||
if (identities.size < signatures.size) {
|
||||
throw SignatureException("Extra signatures. Found ${signatures.size} expected ${identities.size}")
|
||||
}
|
||||
|
@ -18,9 +18,6 @@ import com.esotericsoftware.kryo.io.Output
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener
|
||||
import com.google.common.collect.HashMultimap
|
||||
import com.google.common.collect.Multimaps
|
||||
import com.google.common.collect.SetMultimap
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
import net.corda.client.rpc.RPCException
|
||||
import net.corda.core.context.Actor
|
||||
@ -122,7 +119,7 @@ class RPCServer(
|
||||
/** The observable subscription mapping. */
|
||||
private val observableMap = createObservableSubscriptionMap()
|
||||
/** A mapping from client addresses to IDs of associated Observables */
|
||||
private val clientAddressToObservables = Multimaps.synchronizedSetMultimap(HashMultimap.create<SimpleString, InvocationId>())
|
||||
private val clientAddressToObservables = ConcurrentHashMap<SimpleString, HashSet<InvocationId>>()
|
||||
/** The scheduled reaper handle. */
|
||||
private var reaperScheduledFuture: ScheduledFuture<*>? = null
|
||||
|
||||
@ -301,8 +298,10 @@ class RPCServer(
|
||||
// Observables may be serialised and thus registered.
|
||||
private fun invalidateClient(clientAddress: SimpleString) {
|
||||
lifeCycle.requireState(State.STARTED)
|
||||
val observableIds = clientAddressToObservables.removeAll(clientAddress)
|
||||
observableMap.invalidateAll(observableIds)
|
||||
val observableIds = clientAddressToObservables.remove(clientAddress)
|
||||
if (observableIds != null) {
|
||||
observableMap.invalidateAll(observableIds)
|
||||
}
|
||||
responseMessageBuffer.remove(clientAddress)
|
||||
}
|
||||
|
||||
@ -429,7 +428,7 @@ class RPCServer(
|
||||
*/
|
||||
inner class ObservableContext(
|
||||
val observableMap: ObservableSubscriptionMap,
|
||||
val clientAddressToObservables: SetMultimap<SimpleString, InvocationId>,
|
||||
val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<InvocationId>>,
|
||||
val deduplicationIdentity: String,
|
||||
val clientAddress: SimpleString
|
||||
) {
|
||||
@ -535,11 +534,30 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables != null) {
|
||||
observables.remove(observableId)
|
||||
if (observables.isEmpty()) {
|
||||
null
|
||||
} else {
|
||||
observables
|
||||
}
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
observableContext.clientAddressToObservables.put(observableContext.clientAddress, observableId)
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables == null) {
|
||||
hashSetOf(observableId)
|
||||
} else {
|
||||
observables.add(observableId)
|
||||
observables
|
||||
}
|
||||
}
|
||||
observableContext.observableMap.put(observableId, observableWithSubscription)
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user