ENT-6866 support quick RPCs that are not processed via the RPC thread pool (#7213)

* ENT-6866 support quick RPCs that are not processed via the RPC thread pool
This commit is contained in:
Chris Cochrane 2022-07-07 12:33:51 +01:00 committed by GitHub
parent ac6bac5a87
commit 54fdd12a2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 159 additions and 4 deletions

View File

@ -0,0 +1,109 @@
package net.corda.client.rpc
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.millis
import net.corda.coretesting.internal.testThreadFactory
import net.corda.node.services.rpc.RPCServerConfiguration
import net.corda.testing.node.internal.RPCDriverDSL
import net.corda.testing.node.internal.rpcDriver
import org.junit.After
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import kotlin.test.assertTrue
@RunWith(Parameterized::class)
class QuickRPCRequestTests : AbstractRPCTest() {
companion object {
// indicate when all RPC threads are busy
val threadBusyLatches = ConcurrentHashMap<Long, CountDownLatch>()
val numRpcThreads = 4
}
interface TestOps : RPCOps {
fun newLatch(numberOfDowns: Int): Long
fun waitLatch(id: Long)
fun downLatch(id: Long)
}
class TestOpsImpl : TestOps {
private val latches = ConcurrentHashMap<Long, CountDownLatch>()
override val protocolVersion = 1000
override fun newLatch(numberOfDowns: Int): Long {
val id = random63BitValue()
val latch = CountDownLatch(numberOfDowns)
latches[id] = latch
return id
}
override fun waitLatch(id: Long) {
threadBusyLatches[id]!!.countDown()
latches[id]!!.await()
}
override fun downLatch(id: Long) {
latches[id]!!.countDown()
}
}
private fun RPCDriverDSL.testProxy(): TestProxy<TestOps> {
return testProxy<TestOps>(
TestOpsImpl(),
clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(
reapInterval = 100.millis
),
serverConfiguration = RPCServerConfiguration.DEFAULT.copy(
rpcThreadPoolSize = numRpcThreads
)
)
}
private val pool = Executors.newFixedThreadPool(10, testThreadFactory())
@After
fun shutdown() {
pool.shutdown()
}
@Test(timeout=300_000)
fun `quick RPCs by-pass the standard RPC thread pool`() {
/*
1. Set up a node with N RPC threads
2. Send a call to a blocking RPC on each thread
3. When all RPC threads are blocked, call a quick RPC
4. Check the quick RPC returns, whilst the RPC threads are still blocked
*/
rpcDriver {
val proxy = testProxy()
val numberOfDownsRequired = 1
val id = proxy.ops.newLatch(numberOfDownsRequired)
val newThreadLatch = CountDownLatch(numRpcThreads)
threadBusyLatches[id] = newThreadLatch
// Flood the RPC threads with blocking calls
for (n in 1..numRpcThreads) {
pool.fork {
proxy.ops.waitLatch(id)
}
}
// wait until all the RPC threads are blocked
threadBusyLatches[id]!!.await()
// try a quick RPC - getProtocolVersion() is always quick
val quickResult = proxy.ops.protocolVersion.toString()
// the fact that a result is returned is proof enough that the quick-RPC has by-passed the
// flooded RPC thread pool
assertTrue(quickResult.isNotEmpty())
// The failure condition is that the test times out.
}
}
}

View File

@ -63,6 +63,7 @@ import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.function.Predicate
import kotlin.concurrent.thread
private typealias ObservableSubscriptionMap = Cache<InvocationId, ObservableSubscription>
@ -359,6 +360,22 @@ class RPCServer(
}
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
/*
Local function for actually executing an RPC, either directly or through the thread pool
*/
fun executeRpc(context : RpcAuthContext, clientToServer : RPCApi.ClientToServer.RpcRequest, arguments : Try.Success<List<Any?>>, isQuickRpc : Boolean) {
if (isQuickRpc) {
val result = invokeRpc(context, clientToServer.methodName, arguments.value)
sendReply(clientToServer.replyId, clientToServer.clientAddress, result)
} else {
rpcExecutor!!.submit {
val result = invokeRpc(context, clientToServer.methodName, arguments.value)
sendReply(clientToServer.replyId, clientToServer.clientAddress, result)
}
}
}
lifeCycle.requireState(State.STARTED)
val clientToServer = RPCApi.ClientToServer.fromClientMessage(artemisMessage)
if (log.isDebugEnabled) {
@ -387,16 +404,45 @@ class RPCServer(
val arguments = Try.on {
clientToServer.serialisedArguments.deserialize<List<Any?>>(context = RPC_SERVER_CONTEXT)
}
log.debug("Received RPC request for [${clientToServer.methodName}]")
/*
The supplied method name may consist of <class>#<method>.
If just a method name is supplied then it is a call made via CordaRPCOps because a quirk of the
stored method names is that CordaRPCOps methods are stored without their class name.
The list of predicates below describes how to match quick RPC methods.
If at least one predicate returns true for the supplied method then it is treated as
a quick RPC.
*/
val quickRpcsList = listOf<Predicate<RPCApi.ClientToServer.RpcRequest>>(
// getProtocolVersion for any class
Predicate() { req ->
req.methodName.substringAfter(CLASS_METHOD_DIVIDER) == "getProtocolVersion"
},
// currentNodeTime for CordaRPCOps
Predicate() { req ->
req.methodName == "currentNodeTime"
}
// Add more predicates as and when needed
)
val isQuickRpc = if (quickRpcsList.any {
it.test(clientToServer)
}) {
log.debug("Handling [${clientToServer.methodName}] as a quick RPC")
true
} else {
false
}
val context: RpcAuthContext
when (arguments) {
is Try.Success -> {
context = artemisMessage.context(clientToServer.sessionId, arguments.value)
context.invocation.pushToLoggingContext()
log.debug { "Arguments: ${arguments.value.toTypedArray().contentDeepToString()}" }
rpcExecutor!!.submit {
val result = invokeRpc(context, clientToServer.methodName, arguments.value)
sendReply(clientToServer.replyId, clientToServer.clientAddress, result)
}
executeRpc(context, clientToServer, arguments, isQuickRpc)
}
is Try.Failure -> {
context = artemisMessage.context(clientToServer.sessionId, emptyList())