mirror of
https://github.com/corda/corda.git
synced 2025-03-14 00:06:45 +00:00
ENT-2106 Change RPC connection pooling in JMeter samplers to avoid running out of file descriptors (#1033)
This commit is contained in:
parent
9b5a099302
commit
5685b9f2db
@ -13,19 +13,25 @@ package com.r3.corda.jmeter
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.internal.LazyPool
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import org.apache.jmeter.config.Argument
|
||||
import org.apache.jmeter.config.Arguments
|
||||
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient
|
||||
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext
|
||||
import org.apache.jmeter.samplers.SampleResult
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* Do most of the work for firing flow start requests via RPC at a Corda node.
|
||||
*/
|
||||
abstract class BaseFlowSampler() : AbstractJavaSamplerClient() {
|
||||
companion object {
|
||||
private data class RPCParams(val address: NetworkHostAndPort, val user: String, val password: String)
|
||||
private data class RPCClient(val rpcClient: CordaRPCClient, val rpcConnection: CordaRPCConnection, val ops: CordaRPCOps)
|
||||
|
||||
val label = Argument("label", "\${__samplerName}", "<meta>", "The value in the label column in the resulting CSV file to dissambiguate this test run from others.")
|
||||
val host = Argument("host", "localhost", "<meta>", "The remote network address (hostname or IP address) to connect to for RPC.")
|
||||
val port = Argument("port", "10000", "<meta>", "The remote port to connect to for RPC.")
|
||||
@ -33,11 +39,15 @@ abstract class BaseFlowSampler() : AbstractJavaSamplerClient() {
|
||||
val password = Argument("password", "corda_is_awesome", "<meta>", "The password for the RPC user.")
|
||||
|
||||
val allArgs = setOf(label, host, port, username, password)
|
||||
|
||||
val log = loggerFor<BaseFlowSampler>()
|
||||
|
||||
private val rpcClientPools = Collections.synchronizedMap(mutableMapOf<RPCParams, LazyPool<RPCClient>>())
|
||||
}
|
||||
|
||||
var rpcClient: CordaRPCClient? = null
|
||||
var rpcConnection: CordaRPCConnection? = null
|
||||
var rpcProxy: CordaRPCOps? = null
|
||||
private var rpcParams: RPCParams? = null
|
||||
private var rpcPool: LazyPool<RPCClient>? = null
|
||||
|
||||
var labelValue: String? = null
|
||||
|
||||
override fun getDefaultParameters(): Arguments {
|
||||
@ -54,14 +64,23 @@ abstract class BaseFlowSampler() : AbstractJavaSamplerClient() {
|
||||
|
||||
override fun setupTest(context: JavaSamplerContext) {
|
||||
super.setupTest(context)
|
||||
rpcClient = CordaRPCClient(NetworkHostAndPort(context.getParameter(host.name), context.getIntParameter(port.name)))
|
||||
rpcConnection = rpcClient!!.start(context.getParameter(username.name), context.getParameter(password.name))
|
||||
rpcProxy = rpcConnection!!.proxy
|
||||
rpcParams = RPCParams(NetworkHostAndPort(context.getParameter(host.name), context.getIntParameter(port.name)), context.getParameter(username.name), context.getParameter(password.name))
|
||||
labelValue = context.getParameter(label.name)
|
||||
if (labelValue.isNullOrBlank()) {
|
||||
labelValue = null
|
||||
}
|
||||
setupTest(rpcProxy!!, context)
|
||||
rpcPool = rpcClientPools.computeIfAbsent(rpcParams) {
|
||||
LazyPool<RPCClient> {
|
||||
val rpcClient = CordaRPCClient(it.address)
|
||||
val rpcConnection = rpcClient.start(it.user, it.password)
|
||||
val rpcProxy = rpcConnection.proxy
|
||||
RPCClient(rpcClient, rpcConnection, rpcProxy)
|
||||
}
|
||||
}
|
||||
log.info("Set up test with rpcParams = $rpcParams, rpcPool = $rpcPool")
|
||||
rpcPool?.run {
|
||||
setupTest(it.ops, context)
|
||||
}
|
||||
}
|
||||
|
||||
protected open fun additionalFlowResponseProcessing(context: JavaSamplerContext, sample: SampleResult, response: Any?) {
|
||||
@ -69,35 +88,40 @@ abstract class BaseFlowSampler() : AbstractJavaSamplerClient() {
|
||||
}
|
||||
|
||||
override fun runTest(context: JavaSamplerContext): SampleResult {
|
||||
val flowInvoke = createFlowInvoke(rpcProxy!!, context)
|
||||
val result = SampleResult()
|
||||
result.sampleStart()
|
||||
val handle = rpcProxy!!.startFlowDynamic(flowInvoke.flowLogicClass, *(flowInvoke.args))
|
||||
result.sampleLabel = labelValue ?: flowInvoke.flowLogicClass.simpleName
|
||||
result.latencyEnd()
|
||||
try {
|
||||
val flowResult = handle.returnValue.get()
|
||||
result.sampleEnd()
|
||||
return result.apply {
|
||||
isSuccessful = true
|
||||
additionalFlowResponseProcessing(context, this, flowResult)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
result.sampleEnd()
|
||||
e.printStackTrace()
|
||||
return result.apply {
|
||||
isSuccessful = false
|
||||
additionalFlowResponseProcessing(context, this, e)
|
||||
return rpcPool!!.run {
|
||||
val flowInvoke = createFlowInvoke(it.ops, context)
|
||||
val result = SampleResult()
|
||||
result.sampleStart()
|
||||
val handle = it.ops.startFlowDynamic(flowInvoke.flowLogicClass, *(flowInvoke.args))
|
||||
result.sampleLabel = labelValue ?: flowInvoke.flowLogicClass.simpleName
|
||||
result.latencyEnd()
|
||||
try {
|
||||
val flowResult = handle.returnValue.get()
|
||||
result.sampleEnd()
|
||||
return result.apply {
|
||||
isSuccessful = true
|
||||
additionalFlowResponseProcessing(context, this, flowResult)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
result.sampleEnd()
|
||||
e.printStackTrace()
|
||||
return result.apply {
|
||||
isSuccessful = false
|
||||
additionalFlowResponseProcessing(context, this, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun teardownTest(context: JavaSamplerContext) {
|
||||
teardownTest(rpcProxy!!, context)
|
||||
rpcProxy = null
|
||||
rpcConnection!!.close()
|
||||
rpcConnection = null
|
||||
rpcClient = null
|
||||
log.info("Tear down test with rpcParams = $rpcParams, rpcPool = $rpcPool")
|
||||
for(rpcClient in rpcPool?.close() ?: emptyList()) {
|
||||
teardownTest(rpcClient.ops, context)
|
||||
rpcClient.rpcConnection.close()
|
||||
}
|
||||
rpcClientPools.remove(rpcParams)
|
||||
rpcPool = null
|
||||
rpcParams = null
|
||||
labelValue = null
|
||||
super.teardownTest(context)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user