Merge pull request #291 from corda/aslemmer-loadtest-fixes

Aslemmer loadtest fixes
This commit is contained in:
Andras Slemmer
2017-02-27 14:15:10 +00:00
committed by GitHub
7 changed files with 62 additions and 47 deletions

View File

@ -176,14 +176,15 @@ fun <A : Any> Generator.Companion.replicatePoisson(meanSize: Double, generator:
fun <A : Any> Generator.Companion.pickOne(list: List<A>) = Generator.intRange(0, list.size - 1).map { list[it] } fun <A : Any> Generator.Companion.pickOne(list: List<A>) = Generator.intRange(0, list.size - 1).map { list[it] }
fun <A : Any> Generator.Companion.pickN(number: Int, list: List<A>) = Generator<List<A>> { fun <A : Any> Generator.Companion.pickN(number: Int, list: List<A>) = Generator<List<A>> {
val mask = BitSet(list.size) val mask = BitSet(list.size)
for (i in 0..Math.min(list.size, number) - 1) { val size = Math.min(list.size, number)
mask[i] = 1 for (i in 0..size - 1) {
mask[i] = true
} }
for (i in 0..mask.size() - 1) { for (i in 0..size - 1) {
val byte = mask[i] val bit = mask[i]
val swapIndex = i + it.nextInt(mask.size() - i) val swapIndex = i + it.nextInt(size - i)
mask[i] = mask[swapIndex] mask[i] = mask[swapIndex]
mask[swapIndex] = byte mask[swapIndex] = bit
} }
val resultList = ArrayList<A>() val resultList = ArrayList<A>()
list.forEachIndexed { index, a -> list.forEachIndexed { index, a ->

View File

@ -72,7 +72,9 @@ class ConnectionManager(private val username: String, private val jSch: JSch) {
remoteMessagingPort: Int, remoteMessagingPort: Int,
localTunnelAddress: HostAndPort, localTunnelAddress: HostAndPort,
certificatesBaseDirectory: Path, certificatesBaseDirectory: Path,
remoteCertificatesDirectory: Path remoteCertificatesDirectory: Path,
rpcUsername: String,
rpcPassword: String
): NodeConnection { ): NodeConnection {
val session = jSch.getSession(username, nodeHost, 22) val session = jSch.getSession(username, nodeHost, 22)
// We don't check the host fingerprints because they may change often // We don't check the host fingerprints because they may change often
@ -97,7 +99,7 @@ class ConnectionManager(private val username: String, private val jSch: JSch) {
channel.disconnect() channel.disconnect()
log.info("Certificates copied!") log.info("Certificates copied!")
val connection = NodeConnection(nodeHost, session, localTunnelAddress, certificatesDirectory) val connection = NodeConnection(nodeHost, session, localTunnelAddress, certificatesDirectory, rpcUsername, rpcPassword)
connection.startClient() connection.startClient()
return connection return connection
} }
@ -121,6 +123,8 @@ fun <A> connectToNodes(
remoteMessagingPort: Int, remoteMessagingPort: Int,
tunnelPortAllocation: PortAllocation, tunnelPortAllocation: PortAllocation,
certificatesBaseDirectory: Path, certificatesBaseDirectory: Path,
rpcUsername: String,
rpcPassword: String,
withConnections: (List<NodeConnection>) -> A withConnections: (List<NodeConnection>) -> A
): A { ): A {
val manager = ConnectionManager(username, setupJSchWithSshAgent()) val manager = ConnectionManager(username, setupJSchWithSshAgent())
@ -130,7 +134,9 @@ fun <A> connectToNodes(
remoteMessagingPort = remoteMessagingPort, remoteMessagingPort = remoteMessagingPort,
localTunnelAddress = tunnelPortAllocation.nextHostAndPort(), localTunnelAddress = tunnelPortAllocation.nextHostAndPort(),
certificatesBaseDirectory = certificatesBaseDirectory, certificatesBaseDirectory = certificatesBaseDirectory,
remoteCertificatesDirectory = nodeHostAndCertificatesPath.second remoteCertificatesDirectory = nodeHostAndCertificatesPath.second,
rpcUsername = rpcUsername,
rpcPassword = rpcPassword
) )
}.toList() }.toList()
@ -151,7 +157,9 @@ class NodeConnection(
val hostName: String, val hostName: String,
private val jSchSession: Session, private val jSchSession: Session,
private val localTunnelAddress: HostAndPort, private val localTunnelAddress: HostAndPort,
private val certificatesDirectory: Path private val certificatesDirectory: Path,
private val rpcUsername: String,
private val rpcPassword: String
) : Closeable { ) : Closeable {
private val sslConfig = object : SSLConfiguration { private val sslConfig = object : SSLConfiguration {
@ -187,7 +195,7 @@ class NodeConnection(
fun <A> doWhileClientStopped(action: () -> A): A { fun <A> doWhileClientStopped(action: () -> A): A {
val client = client val client = client
val proxy = _proxy val proxy = _proxy
check(client == null || proxy == null) { "doWhileClientStopped called with no running client" } require(client != null && proxy != null) { "doWhileClientStopped called with no running client" }
log.info("Stopping RPC proxy to $hostName, tunnel at $localTunnelAddress") log.info("Stopping RPC proxy to $hostName, tunnel at $localTunnelAddress")
client!!.close() client!!.close()
try { try {
@ -196,7 +204,7 @@ class NodeConnection(
log.info("Starting new RPC proxy to $hostName, tunnel at $localTunnelAddress") log.info("Starting new RPC proxy to $hostName, tunnel at $localTunnelAddress")
val newClient = CordaRPCClient(localTunnelAddress, sslConfig) val newClient = CordaRPCClient(localTunnelAddress, sslConfig)
// TODO expose these somehow? // TODO expose these somehow?
newClient.start("user1", "test") newClient.start(rpcUsername, rpcPassword)
val newProxy = newClient.proxy() val newProxy = newClient.proxy()
this.client = newClient this.client = newClient
this._proxy = newProxy this._proxy = newProxy
@ -206,7 +214,7 @@ class NodeConnection(
fun startClient() { fun startClient() {
log.info("Creating RPC proxy to $hostName, tunnel at $localTunnelAddress") log.info("Creating RPC proxy to $hostName, tunnel at $localTunnelAddress")
val client = CordaRPCClient(localTunnelAddress, sslConfig) val client = CordaRPCClient(localTunnelAddress, sslConfig)
client.start("user1", "test") client.start(rpcUsername, rpcPassword)
val proxy = client.proxy() val proxy = client.proxy()
log.info("Proxy created") log.info("Proxy created")
this.client = client this.client = client

View File

@ -163,7 +163,9 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest
configuration.nodeHosts.map { it to configuration.remoteNodeDirectory / "certificates" }, configuration.nodeHosts.map { it to configuration.remoteNodeDirectory / "certificates" },
configuration.remoteMessagingPort, configuration.remoteMessagingPort,
PortAllocation.Incremental(configuration.localTunnelStartingPort), PortAllocation.Incremental(configuration.localTunnelStartingPort),
configuration.localCertificatesBaseDirectory configuration.localCertificatesBaseDirectory,
configuration.rpcUsername,
configuration.rpcPassword
) { connections -> ) { connections ->
log.info("Connected to all nodes!") log.info("Connected to all nodes!")
val hostNodeHandleMap = ConcurrentHashMap<String, NodeHandle>() val hostNodeHandleMap = ConcurrentHashMap<String, NodeHandle>()
@ -175,6 +177,8 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest
val pubkeysString = otherNodeInfos.map { val pubkeysString = otherNodeInfos.map {
" ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}" " ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}"
}.joinToString("\n") }.joinToString("\n")
log.info("${connection.hostName} waiting for network map")
connection.proxy.waitUntilRegisteredWithNetworkMap().get()
log.info("${connection.hostName} sees\n$pubkeysString") log.info("${connection.hostName} sees\n$pubkeysString")
val nodeHandle = NodeHandle(configuration, connection, nodeInfo) val nodeHandle = NodeHandle(configuration, connection, nodeInfo)
nodeHandle.waitUntilUp() nodeHandle.waitUntilUp()

View File

@ -1,12 +1,16 @@
package net.corda.loadtest package net.corda.loadtest
import com.typesafe.config.Config
import java.nio.file.Path import java.nio.file.Path
import net.corda.node.services.config.*
/** /**
* @param sshUser The UNIX username to use for SSH auth. * @param sshUser The UNIX username to use for SSH auth.
* @param localCertificatesBaseDirectory The base directory to put node certificates in. * @param localCertificatesBaseDirectory The base directory to put node certificates in.
* @param localTunnelStartingPort The local starting port to allocate tunneling ports from. * @param localTunnelStartingPort The local starting port to allocate tunneling ports from.
* @param nodeHosts The nodes' resolvable addresses. * @param nodeHosts The nodes' resolvable addresses.
* @param rpcUsername The RPC user's name to establish the RPC connection as.
* @param rpcPassword The RPC user's password.
* @param remoteNodeDirectory The remote node directory. * @param remoteNodeDirectory The remote node directory.
* @param remoteMessagingPort The remote Artemis messaging port. * @param remoteMessagingPort The remote Artemis messaging port.
* @param remoteSystemdServiceName The name of the node's systemd service * @param remoteSystemdServiceName The name of the node's systemd service
@ -16,12 +20,16 @@ import java.nio.file.Path
* for disruptions. * for disruptions.
*/ */
data class LoadTestConfiguration( data class LoadTestConfiguration(
val sshUser: String, val config: Config
val localCertificatesBaseDirectory: Path, ) {
val localTunnelStartingPort: Int, val sshUser: String by config
val nodeHosts: List<String>, val localCertificatesBaseDirectory: Path by config
val remoteNodeDirectory: Path, val localTunnelStartingPort: Int by config
val remoteMessagingPort: Int, val nodeHosts: List<String> = config.getStringList("nodeHosts")
val remoteSystemdServiceName: String, val rpcUsername: String by config
val seed: Long? val rpcPassword: String by config
) val remoteNodeDirectory: Path by config
val remoteMessagingPort: Int by config
val remoteSystemdServiceName: String by config
val seed: Long? by config
}

View File

@ -44,24 +44,16 @@ import java.nio.file.Paths
*/ */
fun main(args: Array<String>) { fun main(args: Array<String>) {
if (args.isEmpty()) { if (args.isEmpty()) {
throw IllegalArgumentException("Usage: <binary> PATH_TO_CONFIG") throw IllegalArgumentException("Usage: <binary> PATH_TO_CONFIG")
} }
val defaultConfig = ConfigFactory.parseResources("loadtest-reference.conf", ConfigParseOptions.defaults().setAllowMissing(false)) val defaultConfig = ConfigFactory.parseResources("loadtest-reference.conf", ConfigParseOptions.defaults().setAllowMissing(false))
val customConfig = ConfigFactory.parseFile(File(args[0]), ConfigParseOptions.defaults().setAllowMissing(false)) val defaultSshUserConfig = ConfigFactory.parseMap(
val resolvedConfig = customConfig.withFallback(defaultConfig).resolve() if (defaultConfig.hasPath("sshUser")) emptyMap() else mapOf("sshUser" to System.getProperty("user.name"))
val loadTestConfiguration = LoadTestConfiguration(
sshUser = if (resolvedConfig.hasPath("sshUser")) resolvedConfig.getString("sshUser") else System.getProperty("user.name"),
localCertificatesBaseDirectory = Paths.get(resolvedConfig.getString("localCertificatesBaseDirectory")),
localTunnelStartingPort = resolvedConfig.getInt("localTunnelStartingPort"),
nodeHosts = resolvedConfig.getStringList("nodeHosts"),
remoteNodeDirectory = Paths.get("/opt/r3cev"),
remoteMessagingPort = 31337,
remoteSystemdServiceName = "r3cev-node",
seed = if (resolvedConfig.hasPath("seed")) resolvedConfig.getLong("seed") else null
) )
val customConfig = ConfigFactory.parseFile(File(args[0]), ConfigParseOptions.defaults().setAllowMissing(false))
val resolvedConfig = customConfig.withFallback(defaultConfig).withFallback(defaultSshUserConfig).resolve()
val loadTestConfiguration = LoadTestConfiguration(resolvedConfig)
if (loadTestConfiguration.nodeHosts.isEmpty()) { if (loadTestConfiguration.nodeHosts.isEmpty()) {
throw IllegalArgumentException("Please specify at least one node host") throw IllegalArgumentException("Please specify at least one node host")
@ -99,7 +91,7 @@ fun main(args: Array<String>) {
crossCashTest to LoadTest.RunParameters( crossCashTest to LoadTest.RunParameters(
parallelism = 4, parallelism = 4,
generateCount = 2000, generateCount = 2000,
clearDatabaseBeforeRun = true, clearDatabaseBeforeRun = false,
gatherFrequency = 10, gatherFrequency = 10,
disruptionPatterns = listOf( disruptionPatterns = listOf(
listOf(), listOf(),

View File

@ -3,16 +3,15 @@ package net.corda.loadtest.tests
import net.corda.client.mock.Generator import net.corda.client.mock.Generator
import net.corda.client.mock.pickN import net.corda.client.mock.pickN
import net.corda.contracts.asset.Cash import net.corda.contracts.asset.Cash
import net.corda.core.*
import net.corda.core.contracts.Issued import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.USD import net.corda.core.contracts.USD
import net.corda.core.crypto.AbstractParty import net.corda.core.crypto.AbstractParty
import net.corda.core.crypto.AnonymousParty import net.corda.core.crypto.AnonymousParty
import net.corda.core.flows.FlowException import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.OpaqueBytes
import net.corda.core.toFuture
import net.corda.flows.CashException import net.corda.flows.CashException
import net.corda.flows.CashFlowCommand import net.corda.flows.CashFlowCommand
import net.corda.loadtest.LoadTest import net.corda.loadtest.LoadTest
@ -208,11 +207,12 @@ val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
}, },
execute = { command -> execute = { command ->
try { val result = command.command.startFlow(command.node.connection.proxy).returnValue
val result = command.command.startFlow(command.node.connection.proxy).returnValue.getOrThrow() result.failure {
log.info("Success: $result") log.error("Failure[$command]", it)
} catch (e: FlowException) { }
log.error("Failure", e) result.success {
log.info("Success[$command]: $result")
} }
}, },

View File

@ -2,6 +2,8 @@
# sshUser = "someusername", by default it uses the System property "user.name" # sshUser = "someusername", by default it uses the System property "user.name"
localCertificatesBaseDirectory = "build/load-test/certificates" localCertificatesBaseDirectory = "build/load-test/certificates"
localTunnelStartingPort = 10000 localTunnelStartingPort = 10000
remoteNodeDirectory = "/opt/r3cev" remoteNodeDirectory = "/opt/corda"
remoteMessagingPort = 31337 remoteMessagingPort = 10002
remoteSystemdServiceName = "r3cev-node" remoteSystemdServiceName = "corda"
rpcUsername = "corda"
rpcPassword = "rgb"