Merged in aslemmer-loadtest-dsl (pull request #441)

Load testing framework
This commit is contained in:
Andras Slemmer 2016-11-15 17:21:08 +00:00
commit 7335fba633
27 changed files with 1550 additions and 46 deletions

3
.idea/modules.xml generated
View File

@ -33,6 +33,9 @@
<module fileurl="file://$PROJECT_DIR$/.idea/modules/finance/isolated/isolated.iml" filepath="$PROJECT_DIR$/.idea/modules/finance/isolated/isolated.iml" group="finance/isolated" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/finance/isolated/isolated_main.iml" filepath="$PROJECT_DIR$/.idea/modules/finance/isolated/isolated_main.iml" group="finance/isolated" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/finance/isolated/isolated_test.iml" filepath="$PROJECT_DIR$/.idea/modules/finance/isolated/isolated_test.iml" group="finance/isolated" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/tools/loadtest/loadtest.iml" filepath="$PROJECT_DIR$/.idea/modules/tools/loadtest/loadtest.iml" group="tools/loadtest" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/tools/loadtest/loadtest_main.iml" filepath="$PROJECT_DIR$/.idea/modules/tools/loadtest/loadtest_main.iml" group="tools/loadtest" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/tools/loadtest/loadtest_test.iml" filepath="$PROJECT_DIR$/.idea/modules/tools/loadtest/loadtest_test.iml" group="tools/loadtest" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/node/node.iml" filepath="$PROJECT_DIR$/.idea/modules/node/node.iml" group="node" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/node/node_integrationTest.iml" filepath="$PROJECT_DIR$/.idea/modules/node/node_integrationTest.iml" group="node" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/node/node_main.iml" filepath="$PROJECT_DIR$/.idea/modules/node/node_main.iml" group="node" />

View File

@ -24,13 +24,11 @@ class EventGenerator(
val currencies = setOf(USD, GBP, CHF).toList() // + Currency.getAvailableCurrencies().toList().subList(0, 3).toSet()).toList()
val currencyGenerator = Generator.pickOne(currencies)
val amountIssuedGenerator =
Generator.intRange(1, 10000).combine(issuerGenerator, currencyGenerator) { amount, issuer, currency ->
Amount(amount.toLong(), Issued(issuer, currency))
}
val issuedGenerator = issuerGenerator.combine(currencyGenerator) { issuer, currency -> Issued(issuer, currency) }
val amountIssuedGenerator = generateAmount(1, 10000, issuedGenerator)
val publicKeyGenerator = Generator.oneOf(parties.map { it.owningKey })
val partyGenerator = Generator.oneOf(parties)
val publicKeyGenerator = Generator.pickOne(parties.map { it.owningKey })
val partyGenerator = Generator.pickOne(parties)
val cashStateGenerator = amountIssuedGenerator.combine(publicKeyGenerator) { amount, from ->
val builder = TransactionBuilder(notary = notary)

View File

@ -26,42 +26,42 @@ import java.util.*
* 0.2 to birdsGenerator,
* 0.8 to mammalsGenerator
* )
* val animals = animalsGenerator.generate(Random()).getOrThrow()
* val animals = animalsGenerator.generate(SplittableRandom()).getOrThrow()
*
* The above will generate a random list of animals.
*/
class Generator<out A>(val generate: (Random) -> ErrorOr<A>) {
class Generator<out A : Any>(val generate: (SplittableRandom) -> ErrorOr<A>) {
// Functor
fun <B> map(function: (A) -> B): Generator<B> =
fun <B : Any> map(function: (A) -> B): Generator<B> =
Generator { generate(it).map(function) }
// Applicative
fun <B> product(other: Generator<(A) -> B>) =
fun <B : Any> product(other: Generator<(A) -> B>) =
Generator { generate(it).combine(other.generate(it)) { a, f -> f(a) } }
fun <B, R> combine(other1: Generator<B>, function: (A, B) -> R) =
fun <B : Any, R : Any> combine(other1: Generator<B>, function: (A, B) -> R) =
product<R>(other1.product(pure({ b -> { a -> function(a, b) } })))
fun <B, C, R> combine(other1: Generator<B>, other2: Generator<C>, function: (A, B, C) -> R) =
fun <B : Any, C : Any, R : Any> combine(other1: Generator<B>, other2: Generator<C>, function: (A, B, C) -> R) =
product<R>(other1.product(other2.product(pure({ c -> { b -> { a -> function(a, b, c) } } }))))
fun <B, C, D, R> combine(other1: Generator<B>, other2: Generator<C>, other3: Generator<D>, function: (A, B, C, D) -> R) =
fun <B : Any, C : Any, D : Any, R : Any> combine(other1: Generator<B>, other2: Generator<C>, other3: Generator<D>, function: (A, B, C, D) -> R) =
product<R>(other1.product(other2.product(other3.product(pure({ d -> { c -> { b -> { a -> function(a, b, c, d) } } } })))))
fun <B, C, D, E, R> combine(other1: Generator<B>, other2: Generator<C>, other3: Generator<D>, other4: Generator<E>, function: (A, B, C, D, E) -> R) =
fun <B : Any, C : Any, D : Any, E : Any, R : Any> combine(other1: Generator<B>, other2: Generator<C>, other3: Generator<D>, other4: Generator<E>, function: (A, B, C, D, E) -> R) =
product<R>(other1.product(other2.product(other3.product(other4.product(pure({ e -> { d -> { c -> { b -> { a -> function(a, b, c, d, e) } } } } }))))))
// Monad
fun <B> bind(function: (A) -> Generator<B>) =
fun <B : Any> bind(function: (A) -> Generator<B>) =
Generator { generate(it).bind { a -> function(a).generate(it) } }
companion object {
fun <A> pure(value: A) = Generator { ErrorOr(value) }
fun <A> impure(valueClosure: () -> A) = Generator { ErrorOr(valueClosure()) }
fun <A> fail(error: Exception) = Generator<A> { ErrorOr.of(error) }
fun <A : Any> pure(value: A) = Generator { ErrorOr(value) }
fun <A : Any> impure(valueClosure: () -> A) = Generator { ErrorOr(valueClosure()) }
fun <A : Any> fail(error: Exception) = Generator<A> { ErrorOr.of(error) }
// Alternative
fun <A> choice(generators: List<Generator<A>>) = intRange(0, generators.size - 1).bind { generators[it] }
fun <A : Any> choice(generators: List<Generator<A>>) = intRange(0, generators.size - 1).bind { generators[it] }
fun <A> success(generate: (Random) -> A) = Generator { ErrorOr(generate(it)) }
fun <A> frequency(vararg generators: Pair<Double, Generator<A>>): Generator<A> {
fun <A : Any> success(generate: (SplittableRandom) -> A) = Generator { ErrorOr(generate(it)) }
fun <A : Any> frequency(generators: List<Pair<Double, Generator<A>>>): Generator<A> {
val ranges = mutableListOf<Pair<Double, Double>>()
var current = 0.0
generators.forEach {
@ -82,7 +82,7 @@ class Generator<out A>(val generate: (Random) -> ErrorOr<A>) {
}
}
fun <A> sequence(generators: List<Generator<A>>) = Generator<List<A>> {
fun <A : Any> sequence(generators: List<Generator<A>>) = Generator<List<A>> {
val result = mutableListOf<A>()
for (generator in generators) {
val element = generator.generate(it)
@ -98,9 +98,9 @@ class Generator<out A>(val generate: (Random) -> ErrorOr<A>) {
}
}
fun <A> Generator.Companion.oneOf(list: List<A>) = intRange(0, list.size - 1).map { list[it] }
fun <A : Any> Generator.Companion.frequency(vararg generators: Pair<Double, Generator<A>>) = frequency(generators.toList())
fun <A> Generator<A>.generateOrFail(random: Random, numberOfTries: Int = 1): A {
fun <A : Any> Generator<A>.generateOrFail(random: SplittableRandom, numberOfTries: Int = 1): A {
var error: Throwable? = null
for (i in 0 .. numberOfTries - 1) {
val result = generate(random)
@ -118,16 +118,24 @@ fun <A> Generator<A>.generateOrFail(random: Random, numberOfTries: Int = 1): A {
}
}
fun Generator.Companion.int() = Generator.success { it.nextInt() }
fun Generator.Companion.int() = Generator.success(SplittableRandom::nextInt)
fun Generator.Companion.bytes(size: Int): Generator<ByteArray> = Generator.success { random ->
ByteArray(size) { random.nextInt().toByte() }
}
fun Generator.Companion.intRange(range: IntRange) = intRange(range.first, range.last)
fun Generator.Companion.intRange(from: Int, to: Int): Generator<Int> = Generator.success {
(from + Math.abs(it.nextInt()) % (to - from + 1)).toInt()
}
fun Generator.Companion.longRange(range: LongRange) = longRange(range.first, range.last)
fun Generator.Companion.longRange(from: Long, to: Long): Generator<Long> = Generator.success {
(from + Math.abs(it.nextLong()) % (to - from + 1)).toLong()
}
fun Generator.Companion.double() = Generator.success { it.nextDouble() }
fun Generator.Companion.doubleRange(from: Double, to: Double): Generator<Double> = Generator.success {
from + it.nextDouble() % (to - from)
from + it.nextDouble() * (to - from)
}
fun <A> Generator.Companion.replicate(number: Int, generator: Generator<A>): Generator<List<A>> {
fun <A : Any> Generator.Companion.replicate(number: Int, generator: Generator<A>): Generator<List<A>> {
val generators = mutableListOf<Generator<A>>()
for (i in 1 .. number) {
generators.add(generator)
@ -136,7 +144,7 @@ fun <A> Generator.Companion.replicate(number: Int, generator: Generator<A>): Gen
}
fun <A> Generator.Companion.replicatePoisson(meanSize: Double, generator: Generator<A>) = Generator<List<A>> {
fun <A : Any> Generator.Companion.replicatePoisson(meanSize: Double, generator: Generator<A>) = Generator<List<A>> {
val chance = (meanSize - 1) / meanSize
val result = mutableListOf<A>()
var finish = false
@ -157,7 +165,26 @@ fun <A> Generator.Companion.replicatePoisson(meanSize: Double, generator: Genera
ErrorOr(result)
}
fun <A> Generator.Companion.pickOne(list: List<A>) = Generator.intRange(0, list.size - 1).map { list[it] }
fun <A : Any> Generator.Companion.pickOne(list: List<A>) = Generator.intRange(0, list.size - 1).map { list[it] }
fun <A : Any> Generator.Companion.pickN(number: Int, list: List<A>) = Generator<List<A>> {
val mask = BitSet(list.size)
for (i in 0 .. Math.min(list.size, number) - 1) {
mask[i] = 1
}
for (i in 0 .. mask.size() - 1) {
val byte = mask[i]
val swapIndex = i + it.nextInt(mask.size() - i)
mask[i] = mask[swapIndex]
mask[swapIndex] = byte
}
val resultList = ArrayList<A>()
list.forEachIndexed { index, a ->
if (mask[index]) {
resultList.add(a)
}
}
ErrorOr(resultList)
}
fun <A> Generator.Companion.sampleBernoulli(maxRatio: Double = 1.0, vararg collection: A) =
sampleBernoulli(listOf(collection), maxRatio)

View File

@ -0,0 +1,21 @@
package net.corda.client.mock
import net.corda.core.contracts.Amount
import net.corda.core.serialization.OpaqueBytes
import java.util.*
fun generateCurrency(): Generator<Currency> {
return Generator.pickOne(Currency.getAvailableCurrencies().toList())
}
fun <T : Any> generateAmount(min: Long, max: Long, tokenGenerator: Generator<T>): Generator<Amount<T>> {
return Generator.longRange(min, max).combine(tokenGenerator) { quantity, token -> Amount(quantity, token) }
}
fun generateCurrencyAmount(min: Long, max: Long): Generator<Amount<Currency>> {
return generateAmount(min, max, generateCurrency())
}
fun generateIssueRef(size: Int): Generator<OpaqueBytes> {
return Generator.bytes(size).map { OpaqueBytes(it) }
}

View File

@ -285,12 +285,12 @@ fun extractZipFile(zipPath: Path, toPath: Path) {
val Throwable.rootCause: Throwable get() = Throwables.getRootCause(this)
/** Representation of an operation that may have thrown an error. */
data class ErrorOr<out A> private constructor(val value: A?, val error: Throwable?) {
data class ErrorOr<out A : Any> private constructor(val value: A?, val error: Throwable?) {
constructor(value: A) : this(value, null)
companion object {
/** Runs the given lambda and wraps the result. */
inline fun <T> catch(body: () -> T): ErrorOr<T> = try { ErrorOr(body()) } catch (t: Throwable) { ErrorOr.of(t) }
inline fun <T : Any> catch(body: () -> T): ErrorOr<T> = try { ErrorOr(body()) } catch (t: Throwable) { ErrorOr.of(t) }
fun of(t: Throwable) = ErrorOr(null, t)
}
@ -311,15 +311,15 @@ data class ErrorOr<out A> private constructor(val value: A?, val error: Throwabl
}
// Functor
fun <B> map(function: (A) -> B) = ErrorOr(value?.let(function), error)
fun <B : Any> map(function: (A) -> B) = ErrorOr(value?.let(function), error)
// Applicative
fun <B, C> combine(other: ErrorOr<B>, function: (A, B) -> C): ErrorOr<C> {
fun <B : Any, C : Any> combine(other: ErrorOr<B>, function: (A, B) -> C): ErrorOr<C> {
return ErrorOr(value?.let { a -> other.value?.let { b -> function(a, b) } }, error ?: other.error)
}
// Monad
fun <B> bind(function: (A) -> ErrorOr<B>) = value?.let(function) ?: ErrorOr.of(error!!)
fun <B : Any> bind(function: (A) -> ErrorOr<B>) = value?.let(function) ?: ErrorOr.of(error!!)
}
/**

View File

@ -170,5 +170,7 @@ sealed class NotaryError {
class TransactionInvalid : NotaryError()
class SignaturesMissing(val missingSigners: Set<PublicKeyTree>) : NotaryError()
class SignaturesMissing(val missingSigners: Set<PublicKeyTree>) : NotaryError() {
override fun toString() = "Missing signatures from: ${missingSigners.map { it.toBase58String() }}"
}
}

114
docs/source/loadtesting.rst Normal file
View File

@ -0,0 +1,114 @@
Load testing
============
This section explains how to apply random load to nodes to stress test them. It also allows the specification of disruptions that strain different resources, allowing us to inspect the nodes' behaviour under extreme conditions.
The load-testing framework is incomplete and is not part of CI currently, but the basic pieces are there.
Configuration of the load testing cluster
-----------------------------------------
The load-testing framework currently assumes the following about the node cluster:
* The nodes are managed as a systemd service
* The node directories are the same across the cluster
* The messaging ports are the same across the cluster
* The executing identity of the load-test has SSH access to all machines
* There is a single network map service node
* There is a single notary node
* Some disruptions also assume other tools (like openssl) to be present
Note that these points could and should be relaxed as needed.
The load test Main expects a single command line argument that points to a configuration file specifying the cluster hosts and optional overrides for the default configuration:
.. literalinclude:: ../../tools/loadtest/src/main/resources/loadtest-reference.conf
Running the load tests
----------------------
In order to run the loadtests you need to have an active SSH-agent running with a single identity added that has SSH access to the loadtest cluster.
You can use either IntelliJ or the gradle command line to start the tests.
To use gradle: ``./gradlew tools:loadtest:run -Ploadtest-config=PATH_TO_LOADTEST_CONF``
To use IntelliJ simply run Main.kt with the config path supplied as an argument.
Configuration of individual load tests
--------------------------------------
The load testing configurations are not set-in-stone and are meant to be played with to see how the nodes react.
There are a couple of top-level knobs to tweak test behaviour:
.. literalinclude:: ../../tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt
:language: kotlin
:start-after: DOCS START 2
:end-before: DOCS END 2
The one thing of note is ``disruptionPatterns``, which may be used to specify ways of disrupting the normal running of the load tests.
.. literalinclude:: ../../tools/loadtest/src/main/kotlin/net/corda/loadtest/Disruption.kt
:language: kotlin
:start-after: DOCS START 1
:end-before: DOCS END 1
Disruptions run concurrently in loops on randomly chosen nodes filtered by ``nodeFilter`` at somewhat random intervals.
As an example take ``strainCpu`` which overutilises the processor:
.. literalinclude:: ../../tools/loadtest/src/main/kotlin/net/corda/loadtest/Disruption.kt
:language: kotlin
:start-after: DOCS START 2
:end-before: DOCS END 2
We can use this by specifying a ``DisruptionSpec`` in the load test's ``RunParameters``:
.. literalinclude:: ../../tools/loadtest/src/main/kotlin/net/corda/loadtest/Main.kt
:language: kotlin
:start-after: DOCS START 1
:end-before: DOCS END 1
:dedent: 30
This means every 5-10 seconds at least one randomly chosen nodes' cores will be spinning 100% for 10 seconds.
How to write a load test
------------------------
A load test is basically defined by a random datastructure generator that specifies a unit of work a node should perform, a function that performs this work, and a function that predicts what state the node should end up in by doing so:
.. literalinclude:: ../../tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt
:language: kotlin
:start-after: DOCS START 1
:end-before: DOCS END 1
``LoadTest`` is parameterised over ``T``, the unit of work, and ``S``, the state type that aims to track remote node states. As an example let's look at the Self Issue test. This test simply creates Cash Issues from nodes to themselves, and then checks the vault to see if the numbers add up:
.. literalinclude:: ../../tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt
:language: kotlin
:start-after: DOCS START 1
:end-before: DOCS END 1
The unit of work ``SelfIssueCommand`` simply holds an Issue and a handle to a node where the issue should be submitted. The ``generate`` method should provide a generator for these.
The state ``SelfIssueState`` then holds a map from node identities to a Long that describes the sum quantity of the generated issues (we fixed the currency to be USD).
The invariant we want to hold then simply is: The sum of submitted Issues should be the sum of the quantities in the vaults.
The ``interpret`` function should take a ``SelfIssueCommand`` and update ``SelfIssueState`` to reflect the change we're expecting in the remote nodes. In our case this will simply be adding the issued amount to the corresponding node's Long.
The ``execute`` function should perform the action on the cluster. In our case it will simply take the node handle and submit an RPC request for the Issue.
The ``gatherRemoteState`` function should check the actual remote nodes' states and see whether they conflict with our local predictions (and should throw if they do). This function deserves its own paragraph.
.. literalinclude:: ../../tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt
:language: kotlin
:start-after: val execute
:end-before: val isConsistent
``gatherRemoteState`` gets as input handles to all the nodes, and the current predicted state, or null if this is the initial gathering.
The reason it gets the previous state boils down to allowing non-deterministic predictions about the nodes' remote states. Say some piece of work triggers an asynchronous notification of a node. We need to account both for the case when the node hasn't received the notification and for the case when it has. In these cases ``S`` should somehow represent a collection of possible states, and ``gatherRemoteState`` should "collapse" the collection based on the observations it makes. Of course we don't need this for the simple case of the Self Issue test.
The last parameter ``isConsistent`` is used to poll for eventual consistency at the end of a load test. This is not needed for Self Issue.

View File

@ -31,6 +31,7 @@ import java.util.*
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
/**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
@ -77,8 +78,9 @@ sealed class PortAllocation {
abstract fun nextPort(): Int
fun nextHostAndPort(): HostAndPort = HostAndPort.fromParts("localhost", nextPort())
class Incremental(private var portCounter: Int) : PortAllocation() {
override fun nextPort() = portCounter++
class Incremental(startingPort: Int) : PortAllocation() {
val portCounter = AtomicInteger(startingPort)
override fun nextPort() = portCounter.andIncrement
}
class RandomFree(): PortAllocation() {
override fun nextPort(): Int {

View File

@ -159,4 +159,3 @@ data class ProtocolHandle<A>(
val progress: Observable<ProgressTracker.Change>,
val returnValue: Observable<A>
)

View File

@ -71,7 +71,7 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) {
fun dispatch(msg: ClientRPCRequestMessage) {
val (argsBytes, replyTo, observationsTo, methodName) = msg
val response: ErrorOr<Any?> = ErrorOr.catch {
val response: ErrorOr<Any> = ErrorOr.catch {
val method = methodTable[methodName] ?: throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?")
if (method.isAnnotationPresent(RPCReturnsObservables::class.java) && observationsTo == null)
throw RPCException("Received RPC without any destination for observations, but the RPC returns observables")

View File

@ -3,6 +3,7 @@
package net.corda.node.services.messaging
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
@ -13,6 +14,7 @@ import de.javakaffee.kryoserializers.ArraysAsListSerializer
import de.javakaffee.kryoserializers.guava.*
import net.corda.contracts.asset.Cash
import net.corda.core.ErrorOr
import net.corda.core.TransientProperty
import net.corda.core.contracts.*
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.Party
@ -31,11 +33,13 @@ import net.corda.node.services.User
import net.corda.protocols.CashProtocolResult
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.SimpleString
import org.objenesis.strategy.StdInstantiatorStrategy
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Notification
import rx.Observable
import java.security.PublicKey
import java.time.Instant
import java.util.*
@ -198,6 +202,8 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(ServiceType.parse("ab").javaClass)
register(WorldCoordinate::class.java)
register(HostAndPort::class.java)
register(SimpleString::class.java)
register(ServiceEntry::class.java)
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(IllegalArgumentException::class.java)
// Kryo couldn't serialize Collections.unmodifiableCollection in Throwable correctly, causing null pointer exception when try to access the deserialize object.
@ -207,6 +213,8 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(Collections.unmodifiableList(emptyList<String>()).javaClass)
register(PermissionException::class.java)
register(ProtocolHandle::class.java)
register(KryoException::class.java)
register(StringBuffer::class.java)
}
// Helper method, attempt to reduce boiler plate code

View File

@ -7,6 +7,7 @@ include 'client'
include 'experimental'
include 'test-utils'
include 'tools:explorer'
include 'tools:loadtest'
include 'gradle-plugins:quasar-utils'
include 'gradle-plugins:publish-utils'
include 'gradle-plugins:cordformation'

View File

@ -70,7 +70,8 @@ fun main(args: Array<String>) {
)
eventGenerator.clientToServiceCommandGenerator.map { command ->
rpcProxy?.startProtocol(::CashProtocol, command)
}.generate(Random())
Unit
}.generate(SplittableRandom())
}
waitForAllNodesToFinish()
}

View File

@ -0,0 +1,45 @@
apply plugin: 'kotlin'
apply plugin: 'application'
repositories {
mavenLocal()
mavenCentral()
maven {
url 'http://oss.sonatype.org/content/repositories/snapshots'
}
jcenter()
maven {
url 'https://dl.bintray.com/kotlin/exposed'
}
}
sourceSets {
main {
resources {
srcDir "../../config/dev"
}
}
}
mainClassName = 'com.r3corda.loadtest.MainKt'
dependencies {
compile project(':client')
// https://mvnrepository.com/artifact/com.jcraft/jsch
compile group: 'com.jcraft', name: 'jsch', version: '0.1.54'
compile group: 'com.jcraft', name: 'jsch.agentproxy.core', version: '0.0.9'
compile group: 'com.jcraft', name: 'jsch.agentproxy.sshagent', version: '0.0.9'
compile group: 'com.jcraft', name: 'jsch.agentproxy.usocket-jna', version: '0.0.9'
// https://mvnrepository.com/artifact/de.danielbechler/java-object-diff
compile group: 'de.danielbechler', name: 'java-object-diff', version: '0.10.2'
compile "com.typesafe:config:1.3.0"
}
run {
if (project.hasProperty('loadtest-config') ) {
args project["loadtest-config"]
}
}

View File

@ -0,0 +1,260 @@
package net.corda.loadtest
import com.google.common.net.HostAndPort
import com.jcraft.jsch.*
import com.jcraft.jsch.agentproxy.AgentProxy
import com.jcraft.jsch.agentproxy.connector.SSHAgentConnector
import com.jcraft.jsch.agentproxy.usocket.JNAUSocketFactory
import kotlinx.support.jdk8.collections.parallelStream
import kotlinx.support.jdk8.streams.toList
import net.corda.client.CordaRPCClient
import net.corda.core.createDirectories
import net.corda.core.div
import net.corda.node.driver.PortAllocation
import net.corda.node.services.config.NodeSSLConfiguration
import net.corda.node.services.messaging.CordaRPCOps
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
import java.io.Closeable
import java.nio.file.Path
import java.util.*
private val log = LoggerFactory.getLogger(ConnectionManager::class.java)
/**
* Creates a new [JSch] instance with identities loaded from the running SSH agent.
*/
fun setupJSchWithSshAgent(): JSch {
val connector = SSHAgentConnector(JNAUSocketFactory())
val agentProxy = AgentProxy(connector)
val identities = agentProxy.identities
require(identities.isNotEmpty()) { "No SSH identities found, please add one to the agent" }
require(identities.size == 1) { "Multiple SSH identities found, don't know which one to pick" }
val identity = identities[0]
log.info("Using SSH identity ${String(identity.comment) }")
return JSch().apply {
identityRepository = object : IdentityRepository {
override fun getStatus(): Int {
if (connector.isAvailable) {
return IdentityRepository.RUNNING
} else {
return IdentityRepository.UNAVAILABLE
}
}
override fun getName() = connector.name
override fun getIdentities(): Vector<Identity> = Vector(listOf(
object : Identity {
override fun clear() {}
override fun getAlgName() = String(Buffer(identity.blob).string)
override fun getName() = String(identity.comment)
override fun isEncrypted() = false
override fun getSignature(data: ByteArray?) = agentProxy.sign(identity.blob, data)
override fun decrypt() = true
override fun getPublicKeyBlob() = identity.blob
override fun setPassphrase(passphrase: ByteArray?) = true
}
))
override fun remove(blob: ByteArray?) = throw UnsupportedOperationException()
override fun removeAll() = throw UnsupportedOperationException()
override fun add(identity: ByteArray?) = throw UnsupportedOperationException()
}
}
}
class ConnectionManager(private val username: String, private val jSch: JSch) {
fun connectToNode(
nodeHost: String,
remoteMessagingPort: Int,
localTunnelAddress: HostAndPort,
certificatesBaseDirectory: Path,
remoteCertificatesDirectory: Path
): NodeConnection {
val session = jSch.getSession(username, nodeHost, 22)
// We don't check the host fingerprints because they may change often
session.setConfig("StrictHostKeyChecking", "no")
log.info("Connecting to $nodeHost...")
session.connect()
log.info("Connected to $nodeHost!")
log.info("Creating tunnel from $nodeHost:$remoteMessagingPort to $localTunnelAddress...")
session.setPortForwardingL(localTunnelAddress.port, localTunnelAddress.hostText, remoteMessagingPort)
log.info("Tunnel created!")
val certificatesDirectory = certificatesBaseDirectory / nodeHost
val sslKeyStoreFileName = "sslkeystore.jks"
val trustStoreFileName = "truststore.jks"
log.info("Copying server certificates to $certificatesDirectory")
certificatesDirectory.createDirectories()
val channel = session.openChannel("sftp") as ChannelSftp
channel.connect()
channel.get((remoteCertificatesDirectory / sslKeyStoreFileName).toString(), certificatesDirectory.toString())
channel.get((remoteCertificatesDirectory / trustStoreFileName).toString(), certificatesDirectory.toString())
channel.disconnect()
log.info("Certificates copied!")
val connection = NodeConnection(nodeHost, session, localTunnelAddress, certificatesDirectory)
connection.startClient()
return connection
}
}
/**
* Connects to a list of nodes and executes the passed in action with the connections as parameter. The connections are
* safely cleaned up if an exception is thrown.
*
* @param username The UNIX username to use for SSH authentication.
* @param nodeHostsAndCertificatesPaths The list of hosts and associated remote paths to the nodes' certificate directories.
* @param remoteMessagingPort The Artemis messaging port nodes are listening on.
* @param tunnelPortAllocation A local port allocation strategy for creating SSH tunnels.
* @param certificatesBaseDirectory A local directory to put downloaded certificates in.
* @param withConnections An action to run once we're connected to the nodes.
* @return The return value of [withConnections]
*/
fun <A> connectToNodes(
username: String,
nodeHostsAndCertificatesPaths: List<Pair<String, Path>>,
remoteMessagingPort: Int,
tunnelPortAllocation: PortAllocation,
certificatesBaseDirectory: Path,
withConnections: (List<NodeConnection>) -> A
): A {
val manager = ConnectionManager(username, setupJSchWithSshAgent())
val connections = nodeHostsAndCertificatesPaths.parallelStream().map { nodeHostAndCertificatesPath ->
manager.connectToNode(
nodeHost = nodeHostAndCertificatesPath.first,
remoteMessagingPort = remoteMessagingPort,
localTunnelAddress = tunnelPortAllocation.nextHostAndPort(),
certificatesBaseDirectory = certificatesBaseDirectory,
remoteCertificatesDirectory = nodeHostAndCertificatesPath.second
)
}.toList()
return try {
withConnections(connections)
} finally {
connections.forEach(NodeConnection::close)
}
}
/**
* [NodeConnection] allows executing remote shell commands on the node as well as executing RPCs.
* The RPC Client start/stop must be controlled externally with [startClient] and [doWhileClientStopped]. For example
* if we want to do some action on the node that requires bringing down of the node we should nest it in a
* [doWhileClientStopped], otherwise the RPC link will be broken.
*/
class NodeConnection(
val hostName: String,
private val jSchSession: Session,
private val localTunnelAddress: HostAndPort,
private val certificatesDirectory: Path
): Closeable {
private val sslConfig = object : NodeSSLConfiguration {
override val certificatesPath = certificatesDirectory
override val keyStorePassword: String get() = "cordacadevpass"
override val trustStorePassword: String get() = "trustpass"
}
private var client: CordaRPCClient? = null
private var _proxy: CordaRPCOps? = null
val proxy: CordaRPCOps get() = _proxy ?: throw IllegalStateException("proxy requested, but the client is not running")
data class ShellCommandOutput(
val originalShellCommand: String,
val exitCode: Int,
val stdout: String,
val stderr: String
) {
fun getResultOrThrow(): String {
if (exitCode != 0) {
val diagnostic =
"There was a problem running \"$originalShellCommand\":\n" +
" stdout:\n$stdout" +
" stderr:\n$stderr"
log.error(diagnostic)
throw Exception(diagnostic)
} else {
return stdout
}
}
}
fun <A> doWhileClientStopped(action: () -> A): A {
val client = client
val proxy = _proxy
check(client == null || proxy == null) { "doWhileClientStopped called with no running client" }
log.info("Stopping RPC proxy to $hostName, tunnel at $localTunnelAddress")
client!!.close()
try {
return action()
} finally {
log.info("Starting new RPC proxy to $hostName, tunnel at $localTunnelAddress")
val newClient = CordaRPCClient(localTunnelAddress, sslConfig)
// TODO expose these somehow?
newClient.start("user1", "test")
val newProxy = newClient.proxy()
this.client = newClient
this._proxy = newProxy
}
}
fun startClient() {
log.info("Creating RPC proxy to $hostName, tunnel at $localTunnelAddress")
val client = CordaRPCClient(localTunnelAddress, sslConfig)
client.start("user1", "test")
val proxy = client.proxy()
log.info("Proxy created")
this.client = client
this._proxy = proxy
}
/**
* @return Pair of (stdout, stderr) of command
*/
fun runShellCommandGetOutput(command: String): ShellCommandOutput {
log.info("Running '$command' on $hostName")
val (exitCode, pair) = withChannelExec(command) { channel ->
val stdoutStream = ByteArrayOutputStream()
val stderrStream = ByteArrayOutputStream()
channel.outputStream = stdoutStream
channel.setErrStream(stderrStream)
channel.connect()
poll { channel.isEOF }
Pair(stdoutStream.toString(), stderrStream.toString())
}
return ShellCommandOutput(
originalShellCommand = command,
exitCode = exitCode,
stdout = pair.first,
stderr = pair.second
)
}
/**
* @param function should call [ChannelExec.connect]
* @return A pair of (exit code, [function] return value)
*/
private fun <A> withChannelExec(command: String, function: (ChannelExec) -> A): Pair<Int, A> {
val channel = jSchSession.openChannel("exec") as ChannelExec
channel.setCommand(command)
try {
val result = function(channel)
poll { channel.isEOF }
return Pair(channel.exitStatus, result)
} finally {
channel.disconnect()
}
}
override fun close() {
client?.close()
jSchSession.disconnect()
}
}
fun poll(intervalMilliseconds: Long = 500, function: () -> Boolean) {
while (!function()) {
Thread.sleep(intervalMilliseconds)
}
}

View File

@ -0,0 +1,105 @@
package net.corda.loadtest
import net.corda.client.mock.*
import net.corda.node.services.network.NetworkMapService
import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.Callable
import java.util.concurrent.Executors
private val log = LoggerFactory.getLogger(Disruption::class.java)
/**
* A [Disruption] puts strain on the passed in node in some way. Each disruption runs in its own thread in a tight loop
* and may be interrupted at will, plan your cleanup accordingly so that nodes are left in a usable state.
*/
// DOCS START 1
data class Disruption(
val name: String,
val disrupt: (NodeHandle, SplittableRandom) -> Unit
)
data class DisruptionSpec(
val nodeFilter: (NodeHandle) -> Boolean,
val disruption: Disruption,
val noDisruptionWindowMs: LongRange
)
// DOCS END 1
/**
* TODO Further Disruptions to add:
* * Strain on filesystem.
* * Keep clearing fs caches?
* * Keep allocating lots of fds.
* * Exhaust disk space
* * Delete non-DB stored files like attachments.
* * Strain on DB.
* * In theory starting protocols that hang in a tight loop should do the job.
* * We could also mess with the database directly.
* * Strain on ActiveMQ.
* * Requires exposing of the Artemis client in [NodeConnection].
* * Fuzz inputs.
* * Randomly block queues.
* * Randomly duplicate messages, perhaps to other queues even.
*/
val isNetworkMap = { node: NodeHandle -> node.info.advertisedServices.any { it.info.type == NetworkMapService.type } }
val isNotary = { node: NodeHandle -> node.info.advertisedServices.any { it.info.type.isNotary() } }
fun <A> ((A) -> Boolean).or(other: (A) -> Boolean): (A) -> Boolean = { this(it) || other(it) }
fun hang(hangIntervalRange: LongRange) = Disruption("Hang randomly") { node, random ->
val hangIntervalMs = Generator.longRange(hangIntervalRange).generateOrFail(random)
node.doWhileSigStopped { Thread.sleep(hangIntervalMs) }
}
val restart = Disruption("Restart randomly") { node, random ->
node.connection.runShellCommandGetOutput("sudo systemctl restart ${node.configuration.remoteSystemdServiceName}").getResultOrThrow()
}
val kill = Disruption("Kill randomly") { node, random ->
val pid = node.getNodePid()
node.connection.runShellCommandGetOutput("sudo kill $pid")
}
val deleteDb = Disruption("Delete persistence database without restart") { node, random ->
node.connection.runShellCommandGetOutput("sudo rm ${node.configuration.remoteNodeDirectory}/persistence.mv.db").getResultOrThrow()
}
// DOCS START 2
fun strainCpu(parallelism: Int, durationSeconds: Int) = Disruption("Put strain on cpu") { node, random ->
val shell = "for c in {1..$parallelism} ; do openssl enc -aes-128-cbc -in /dev/urandom -pass pass: -e > /dev/null & done && JOBS=\$(jobs -p) && (sleep $durationSeconds && kill \$JOBS) & wait"
node.connection.runShellCommandGetOutput(shell).getResultOrThrow()
}
// DOCS END 2
fun <A> Nodes.withDisruptions(disruptions: List<DisruptionSpec>, mainRandom: SplittableRandom, action: () -> A): A {
val executor = Executors.newCachedThreadPool()
disruptions.map { disruption ->
val random = mainRandom.split()
val relevantNodes = allNodes.filter(disruption.nodeFilter)
executor.submit {
while (true) {
val noDisruptionIntervalMs = Generator.longRange(disruption.noDisruptionWindowMs).generateOrFail(random)
Thread.sleep(noDisruptionIntervalMs)
val randomNodes = Generator.sampleBernoulli(relevantNodes).generateOrFail(random)
val nodes = if (randomNodes.isEmpty()) {
listOf(Generator.pickOne(relevantNodes).generateOrFail(random))
} else {
randomNodes
}
executor.invokeAll(nodes.map { node ->
val nodeRandom = random.split()
Callable {
log.info("Disrupting ${node.connection.hostName} with '${disruption.disruption.name}'")
disruption.disruption.disrupt(node, nodeRandom)
}
})
}
}
}
try {
return action()
} finally {
executor.shutdownNow()
}
}

View File

@ -0,0 +1,208 @@
package net.corda.loadtest
import kotlinx.support.jdk8.collections.parallelStream
import net.corda.client.mock.Generator
import net.corda.core.crypto.toBase58String
import net.corda.core.div
import net.corda.node.driver.PortAllocation
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.ValidatingNotaryService
import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
private val log = LoggerFactory.getLogger(LoadTest::class.java)
/**
* @param T The type of generated object in the load test. This should describe the basic unit of execution, for example
* a single transaction to execute.
* @param S The type of state that describes the state of the load test, for example a hashmap of vaults. Note that this
* most probably won't be the actual vault, because that includes [StateRef]s which we cannot predict in advance
* in [interpret] due to usage of nonces.
* @param generate Generator function for [T]s. (e.g. generate payment transactions of random quantity). It takes as
* input a number indicating around how many objects it should generate. This need not be the case, but the generator
* must generate the objects so that they run consistently when executed in parallel. (e.g. if Alice has 100 USD we
* cannot generate two Spend(80 USD) txs, even though individually they are consistent).
* @param interpret A pure function that applies the generated object to the abstract state. (e.g. subtract/add payment
* quantity to relevant vaults)
* @param execute A function that executes the generated object by executing IO (e.g. make RPC call to execute tx).
* @param gatherRemoteState A function that assembles the abstract state from the real world (e.g. by getting snapshots
* from nodes) and the current simulated state. When run the simulated state will be replaced by the returned value.
* It should throw an exception if a divergence from the expected state is detected.
* @param isConsistent Should be specified if the abstract state tracks non-determinism, in which case it should return
* false if the state is not yet consistent, true otherwise. The final convergence check will poll this value on
* gathered states.
*
* TODO Perhaps an interface would be more idiomatic here
*/
// DOCS START 1
data class LoadTest<T, S>(
val testName: String,
val generate: Nodes.(S, Int) -> Generator<List<T>>,
val interpret: (S, T) -> S,
val execute: Nodes.(T) -> Unit,
val gatherRemoteState: Nodes.(S?) -> S,
val isConsistent: (S) -> Boolean = { true }
) {
// DOCS END 1
// DOCS START 2
/**
* @param parallelism Number of concurrent threads to use to run commands. Note that the actual parallelism may be
* further limited by the batches that [generate] returns.
* @param generateCount Number of total commands to generate. Note that the actual number of generated commands may
* exceed this, it is used just for cutoff.
* @param clearDatabaseBeforeRun Indicates whether the node databases should be cleared before running the test. May
* significantly slow down testing as this requires bringing the nodes down and up again.
* @param gatherFrequency Indicates after how many commands we should gather the remote states.
* @param disruptionPatterns A list of disruption-lists. The test will be run for each such list, and the test will
* be interleaved with the specified disruptions.
*/
data class RunParameters(
val parallelism: Int,
val generateCount: Int,
val clearDatabaseBeforeRun: Boolean,
val gatherFrequency: Int,
val disruptionPatterns: List<List<DisruptionSpec>>
)
// DOCS END 2
fun run(nodes: Nodes, parameters: RunParameters, random: SplittableRandom) {
log.info("Running '$testName' with parameters $parameters")
if (parameters.clearDatabaseBeforeRun) {
log.info("Clearing databases as clearDatabaseBeforeRun=true")
// We need to clear the network map first so that other nodes register fine
nodes.networkMap.clearDb()
(nodes.simpleNodes + listOf(nodes.notary)).parallelStream().forEach {
it.clearDb()
}
}
parameters.disruptionPatterns.forEach { disruptions ->
log.info("Running test '$testName' with disruptions ${disruptions.map { it.disruption.name }}")
nodes.withDisruptions(disruptions, random) {
var state = nodes.gatherRemoteState(null)
var count = parameters.generateCount
var countSinceLastCheck = 0
while (count > 0) {
log.info("$count remaining commands, state:\n$state")
// Generate commands
val commands = nodes.generate(state, parameters.parallelism).generate(random).getOrThrow()
require(commands.size > 0)
log.info("Generated command batch of size ${commands.size}: $commands")
// Interpret commands
val newState = commands.fold(state, interpret)
// Execute commands
val queue = ConcurrentLinkedQueue(commands)
(1 .. parameters.parallelism).toList().parallelStream().forEach {
var next = queue.poll()
while (next != null) {
log.info("Executing $next")
try {
nodes.execute(next)
next = queue.poll()
} catch (exception: Throwable) {
val diagnostic = executeDiagnostic(state, newState, next, exception)
log.error(diagnostic)
throw Exception(diagnostic)
}
}
}
countSinceLastCheck += commands.size
if (countSinceLastCheck >= parameters.gatherFrequency) {
log.info("Checking consistency...")
countSinceLastCheck %= parameters.gatherFrequency
state = nodes.gatherRemoteState(newState)
} else {
state = newState
}
count -= commands.size
}
log.info("Checking final consistency...")
poll {
state = nodes.gatherRemoteState(state)
isConsistent(state).apply {
if (!this) {
log.warn("State is not yet consistent: $state")
}
}
}
log.info("'$testName' done!")
}
}
}
companion object {
fun <T, S> executeDiagnostic(oldState: S, newState: S, failedCommand: T, exception: Throwable): String {
return "There was a problem executing command $failedCommand." +
"\nOld simulated state: $oldState" +
"\nNew simulated state(after batch): $newState" +
"\nException: $exception"
}
}
}
data class Nodes(
val notary: NodeHandle,
val networkMap: NodeHandle,
val simpleNodes: List<NodeHandle>
) {
val allNodes by lazy { (listOf(notary, networkMap) + simpleNodes).associateBy { it.info }.values }
}
/**
* Runs the given [LoadTest]s using the given configuration.
*/
fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest<*, *>, LoadTest.RunParameters>>) {
val seed = configuration.seed ?: Random().nextLong()
log.info("Using seed $seed")
val random = SplittableRandom(seed)
connectToNodes(
configuration.sshUser,
configuration.nodeHosts.map { it to configuration.remoteNodeDirectory / "certificates" },
configuration.remoteMessagingPort,
PortAllocation.Incremental(configuration.localTunnelStartingPort),
configuration.localCertificatesBaseDirectory
) { connections ->
log.info("Connected to all nodes!")
val hostNodeHandleMap = ConcurrentHashMap<String, NodeHandle>()
connections.parallelStream().forEach { connection ->
log.info("Getting node info of ${connection.hostName}")
val nodeInfo = connection.proxy.nodeIdentity()
log.info("Got node info of ${connection.hostName}: $nodeInfo!")
val otherNodeInfos = connection.proxy.networkMapUpdates().first
val pubkeysString = otherNodeInfos.map {
" ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}"
}.joinToString("\n")
log.info("${connection.hostName} sees\n$pubkeysString")
val nodeHandle = NodeHandle(configuration, connection, nodeInfo)
nodeHandle.waitUntilUp()
hostNodeHandleMap.put(connection.hostName, nodeHandle)
}
val networkMapNode = hostNodeHandleMap.toList().single {
it.second.info.advertisedServices.any { it.info.type == NetworkMapService.type }
}
val notaryNode = hostNodeHandleMap.toList().single {
it.second.info.advertisedServices.any { it.info.type.isNotary() }
}
val nodes = Nodes(
notary = notaryNode.second,
networkMap = networkMapNode.second,
simpleNodes = hostNodeHandleMap.values.filter {
it.info.advertisedServices.filter {
it.info.type in setOf(NetworkMapService.type, ValidatingNotaryService.type)
}.isEmpty()
}
)
tests.forEach {
val (test, parameters) = it
test.run(nodes, parameters, random)
}
}
}

View File

@ -0,0 +1,27 @@
package net.corda.loadtest
import java.nio.file.Path
/**
* @param sshUser The UNIX username to use for SSH auth.
* @param localCertificatesBaseDirectory The base directory to put node certificates in.
* @param localTunnelStartingPort The local starting port to allocate tunneling ports from.
* @param nodeHosts The nodes' resolvable addresses.
* @param remoteNodeDirectory The remote node directory.
* @param remoteMessagingPort The remote Artemis messaging port.
* @param remoteSystemdServiceName The name of the node's systemd service
* @param seed An optional starting seed for the [SplittableRandom] RNG. Note that specifying the seed may not be enough
* to make a load test reproducible due to unpredictable node behaviour, but it should make the local number
* generation deterministic as long as [SplittableRandom.split] is used as required. This RNG is also used as input
* for disruptions.
*/
data class LoadTestConfiguration(
val sshUser: String,
val localCertificatesBaseDirectory: Path,
val localTunnelStartingPort: Int,
val nodeHosts: List<String>,
val remoteNodeDirectory: Path,
val remoteMessagingPort: Int,
val remoteSystemdServiceName: String,
val seed: Long?
)

View File

@ -0,0 +1,126 @@
package net.corda.loadtest
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import net.corda.loadtest.tests.crossCashTest
import net.corda.loadtest.tests.selfIssueTest
import java.io.File
import java.nio.file.Paths
/**
* This is how load testing works:
*
* Setup:
* The load test assumes that there is an active SSH Agent running with an added identity it can use to connect to all
* remote nodes. To run intellij with the ssh-agent:
* $ ssh-agent $SHELL
* $ ssh-add
* $ exec idea.sh # 'exec' is required so we can detach from the surrounding shell without quiting the agent.
*
* In order to make our life easier we download the remote node certificates to localhost and use those directly. The
* reasoning being that if somebody has SSH access to the nodes they have access to the certificates anyway.
* TODO Still, is this ok? Perhaps we should include a warning in the docs.
* We then tunnel the remote Artemis messaging ports to localhost and establish an RPC link.
*
* Running the tests:
* The [LoadTest] API assumes that each load test will keep generating some kind of work to push to the nodes. It also
* assumes that the nodes' behaviour will be somewhat predictable, which is tracked in a state. For example say we
* want to self issue Cash on each node(see [SelfIssueTest]). We can predict that if we submit an Issue request of
* 100 USD and 200 USD we should end up with 300 USD issued by the node. Each load test can define its own such
* invariant and should check for it in [LoadTest.gatherRemoteState].
* We then simply keep generating pieces of work and check that the invariants hold(see [LoadTest.RunParameters] on
* how to configure the generation).
* In addition for each test a number of disruptions may be specified that make the nodes' jobs harder. Each
* disruption is basically an infinite loop of wait->mess something up->repeat. Invariants should hold under these
* conditions as well.
*
* Diagnostic:
* TODO currently the diagnostic is quite poor, all we can say is that the predicted state is different from the real
* one, or that some piece of work failed to execute in some state. Logs need to be checked manually.
*
* TODO: Any load test that involves intra-node transactions will currently fail because the node re-picks the same states
* if tx creation requests arrive quickly, which result in notarisation failures. So this needs figuring out before we
* can run the load tests properly.
*/
fun main(args: Array<String>) {
if (args.isEmpty()) {
throw IllegalArgumentException("Usage: <binary> PATH_TO_CONFIG")
}
val defaultConfig = ConfigFactory.parseResources("loadtest-reference.conf", ConfigParseOptions.defaults().setAllowMissing(false))
val customConfig = ConfigFactory.parseFile(File(args[0]), ConfigParseOptions.defaults().setAllowMissing(false))
val resolvedConfig = customConfig.withFallback(defaultConfig).resolve()
val loadTestConfiguration = LoadTestConfiguration(
sshUser = if (resolvedConfig.hasPath("sshUser")) resolvedConfig.getString("sshUser") else System.getProperty("user.name"),
localCertificatesBaseDirectory = Paths.get(resolvedConfig.getString("localCertificatesBaseDirectory")),
localTunnelStartingPort = resolvedConfig.getInt("localTunnelStartingPort"),
nodeHosts = resolvedConfig.getStringList("nodeHosts"),
remoteNodeDirectory = Paths.get("/opt/r3cev"),
remoteMessagingPort = 31337,
remoteSystemdServiceName = "r3cev-node",
seed = if (resolvedConfig.hasPath("seed")) resolvedConfig.getLong("seed") else null
)
if (loadTestConfiguration.nodeHosts.isEmpty()) {
throw IllegalArgumentException("Please specify at least one node host")
}
runLoadTests(loadTestConfiguration, listOf(
selfIssueTest to LoadTest.RunParameters(
parallelism = 100,
generateCount = 10000,
clearDatabaseBeforeRun = false,
gatherFrequency = 1000,
disruptionPatterns = listOf(
listOf(), // no disruptions
listOf(
DisruptionSpec(
disruption = hang(2000L..4000L),
nodeFilter = { true },
noDisruptionWindowMs = 500L..1000L
),
DisruptionSpec(
disruption = kill,
nodeFilter = isNetworkMap.or(isNotary),
noDisruptionWindowMs = 10000L..20000L // Takes a while for it to restart
),
// DOCS START 1
DisruptionSpec(
disruption = strainCpu(parallelism = 4, durationSeconds = 10),
nodeFilter = { true },
noDisruptionWindowMs = 5000L..10000L
)
// DOCS END 1
)
)
),
crossCashTest to LoadTest.RunParameters(
parallelism = 4,
generateCount = 2000,
clearDatabaseBeforeRun = true,
gatherFrequency = 10,
disruptionPatterns = listOf(
listOf(),
listOf(
DisruptionSpec(
disruption = hang(2000L..4000L),
nodeFilter = { true },
noDisruptionWindowMs = 500L..1000L
),
DisruptionSpec(
disruption = kill,
nodeFilter = isNetworkMap.or(isNotary),
noDisruptionWindowMs = 10000L..20000L // Takes a while for it to restart
),
DisruptionSpec(
disruption = strainCpu(parallelism = 4, durationSeconds = 10),
nodeFilter = { true },
noDisruptionWindowMs = 5000L..10000L
)
)
)
)
))
}

View File

@ -0,0 +1,48 @@
package net.corda.loadtest
import net.corda.core.node.NodeInfo
import org.slf4j.LoggerFactory
private val log = LoggerFactory.getLogger(NodeHandle::class.java)
data class NodeHandle(
val configuration: LoadTestConfiguration,
val connection: NodeConnection,
val info: NodeInfo
)
fun <A> NodeHandle.doWhileStopped(action: NodeHandle.() -> A): A {
return connection.doWhileClientStopped {
connection.runShellCommandGetOutput("sudo systemctl stop ${configuration.remoteSystemdServiceName}").getResultOrThrow()
try {
action()
} finally {
connection.runShellCommandGetOutput("sudo systemctl start ${configuration.remoteSystemdServiceName}").getResultOrThrow()
waitUntilUp()
}
}
}
fun <A> NodeHandle.doWhileSigStopped(action: NodeHandle.() -> A): A {
val pid = getNodePid()
log.info("PID is $pid")
connection.runShellCommandGetOutput("sudo kill -SIGSTOP $pid").getResultOrThrow()
try {
return action()
} finally {
connection.runShellCommandGetOutput("sudo kill -SIGCONT $pid").getResultOrThrow()
}
}
fun NodeHandle.clearDb() = doWhileStopped {
connection.runShellCommandGetOutput("sudo rm ${configuration.remoteNodeDirectory}/persistence.mv.db").getResultOrThrow()
}
fun NodeHandle.waitUntilUp() {
log.info("Waiting for ${info.legalIdentity} to come online")
connection.runShellCommandGetOutput("until sudo netstat -tlpn | grep ${configuration.remoteMessagingPort} > /dev/null ; do sleep 1 ; done")
}
fun NodeHandle.getNodePid(): String {
return connection.runShellCommandGetOutput("sudo netstat -tlpn | grep ${configuration.remoteMessagingPort} | awk '{print $7}' | grep -oE '[0-9]+'").getResultOrThrow()
}

View File

@ -0,0 +1,346 @@
package net.corda.loadtest.tests
import net.corda.client.mock.Generator
import net.corda.client.mock.pickN
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.USD
import net.corda.core.crypto.Party
import net.corda.core.serialization.OpaqueBytes
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeHandle
import net.corda.node.services.messaging.startProtocol
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import net.corda.protocols.CashProtocolResult
import org.slf4j.LoggerFactory
import java.util.*
private val log = LoggerFactory.getLogger("CrossCash")
/**
* Cross Cash test generates random issues, spends and exits between nodes and checks whether they succeeded fine. The
* logic is significantly more complicated than e.g. the Self Issue test because of the non-determinism of how
* transaction notifications arrive.
*/
data class CrossCashCommand(
val command: CashCommand,
val node: NodeHandle
) {
override fun toString(): String {
return when (command) {
is CashCommand.IssueCash -> {
"ISSUE ${node.info.legalIdentity} -> ${command.recipient} : ${command.amount}"
}
is CashCommand.PayCash -> {
"MOVE ${node.info.legalIdentity} -> ${command.recipient} : ${command.amount}"
}
is CashCommand.ExitCash -> {
"EXIT ${node.info.legalIdentity} : ${command.amount}"
}
}
}
}
/**
* Map from node to (map from issuer to USD quantity)
*/
data class CrossCashState(
val nodeVaults: Map<Party, Map<Party, Long>>,
// node -> (notifying node -> [(issuer, amount)])
// This map holds the queues that encode the non-determinism of how tx notifications arrive in the background.
// Only moves and issues create non-determinism on the receiver side.
// This together with [nodeVaults] should give the eventually consistent state of all nodes.
// We check a gathered state against this by searching for interleave patterns that produce the gathered state.
// If none is found we diverged. If several then we only pop from the queues so that all interleave patterns are
// still satisfied, as we don't know which one happened in reality. Most of the time we should find a single
// pattern where we can cut off the queues, thus collapsing the non-determinism. Note that we should do this
// frequently, otherwise the search blows up. (for queues of size [A,B,C] (A+1)*(B+1)*(C+1) states need to be
// checked)
// Alternative: We could track the transactions directly, which would remove the need for searching. However
// there is a sync issue between the vault's view and the tx db's view about the UTXOs. Furthermore the tracking
// itself would either require downloading the tx graph on every check or using the Observable stream which
// requires more concurrent code which is conceptually also more complex than the current design.
// TODO: Alternative: We may possibly reduce the complexity of the search even further using some form of
// knapsack instead of the naive search
val diffQueues: Map<Party, Map<Party, List<Pair<Party, Long>>>>
) {
fun copyVaults(): HashMap<Party, HashMap<Party, Long>> {
val newNodeVaults = HashMap<Party, HashMap<Party, Long>>()
for ((key, value) in nodeVaults) {
newNodeVaults[key] = HashMap(value)
}
return newNodeVaults
}
fun copyQueues(): HashMap<Party, HashMap<Party, ArrayList<Pair<Party, Long>>>> {
val newDiffQueues = HashMap<Party, HashMap<Party, ArrayList<Pair<Party, Long>>>>()
for ((node, queues) in diffQueues) {
val newQueues = HashMap<Party, ArrayList<Pair<Party, Long>>>()
for ((sender, value) in queues) {
newQueues[sender] = ArrayList(value)
}
newDiffQueues[node] = newQueues
}
return newDiffQueues
}
override fun toString(): String {
return "Base vault:\n" +
nodeVaults.map {
val node = it.key
" $node:\n" +
it.value.map {
val issuer = it.key
" $issuer: ${it.value}"
}.joinToString("\n")
}.joinToString("\n") +
"\nDiff queues:\n" +
diffQueues.map {
val node = it.key
" $node:\n" +
it.value.map {
val notifier = it.key
" $notifier: [" + it.value.map {
Issued(PartyAndReference(it.first, OpaqueBytes.of(0)), it.second)
}.joinToString(",") + "]"
}.joinToString("\n")
}.joinToString("\n")
}
}
val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
"Creating Cash transactions randomly",
generate = { state, parallelism ->
val nodeMap = simpleNodes.associateBy { it.info.legalIdentity }
Generator.pickN(parallelism, simpleNodes).bind { nodes ->
Generator.sequence(
nodes.map { node ->
val quantities = state.nodeVaults[node.info.legalIdentity] ?: mapOf()
val possibleRecipients = nodeMap.keys.toList()
val moves = quantities.map {
it.value.toDouble() / 1000 to generateMove(it.value, USD, it.key, possibleRecipients)
}
val exits = quantities.mapNotNull {
if (it.key == node.info.legalIdentity) {
it.value.toDouble() / 3000 to generateExit(it.value, USD)
} else {
null
}
}
val command = Generator.frequency(
listOf(1.0 to generateIssue(10000, USD, notary.info.notaryIdentity, possibleRecipients)) + moves + exits
)
command.map { CrossCashCommand(it, nodeMap[node.info.legalIdentity]!!) }
}
)
}
},
interpret = { state, command ->
when (command.command) {
is CashCommand.IssueCash -> {
val newDiffQueues = state.copyQueues()
val originators = newDiffQueues.getOrPut(command.command.recipient, { HashMap() })
val issuer = command.node.info.legalIdentity
val quantity = command.command.amount.quantity
val originator = issuer
val queue = originators.getOrPut(originator, { ArrayList() })
queue.add(Pair(issuer, quantity))
CrossCashState(state.nodeVaults, newDiffQueues)
}
is CashCommand.PayCash -> {
val newNodeVaults = state.copyVaults()
val newDiffQueues = state.copyQueues()
val recipientOriginators = newDiffQueues.getOrPut(command.command.recipient, { HashMap() })
val senderQuantities = newNodeVaults[command.node.info.legalIdentity]!!
val quantity = command.command.amount.quantity
val issuer = command.command.amount.token.issuer.party
val originator = command.node.info.legalIdentity
val senderQuantity = senderQuantities[issuer] ?: throw Exception(
"Generated payment of ${command.command.amount} from ${command.node.info.legalIdentity}, " +
"however there is no cash from $issuer!"
)
if (senderQuantity < quantity) {
throw Exception(
"Generated payment of ${command.command.amount} from ${command.node.info.legalIdentity}, " +
"however they only have $senderQuantity!"
)
}
if (senderQuantity == quantity) {
senderQuantities.remove(issuer)
} else {
senderQuantities.put(issuer, senderQuantity - quantity)
}
val recipientQueue = recipientOriginators.getOrPut(originator, { ArrayList() })
recipientQueue.add(Pair(issuer, quantity))
CrossCashState(newNodeVaults, newDiffQueues)
}
is CashCommand.ExitCash -> {
val newNodeVaults = state.copyVaults()
val issuer = command.node.info.legalIdentity
val quantity = command.command.amount.quantity
val issuerQuantities = newNodeVaults[issuer]!!
val issuerQuantity = issuerQuantities[issuer] ?: throw Exception(
"Generated exit of ${command.command.amount} from $issuer, however there is no cash to exit!"
)
if (issuerQuantity < quantity) {
throw Exception(
"Generated payment of ${command.command.amount} from $issuer, " +
"however they only have $issuerQuantity!"
)
}
if (issuerQuantity == quantity) {
issuerQuantities.remove(issuer)
} else {
issuerQuantities.put(issuer, issuerQuantity - quantity)
}
CrossCashState(newNodeVaults, state.diffQueues)
}
}
},
execute = { command ->
val result = command.node.connection.proxy.startProtocol(::CashProtocol, command.command).returnValue.toBlocking().first()
when (result) {
is CashProtocolResult.Success -> {
log.info(result.message)
}
is CashProtocolResult.Failed -> {
log.error(result.message)
}
}
},
gatherRemoteState = { previousState ->
log.info("Reifying state...")
val currentNodeVaults = HashMap<Party, HashMap<Party, Long>>()
simpleNodes.forEach {
val quantities = HashMap<Party, Long>()
val vault = it.connection.proxy.vaultAndUpdates().first
vault.forEach {
val state = it.state.data
if (state is Cash.State) {
val issuer = state.amount.token.issuer.party
quantities.put(issuer, (quantities[issuer] ?: 0L) + state.amount.quantity)
}
}
currentNodeVaults.put(it.info.legalIdentity, quantities)
}
val (consistentVaults, diffQueues) = if (previousState == null) {
Pair(currentNodeVaults, mapOf<Party, Map<Party, List<Pair<Party, Long>>>>())
} else {
log.info("${previousState.diffQueues.values.sumBy { it.values.sumBy { it.size } }} txs in limbo")
val newDiffQueues = previousState.copyQueues()
val newConsistentVault = previousState.copyVaults()
previousState.diffQueues.forEach { entry ->
val (node, queues) = entry
val searchedState = currentNodeVaults[node]
val baseState = previousState.nodeVaults[node]
if (searchedState != null) {
val matches = searchForState(searchedState, baseState ?: mapOf(), queues)
if (matches.isEmpty()) {
log.warn(
"Divergence detected, the remote state doesn't match any of our possible predictions." +
"\nPredicted state/queues:\n$previousState" +
"\nActual gathered state:\n${CrossCashState(currentNodeVaults, mapOf())}"
)
// TODO We should terminate here with an exception, we cannot carry on as we have an inconsistent model. We carry on currently because we always diverge due to notarisation failures
return@LoadTest CrossCashState(currentNodeVaults, mapOf<Party, Map<Party, List<Pair<Party, Long>>>>())
}
if (matches.size > 1) {
log.warn("Multiple predicted states match the remote state")
}
val minimumMatches = matches.fold<Map<Party, Int>, HashMap<Party, Int>?>(null) { minimum, next ->
if (minimum == null) {
HashMap(next)
} else {
next.forEach { entry ->
minimum.merge(entry.key, entry.value, Math::min)
}
minimum
}
}!!
// Now compute the new consistent state
val newNodeDiffQueues = newDiffQueues[node]
val newNodeVault = newConsistentVault.getOrPut(node) { HashMap() }
minimumMatches.forEach { originator, consumedTxs ->
if (consumedTxs > 0) {
newNodeDiffQueues!!
for (i in 0 .. consumedTxs - 1) {
val (issuer, quantity) = newNodeDiffQueues[originator]!!.removeAt(0)
newNodeVault.put(issuer, (newNodeVault[issuer] ?: 0L) + quantity)
}
}
}
} else {
require(baseState == null)
}
}
Pair(newConsistentVault, newDiffQueues)
}
CrossCashState(consistentVaults, diffQueues)
},
isConsistent = { state ->
state.diffQueues.all { it.value.all { it.value.isEmpty() } }
}
)
/**
* @param searchedState The state to search for
* @param baseState The consistent base knowledge
* @param diffQueues The queues to interleave
* @return List of (node -> number of txs consumed) maps, each of which results in [searchedState].
*/
private fun <A> searchForState(
searchedState: Map<A, Long>,
baseState: Map<A, Long>,
diffQueues: Map<A, List<Pair<A, Long>>>
): List<Map<A, Int>> {
val diffQueuesList = diffQueues.toList()
fun searchForStateHelper(state: Map<A, Long>, diffIx: Int, consumedTxs: HashMap<A, Int>, matched: ArrayList<Map<A, Int>>) {
if (diffIx >= diffQueuesList.size) {
if (state == searchedState) {
matched.add(HashMap(consumedTxs))
}
} else {
val (originator, queue) = diffQueuesList[diffIx]
consumedTxs[originator] = 0
searchForStateHelper(state, diffIx + 1, consumedTxs, matched)
var currentState = state
queue.forEachIndexed { index, pair ->
consumedTxs[originator] = index + 1
// Prune search if we exceeded the searched quantity anyway
currentState = applyDiff(pair.first, pair.second, currentState, searchedState) ?: return
searchForStateHelper(currentState, diffIx + 1, consumedTxs, matched)
}
}
}
val matched = ArrayList<Map<A, Int>>()
searchForStateHelper(baseState, 0, HashMap(), matched)
return matched
}
// Returns null if we exceeded the searched quantity.
private fun <A> applyDiff(
issuer: A,
quantity: Long,
state: Map<A, Long>,
searchedState: Map<A, Long>
): Map<A, Long>? {
val newState = HashMap(state)
val newQuantity = (newState[issuer] ?: 0L) + quantity
val searchedQuantity = searchedState[issuer]
if (searchedQuantity == null || newQuantity > searchedQuantity) {
return null
}
newState.put(issuer, newQuantity)
return newState
}

View File

@ -0,0 +1,47 @@
package net.corda.loadtest.tests
import net.corda.client.mock.Generator
import net.corda.client.mock.generateAmount
import net.corda.client.mock.pickOne
import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.Party
import net.corda.core.serialization.OpaqueBytes
import net.corda.protocols.CashCommand
import java.util.*
fun generateIssue(
max: Long,
currency: Currency,
notary: Party,
possibleRecipients: List<Party>
): Generator<CashCommand.IssueCash> {
return generateAmount(0, max, Generator.pure(currency)).combine(
Generator.pure(OpaqueBytes.of(0)),
Generator.pickOne(possibleRecipients)
) { amount, ref, recipient ->
CashCommand.IssueCash(amount, ref, recipient, notary)
}
}
fun generateMove(
max: Long,
currency: Currency,
issuer: Party,
possibleRecipients: List<Party>
): Generator<CashCommand.PayCash> {
return generateAmount(1, max, Generator.pure(Issued(PartyAndReference(issuer, OpaqueBytes.of(0)), currency))).combine(
Generator.pickOne(possibleRecipients)
) { amount, recipient ->
CashCommand.PayCash(amount, recipient)
}
}
fun generateExit(
max: Long,
currency: Currency
): Generator<CashCommand.ExitCash> {
return generateAmount(1, max, Generator.pure(currency)).map { amount ->
CashCommand.ExitCash(amount, OpaqueBytes.of(0))
}
}

View File

@ -0,0 +1,109 @@
package net.corda.loadtest.tests
import de.danielbechler.diff.ObjectDifferFactory
import net.corda.client.mock.Generator
import net.corda.client.mock.pickOne
import net.corda.client.mock.replicatePoisson
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.USD
import net.corda.core.crypto.Party
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeHandle
import net.corda.node.services.messaging.startProtocol
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import net.corda.protocols.CashProtocolResult
import org.slf4j.LoggerFactory
import java.util.*
private val log = LoggerFactory.getLogger("SelfIssue")
// DOCS START 1
data class SelfIssueCommand(
val command: CashCommand.IssueCash,
val node: NodeHandle
)
data class SelfIssueState(
val vaultsSelfIssued: Map<Party, Long>
) {
fun copyVaults(): HashMap<Party, Long> {
return HashMap(vaultsSelfIssued)
}
}
val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
// DOCS END 1
"Self issuing cash randomly",
generate = { state, parallelism ->
val generateIssue = Generator.pickOne(simpleNodes).bind { node: NodeHandle ->
generateIssue(1000, USD, notary.info.notaryIdentity, listOf(node.info.legalIdentity)).map {
SelfIssueCommand(it, node)
}
}
Generator.replicatePoisson(parallelism.toDouble(), generateIssue).bind {
// We need to generate at least one
if (it.isEmpty()) {
Generator.sequence(listOf(generateIssue))
} else {
Generator.pure(it)
}
}
},
interpret = { state, command ->
val vaults = state.copyVaults()
val issuer = command.node.info.legalIdentity
vaults.put(issuer, (vaults[issuer] ?: 0L) + command.command.amount.quantity)
SelfIssueState(vaults)
},
execute = { command ->
val result = command.node.connection.proxy.startProtocol(::CashProtocol, command.command).returnValue.toBlocking().first()
when (result) {
is CashProtocolResult.Success -> {
log.info(result.message)
}
is CashProtocolResult.Failed -> {
log.error(result.message)
}
}
},
gatherRemoteState = { previousState ->
val selfIssueVaults = HashMap<Party, Long>()
simpleNodes.forEach { node ->
val vault = node.connection.proxy.vaultAndUpdates().first
vault.forEach {
val state = it.state.data
if (state is Cash.State) {
val issuer = state.amount.token.issuer.party
if (issuer == node.info.legalIdentity) {
selfIssueVaults.put(issuer, (selfIssueVaults[issuer] ?: 0L) + state.amount.quantity)
}
}
}
}
log.info("$selfIssueVaults")
if (previousState != null) {
val diff = ObjectDifferFactory.getInstance().compare(previousState.vaultsSelfIssued, selfIssueVaults)
if (!diff.isUntouched) {
var diffString = ""
diff.visit { node, visit ->
if (node.isChanged && node.children.all { !it.isChanged }) {
diffString += "${node.propertyPath}: simulated[${node.canonicalGet(previousState.vaultsSelfIssued)}], actual[${node.canonicalGet(selfIssueVaults)}]\n"
}
}
throw Exception(
"Simulated state diverged from actual state" +
"\nSimulated state:\n${previousState.vaultsSelfIssued}" +
"\nActual state:\n$selfIssueVaults" +
"\nDiff:\n$diffString"
)
}
}
SelfIssueState(selfIssueVaults)
}
)

View File

@ -0,0 +1,7 @@
# nodeHosts = ["host1", "host2"]
# sshUser = "someusername", by default it uses the System property "user.name"
localCertificatesBaseDirectory = "build/load-test/certificates"
localTunnelStartingPort = 10000
remoteNodeDirectory = "/opt/r3cev"
remoteMessagingPort = 31337
remoteSystemdServiceName = "r3cev-node"