diff --git a/tools/jmeter/src/main/kotlin/com/r3/corda/jmeter/BaseFlowSampler.kt b/tools/jmeter/src/main/kotlin/com/r3/corda/jmeter/BaseFlowSampler.kt index d985b37575..dd53ca819a 100644 --- a/tools/jmeter/src/main/kotlin/com/r3/corda/jmeter/BaseFlowSampler.kt +++ b/tools/jmeter/src/main/kotlin/com/r3/corda/jmeter/BaseFlowSampler.kt @@ -13,19 +13,25 @@ package com.r3.corda.jmeter import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCConnection import net.corda.core.flows.FlowLogic +import net.corda.core.internal.LazyPool import net.corda.core.messaging.CordaRPCOps import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.loggerFor import org.apache.jmeter.config.Argument import org.apache.jmeter.config.Arguments import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext import org.apache.jmeter.samplers.SampleResult +import java.util.* /** * Do most of the work for firing flow start requests via RPC at a Corda node. */ abstract class BaseFlowSampler() : AbstractJavaSamplerClient() { companion object { + private data class RPCParams(val address: NetworkHostAndPort, val user: String, val password: String) + private data class RPCClient(val rpcClient: CordaRPCClient, val rpcConnection: CordaRPCConnection, val ops: CordaRPCOps) + val label = Argument("label", "\${__samplerName}", "", "The value in the label column in the resulting CSV file to dissambiguate this test run from others.") val host = Argument("host", "localhost", "", "The remote network address (hostname or IP address) to connect to for RPC.") val port = Argument("port", "10000", "", "The remote port to connect to for RPC.") @@ -33,11 +39,15 @@ abstract class BaseFlowSampler() : AbstractJavaSamplerClient() { val password = Argument("password", "corda_is_awesome", "", "The password for the RPC user.") val allArgs = setOf(label, host, port, username, password) + + val log = loggerFor() + + private val rpcClientPools = Collections.synchronizedMap(mutableMapOf>()) } - var rpcClient: CordaRPCClient? = null - var rpcConnection: CordaRPCConnection? = null - var rpcProxy: CordaRPCOps? = null + private var rpcParams: RPCParams? = null + private var rpcPool: LazyPool? = null + var labelValue: String? = null override fun getDefaultParameters(): Arguments { @@ -54,14 +64,23 @@ abstract class BaseFlowSampler() : AbstractJavaSamplerClient() { override fun setupTest(context: JavaSamplerContext) { super.setupTest(context) - rpcClient = CordaRPCClient(NetworkHostAndPort(context.getParameter(host.name), context.getIntParameter(port.name))) - rpcConnection = rpcClient!!.start(context.getParameter(username.name), context.getParameter(password.name)) - rpcProxy = rpcConnection!!.proxy + rpcParams = RPCParams(NetworkHostAndPort(context.getParameter(host.name), context.getIntParameter(port.name)), context.getParameter(username.name), context.getParameter(password.name)) labelValue = context.getParameter(label.name) if (labelValue.isNullOrBlank()) { labelValue = null } - setupTest(rpcProxy!!, context) + rpcPool = rpcClientPools.computeIfAbsent(rpcParams) { + LazyPool { + val rpcClient = CordaRPCClient(it.address) + val rpcConnection = rpcClient.start(it.user, it.password) + val rpcProxy = rpcConnection.proxy + RPCClient(rpcClient, rpcConnection, rpcProxy) + } + } + log.info("Set up test with rpcParams = $rpcParams, rpcPool = $rpcPool") + rpcPool?.run { + setupTest(it.ops, context) + } } protected open fun additionalFlowResponseProcessing(context: JavaSamplerContext, sample: SampleResult, response: Any?) { @@ -69,35 +88,40 @@ abstract class BaseFlowSampler() : AbstractJavaSamplerClient() { } override fun runTest(context: JavaSamplerContext): SampleResult { - val flowInvoke = createFlowInvoke(rpcProxy!!, context) - val result = SampleResult() - result.sampleStart() - val handle = rpcProxy!!.startFlowDynamic(flowInvoke.flowLogicClass, *(flowInvoke.args)) - result.sampleLabel = labelValue ?: flowInvoke.flowLogicClass.simpleName - result.latencyEnd() - try { - val flowResult = handle.returnValue.get() - result.sampleEnd() - return result.apply { - isSuccessful = true - additionalFlowResponseProcessing(context, this, flowResult) - } - } catch (e: Exception) { - result.sampleEnd() - e.printStackTrace() - return result.apply { - isSuccessful = false - additionalFlowResponseProcessing(context, this, e) + return rpcPool!!.run { + val flowInvoke = createFlowInvoke(it.ops, context) + val result = SampleResult() + result.sampleStart() + val handle = it.ops.startFlowDynamic(flowInvoke.flowLogicClass, *(flowInvoke.args)) + result.sampleLabel = labelValue ?: flowInvoke.flowLogicClass.simpleName + result.latencyEnd() + try { + val flowResult = handle.returnValue.get() + result.sampleEnd() + return result.apply { + isSuccessful = true + additionalFlowResponseProcessing(context, this, flowResult) + } + } catch (e: Exception) { + result.sampleEnd() + e.printStackTrace() + return result.apply { + isSuccessful = false + additionalFlowResponseProcessing(context, this, e) + } } } } override fun teardownTest(context: JavaSamplerContext) { - teardownTest(rpcProxy!!, context) - rpcProxy = null - rpcConnection!!.close() - rpcConnection = null - rpcClient = null + log.info("Tear down test with rpcParams = $rpcParams, rpcPool = $rpcPool") + for(rpcClient in rpcPool?.close() ?: emptyList()) { + teardownTest(rpcClient.ops, context) + rpcClient.rpcConnection.close() + } + rpcClientPools.remove(rpcParams) + rpcPool = null + rpcParams = null labelValue = null super.teardownTest(context) }