CORDA-2979: Remove quasar from RPC client (#5572)

* CORDA-2979: Remove QUASAR from rpc-client

* CORDA-2979: Remove usage of QUASAR class since it's not used.  This
eventually triggers a check for the java agent that won't be run
for RPC clients.
This commit is contained in:
Ryan Fowler 2019-10-09 17:43:11 +01:00 committed by Rick Parker
parent 0433d026b7
commit 515d1088d5

View File

@ -1,10 +1,10 @@
package net.corda.client.rpc.internal
import co.paralleluniverse.common.util.SameThreadExecutor
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.RemovalListener
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.ConnectionFailureException
@ -132,7 +132,10 @@ class RPCClientProxyHandler(
private var sendExecutor: ExecutorService? = null
// A sticky pool for running Observable.onNext()s. We need the stickiness to preserve the observation ordering.
private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").setDaemon(true).build()
private val observationExecutorThreadFactory = ThreadFactoryBuilder()
.setNameFormat("rpc-client-observation-pool-%d")
.setDaemon(true)
.build()
private val observationExecutorPool = LazyStickyPool(rpcConfiguration.observationExecutorPoolSize) {
Executors.newFixedThreadPool(1, observationExecutorThreadFactory)
}
@ -156,7 +159,8 @@ class RPCClientProxyHandler(
private val observablesToReap = ThreadBox(object {
var observables = ArrayList<InvocationId>()
})
private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)
private val serializationContextWithObservableContext = RpcClientObservableDeSerializer
.createContext(serializationContext, observableContext)
private fun createRpcObservableMap(): RpcObservableMap {
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause ->
@ -175,7 +179,13 @@ class RPCClientProxyHandler(
}
observablesToReap.locked { observables.add(observableId) }
}
return cacheFactory.buildNamed(Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()), "RpcClientProxyHandler_rpcObservable")
return cacheFactory.buildNamed(
Caffeine.newBuilder()
.weakValues()
.removalListener(onObservableRemove)
.executor(MoreExecutors.directExecutor()),
"RpcClientProxyHandler_rpcObservable"
)
}
private var sessionFactory: ClientSessionFactory? = null