From 54fdd12a2d20a85e7945ce61dde055c60578ad45 Mon Sep 17 00:00:00 2001 From: Chris Cochrane <78791827+chriscochrane@users.noreply.github.com> Date: Thu, 7 Jul 2022 12:33:51 +0100 Subject: [PATCH] ENT-6866 support quick RPCs that are not processed via the RPC thread pool (#7213) * ENT-6866 support quick RPCs that are not processed via the RPC thread pool --- .../corda/client/rpc/QuickRPCRequestTests.kt | 109 ++++++++++++++++++ .../net/corda/node/services/rpc/RPCServer.kt | 54 ++++++++- 2 files changed, 159 insertions(+), 4 deletions(-) create mode 100644 client/rpc/src/test/kotlin/net/corda/client/rpc/QuickRPCRequestTests.kt diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/QuickRPCRequestTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/QuickRPCRequestTests.kt new file mode 100644 index 0000000000..22110edafc --- /dev/null +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/QuickRPCRequestTests.kt @@ -0,0 +1,109 @@ +package net.corda.client.rpc + +import net.corda.core.crypto.random63BitValue +import net.corda.core.internal.concurrent.fork +import net.corda.core.messaging.RPCOps +import net.corda.core.utilities.millis +import net.corda.coretesting.internal.testThreadFactory +import net.corda.node.services.rpc.RPCServerConfiguration +import net.corda.testing.node.internal.RPCDriverDSL +import net.corda.testing.node.internal.rpcDriver +import org.junit.After +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import kotlin.test.assertTrue + +@RunWith(Parameterized::class) +class QuickRPCRequestTests : AbstractRPCTest() { + + companion object { + // indicate when all RPC threads are busy + val threadBusyLatches = ConcurrentHashMap() + + val numRpcThreads = 4 + } + + interface TestOps : RPCOps { + fun newLatch(numberOfDowns: Int): Long + fun waitLatch(id: Long) + fun downLatch(id: Long) + } + + class TestOpsImpl : TestOps { + private val latches = ConcurrentHashMap() + override val protocolVersion = 1000 + + override fun newLatch(numberOfDowns: Int): Long { + val id = random63BitValue() + val latch = CountDownLatch(numberOfDowns) + latches[id] = latch + return id + } + + override fun waitLatch(id: Long) { + threadBusyLatches[id]!!.countDown() + latches[id]!!.await() + } + + override fun downLatch(id: Long) { + latches[id]!!.countDown() + } + } + + private fun RPCDriverDSL.testProxy(): TestProxy { + return testProxy( + TestOpsImpl(), + clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy( + reapInterval = 100.millis + ), + serverConfiguration = RPCServerConfiguration.DEFAULT.copy( + rpcThreadPoolSize = numRpcThreads + ) + ) + } + + private val pool = Executors.newFixedThreadPool(10, testThreadFactory()) + @After + fun shutdown() { + pool.shutdown() + } + + @Test(timeout=300_000) + fun `quick RPCs by-pass the standard RPC thread pool`() { + /* + 1. Set up a node with N RPC threads + 2. Send a call to a blocking RPC on each thread + 3. When all RPC threads are blocked, call a quick RPC + 4. Check the quick RPC returns, whilst the RPC threads are still blocked + */ + rpcDriver { + val proxy = testProxy() + val numberOfDownsRequired = 1 + val id = proxy.ops.newLatch(numberOfDownsRequired) + + val newThreadLatch = CountDownLatch(numRpcThreads) + threadBusyLatches[id] = newThreadLatch + + // Flood the RPC threads with blocking calls + for (n in 1..numRpcThreads) { + pool.fork { + proxy.ops.waitLatch(id) + } + } + // wait until all the RPC threads are blocked + threadBusyLatches[id]!!.await() + // try a quick RPC - getProtocolVersion() is always quick + val quickResult = proxy.ops.protocolVersion.toString() + + // the fact that a result is returned is proof enough that the quick-RPC has by-passed the + // flooded RPC thread pool + assertTrue(quickResult.isNotEmpty()) + + // The failure condition is that the test times out. + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/rpc/RPCServer.kt index fdda466c2d..2b1b344aad 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/RPCServer.kt @@ -63,6 +63,7 @@ import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit +import java.util.function.Predicate import kotlin.concurrent.thread private typealias ObservableSubscriptionMap = Cache @@ -359,6 +360,22 @@ class RPCServer( } private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { + + /* + Local function for actually executing an RPC, either directly or through the thread pool + */ + fun executeRpc(context : RpcAuthContext, clientToServer : RPCApi.ClientToServer.RpcRequest, arguments : Try.Success>, isQuickRpc : Boolean) { + if (isQuickRpc) { + val result = invokeRpc(context, clientToServer.methodName, arguments.value) + sendReply(clientToServer.replyId, clientToServer.clientAddress, result) + } else { + rpcExecutor!!.submit { + val result = invokeRpc(context, clientToServer.methodName, arguments.value) + sendReply(clientToServer.replyId, clientToServer.clientAddress, result) + } + } + } + lifeCycle.requireState(State.STARTED) val clientToServer = RPCApi.ClientToServer.fromClientMessage(artemisMessage) if (log.isDebugEnabled) { @@ -387,16 +404,45 @@ class RPCServer( val arguments = Try.on { clientToServer.serialisedArguments.deserialize>(context = RPC_SERVER_CONTEXT) } + log.debug("Received RPC request for [${clientToServer.methodName}]") + + /* + The supplied method name may consist of #. + If just a method name is supplied then it is a call made via CordaRPCOps because a quirk of the + stored method names is that CordaRPCOps methods are stored without their class name. + + The list of predicates below describes how to match quick RPC methods. + If at least one predicate returns true for the supplied method then it is treated as + a quick RPC. + */ + val quickRpcsList = listOf>( + // getProtocolVersion for any class + Predicate() { req -> + req.methodName.substringAfter(CLASS_METHOD_DIVIDER) == "getProtocolVersion" + }, + // currentNodeTime for CordaRPCOps + Predicate() { req -> + req.methodName == "currentNodeTime" + } + // Add more predicates as and when needed + ) + + val isQuickRpc = if (quickRpcsList.any { + it.test(clientToServer) + }) { + log.debug("Handling [${clientToServer.methodName}] as a quick RPC") + true + } else { + false + } + val context: RpcAuthContext when (arguments) { is Try.Success -> { context = artemisMessage.context(clientToServer.sessionId, arguments.value) context.invocation.pushToLoggingContext() log.debug { "Arguments: ${arguments.value.toTypedArray().contentDeepToString()}" } - rpcExecutor!!.submit { - val result = invokeRpc(context, clientToServer.methodName, arguments.value) - sendReply(clientToServer.replyId, clientToServer.clientAddress, result) - } + executeRpc(context, clientToServer, arguments, isQuickRpc) } is Try.Failure -> { context = artemisMessage.context(clientToServer.sessionId, emptyList())