From 515d1088d54bd17629147ca74e521ea0d404a0a6 Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Wed, 9 Oct 2019 17:43:11 +0100 Subject: [PATCH] CORDA-2979: Remove quasar from RPC client (#5572) * CORDA-2979: Remove QUASAR from rpc-client * CORDA-2979: Remove usage of QUASAR class since it's not used. This eventually triggers a check for the java agent that won't be run for RPC clients. --- .../rpc/internal/RPCClientProxyHandler.kt | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 7c1d7cdc2d..a3b717b76e 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -1,10 +1,10 @@ package net.corda.client.rpc.internal -import co.paralleluniverse.common.util.SameThreadExecutor import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.RemovalCause import com.github.benmanes.caffeine.cache.RemovalListener +import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.client.rpc.ConnectionFailureException @@ -132,7 +132,10 @@ class RPCClientProxyHandler( private var sendExecutor: ExecutorService? = null // A sticky pool for running Observable.onNext()s. We need the stickiness to preserve the observation ordering. - private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").setDaemon(true).build() + private val observationExecutorThreadFactory = ThreadFactoryBuilder() + .setNameFormat("rpc-client-observation-pool-%d") + .setDaemon(true) + .build() private val observationExecutorPool = LazyStickyPool(rpcConfiguration.observationExecutorPoolSize) { Executors.newFixedThreadPool(1, observationExecutorThreadFactory) } @@ -156,7 +159,8 @@ class RPCClientProxyHandler( private val observablesToReap = ThreadBox(object { var observables = ArrayList() }) - private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext) + private val serializationContextWithObservableContext = RpcClientObservableDeSerializer + .createContext(serializationContext, observableContext) private fun createRpcObservableMap(): RpcObservableMap { val onObservableRemove = RemovalListener>> { key, _, cause -> @@ -175,7 +179,13 @@ class RPCClientProxyHandler( } observablesToReap.locked { observables.add(observableId) } } - return cacheFactory.buildNamed(Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()), "RpcClientProxyHandler_rpcObservable") + return cacheFactory.buildNamed( + Caffeine.newBuilder() + .weakValues() + .removalListener(onObservableRemove) + .executor(MoreExecutors.directExecutor()), + "RpcClientProxyHandler_rpcObservable" + ) } private var sessionFactory: ClientSessionFactory? = null