Address more comments

This commit is contained in:
Andras Slemmer 2018-02-19 15:16:12 +00:00
parent 7f1bfac8b0
commit 32bcf0a06c
2 changed files with 6 additions and 4 deletions

View File

@ -22,6 +22,7 @@ class DeduplicationChecker(cacheExpiry: Duration) {
/**
* @param identity the identity that generates the sequence numbers.
* @param sequenceNumber the sequence number to check.
* @return true if the message is unique, false if it's a duplicate.
*/
fun checkDuplicateMessageId(identity: Any, sequenceNumber: Long): Boolean {
return watermarkCache[identity].getAndUpdate { maxOf(sequenceNumber, it) } >= sequenceNumber

View File

@ -412,10 +412,11 @@ class RPCServer(
return Pair(Actor(Id(validatedUser), securityManager.id, targetLegalIdentity), securityManager.buildSubject(validatedUser))
}
// We construct an observable context on each RPC request. If subsequently a nested Observable is
// encountered this same context is propagated by the instrumented KryoPool. This way all
// observations rooted in a single RPC will be muxed correctly. Note that the context construction
// itself is quite cheap.
/*
* We construct an observable context on each RPC request. If subsequently a nested Observable is encountered this
* same context is propagated by the instrumented KryoPool. This way all observations rooted in a single RPC will be
* muxed correctly. Note that the context construction itself is quite cheap.
*/
inner class ObservableContext(
val observableMap: ObservableSubscriptionMap,
val clientAddressToObservables: SetMultimap<SimpleString, InvocationId>,