Tool for prototyping and load testing notary implementations (#823)

This commit is contained in:
Andrius Dagys 2018-05-17 20:02:19 +01:00 committed by GitHub
parent 18393f27dd
commit 1c575b5364
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 444 additions and 0 deletions

View File

@ -56,6 +56,7 @@ include 'tools:explorer'
include 'tools:explorer:capsule'
include 'tools:demobench'
include 'tools:loadtest'
include 'tools:notarytest'
include 'tools:graphs'
include 'tools:bootstrapper'
include 'tools:dbmigration'

View File

@ -0,0 +1,7 @@
# Notary test tool
Provides building blocks for experimenting and load testing new notary implementations:
* Deploying a custom notary service locally.
* Invoking a load generating flow on a notary node.
No scripts for deploying nodes to a remote test cluster are provided.

View File

@ -0,0 +1,61 @@
import net.corda.plugins.Cordform
apply plugin: 'java'
apply plugin: 'kotlin'
apply plugin: 'idea'
apply plugin: 'net.corda.plugins.quasar-utils'
apply plugin: 'net.corda.plugins.publish-utils'
apply plugin: 'net.corda.plugins.cordapp'
apply plugin: 'net.corda.plugins.cordformation'
apply plugin: 'maven-publish'
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version"
testCompile "junit:junit:$junit_version"
// Corda integration dependencies
cordaCompile project(path: ":node:capsule", configuration: 'runtimeArtifacts')
cordaCompile project(':core')
cordaCompile project(':client:rpc')
cordaCompile project(':node-driver')
compile project(':client:mock')
compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6'
compile group: 'io.dropwizard.metrics', name: 'metrics-graphite', version: '3.2.5'
}
idea {
module {
downloadJavadoc = true // defaults to false
downloadSources = true
}
}
publishing {
publications {
jarAndSources(MavenPublication) {
from components.java
artifactId 'notarytest'
artifact sourceJar
artifact javadocJar
}
}
}
task deployJDBC(type: Cordform, dependsOn: 'jar') {
definitionClass = 'net.corda.notarytest.JDBCNotaryCordform'
}
task runTest(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'net.corda.notarytest.MainKt'
}
jar {
manifest {
attributes(
'Automatic-Module-Name': 'net.corda.notarytest'
)
}
}

View File

@ -0,0 +1,82 @@
package net.corda.notarytest
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformDefinition
import net.corda.cordform.CordformNode
import net.corda.core.identity.CordaX500Name
import net.corda.node.services.Permissions
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.testing.node.User
import net.corda.testing.node.internal.demorun.*
fun main(args: Array<String>) = JDBCNotaryCordform().nodeRunner().deployAndRunNodes()
internal val notaryDemoUser = User("demou", "demop", setOf(Permissions.all()))
class JDBCNotaryCordform : CordformDefinition() {
private val clusterName = CordaX500Name("Mysql Notary", "Zurich", "CH")
private val notaryNames = createNotaryNames(3)
private fun createNotaryNames(clusterSize: Int) = (0 until clusterSize).map {
CordaX500Name("Notary Service $it", "Zurich", "CH")
}
init {
fun notaryNode(index: Int, configure: CordformNode.() -> Unit) = node {
name(notaryNames[index])
notary(
NotaryConfig(
validating = true,
custom = true
)
)
extraConfig = mapOf("custom" to
mapOf(
"mysql" to mapOf(
"dataSource" to mapOf(
// Update the db address/port accordingly
"jdbcUrl" to "jdbc:mysql://localhost:330${6 + index}/corda?rewriteBatchedStatements=true&useSSL=false&failOverReadOnly=false",
"username" to "corda",
"password" to "awesome",
"autoCommit" to "false")
),
"graphiteAddress" to "performance-metrics.northeurope.cloudapp.azure.com:2004"
)
)
configure()
}
notaryNode(0) {
p2pPort(10009)
rpcSettings {
address("localhost:10010")
adminAddress("localhost:10110")
}
rpcUsers(notaryDemoUser)
}
notaryNode(1) {
p2pPort(10013)
rpcSettings {
address("localhost:10014")
adminAddress("localhost:10114")
}
rpcUsers(notaryDemoUser)
}
notaryNode(2) {
p2pPort(10017)
rpcSettings {
address("localhost:10018")
adminAddress("localhost:10118")
}
rpcUsers(notaryDemoUser)
}
}
override fun setup(context: CordformContext) {
DevIdentityGenerator.generateDistributedNotarySingularIdentity(
notaryNames.map { context.baseDirectory(it.toString()) },
clusterName
)
}
}

View File

@ -0,0 +1,67 @@
package net.corda.notarytest
import com.google.common.base.Stopwatch
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.notarytest.service.JDBCLoadTestFlow
import java.io.File
import java.io.PrintWriter
import java.time.Instant
import java.util.concurrent.TimeUnit
/** The number of test flows to run on each notary node */
const val TEST_RUNS = 1
/** Total number of transactions to generate and notarise. */
const val TRANSACTION_COUNT = 10000000
/** Number of transactions to submit before awaiting completion. */
const val BATCH_SIZE = 1000
fun main(args: Array<String>) {
// Provide a list of notary node addresses to invoke the load generation flow on
val addresses = listOf(
NetworkHostAndPort("localhost", 10010),
NetworkHostAndPort("localhost", 11014),
NetworkHostAndPort("localhost", 11018)
)
addresses.parallelStream().forEach {
val node = it
println("Connecting to the recipient node ($node)")
CordaRPCClient(it).start(notaryDemoUser.username, notaryDemoUser.password).use {
println(it.proxy.nodeInfo())
val totalTime = Stopwatch.createStarted()
val durations = run(it.proxy, 1)
totalTime.stop()
val totalTx = TEST_RUNS * TRANSACTION_COUNT
println("Total duration for $totalTx transactions: ${totalTime.elapsed(TimeUnit.MILLISECONDS)} ms")
println("Average tx/s: ${totalTx.toDouble() / totalTime.elapsed(TimeUnit.MILLISECONDS).toDouble() * 1000}")
// Uncomment to generate a CSV report
// printCSV(node, durations, TEST_RUNS, BATCH_SIZE)
}
}
}
private fun run(rpc: CordaRPCOps, inputStateCount: Int? = null): List<Long> {
return (1..TEST_RUNS).map { i ->
val timer = Stopwatch.createStarted()
val commitDuration = rpc.startFlow(::JDBCLoadTestFlow, TRANSACTION_COUNT, BATCH_SIZE, inputStateCount).returnValue.get()
val flowDuration = timer.stop().elapsed(TimeUnit.MILLISECONDS)
println("#$i: Duration: $flowDuration ms, commit duration: $commitDuration ms")
flowDuration
}
}
private fun printCSV(node: NetworkHostAndPort, durations: List<Long>, testRuns: Int, batchSize: Int) {
val pw = PrintWriter(File("notarytest-${Instant.now()}-${node.host}${node.port}-${testRuns}x$batchSize.csv"))
val sb = StringBuilder()
sb.append("$testRuns, $batchSize")
sb.append('\n')
sb.append(durations.joinToString())
pw.write(sb.toString())
pw.close()
}

View File

@ -0,0 +1,86 @@
package net.corda.notarytest.flows
import co.paralleluniverse.fibers.Suspendable
import com.google.common.base.Stopwatch
import net.corda.client.mock.Generator
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotarisationRequest
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.core.internal.notary.AsyncUniquenessProvider
import net.corda.core.internal.notary.generateSignature
import java.math.BigInteger
import java.util.*
import java.util.concurrent.TimeUnit
@StartableByRPC
open class AsyncLoadTestFlow<T : AsyncCFTNotaryService>(
private val serviceType: Class<T>,
private val transactionCount: Int,
private val batchSize: Int = 100,
/**
* Number of input states per transaction.
* If *null*, variable sized transactions will be created with median size 4.
*/
private val inputStateCount: Int? = null
) : FlowLogic<Long>() {
private val keyPairGenerator = Generator.long().map { entropyToKeyPair(BigInteger.valueOf(it)) }
private val publicKeyGeneratorSingle = Generator.pure(generateKeyPair().public)
private val partyGenerator: Generator<Party> = Generator.int().combine(publicKeyGeneratorSingle) { n, key ->
Party(CordaX500Name(organisation = "Party$n", locality = "London", country = "GB"), key)
}
private val txIdGenerator = Generator.bytes(32).map { SecureHash.sha256(it) }
private val stateRefGenerator = txIdGenerator.combine(Generator.intRange(0, 10)) { id, pos -> StateRef(id, pos) }
@Suspendable
override fun call(): Long {
var current = 0
var totalDuration = 0L
while (current < transactionCount) {
val batch = Math.min(batchSize, transactionCount - current)
totalDuration += runBatch(batch)
current += batch
}
return totalDuration
}
private val random = SplittableRandom()
private fun runBatch(transactionCount: Int): Long {
val stopwatch = Stopwatch.createStarted()
val futures = mutableListOf<CordaFuture<AsyncUniquenessProvider.Result>>()
val service = serviceHub.cordaService(serviceType)
for (i in 1..batchSize) {
val txId: SecureHash = txIdGenerator.generateOrFail(random)
val callerParty = partyGenerator.generateOrFail(random)
val inputGenerator = if (inputStateCount == null) {
Generator.replicatePoisson(4.0, stateRefGenerator, true)
} else {
Generator.replicate(inputStateCount, stateRefGenerator)
}
val inputs = inputGenerator.generateOrFail(random)
val requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
futures += AsyncCFTNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature, null).execute()
}
futures.transpose().get()
stopwatch.stop()
val duration = stopwatch.elapsed(TimeUnit.MILLISECONDS)
logger.info("Committed $transactionCount transactions in $duration ms, avg ${duration.toDouble() / transactionCount} ms")
return duration
}
}

View File

@ -0,0 +1,67 @@
package net.corda.notarytest.flows
import co.paralleluniverse.fibers.Suspendable
import com.google.common.base.Stopwatch
import net.corda.client.mock.Generator
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotarisationRequest
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.notary.TrustedAuthorityNotaryService
import net.corda.core.internal.notary.generateSignature
import java.math.BigInteger
import java.util.*
import java.util.concurrent.TimeUnit
@StartableByRPC
open class LoadTestFlow<T : TrustedAuthorityNotaryService>(
private val serviceType: Class<T>,
private val transactionCount: Int,
/**
* Number of input states per transaction.
* If *null*, variable sized transactions will be created with median size 4.
*/
private val inputStateCount: Int?
) : FlowLogic<Long>() {
private val keyPairGenerator = Generator.long().map { entropyToKeyPair(BigInteger.valueOf(it)) }
private val publicKeyGenerator = keyPairGenerator.map { it.public }
private val publicKeyGenerator2 = Generator.pure(generateKeyPair().public)
private val partyGenerator: Generator<Party> = Generator.int().combine(publicKeyGenerator2) { n, key ->
Party(CordaX500Name(organisation = "Party$n", locality = "London", country = "GB"), key)
}
private val txIdGenerator = Generator.bytes(32).map { SecureHash.sha256(it) }
private val stateRefGenerator = Generator.intRange(0, 10).map { StateRef(SecureHash.randomSHA256(), it) }
@Suspendable
override fun call(): Long {
val stopwatch = Stopwatch.createStarted()
val random = SplittableRandom()
for (i in 1..transactionCount) {
val txId: SecureHash = txIdGenerator.generateOrFail(random)
val callerParty = partyGenerator.generateOrFail(random)
val inputGenerator = if (inputStateCount == null) {
Generator.replicatePoisson(4.0, stateRefGenerator, true)
} else {
Generator.replicate(inputStateCount, stateRefGenerator)
}
val inputs = inputGenerator.generateOrFail(random)
val localStopwatch = Stopwatch.createStarted()
val sig = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
serviceHub.cordaService(serviceType).commitInputStates(inputs, txId, callerParty, sig, null)
logger.info("Committed a transaction ${txId.toString().take(10)} with ${inputs.size} inputs in ${localStopwatch.stop().elapsed(TimeUnit.MILLISECONDS)} ms")
}
stopwatch.stop()
val duration = stopwatch.elapsed(TimeUnit.MILLISECONDS)
logger.info("Committed $transactionCount transactions in $duration, avg ${duration.toDouble() / transactionCount} ms")
return duration
}
}

View File

@ -0,0 +1,73 @@
package net.corda.notarytest.service
import com.codahale.metrics.MetricFilter
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.graphite.GraphiteReporter
import com.codahale.metrics.graphite.PickledGraphite
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.notary.AsyncCFTNotaryService
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.MySQLConfiguration
import net.corda.node.services.transactions.MySQLUniquenessProvider
import net.corda.node.services.transactions.NonValidatingNotaryFlow
import net.corda.nodeapi.internal.config.parseAs
import net.corda.notarytest.flows.AsyncLoadTestFlow
import java.net.InetAddress
import java.net.InetSocketAddress
import java.nio.file.Paths
import java.security.PublicKey
import java.util.concurrent.TimeUnit
@CordaService
class JDBCNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() {
private val appConfig = ConfigHelper.loadConfig(Paths.get(".")).getConfig("custom")
override val asyncUniquenessProvider: MySQLUniquenessProvider = createUniquenessProvider()
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = NonValidatingNotaryFlow(otherPartySession, this)
override fun start() {
asyncUniquenessProvider.createTable()
}
override fun stop() {
asyncUniquenessProvider.stop()
}
private fun createMetricsRegistry(): MetricRegistry {
val graphiteAddress = appConfig.getString("graphiteAddress").let { NetworkHostAndPort.parse(it) }
val hostName = InetAddress.getLocalHost().hostName.replace(".", "_")
val nodeName = services.myInfo.legalIdentities.first().name.organisation
.toLowerCase()
.replace(" ", "_")
.replace(".", "_")
val pickledGraphite = PickledGraphite(
InetSocketAddress(graphiteAddress.host, graphiteAddress.port)
)
val metrics = MetricRegistry()
GraphiteReporter.forRegistry(metrics)
.prefixedWith("corda.$hostName.$nodeName")
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.build(pickledGraphite)
.start(10, TimeUnit.SECONDS)
return metrics
}
private fun createUniquenessProvider(): MySQLUniquenessProvider {
val mysqlConfig = appConfig.getConfig("mysql").parseAs<MySQLConfiguration>()
return MySQLUniquenessProvider(createMetricsRegistry(), services.clock, mysqlConfig)
}
}
@StartableByRPC
class JDBCLoadTestFlow(transactionCount: Int,
batchSize: Int,
inputStateCount: Int?
) : AsyncLoadTestFlow<JDBCNotaryService>(JDBCNotaryService::class.java, transactionCount, batchSize, inputStateCount)