diff --git a/settings.gradle b/settings.gradle index 82ada2052d..fe2770a53a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -56,6 +56,7 @@ include 'tools:explorer' include 'tools:explorer:capsule' include 'tools:demobench' include 'tools:loadtest' +include 'tools:notarytest' include 'tools:graphs' include 'tools:bootstrapper' include 'tools:dbmigration' diff --git a/tools/notarytest/README.md b/tools/notarytest/README.md new file mode 100644 index 0000000000..d369668d5e --- /dev/null +++ b/tools/notarytest/README.md @@ -0,0 +1,7 @@ +# Notary test tool + +Provides building blocks for experimenting and load testing new notary implementations: +* Deploying a custom notary service locally. +* Invoking a load generating flow on a notary node. + +No scripts for deploying nodes to a remote test cluster are provided. \ No newline at end of file diff --git a/tools/notarytest/build.gradle b/tools/notarytest/build.gradle new file mode 100644 index 0000000000..98191d76bd --- /dev/null +++ b/tools/notarytest/build.gradle @@ -0,0 +1,61 @@ +import net.corda.plugins.Cordform + +apply plugin: 'java' +apply plugin: 'kotlin' +apply plugin: 'idea' +apply plugin: 'net.corda.plugins.quasar-utils' +apply plugin: 'net.corda.plugins.publish-utils' +apply plugin: 'net.corda.plugins.cordapp' +apply plugin: 'net.corda.plugins.cordformation' +apply plugin: 'maven-publish' + +dependencies { + compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version" + testCompile "junit:junit:$junit_version" + + // Corda integration dependencies + cordaCompile project(path: ":node:capsule", configuration: 'runtimeArtifacts') + cordaCompile project(':core') + cordaCompile project(':client:rpc') + cordaCompile project(':node-driver') + compile project(':client:mock') + compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6' + compile group: 'io.dropwizard.metrics', name: 'metrics-graphite', version: '3.2.5' + +} + +idea { + module { + downloadJavadoc = true // defaults to false + downloadSources = true + } +} + +publishing { + publications { + jarAndSources(MavenPublication) { + from components.java + artifactId 'notarytest' + + artifact sourceJar + artifact javadocJar + } + } +} + +task deployJDBC(type: Cordform, dependsOn: 'jar') { + definitionClass = 'net.corda.notarytest.JDBCNotaryCordform' +} + +task runTest(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + main = 'net.corda.notarytest.MainKt' +} + +jar { + manifest { + attributes( + 'Automatic-Module-Name': 'net.corda.notarytest' + ) + } +} diff --git a/tools/notarytest/src/main/kotlin/net/corda/notarytest/JDBCNotaryCordform.kt b/tools/notarytest/src/main/kotlin/net/corda/notarytest/JDBCNotaryCordform.kt new file mode 100644 index 0000000000..47489a1cc3 --- /dev/null +++ b/tools/notarytest/src/main/kotlin/net/corda/notarytest/JDBCNotaryCordform.kt @@ -0,0 +1,82 @@ +package net.corda.notarytest + +import net.corda.cordform.CordformContext +import net.corda.cordform.CordformDefinition +import net.corda.cordform.CordformNode +import net.corda.core.identity.CordaX500Name +import net.corda.node.services.Permissions +import net.corda.node.services.config.NotaryConfig +import net.corda.nodeapi.internal.DevIdentityGenerator +import net.corda.testing.node.User +import net.corda.testing.node.internal.demorun.* + +fun main(args: Array) = JDBCNotaryCordform().nodeRunner().deployAndRunNodes() + +internal val notaryDemoUser = User("demou", "demop", setOf(Permissions.all())) + +class JDBCNotaryCordform : CordformDefinition() { + private val clusterName = CordaX500Name("Mysql Notary", "Zurich", "CH") + private val notaryNames = createNotaryNames(3) + + private fun createNotaryNames(clusterSize: Int) = (0 until clusterSize).map { + CordaX500Name("Notary Service $it", "Zurich", "CH") + } + + init { + fun notaryNode(index: Int, configure: CordformNode.() -> Unit) = node { + name(notaryNames[index]) + notary( + NotaryConfig( + validating = true, + custom = true + ) + ) + extraConfig = mapOf("custom" to + mapOf( + "mysql" to mapOf( + "dataSource" to mapOf( + // Update the db address/port accordingly + "jdbcUrl" to "jdbc:mysql://localhost:330${6 + index}/corda?rewriteBatchedStatements=true&useSSL=false&failOverReadOnly=false", + "username" to "corda", + "password" to "awesome", + "autoCommit" to "false") + ), + "graphiteAddress" to "performance-metrics.northeurope.cloudapp.azure.com:2004" + ) + ) + configure() + } + + notaryNode(0) { + p2pPort(10009) + rpcSettings { + address("localhost:10010") + adminAddress("localhost:10110") + } + rpcUsers(notaryDemoUser) + } + notaryNode(1) { + p2pPort(10013) + rpcSettings { + address("localhost:10014") + adminAddress("localhost:10114") + } + rpcUsers(notaryDemoUser) + } + notaryNode(2) { + p2pPort(10017) + rpcSettings { + address("localhost:10018") + adminAddress("localhost:10118") + } + rpcUsers(notaryDemoUser) + } + } + + override fun setup(context: CordformContext) { + DevIdentityGenerator.generateDistributedNotarySingularIdentity( + notaryNames.map { context.baseDirectory(it.toString()) }, + clusterName + ) + } +} \ No newline at end of file diff --git a/tools/notarytest/src/main/kotlin/net/corda/notarytest/Main.kt b/tools/notarytest/src/main/kotlin/net/corda/notarytest/Main.kt new file mode 100644 index 0000000000..b966f6fbd6 --- /dev/null +++ b/tools/notarytest/src/main/kotlin/net/corda/notarytest/Main.kt @@ -0,0 +1,67 @@ +package net.corda.notarytest + +import com.google.common.base.Stopwatch +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.notarytest.service.JDBCLoadTestFlow +import java.io.File +import java.io.PrintWriter +import java.time.Instant +import java.util.concurrent.TimeUnit + +/** The number of test flows to run on each notary node */ +const val TEST_RUNS = 1 +/** Total number of transactions to generate and notarise. */ +const val TRANSACTION_COUNT = 10000000 +/** Number of transactions to submit before awaiting completion. */ +const val BATCH_SIZE = 1000 + +fun main(args: Array) { + // Provide a list of notary node addresses to invoke the load generation flow on + val addresses = listOf( + NetworkHostAndPort("localhost", 10010), + NetworkHostAndPort("localhost", 11014), + NetworkHostAndPort("localhost", 11018) + ) + + addresses.parallelStream().forEach { + val node = it + println("Connecting to the recipient node ($node)") + + CordaRPCClient(it).start(notaryDemoUser.username, notaryDemoUser.password).use { + println(it.proxy.nodeInfo()) + val totalTime = Stopwatch.createStarted() + val durations = run(it.proxy, 1) + totalTime.stop() + + val totalTx = TEST_RUNS * TRANSACTION_COUNT + println("Total duration for $totalTx transactions: ${totalTime.elapsed(TimeUnit.MILLISECONDS)} ms") + println("Average tx/s: ${totalTx.toDouble() / totalTime.elapsed(TimeUnit.MILLISECONDS).toDouble() * 1000}") + + // Uncomment to generate a CSV report + // printCSV(node, durations, TEST_RUNS, BATCH_SIZE) + } + } +} + +private fun run(rpc: CordaRPCOps, inputStateCount: Int? = null): List { + return (1..TEST_RUNS).map { i -> + val timer = Stopwatch.createStarted() + val commitDuration = rpc.startFlow(::JDBCLoadTestFlow, TRANSACTION_COUNT, BATCH_SIZE, inputStateCount).returnValue.get() + val flowDuration = timer.stop().elapsed(TimeUnit.MILLISECONDS) + println("#$i: Duration: $flowDuration ms, commit duration: $commitDuration ms") + flowDuration + } +} + +private fun printCSV(node: NetworkHostAndPort, durations: List, testRuns: Int, batchSize: Int) { + val pw = PrintWriter(File("notarytest-${Instant.now()}-${node.host}${node.port}-${testRuns}x$batchSize.csv")) + val sb = StringBuilder() + sb.append("$testRuns, $batchSize") + sb.append('\n') + sb.append(durations.joinToString()) + pw.write(sb.toString()) + pw.close() +} \ No newline at end of file diff --git a/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/AsyncLoadTestFlow.kt b/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/AsyncLoadTestFlow.kt new file mode 100644 index 0000000000..c6d272637c --- /dev/null +++ b/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/AsyncLoadTestFlow.kt @@ -0,0 +1,86 @@ +package net.corda.notarytest.flows + +import co.paralleluniverse.fibers.Suspendable +import com.google.common.base.Stopwatch +import net.corda.client.mock.Generator +import net.corda.core.concurrent.CordaFuture +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.entropyToKeyPair +import net.corda.core.crypto.generateKeyPair +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.NotarisationRequest +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.transpose +import net.corda.core.internal.notary.AsyncCFTNotaryService +import net.corda.core.internal.notary.AsyncUniquenessProvider +import net.corda.core.internal.notary.generateSignature +import java.math.BigInteger +import java.util.* +import java.util.concurrent.TimeUnit + +@StartableByRPC +open class AsyncLoadTestFlow( + private val serviceType: Class, + private val transactionCount: Int, + private val batchSize: Int = 100, + /** + * Number of input states per transaction. + * If *null*, variable sized transactions will be created with median size 4. + */ + private val inputStateCount: Int? = null +) : FlowLogic() { + private val keyPairGenerator = Generator.long().map { entropyToKeyPair(BigInteger.valueOf(it)) } + private val publicKeyGeneratorSingle = Generator.pure(generateKeyPair().public) + private val partyGenerator: Generator = Generator.int().combine(publicKeyGeneratorSingle) { n, key -> + Party(CordaX500Name(organisation = "Party$n", locality = "London", country = "GB"), key) + } + private val txIdGenerator = Generator.bytes(32).map { SecureHash.sha256(it) } + private val stateRefGenerator = txIdGenerator.combine(Generator.intRange(0, 10)) { id, pos -> StateRef(id, pos) } + + @Suspendable + override fun call(): Long { + var current = 0 + var totalDuration = 0L + while (current < transactionCount) { + val batch = Math.min(batchSize, transactionCount - current) + totalDuration += runBatch(batch) + current += batch + } + return totalDuration + } + + private val random = SplittableRandom() + + private fun runBatch(transactionCount: Int): Long { + val stopwatch = Stopwatch.createStarted() + val futures = mutableListOf>() + + val service = serviceHub.cordaService(serviceType) + + for (i in 1..batchSize) { + val txId: SecureHash = txIdGenerator.generateOrFail(random) + val callerParty = partyGenerator.generateOrFail(random) + val inputGenerator = if (inputStateCount == null) { + Generator.replicatePoisson(4.0, stateRefGenerator, true) + } else { + Generator.replicate(inputStateCount, stateRefGenerator) + } + val inputs = inputGenerator.generateOrFail(random) + val requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub) + + futures += AsyncCFTNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature, null).execute() + } + + futures.transpose().get() + + stopwatch.stop() + val duration = stopwatch.elapsed(TimeUnit.MILLISECONDS) + logger.info("Committed $transactionCount transactions in $duration ms, avg ${duration.toDouble() / transactionCount} ms") + + return duration + } +} \ No newline at end of file diff --git a/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/LoadTestFlow.kt b/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/LoadTestFlow.kt new file mode 100644 index 0000000000..b0b78e084e --- /dev/null +++ b/tools/notarytest/src/main/kotlin/net/corda/notarytest/flows/LoadTestFlow.kt @@ -0,0 +1,67 @@ +package net.corda.notarytest.flows + +import co.paralleluniverse.fibers.Suspendable +import com.google.common.base.Stopwatch +import net.corda.client.mock.Generator +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.entropyToKeyPair +import net.corda.core.crypto.generateKeyPair +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.NotarisationRequest +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.notary.TrustedAuthorityNotaryService +import net.corda.core.internal.notary.generateSignature +import java.math.BigInteger +import java.util.* +import java.util.concurrent.TimeUnit + +@StartableByRPC +open class LoadTestFlow( + private val serviceType: Class, + private val transactionCount: Int, + /** + * Number of input states per transaction. + * If *null*, variable sized transactions will be created with median size 4. + */ + private val inputStateCount: Int? +) : FlowLogic() { + private val keyPairGenerator = Generator.long().map { entropyToKeyPair(BigInteger.valueOf(it)) } + private val publicKeyGenerator = keyPairGenerator.map { it.public } + + private val publicKeyGenerator2 = Generator.pure(generateKeyPair().public) + private val partyGenerator: Generator = Generator.int().combine(publicKeyGenerator2) { n, key -> + Party(CordaX500Name(organisation = "Party$n", locality = "London", country = "GB"), key) + } + private val txIdGenerator = Generator.bytes(32).map { SecureHash.sha256(it) } + private val stateRefGenerator = Generator.intRange(0, 10).map { StateRef(SecureHash.randomSHA256(), it) } + + @Suspendable + override fun call(): Long { + val stopwatch = Stopwatch.createStarted() + val random = SplittableRandom() + + for (i in 1..transactionCount) { + val txId: SecureHash = txIdGenerator.generateOrFail(random) + val callerParty = partyGenerator.generateOrFail(random) + val inputGenerator = if (inputStateCount == null) { + Generator.replicatePoisson(4.0, stateRefGenerator, true) + } else { + Generator.replicate(inputStateCount, stateRefGenerator) + } + val inputs = inputGenerator.generateOrFail(random) + val localStopwatch = Stopwatch.createStarted() + val sig = NotarisationRequest(inputs, txId).generateSignature(serviceHub) + serviceHub.cordaService(serviceType).commitInputStates(inputs, txId, callerParty, sig, null) + logger.info("Committed a transaction ${txId.toString().take(10)} with ${inputs.size} inputs in ${localStopwatch.stop().elapsed(TimeUnit.MILLISECONDS)} ms") + } + + stopwatch.stop() + val duration = stopwatch.elapsed(TimeUnit.MILLISECONDS) + logger.info("Committed $transactionCount transactions in $duration, avg ${duration.toDouble() / transactionCount} ms") + + return duration + } +} \ No newline at end of file diff --git a/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt b/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt new file mode 100644 index 0000000000..b81ec0863a --- /dev/null +++ b/tools/notarytest/src/main/kotlin/net/corda/notarytest/service/JDBCNotaryService.kt @@ -0,0 +1,73 @@ +package net.corda.notarytest.service + +import com.codahale.metrics.MetricFilter +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.graphite.GraphiteReporter +import com.codahale.metrics.graphite.PickledGraphite +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.StartableByRPC +import net.corda.core.internal.notary.AsyncCFTNotaryService +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.services.config.ConfigHelper +import net.corda.node.services.config.MySQLConfiguration +import net.corda.node.services.transactions.MySQLUniquenessProvider +import net.corda.node.services.transactions.NonValidatingNotaryFlow +import net.corda.nodeapi.internal.config.parseAs +import net.corda.notarytest.flows.AsyncLoadTestFlow +import java.net.InetAddress +import java.net.InetSocketAddress +import java.nio.file.Paths +import java.security.PublicKey +import java.util.concurrent.TimeUnit + +@CordaService +class JDBCNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : AsyncCFTNotaryService() { + private val appConfig = ConfigHelper.loadConfig(Paths.get(".")).getConfig("custom") + + override val asyncUniquenessProvider: MySQLUniquenessProvider = createUniquenessProvider() + + override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = NonValidatingNotaryFlow(otherPartySession, this) + + override fun start() { + asyncUniquenessProvider.createTable() + } + + override fun stop() { + asyncUniquenessProvider.stop() + } + + private fun createMetricsRegistry(): MetricRegistry { + val graphiteAddress = appConfig.getString("graphiteAddress").let { NetworkHostAndPort.parse(it) } + val hostName = InetAddress.getLocalHost().hostName.replace(".", "_") + val nodeName = services.myInfo.legalIdentities.first().name.organisation + .toLowerCase() + .replace(" ", "_") + .replace(".", "_") + val pickledGraphite = PickledGraphite( + InetSocketAddress(graphiteAddress.host, graphiteAddress.port) + ) + val metrics = MetricRegistry() + GraphiteReporter.forRegistry(metrics) + .prefixedWith("corda.$hostName.$nodeName") + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .filter(MetricFilter.ALL) + .build(pickledGraphite) + .start(10, TimeUnit.SECONDS) + return metrics + } + + private fun createUniquenessProvider(): MySQLUniquenessProvider { + val mysqlConfig = appConfig.getConfig("mysql").parseAs() + return MySQLUniquenessProvider(createMetricsRegistry(), services.clock, mysqlConfig) + } +} + +@StartableByRPC +class JDBCLoadTestFlow(transactionCount: Int, + batchSize: Int, + inputStateCount: Int? +) : AsyncLoadTestFlow(JDBCNotaryService::class.java, transactionCount, batchSize, inputStateCount) \ No newline at end of file