Added throttling to the load test tool and added stability test. (#847)

* Added throttling to the load test tool.
* Added stability test to the tool.
* Some refactoring to allow some configuration via vm option.
* updated doc.
* gradle now passes system parameter to load test.
* using Guava's RateLimiter instead of execute at fix rate.
This commit is contained in:
Patrick Kuo 2017-06-16 15:30:11 +01:00 committed by GitHub
parent 1f42997915
commit 28afb6d3ff
14 changed files with 442 additions and 306 deletions

View File

@ -31,9 +31,13 @@ In order to run the loadtests you need to have an active SSH-agent running with
You can use either IntelliJ or the gradle command line to start the tests.
To use gradle: ``./gradlew tools:loadtest:run -Ploadtest-config=PATH_TO_LOADTEST_CONF``
To use gradle with configuration file: ``./gradlew tools:loadtest:run -Ploadtest-config=PATH_TO_LOADTEST_CONF``
To use IntelliJ simply run Main.kt with the config path supplied as an argument.
To use gradle with system properties: ``./gradlew tools:loadtest:run -Dloadtest.mode=LOAD_TEST -Dloadtest.nodeHosts.0=node0.myhost.com``
.. note:: You can provide or override any configuration using the system properties, all properties will need to be prefixed with ``loadtest.``.
To use IntelliJ simply run Main.kt with the config path supplied as an argument or system properties as vm options.
Configuration of individual load tests
--------------------------------------
@ -112,3 +116,12 @@ The ``gatherRemoteState`` function should check the actual remote nodes' states
The reason it gets the previous state boils down to allowing non-deterministic predictions about the nodes' remote states. Say some piece of work triggers an asynchronous notification of a node. We need to account both for the case when the node hasn't received the notification and for the case when it has. In these cases ``S`` should somehow represent a collection of possible states, and ``gatherRemoteState`` should "collapse" the collection based on the observations it makes. Of course we don't need this for the simple case of the Self Issue test.
The last parameter ``isConsistent`` is used to poll for eventual consistency at the end of a load test. This is not needed for self-issuance.
Stability Test
--------------
Stability test is one variation of the load test, instead of flooding the nodes with request, the stability test uses execution frequency limit to achieve a constant execution rate.
To run the stability test, set the load test mode to STABILITY_TEST (``mode=STABILITY_TEST`` in config file or ``-Dloadtest.mode=STABILITY_TEST`` in system properties).
The stability test will first self issue cash using ``StabilityTest.selfIssueTest`` and after that it will randomly pay and exit cash using ``StabilityTest.crossCashTest`` for P2P testing, unlike the load test, the stability test will run without any disruption.

View File

@ -25,4 +25,8 @@ run {
if (project.hasProperty('loadtest-config')) {
args project["loadtest-config"]
}
System.getProperties().forEach { k, v ->
if (k.toString().startsWith("loadtest."))
systemProperty k, v
}
}

View File

@ -1,17 +1,15 @@
package net.corda.loadtest
import com.google.common.net.HostAndPort
import com.jcraft.jsch.*
import com.jcraft.jsch.Buffer
import com.jcraft.jsch.Identity
import com.jcraft.jsch.IdentityRepository
import com.jcraft.jsch.JSch
import com.jcraft.jsch.agentproxy.AgentProxy
import com.jcraft.jsch.agentproxy.connector.SSHAgentConnector
import com.jcraft.jsch.agentproxy.usocket.JNAUSocketFactory
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.core.messaging.CordaRPCOps
import net.corda.testing.driver.PortAllocation
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
import java.io.Closeable
import java.util.*
import kotlin.streams.toList
@ -62,27 +60,23 @@ fun setupJSchWithSshAgent(): JSch {
}
}
class ConnectionManager(private val username: String, private val jSch: JSch) {
fun connectToNode(
nodeHost: String,
remoteMessagingPort: Int,
localTunnelAddress: HostAndPort,
rpcUsername: String,
rpcPassword: String
): NodeConnection {
val session = jSch.getSession(username, nodeHost, 22)
class ConnectionManager(private val jSch: JSch) {
fun connectToNode(remoteNode: RemoteNode, localTunnelAddress: HostAndPort): NodeConnection {
val session = jSch.getSession(remoteNode.sshUserName, remoteNode.hostname, 22)
// We don't check the host fingerprints because they may change often
session.setConfig("StrictHostKeyChecking", "no")
log.info("Connecting to $nodeHost...")
log.info("Connecting to ${remoteNode.hostname}...")
session.connect()
log.info("Connected to $nodeHost!")
log.info("Connected to ${remoteNode.hostname}!")
log.info("Creating tunnel from $nodeHost:$remoteMessagingPort to $localTunnelAddress...")
session.setPortForwardingL(localTunnelAddress.port, localTunnelAddress.host, remoteMessagingPort)
log.info("Creating tunnel from ${remoteNode.hostname} to $localTunnelAddress...")
session.setPortForwardingL(localTunnelAddress.port, localTunnelAddress.host, remoteNode.rpcPort)
log.info("Tunnel created!")
val connection = NodeConnection(nodeHost, session, localTunnelAddress, rpcUsername, rpcPassword)
connection.startClient()
val connection = NodeConnection(remoteNode, session, localTunnelAddress)
connection.startNode()
connection.waitUntilUp()
connection.startRPCClient()
return connection
}
}
@ -98,26 +92,11 @@ class ConnectionManager(private val username: String, private val jSch: JSch) {
* @param withConnections An action to run once we're connected to the nodes.
* @return The return value of [withConnections]
*/
fun <A> connectToNodes(
username: String,
nodeHosts: List<String>,
remoteMessagingPort: Int,
tunnelPortAllocation: PortAllocation,
rpcUsername: String,
rpcPassword: String,
withConnections: (List<NodeConnection>) -> A
): A {
val manager = ConnectionManager(username, setupJSchWithSshAgent())
val connections = nodeHosts.parallelStream().map { nodeHost ->
manager.connectToNode(
nodeHost = nodeHost,
remoteMessagingPort = remoteMessagingPort,
localTunnelAddress = tunnelPortAllocation.nextHostAndPort(),
rpcUsername = rpcUsername,
rpcPassword = rpcPassword
)
fun <A> connectToNodes(remoteNodes: List<RemoteNode>, tunnelPortAllocation: PortAllocation, withConnections: (List<NodeConnection>) -> A): A {
val manager = ConnectionManager(setupJSchWithSshAgent())
val connections = remoteNodes.parallelStream().map { remoteNode ->
manager.connectToNode(remoteNode, tunnelPortAllocation.nextHostAndPort())
}.toList()
return try {
withConnections(connections)
} finally {
@ -125,108 +104,6 @@ fun <A> connectToNodes(
}
}
/**
* [NodeConnection] allows executing remote shell commands on the node as well as executing RPCs.
* The RPC Client start/stop must be controlled externally with [startClient] and [doWhileClientStopped]. For example
* if we want to do some action on the node that requires bringing down of the node we should nest it in a
* [doWhileClientStopped], otherwise the RPC link will be broken.
*/
class NodeConnection(
val hostName: String,
private val jSchSession: Session,
private val localTunnelAddress: HostAndPort,
private val rpcUsername: String,
private val rpcPassword: String
) : Closeable {
private val client = CordaRPCClient(localTunnelAddress)
private var connection: CordaRPCConnection? = null
val proxy: CordaRPCOps get() = connection?.proxy ?: throw IllegalStateException("proxy requested, but the client is not running")
data class ShellCommandOutput(
val originalShellCommand: String,
val exitCode: Int,
val stdout: String,
val stderr: String
) {
fun getResultOrThrow(): String {
if (exitCode != 0) {
val diagnostic =
"There was a problem running \"$originalShellCommand\":\n" +
" stdout:\n$stdout" +
" stderr:\n$stderr"
log.error(diagnostic)
throw Exception(diagnostic)
} else {
return stdout
}
}
}
fun <A> doWhileClientStopped(action: () -> A): A {
val connection = connection
require(connection != null) { "doWhileClientStopped called with no running client" }
log.info("Stopping RPC proxy to $hostName, tunnel at $localTunnelAddress")
connection!!.close()
try {
return action()
} finally {
log.info("Starting new RPC proxy to $hostName, tunnel at $localTunnelAddress")
// TODO expose these somehow?
val newConnection = client.start(rpcUsername, rpcPassword)
this.connection = newConnection
}
}
fun startClient() {
log.info("Creating RPC proxy to $hostName, tunnel at $localTunnelAddress")
connection = client.start(rpcUsername, rpcPassword)
log.info("Proxy created")
}
/**
* @return Pair of (stdout, stderr) of command
*/
fun runShellCommandGetOutput(command: String): ShellCommandOutput {
log.info("Running '$command' on $hostName")
val (exitCode, pair) = withChannelExec(command) { channel ->
val stdoutStream = ByteArrayOutputStream()
val stderrStream = ByteArrayOutputStream()
channel.outputStream = stdoutStream
channel.setErrStream(stderrStream)
channel.connect()
poll { channel.isEOF }
Pair(stdoutStream.toString(), stderrStream.toString())
}
return ShellCommandOutput(
originalShellCommand = command,
exitCode = exitCode,
stdout = pair.first,
stderr = pair.second
)
}
/**
* @param function should call [ChannelExec.connect]
* @return A pair of (exit code, [function] return value)
*/
private fun <A> withChannelExec(command: String, function: (ChannelExec) -> A): Pair<Int, A> {
val channel = jSchSession.openChannel("exec") as ChannelExec
channel.setCommand(command)
try {
val result = function(channel)
poll { channel.isEOF }
return Pair(channel.exitStatus, result)
} finally {
channel.disconnect()
}
}
override fun close() {
connection?.close()
jSchSession.disconnect()
}
}
fun poll(intervalMilliseconds: Long = 500, function: () -> Boolean) {
while (!function()) {
Thread.sleep(intervalMilliseconds)

View File

@ -16,11 +16,11 @@ private val log = LoggerFactory.getLogger(Disruption::class.java)
// DOCS START 1
data class Disruption(
val name: String,
val disrupt: (NodeHandle, SplittableRandom) -> Unit
val disrupt: (NodeConnection, SplittableRandom) -> Unit
)
data class DisruptionSpec(
val nodeFilter: (NodeHandle) -> Boolean,
val nodeFilter: (NodeConnection) -> Boolean,
val disruption: Disruption,
val noDisruptionWindowMs: LongRange
)
@ -43,8 +43,8 @@ data class DisruptionSpec(
* * Randomly duplicate messages, perhaps to other queues even.
*/
val isNetworkMap = { node: NodeHandle -> node.info.advertisedServices.any { it.info.type == NetworkMapService.type } }
val isNotary = { node: NodeHandle -> node.info.advertisedServices.any { it.info.type.isNotary() } }
val isNetworkMap = { node: NodeConnection -> node.info.advertisedServices.any { it.info.type == NetworkMapService.type } }
val isNotary = { node: NodeConnection -> node.info.advertisedServices.any { it.info.type.isNotary() } }
fun <A> ((A) -> Boolean).or(other: (A) -> Boolean): (A) -> Boolean = { this(it) || other(it) }
fun hang(hangIntervalRange: LongRange) = Disruption("Hang randomly") { node, random ->
@ -52,21 +52,21 @@ fun hang(hangIntervalRange: LongRange) = Disruption("Hang randomly") { node, ran
node.doWhileSigStopped { Thread.sleep(hangIntervalMs) }
}
val restart = Disruption("Restart randomly") { (configuration, connection), _ ->
connection.runShellCommandGetOutput("sudo systemctl restart ${configuration.remoteSystemdServiceName}").getResultOrThrow()
val restart = Disruption("Restart randomly") { connection, _ ->
connection.restartNode()
connection.waitUntilUp()
}
val kill = Disruption("Kill randomly") { node, _ ->
val pid = node.getNodePid()
node.connection.runShellCommandGetOutput("sudo kill $pid")
node.kill()
}
val deleteDb = Disruption("Delete persistence database without restart") { (configuration, connection), _ ->
connection.runShellCommandGetOutput("sudo rm ${configuration.remoteNodeDirectory}/persistence.mv.db").getResultOrThrow()
val deleteDb = Disruption("Delete persistence database without restart") { connection, _ ->
connection.runShellCommandGetOutput("sudo rm ${connection.remoteNode.nodeDirectory}/persistence.mv.db").getResultOrThrow()
}
// DOCS START 2
fun strainCpu(parallelism: Int, durationSeconds: Int) = Disruption("Put strain on cpu") { (_, connection), _ ->
fun strainCpu(parallelism: Int, durationSeconds: Int) = Disruption("Put strain on cpu") { connection, _ ->
val shell = "for c in {1..$parallelism} ; do openssl enc -aes-128-cbc -in /dev/urandom -pass pass: -e > /dev/null & done && JOBS=\$(jobs -p) && (sleep $durationSeconds && kill \$JOBS) & wait"
connection.runShellCommandGetOutput(shell).getResultOrThrow()
}
@ -90,7 +90,7 @@ fun <A> Nodes.withDisruptions(disruptions: List<DisruptionSpec>, mainRandom: Spl
executor.invokeAll(nodes.map { node ->
val nodeRandom = random.split()
Callable {
log.info("Disrupting ${node.connection.hostName} with '${disruption.disruption.name}'")
log.info("Disrupting ${node.remoteNode.hostname} with '${disruption.disruption.name}'")
disruption.disruption.disrupt(node, nodeRandom)
}
})

View File

@ -1,14 +1,16 @@
package net.corda.loadtest
import com.google.common.util.concurrent.RateLimiter
import net.corda.client.mock.Generator
import net.corda.client.rpc.notUsed
import net.corda.core.crypto.toBase58String
import net.corda.testing.driver.PortAllocation
import net.corda.node.services.network.NetworkMapService
import net.corda.testing.driver.PortAllocation
import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors
private val log = LoggerFactory.getLogger(LoadTest::class.java)
@ -61,6 +63,7 @@ data class LoadTest<T, S>(
val parallelism: Int,
val generateCount: Int,
val clearDatabaseBeforeRun: Boolean,
val executionFrequency: Int?,
val gatherFrequency: Int,
val disruptionPatterns: List<List<DisruptionSpec>>
)
@ -77,12 +80,19 @@ data class LoadTest<T, S>(
}
}
val rateLimiter = parameters.executionFrequency?.let {
log.info("Execution rate limited to $it per second.")
RateLimiter.create(it.toDouble())
}
val executor = Executors.newFixedThreadPool(parameters.parallelism)
parameters.disruptionPatterns.forEach { disruptions ->
log.info("Running test '$testName' with disruptions ${disruptions.map { it.disruption.name }}")
nodes.withDisruptions(disruptions, random) {
var state = nodes.gatherRemoteState(null)
var count = parameters.generateCount
var countSinceLastCheck = 0
while (count > 0) {
log.info("$count remaining commands, state:\n$state")
// Generate commands
@ -92,21 +102,21 @@ data class LoadTest<T, S>(
// Interpret commands
val newState = commands.fold(state, interpret)
// Execute commands
val queue = ConcurrentLinkedQueue(commands)
(1..parameters.parallelism).toList().parallelStream().forEach {
var next = queue.poll()
while (next != null) {
log.info("Executing $next")
try {
nodes.execute(next)
next = queue.poll()
} catch (exception: Throwable) {
val diagnostic = executeDiagnostic(state, newState, next, exception)
log.error(diagnostic)
throw Exception(diagnostic)
executor.invokeAll(
commands.map {
Callable<Unit> {
rateLimiter?.acquire()
log.info("Executing $it")
try {
nodes.execute(it)
} catch (exception: Throwable) {
val diagnostic = executeDiagnostic(state, newState, it, exception)
log.error(diagnostic)
throw Exception(diagnostic)
}
}
}
}
}
)
countSinceLastCheck += commands.size
if (countSinceLastCheck >= parameters.gatherFrequency) {
log.info("Checking consistency...")
@ -129,7 +139,7 @@ data class LoadTest<T, S>(
log.info("'$testName' done!")
}
}
executor.shutdown()
}
companion object {
@ -143,9 +153,9 @@ data class LoadTest<T, S>(
}
data class Nodes(
val notary: NodeHandle,
val networkMap: NodeHandle,
val simpleNodes: List<NodeHandle>
val notary: NodeConnection,
val networkMap: NodeConnection,
val simpleNodes: List<NodeConnection>
) {
val allNodes by lazy { (listOf(notary, networkMap) + simpleNodes).associateBy { it.info }.values }
}
@ -157,53 +167,44 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest
val seed = configuration.seed ?: Random().nextLong()
log.info("Using seed $seed")
val random = SplittableRandom(seed)
connectToNodes(
configuration.sshUser,
configuration.nodeHosts,
configuration.remoteMessagingPort,
PortAllocation.Incremental(configuration.localTunnelStartingPort),
configuration.rpcUsername,
configuration.rpcPassword
) { connections ->
val remoteNodes = configuration.nodeHosts.map { hostname ->
configuration.let {
RemoteNode(hostname, it.remoteSystemdServiceName, it.sshUser, it.rpcUser, it.rpcPort, it.remoteNodeDirectory)
}
}
connectToNodes(remoteNodes, PortAllocation.Incremental(configuration.localTunnelStartingPort)) { connections ->
log.info("Connected to all nodes!")
val hostNodeHandleMap = ConcurrentHashMap<String, NodeHandle>()
val hostNodeMap = ConcurrentHashMap<String, NodeConnection>()
connections.parallelStream().forEach { connection ->
log.info("Getting node info of ${connection.hostName}")
val nodeInfo = connection.proxy.nodeIdentity()
log.info("Got node info of ${connection.hostName}: $nodeInfo!")
val (otherNodeInfos, nodeInfoUpdates) = connection.proxy.networkMapUpdates()
nodeInfoUpdates.notUsed()
val pubkeysString = otherNodeInfos.map {
log.info("Getting node info of ${connection.remoteNode.hostname}")
val info = connection.info
log.info("Got node info of ${connection.remoteNode.hostname}: $info!")
val (otherInfo, infoUpdates) = connection.proxy.networkMapUpdates()
infoUpdates.notUsed()
val pubKeysString = otherInfo.map {
" ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}"
}.joinToString("\n")
log.info("${connection.hostName} waiting for network map")
log.info("${connection.remoteNode.hostname} waiting for network map")
connection.proxy.waitUntilRegisteredWithNetworkMap().get()
log.info("${connection.hostName} sees\n$pubkeysString")
val nodeHandle = NodeHandle(configuration, connection, nodeInfo)
nodeHandle.waitUntilUp()
hostNodeHandleMap.put(connection.hostName, nodeHandle)
}
val networkMapNode = hostNodeHandleMap.toList().single {
it.second.info.advertisedServices.any { it.info.type == NetworkMapService.type }
}
val notaryNode = hostNodeHandleMap.toList().single {
it.second.info.advertisedServices.any { it.info.type.isNotary() }
log.info("${connection.remoteNode.hostname} sees\n$pubKeysString")
hostNodeMap.put(connection.remoteNode.hostname, connection)
}
val networkMapNode = hostNodeMap.values.single { it.info.advertisedServices.any { it.info.type == NetworkMapService.type } }
val notaryNode = hostNodeMap.values.single { it.info.advertisedServices.any { it.info.type.isNotary() } }
val nodes = Nodes(
notary = notaryNode.second,
networkMap = networkMapNode.second,
simpleNodes = hostNodeHandleMap.values.filter {
notary = notaryNode,
networkMap = networkMapNode,
simpleNodes = hostNodeMap.values.filter {
it.info.advertisedServices.none {
it.info.type == NetworkMapService.type || it.info.type.isNotary()
}
}
)
tests.forEach {
val (test, parameters) = it
tests.forEach { (test, parameters) ->
test.run(nodes, parameters, random)
}
}

View File

@ -1,35 +1,45 @@
package net.corda.loadtest
import com.typesafe.config.Config
import net.corda.nodeapi.config.getValue
import net.corda.nodeapi.User
import java.nio.file.Path
import java.util.concurrent.ForkJoinPool
/**
* @param sshUser The UNIX username to use for SSH auth.
* @param localCertificatesBaseDirectory The base directory to put node certificates in.
* @param localTunnelStartingPort The local starting port to allocate tunneling ports from.
* @param nodeHosts The nodes' resolvable addresses.
* @param rpcUsername The RPC user's name to establish the RPC connection as.
* @param rpcPassword The RPC user's password.
* @param rpcUser The RPC user's name and passward to establish the RPC connection.
* @param remoteNodeDirectory The remote node directory.
* @param remoteMessagingPort The remote Artemis messaging port.
* @param rpcPort The remote Artemis messaging port for RPC.
* @param remoteSystemdServiceName The name of the node's systemd service
* @param seed An optional starting seed for the [SplittableRandom] RNG. Note that specifying the seed may not be enough
* to make a load test reproducible due to unpredictable node behaviour, but it should make the local number
* generation deterministic as long as [SplittableRandom.split] is used as required. This RNG is also used as input
* for disruptions.
* @param mode Indicates the type of test.
* @param executionFrequency Indicates how many commands we should execute per second.
* @param generateCount Number of total commands to generate. Note that the actual number of generated commands may
* exceed this, it is used just for cutoff.
* @param parallelism Number of concurrent threads to use to run commands. Note that the actual parallelism may be
* further limited by the batches that [generate] returns.
*/
data class LoadTestConfiguration(
val config: Config
) {
val sshUser: String by config
val localCertificatesBaseDirectory: Path by config
val localTunnelStartingPort: Int by config
val nodeHosts: List<String> = config.getStringList("nodeHosts")
val rpcUsername: String by config
val rpcPassword: String by config
val remoteNodeDirectory: Path by config
val remoteMessagingPort: Int by config
val remoteSystemdServiceName: String by config
val seed: Long? by config
val sshUser: String = System.getProperty("user.name"),
val localTunnelStartingPort: Int,
val nodeHosts: List<String>,
val rpcUser: User,
val remoteNodeDirectory: Path,
val rpcPort: Int,
val remoteSystemdServiceName: String,
val seed: Long?,
val mode: TestMode = TestMode.LOAD_TEST,
val executionFrequency: Int = 20,
val generateCount: Int = 10000,
val parallelism: Int = ForkJoinPool.getCommonPoolParallelism())
data class RemoteNode(val hostname: String, val systemdServiceName: String, val sshUserName: String, val rpcUser: User, val rpcPort: Int, val nodeDirectory: Path)
enum class TestMode {
LOAD_TEST,
STABILITY_TEST
}

View File

@ -2,8 +2,10 @@ package net.corda.loadtest
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import net.corda.loadtest.tests.StabilityTest
import net.corda.loadtest.tests.crossCashTest
import net.corda.loadtest.tests.selfIssueTest
import net.corda.nodeapi.config.parseAs
import java.io.File
/**
@ -33,6 +35,11 @@ import java.io.File
* disruption is basically an infinite loop of wait->mess something up->repeat. Invariants should hold under these
* conditions as well.
*
* Configuration:
* The load test will look for configuration in location provided by the program argument, or the configuration can be
* provided via system properties using vm arguments, e.g. -Dloadtest.nodeHosts.0="host" see [LoadTestConfiguration] for
* list of configurable properties.
*
* Diagnostic:
* TODO currently the diagnostic is quite poor, all we can say is that the predicted state is different from the real
* one, or that some piece of work failed to execute in some state. Logs need to be checked manually.
@ -43,26 +50,33 @@ import java.io.File
*/
fun main(args: Array<String>) {
if (args.isEmpty()) {
throw IllegalArgumentException("Usage: <binary> PATH_TO_CONFIG")
val customConfig = if (args.isNotEmpty()) {
ConfigFactory.parseFile(File(args[0]), ConfigParseOptions.defaults().setAllowMissing(false))
} else {
// This allow us to provide some configurations via teamcity.
ConfigFactory.parseProperties(System.getProperties()).getConfig("loadtest")
}
val defaultConfig = ConfigFactory.parseResources("loadtest-reference.conf", ConfigParseOptions.defaults().setAllowMissing(false))
val defaultSshUserConfig = ConfigFactory.parseMap(
if (defaultConfig.hasPath("sshUser")) emptyMap() else mapOf("sshUser" to System.getProperty("user.name"))
)
val customConfig = ConfigFactory.parseFile(File(args[0]), ConfigParseOptions.defaults().setAllowMissing(false))
val resolvedConfig = customConfig.withFallback(defaultConfig).withFallback(defaultSshUserConfig).resolve()
val loadTestConfiguration = LoadTestConfiguration(resolvedConfig)
val resolvedConfig = customConfig.withFallback(defaultConfig).resolve()
val loadTestConfiguration = resolvedConfig.parseAs<LoadTestConfiguration>()
if (loadTestConfiguration.nodeHosts.isEmpty()) {
throw IllegalArgumentException("Please specify at least one node host")
}
when (loadTestConfiguration.mode) {
TestMode.LOAD_TEST -> runLoadTest(loadTestConfiguration)
TestMode.STABILITY_TEST -> runStabilityTest(loadTestConfiguration)
}
}
private fun runLoadTest(loadTestConfiguration: LoadTestConfiguration) {
runLoadTests(loadTestConfiguration, listOf(
selfIssueTest to LoadTest.RunParameters(
parallelism = 100,
generateCount = 10000,
clearDatabaseBeforeRun = false,
executionFrequency = 1000,
gatherFrequency = 1000,
disruptionPatterns = listOf(
listOf(), // no disruptions
@ -91,6 +105,7 @@ fun main(args: Array<String>) {
parallelism = 4,
generateCount = 2000,
clearDatabaseBeforeRun = false,
executionFrequency = 1000,
gatherFrequency = 10,
disruptionPatterns = listOf(
listOf(),
@ -115,3 +130,26 @@ fun main(args: Array<String>) {
)
))
}
private fun runStabilityTest(loadTestConfiguration: LoadTestConfiguration) {
runLoadTests(loadTestConfiguration, listOf(
// Self issue cash.
StabilityTest.selfIssueTest to LoadTest.RunParameters(
parallelism = loadTestConfiguration.parallelism,
generateCount = loadTestConfiguration.generateCount,
clearDatabaseBeforeRun = false,
executionFrequency = loadTestConfiguration.executionFrequency,
gatherFrequency = 100,
disruptionPatterns = listOf(listOf()) // no disruptions
),
// Send cash to a random party or exit cash, commands are generated randomly.
StabilityTest.crossCashTest to LoadTest.RunParameters(
parallelism = loadTestConfiguration.parallelism,
generateCount = loadTestConfiguration.generateCount,
clearDatabaseBeforeRun = false,
executionFrequency = loadTestConfiguration.executionFrequency,
gatherFrequency = 100,
disruptionPatterns = listOf(listOf())
)
))
}

View File

@ -0,0 +1,169 @@
package net.corda.loadtest
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import com.jcraft.jsch.ChannelExec
import com.jcraft.jsch.Session
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.core.future
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.addShutdownHook
import java.io.ByteArrayOutputStream
import java.io.Closeable
import java.io.OutputStream
/**
* [NodeConnection] allows executing remote shell commands on the node as well as executing RPCs.
* The RPC Client start/stop must be controlled externally with [startClient] and [doWhileClientStopped]. For example
* if we want to do some action on the node that requires bringing down of the node we should nest it in a
* [doWhileClientStopped], otherwise the RPC link will be broken.
* TODO: Auto reconnect has been enable for RPC connection, investigate if we still need [doWhileClientStopped].
*/
class NodeConnection(val remoteNode: RemoteNode, private val jSchSession: Session, private val localTunnelAddress: HostAndPort) : Closeable {
companion object {
val log = loggerFor<NodeConnection>()
}
init {
addShutdownHook {
close()
}
}
private val client = CordaRPCClient(localTunnelAddress)
private var rpcConnection: CordaRPCConnection? = null
val proxy: CordaRPCOps get() = rpcConnection?.proxy ?: throw IllegalStateException("proxy requested, but the client is not running")
val info: NodeInfo by lazy { proxy.nodeIdentity() }
fun <A> doWhileClientStopped(action: () -> A): A {
val connection = rpcConnection
require(connection != null) { "doWhileClientStopped called with no running client" }
log.info("Stopping RPC proxy to ${remoteNode.hostname}, tunnel at $localTunnelAddress")
connection!!.close()
try {
return action()
} finally {
log.info("Starting new RPC proxy to ${remoteNode.hostname}, tunnel at $localTunnelAddress")
// TODO expose these somehow?
val newConnection = client.start(remoteNode.rpcUser.username, remoteNode.rpcUser.password)
this.rpcConnection = newConnection
}
}
fun startRPCClient() {
log.info("Creating RPC proxy to ${remoteNode.hostname}, tunnel at $localTunnelAddress")
rpcConnection = client.start(remoteNode.rpcUser.username, remoteNode.rpcUser.password)
log.info("Proxy created")
}
/**
* @param function should call [ChannelExec.connect]
* @return A pair of (exit code, [function] return value)
*/
private fun <A> withChannelExec(command: String, function: (ChannelExec) -> A): Pair<Int, A> {
val channel = jSchSession.openChannel("exec") as ChannelExec
channel.setCommand(command)
try {
val result = function(channel)
poll { channel.isEOF }
return Pair(channel.exitStatus, result)
} finally {
channel.disconnect()
}
}
/**
* @return Pair of (stdout, stderr) of command
*/
fun runShellCommandGetOutput(command: String): ShellCommandOutput {
val stdoutStream = ByteArrayOutputStream()
val stderrStream = ByteArrayOutputStream()
val exitCode = runShellCommand(command, stdoutStream, stderrStream).get()
return ShellCommandOutput(command, exitCode, stdoutStream.toString(), stderrStream.toString())
}
private fun runShellCommand(command: String, stdout: OutputStream, stderr: OutputStream): ListenableFuture<Int> {
log.info("Running '$command' on ${remoteNode.hostname}")
return future {
val (exitCode, _) = withChannelExec(command) { channel ->
channel.outputStream = stdout
channel.setErrStream(stderr)
channel.connect()
poll { channel.isEOF }
}
exitCode
}
}
data class ShellCommandOutput(val originalShellCommand: String, val exitCode: Int, val stdout: String, val stderr: String) {
fun getResultOrThrow(): String {
if (exitCode != 0) {
val diagnostic =
"There was a problem running \"$originalShellCommand\":\n" +
" stdout:\n$stdout" +
" stderr:\n$stderr"
log.error(diagnostic)
throw Exception(diagnostic)
} else {
return stdout
}
}
}
fun startNode() {
runShellCommandGetOutput("sudo systemctl start ${remoteNode.systemdServiceName}").getResultOrThrow()
}
fun stopNode() {
runShellCommandGetOutput("sudo systemctl stop ${remoteNode.systemdServiceName}").getResultOrThrow()
}
fun restartNode() {
runShellCommandGetOutput("sudo systemctl restart ${remoteNode.systemdServiceName}").getResultOrThrow()
}
fun waitUntilUp() {
log.info("Waiting for ${remoteNode.hostname} to come online")
runShellCommandGetOutput("until sudo netstat -tlpn | grep ${remoteNode.rpcPort} > /dev/null ; do sleep 1 ; done")
}
fun getNodePid(): String {
return runShellCommandGetOutput("sudo netstat -tlpn | grep ${remoteNode.rpcPort} | awk '{print $7}' | grep -oE '[0-9]+'").getResultOrThrow().replace("\n", "")
}
fun <A> doWhileStopped(action: () -> A): A {
return doWhileClientStopped {
stopNode()
try {
action()
} finally {
startNode()
}
}
}
fun kill() {
runShellCommandGetOutput("sudo kill ${getNodePid()}")
}
fun <A> doWhileSigStopped(action: () -> A): A {
val pid = getNodePid()
log.info("PID is $pid")
runShellCommandGetOutput("sudo kill -SIGSTOP $pid").getResultOrThrow()
try {
return action()
} finally {
runShellCommandGetOutput("sudo kill -SIGCONT $pid").getResultOrThrow()
}
}
fun clearDb() = doWhileStopped { runShellCommandGetOutput("sudo rm ${remoteNode.nodeDirectory}/persistence.mv.db").getResultOrThrow() }
override fun close() {
rpcConnection?.close()
jSchSession.disconnect()
}
}

View File

@ -1,48 +0,0 @@
package net.corda.loadtest
import net.corda.core.node.NodeInfo
import org.slf4j.LoggerFactory
private val log = LoggerFactory.getLogger(NodeHandle::class.java)
data class NodeHandle(
val configuration: LoadTestConfiguration,
val connection: NodeConnection,
val info: NodeInfo
)
fun <A> NodeHandle.doWhileStopped(action: NodeHandle.() -> A): A {
return connection.doWhileClientStopped {
connection.runShellCommandGetOutput("sudo systemctl stop ${configuration.remoteSystemdServiceName}").getResultOrThrow()
try {
action()
} finally {
connection.runShellCommandGetOutput("sudo systemctl start ${configuration.remoteSystemdServiceName}").getResultOrThrow()
waitUntilUp()
}
}
}
fun <A> NodeHandle.doWhileSigStopped(action: NodeHandle.() -> A): A {
val pid = getNodePid()
log.info("PID is $pid")
connection.runShellCommandGetOutput("sudo kill -SIGSTOP $pid").getResultOrThrow()
try {
return action()
} finally {
connection.runShellCommandGetOutput("sudo kill -SIGCONT $pid").getResultOrThrow()
}
}
fun NodeHandle.clearDb() = doWhileStopped {
connection.runShellCommandGetOutput("sudo rm ${configuration.remoteNodeDirectory}/persistence.mv.db").getResultOrThrow()
}
fun NodeHandle.waitUntilUp() {
log.info("Waiting for ${info.legalIdentity} to come online")
connection.runShellCommandGetOutput("until sudo netstat -tlpn | grep ${configuration.remoteMessagingPort} > /dev/null ; do sleep 1 ; done")
}
fun NodeHandle.getNodePid(): String {
return connection.runShellCommandGetOutput("sudo netstat -tlpn | grep ${configuration.remoteMessagingPort} | awk '{print $7}' | grep -oE '[0-9]+'").getResultOrThrow()
}

View File

@ -7,13 +7,13 @@ import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.USD
import net.corda.core.identity.AbstractParty
import net.corda.core.failure
import net.corda.core.identity.AbstractParty
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.success
import net.corda.flows.CashFlowCommand
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeHandle
import net.corda.loadtest.NodeConnection
import org.slf4j.LoggerFactory
import java.util.*
@ -27,7 +27,7 @@ private val log = LoggerFactory.getLogger("CrossCash")
data class CrossCashCommand(
val command: CashFlowCommand,
val node: NodeHandle
val node: NodeConnection
) {
override fun toString(): String {
return when (command) {
@ -115,12 +115,12 @@ data class CrossCashState(
val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
"Creating Cash transactions randomly",
generate = { state, parallelism ->
generate = { (nodeVaults), parallelism ->
val nodeMap = simpleNodes.associateBy { it.info.legalIdentity }
Generator.pickN(parallelism, simpleNodes).bind { nodes ->
Generator.sequence(
nodes.map { node ->
val quantities = state.nodeVaults[node.info.legalIdentity] ?: mapOf()
val quantities = nodeVaults[node.info.legalIdentity] ?: mapOf()
val possibleRecipients = nodeMap.keys.toList()
val moves = quantities.map {
it.value.toDouble() / 1000 to generateMove(it.value, USD, node.info.legalIdentity, possibleRecipients)
@ -205,7 +205,7 @@ val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
},
execute = { command ->
val result = command.command.startFlow(command.node.connection.proxy).returnValue
val result = command.command.startFlow(command.node.proxy).returnValue
result.failure {
log.error("Failure[$command]", it)
}
@ -219,7 +219,7 @@ val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
val currentNodeVaults = HashMap<AbstractParty, HashMap<AbstractParty, Long>>()
simpleNodes.forEach {
val quantities = HashMap<AbstractParty, Long>()
val (vault, vaultUpdates) = it.connection.proxy.vaultAndUpdates()
val (vault, vaultUpdates) = it.proxy.vaultAndUpdates()
vaultUpdates.notUsed()
vault.forEach {
val state = it.state.data
@ -313,10 +313,10 @@ private fun <A> searchForState(
consumedTxs[originator] = 0
searchForStateHelper(state, diffIx + 1, consumedTxs, matched)
var currentState = state
queue.forEachIndexed { index, pair ->
queue.forEachIndexed { index, (issuer, quantity) ->
consumedTxs[originator] = index + 1
// Prune search if we exceeded the searched quantity anyway
currentState = applyDiff(pair.first, pair.second, currentState, searchedState) ?: return
currentState = applyDiff(issuer, quantity, currentState, searchedState) ?: return
searchForStateHelper(currentState, diffIx + 1, consumedTxs, matched)
}
}

View File

@ -13,17 +13,17 @@ import net.corda.core.success
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.FinalityFlow
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeHandle
import net.corda.loadtest.NodeConnection
import org.slf4j.LoggerFactory
private val log = LoggerFactory.getLogger("NotaryTest")
data class NotariseCommand(val issueTx: SignedTransaction, val moveTx: SignedTransaction, val node: NodeHandle)
data class NotariseCommand(val issueTx: SignedTransaction, val moveTx: SignedTransaction, val node: NodeConnection)
val dummyNotarisationTest = LoadTest<NotariseCommand, Unit>(
"Notarising dummy transactions",
generate = { _, _ ->
val generateTx = Generator.pickOne(simpleNodes).bind { node: NodeHandle ->
val generateTx = Generator.pickOne(simpleNodes).bind { node ->
Generator.int().map {
val issueTx = DummyContract.generateInitial(it, notary.info.notaryIdentity, DUMMY_CASH_ISSUER).apply {
signWith(DUMMY_CASH_ISSUER_KEY)
@ -40,7 +40,7 @@ val dummyNotarisationTest = LoadTest<NotariseCommand, Unit>(
interpret = { _, _ -> },
execute = { (issueTx, moveTx, node) ->
try {
val proxy = node.connection.proxy
val proxy = node.proxy
val issueFlow = proxy.startFlow(::FinalityFlow, issueTx)
issueFlow.returnValue.success {
val moveFlow = proxy.startFlow(::FinalityFlow, moveTx)

View File

@ -7,12 +7,12 @@ import net.corda.client.mock.replicatePoisson
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.USD
import net.corda.core.identity.AbstractParty
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.identity.AbstractParty
import net.corda.flows.CashFlowCommand
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeHandle
import net.corda.loadtest.NodeConnection
import org.slf4j.LoggerFactory
import java.util.*
@ -21,7 +21,7 @@ private val log = LoggerFactory.getLogger("SelfIssue")
// DOCS START 1
data class SelfIssueCommand(
val command: CashFlowCommand.IssueCash,
val node: NodeHandle
val node: NodeConnection
)
data class SelfIssueState(
@ -37,7 +37,7 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
"Self issuing cash randomly",
generate = { _, parallelism ->
val generateIssue = Generator.pickOne(simpleNodes).bind { node: NodeHandle ->
val generateIssue = Generator.pickOne(simpleNodes).bind { node ->
generateIssue(1000, USD, notary.info.notaryIdentity, listOf(node.info.legalIdentity)).map {
SelfIssueCommand(it, node)
}
@ -61,7 +61,7 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
execute = { command ->
try {
val result = command.command.startFlow(command.node.connection.proxy).returnValue.getOrThrow()
val result = command.command.startFlow(command.node.proxy).returnValue.getOrThrow()
log.info("Success: $result")
} catch (e: FlowException) {
log.error("Failure", e)
@ -70,14 +70,14 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
gatherRemoteState = { previousState ->
val selfIssueVaults = HashMap<AbstractParty, Long>()
simpleNodes.forEach { (_, connection, info) ->
simpleNodes.forEach { connection ->
val (vault, vaultUpdates) = connection.proxy.vaultAndUpdates()
vaultUpdates.notUsed()
vault.forEach {
val state = it.state.data
if (state is Cash.State) {
val issuer = state.amount.token.issuer.party
if (issuer == info.legalIdentity as AbstractParty) {
if (issuer == connection.info.legalIdentity as AbstractParty) {
selfIssueVaults.put(issuer, (selfIssueVaults[issuer] ?: 0L) + state.amount.quantity)
}
}

View File

@ -0,0 +1,70 @@
package net.corda.loadtest.tests
import net.corda.client.mock.Generator
import net.corda.client.mock.pickOne
import net.corda.client.mock.replicatePoisson
import net.corda.core.contracts.USD
import net.corda.core.failure
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.success
import net.corda.core.utilities.loggerFor
import net.corda.loadtest.LoadTest
object StabilityTest {
private val log = loggerFor<StabilityTest>()
val crossCashTest = LoadTest<CrossCashCommand, Unit>(
"Creating Cash transactions randomly",
generate = { _, _ ->
val nodeMap = simpleNodes.associateBy { it.info.legalIdentity }
Generator.sequence(simpleNodes.map { node ->
val possibleRecipients = nodeMap.keys.toList()
val moves = 0.5 to generateMove(1, USD, node.info.legalIdentity, possibleRecipients)
val exits = 0.5 to generateExit(1, USD)
val command = Generator.frequency(listOf(moves, exits))
command.map { CrossCashCommand(it, nodeMap[node.info.legalIdentity]!!) }
})
},
interpret = { _, _ -> },
execute = { command ->
val result = command.command.startFlow(command.node.proxy).returnValue
result.failure {
log.error("Failure[$command]", it)
}
result.success {
log.info("Success[$command]: $result")
}
},
gatherRemoteState = {}
)
val selfIssueTest = LoadTest<SelfIssueCommand, Unit>(
"Self issuing cash randomly",
generate = { _, parallelism ->
val generateIssue = Generator.pickOne(simpleNodes).bind { node ->
generateIssue(1000, USD, notary.info.notaryIdentity, listOf(node.info.legalIdentity)).map {
SelfIssueCommand(it, node)
}
}
Generator.replicatePoisson(parallelism.toDouble(), generateIssue).bind {
// We need to generate at least one
if (it.isEmpty()) {
Generator.sequence(listOf(generateIssue))
} else {
Generator.pure(it)
}
}
},
interpret = { _, _ -> },
execute = { command ->
try {
val result = command.command.startFlow(command.node.proxy).returnValue.getOrThrow()
log.info("Success: $result")
} catch (e: FlowException) {
log.error("Failure", e)
}
},
gatherRemoteState = {}
)
}

View File

@ -1,9 +1,11 @@
# nodeHosts = ["host1", "host2"]
# sshUser = "someusername", by default it uses the System property "user.name"
# executionFrequency = <number of execution per second> , optional, defaulted to 20 flow execution per second.
# generateCount = <number of generated command> , optional, defaulted to 10000.
# parallelism = <unmber of thread used to execte the commands>, optional, defaulted to [ForkJoinPool] default parallelism.
localCertificatesBaseDirectory = "build/load-test/certificates"
localTunnelStartingPort = 10000
remoteNodeDirectory = "/opt/corda"
remoteMessagingPort = 10003
rpcPort = 10003
remoteSystemdServiceName = "corda"
rpcUsername = "corda"
rpcPassword = "rgb"
rpcUser = {username = corda, password = not_blockchain, permissions = []}