From c396b80afe84cfd56092371d9c84ffa93d82091d Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Tue, 10 Jul 2018 10:05:07 +0100 Subject: [PATCH] ENT-1967: Illustration for Byteman library can be used in Node integration test. (#1204) * ENT-1967: Enable byteman library * ENT-1967: Add an integration test to experiment with Byteman. This needs to be running with: `-Dexperimental.test.enable` As in: `gradlew -Dexperimental.test.enable integrationTest` * ENT-1967: Modify Node driver to allow for optional instrumentation and use it in the integration test * ENT-1967: Rely on port allocation * ENT-1967: Install the rule that works * ENT-1967: Trying to introduce counter rule (doesn't work) * ENT-1967: Install rules that make correct use of countdown and also improve debug logging for Byteman * ENT-1967: Add assertion to validate that exception is indeed thrown as per rules installed. * ENT-1967: Less logging and more assertions * ENT-1967: Replace `fun` with `val` * ENT-1967: Un-break DriverDSL public API. * ENT-1967: Minor change * ENT-1967: Remove Byteman settings from NodeParameters and hide them inside InternalDriverDSL. * ENT-1967: Change the way how Jars resolved and use `Try` construct. --- experimental/ha-testing/build.gradle | 59 ++++--- .../byteman/InstrumentationTest.kt | 159 ++++++++++++++++++ .../testing/node/internal/DriverDSLImpl.kt | 90 ++++++++-- 3 files changed, 265 insertions(+), 43 deletions(-) create mode 100644 experimental/ha-testing/src/integration-test/kotlin/net/corda/instrumentation/byteman/InstrumentationTest.kt diff --git a/experimental/ha-testing/build.gradle b/experimental/ha-testing/build.gradle index 42355015ce..f4a6d58f8a 100644 --- a/experimental/ha-testing/build.gradle +++ b/experimental/ha-testing/build.gradle @@ -8,30 +8,7 @@ * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. */ -buildscript { - // For sharing constants between builds - Properties constants = new Properties() - file("$projectDir/../../constants.properties").withInputStream { constants.load(it) } - - ext.kotlin_version = constants.getProperty("kotlinVersion") - ext.byteman_version = "4.0.2" - - repositories { - mavenLocal() - mavenCentral() - jcenter() - } - - dependencies { - classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" - } -} - -repositories { - mavenLocal() - mavenCentral() - jcenter() -} +ext.byteman_version = "4.0.3" apply plugin: 'kotlin' apply plugin: 'idea' @@ -39,12 +16,30 @@ apply plugin: 'net.corda.plugins.cordapp' description 'A set of tools to perform Nodes High Availability testing' +configurations { + integrationTestCompile.extendsFrom testCompile + integrationTestRuntime.extendsFrom testRuntime +} + +sourceSets { + integrationTest { + kotlin { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integration-test/kotlin') + } + resources { + srcDir file('src/integration-test/resources') + } + } +} + dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" cordaCompile project(":client:rpc") - cordaCompile project(':node-api') + cordaCompile project(":node-api") cordaCompile project(":finance") cordaCompile project(":perftestcordapp") @@ -57,7 +52,14 @@ dependencies { compile "net.sf.jopt-simple:jopt-simple:$jopt_simple_version" // Byteman for runtime (termination) rules injection on the running node - //compile "org.jboss.byteman:byteman:$byteman_version" + // Submission tool allowing to install rules on running nodes + compile "org.jboss.byteman:byteman-submit:$byteman_version" + // The actual Byteman agent which should only be in the classpath of the out of process nodes + integrationTestCompile "org.jboss.byteman:byteman:$byteman_version" + + + integrationTestCompile project(":test-utils") + integrationTestCompile project(":node-driver") } jar { @@ -70,4 +72,9 @@ jar { ) } from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } +} + +task integrationTest(type: Test) { + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath } \ No newline at end of file diff --git a/experimental/ha-testing/src/integration-test/kotlin/net/corda/instrumentation/byteman/InstrumentationTest.kt b/experimental/ha-testing/src/integration-test/kotlin/net/corda/instrumentation/byteman/InstrumentationTest.kt new file mode 100644 index 0000000000..918c33daa4 --- /dev/null +++ b/experimental/ha-testing/src/integration-test/kotlin/net/corda/instrumentation/byteman/InstrumentationTest.kt @@ -0,0 +1,159 @@ +package net.corda.instrumentation.byteman + +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.contracts.Amount +import net.corda.core.identity.Party +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.getOrThrow +import net.corda.finance.POUNDS +import net.corda.finance.flows.CashIssueFlow +import net.corda.finance.flows.CashPaymentFlow +import net.corda.node.services.Permissions.Companion.invokeRpc +import net.corda.node.services.Permissions.Companion.startFlow +import net.corda.testing.core.* +import net.corda.testing.driver.* +import net.corda.testing.internal.IntegrationTest +import net.corda.testing.internal.IntegrationTestSchemas +import net.corda.testing.internal.toDatabaseSchemaName +import net.corda.testing.internal.toDatabaseSchemaNames +import net.corda.testing.node.NotarySpec +import net.corda.testing.node.User +import net.corda.testing.node.internal.DummyClusterSpec +import net.corda.testing.node.internal.internalDriver +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.jboss.byteman.agent.submit.ScriptText +import org.jboss.byteman.agent.submit.Submit +import org.junit.ClassRule +import org.junit.Test +import java.util.* + +class InstrumentationTest : IntegrationTest() { + private lateinit var alice: NodeHandle + private lateinit var notaryNodes: List + private lateinit var aliceProxy: CordaRPCOps + private lateinit var raftNotaryIdentity: Party + private var bytemanPort: Int = -1 + + companion object { + @ClassRule + @JvmField + val databaseSchemas = IntegrationTestSchemas(*DUMMY_NOTARY_NAME.toDatabaseSchemaNames("_0", "_1", "_2").toTypedArray(), + ALICE_NAME.toDatabaseSchemaName()) + + val logger = contextLogger() + } + private fun setup(compositeIdentity: Boolean = false, testBlock: () -> Unit) { + val testUser = User("test", "test", permissions = setOf( + startFlow(), + startFlow(), + invokeRpc(CordaRPCOps::nodeInfo), + invokeRpc(CordaRPCOps::stateMachinesFeed)) + ) + val portAllocation = PortAllocation.Incremental(10000) + + internalDriver( + extraCordappPackagesToScan = listOf("net.corda.finance.contracts", "net.corda.finance.schemas"), + portAllocation = portAllocation, + notarySpecs = listOf( + NotarySpec( + DUMMY_NOTARY_NAME, + rpcUsers = listOf(testUser), + cluster = DummyClusterSpec(clusterSize = 1, compositeServiceIdentity = compositeIdentity)) + ) + ) { + + bytemanPort = portAllocation.nextPort() + alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(testUser), bytemanPort = bytemanPort).getOrThrow() + raftNotaryIdentity = defaultNotaryIdentity + notaryNodes = defaultNotaryHandle.nodeHandles.getOrThrow().map { it as OutOfProcess } + + assertThat(notaryNodes).hasSize(1) + + for (notaryNode in notaryNodes) { + assertThat(notaryNode.nodeInfo.legalIdentities).contains(raftNotaryIdentity) + } + + // Check that each notary has different identity as a node. + assertThat(notaryNodes.flatMap { it.nodeInfo.legalIdentities - raftNotaryIdentity }.toSet()).hasSameSizeAs(notaryNodes) + + // Connect to Alice and the notaries + fun connectRpc(node: NodeHandle): CordaRPCOps { + val client = CordaRPCClient(node.rpcAddress) + return client.start("test", "test").proxy + } + aliceProxy = connectRpc(alice) + + testBlock() + } + } + + @Test + fun test() { + setup { + + val submit = Submit("localhost", bytemanPort) + logger.info("Byteman agent version used: " + submit.agentVersion) + logger.info("Remote system properties: " + submit.listSystemProperties()) + + val COUNTDOWN_REACHED_STR = "Countdown reached" + val deploymentOutcome = submit.addScripts(listOf(ScriptText("My test script", """ +RULE CashIssue invocation logging +CLASS net.corda.finance.flows.CashIssueFlow +METHOD call +AT ENTRY +IF TRUE +DO System.out.println("Installing paymentCounter countdown") +ENDRULE + +RULE Create CountDown +CLASS net.corda.finance.flows.CashIssueFlow +METHOD call +AT EXIT +IF TRUE +DO createCountDown("paymentCounter", 10) +ENDRULE + +RULE trace CashPaymentFlow.call +CLASS net.corda.finance.flows.CashPaymentFlow +METHOD call +AT ENTRY +IF TRUE +DO debug("CashPaymentFlow invoked") +ENDRULE + +RULE Decrement CountDown and throw +CLASS net.corda.finance.flows.CashPaymentFlow +METHOD call +AT EXIT +IF countDown("paymentCounter") +DO throw new java.lang.IllegalStateException("$COUNTDOWN_REACHED_STR") +ENDRULE +"""))) + assertThat(deploymentOutcome).contains("install rule Decrement CountDown and throw") + assertThat(submit.listAllRules()).contains(COUNTDOWN_REACHED_STR) + + // Issue 100 pounds, then pay ourselves 10x5 pounds + issueCash(100.POUNDS) + + // Submit 10 successful payments + for (i in 1..10) { + paySelf(5.POUNDS) + } + + // 11th payment should fail as countDown has been reached + assertThatThrownBy { paySelf(5.POUNDS) }.hasMessageContaining(COUNTDOWN_REACHED_STR) + } + } + + private fun issueCash(amount: Amount) { + aliceProxy.startFlow(::CashIssueFlow, amount, OpaqueBytes.of(0), raftNotaryIdentity).returnValue.getOrThrow() + } + + private fun paySelf(amount: Amount) { + aliceProxy.startFlow(::CashPaymentFlow, amount, alice.nodeInfo.singleIdentity()).returnValue.getOrThrow() + } +} \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index 2230ded4fb..aa9196b5f1 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -28,10 +28,7 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.NetworkParameters import net.corda.core.node.NotaryInfo import net.corda.core.node.services.NetworkMapCache -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.getOrThrow -import net.corda.core.utilities.millis +import net.corda.core.utilities.* import net.corda.node.NodeRegistrationOption import net.corda.node.VersionInfo import net.corda.node.internal.Node @@ -138,9 +135,17 @@ class DriverDSLImpl( private val state = ThreadBox(State()) //TODO: remove this once we can bundle quasar properly. - private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$") } + private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$").getOrThrow() } - private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$") } + private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$").getOrThrow() } + + private val bytemanJarPath: String? by lazy { + val maybeResolvedJar = resolveJar(".*byteman-\\d.*\\.jar$") + when (maybeResolvedJar) { + is Try.Success -> maybeResolvedJar.getOrThrow() + is Try.Failure -> null + } + } private fun NodeConfig.checkAndOverrideForInMemoryDB(): NodeConfig = this.run { if (inMemoryDB && corda.dataSourceProperties.getProperty("dataSource.url").startsWith("jdbc:h2:")) { @@ -152,16 +157,16 @@ class DriverDSLImpl( } } - private fun resolveJar(jarNamePattern: String): String { + private fun resolveJar(jarNamePattern: String): Try { return try { val cl = ClassLoader.getSystemClassLoader() val urls = (cl as URLClassLoader).urLs val jarPattern = jarNamePattern.toRegex() val jarFileUrl = urls.first { jarPattern.matches(it.path) } - jarFileUrl.toPath().toString() + Try.Success(jarFileUrl.toPath().toString()) } catch (e: Exception) { log.warn("Unable to locate JAR `$jarNamePattern` on classpath: ${e.message}", e) - throw e + Try.Failure(e) } } @@ -205,6 +210,28 @@ class DriverDSLImpl( customOverrides: Map, startInSameProcess: Boolean?, maximumHeapSize: String + ): CordaFuture { + return startNode( + defaultParameters, + providedName, + rpcUsers, + verifierType, + customOverrides, + startInSameProcess, + maximumHeapSize, + null + ) + } + + override fun startNode( + defaultParameters: NodeParameters, + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String, + bytemanPort: Int? ): CordaFuture { val p2pAddress = portAllocation.nextHostAndPort() // TODO: Derive name from the full picked name, don't just wrap the common name @@ -220,7 +247,7 @@ class DriverDSLImpl( return registrationFuture.flatMap { networkMapAvailability.flatMap { // But starting the node proper does require the network map - startRegisteredNode(name, it, rpcUsers, verifierType, customOverrides, startInSameProcess, maximumHeapSize, p2pAddress) + startRegisteredNode(name, it, rpcUsers, verifierType, customOverrides, startInSameProcess, maximumHeapSize, p2pAddress, bytemanPort) } } } @@ -232,7 +259,8 @@ class DriverDSLImpl( customOverrides: Map, startInSameProcess: Boolean? = null, maximumHeapSize: String = "512m", - p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort()): CordaFuture { + p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort(), + bytemanPort: Int? = null): CordaFuture { val rpcAddress = portAllocation.nextHostAndPort() val rpcAdminAddress = portAllocation.nextHostAndPort() val webAddress = portAllocation.nextHostAndPort() @@ -261,7 +289,7 @@ class DriverDSLImpl( allowMissingConfig = true, configOverrides = if (overrides.hasPath("devMode")) overrides else overrides + mapOf("devMode" to true) )).checkAndOverrideForInMemoryDB() - return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize, localNetworkMap) + return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize, localNetworkMap, bytemanPort) } private fun startNodeRegistration(providedName: CordaX500Name, rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture { @@ -402,7 +430,7 @@ class DriverDSLImpl( ) val cordaConfig = typesafe.parseAsNodeConfiguration() val config = NodeConfig(rawConfig, cordaConfig).checkAndOverrideForInMemoryDB() - return startNodeInternal(config, webAddress, null, "512m", localNetworkMap) + return startNodeInternal(config, webAddress, null, "512m", localNetworkMap, null) } @Suppress("DEPRECATION") @@ -621,6 +649,8 @@ class DriverDSLImpl( debugPort, jolokiaJarPath, monitorPort, + bytemanJarPath, + null, systemProperties, cordappPackages, "512m", @@ -636,7 +666,8 @@ class DriverDSLImpl( webAddress: NetworkHostAndPort, startInProcess: Boolean?, maximumHeapSize: String, - localNetworkMap: LocalNetworkMap?): CordaFuture { + localNetworkMap: LocalNetworkMap?, + bytemanPort: Int?): CordaFuture { val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName) val baseDirectory = config.corda.baseDirectory.createDirectories() localNetworkMap?.networkParametersCopier?.install(baseDirectory) @@ -677,7 +708,7 @@ class DriverDSLImpl( } else { val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val monitorPort = if (jmxPolicy.startJmxHttpServer) jmxPolicy.jmxHttpServerPortAllocation?.nextPort() else null - val process = startOutOfProcessNode(config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort, systemProperties, cordappPackages, maximumHeapSize) + val process = startOutOfProcessNode(config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort, bytemanJarPath, bytemanPort, systemProperties, cordappPackages, maximumHeapSize) // Destroy the child process when the parent exits.This is needed even when `waitForAllNodesToFinish` is // true because we don't want orphaned processes in the case that the parent process is terminated by the @@ -800,6 +831,8 @@ class DriverDSLImpl( debugPort: Int?, jolokiaJarPath: String, monitorPort: Int?, + bytemanJarPath: String?, + bytemanPort: Int?, overriddenSystemProperties: Map, cordappPackages: List, maximumHeapSize: String, @@ -807,7 +840,8 @@ class DriverDSLImpl( ): Process { log.info("Starting out-of-process Node ${config.corda.myLegalName.organisation}, " + "debug port is " + (debugPort ?: "not enabled") + ", " + - "jolokia monitoring port is " + (monitorPort ?: "not enabled")) + "jolokia monitoring port is " + (monitorPort ?: "not enabled") + ", " + + "byteMan: " + if (bytemanJarPath == null) "not in classpath" else "port is " + (bytemanPort ?: "not enabled")) // Write node.conf writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe.toNodeOnly()) @@ -845,11 +879,22 @@ class DriverDSLImpl( it += extraCmdLineFlag }.toList() + val bytemanJvmArgs = { + val bytemanAgent = bytemanJarPath?.let { + bytemanPort?.let { + "-javaagent:$bytemanJarPath=port:$bytemanPort,listener:true" + } + } + listOfNotNull(bytemanAgent) + + if (bytemanAgent != null && debugPort != null) listOf("-Dorg.jboss.byteman.verbose=true", "-Dorg.jboss.byteman.debug=true") + else emptyList() + }.invoke() + return ProcessUtilities.startJavaProcess( className = "net.corda.node.Corda", // cannot directly get class for this, so just use string arguments = arguments, jdwpPort = debugPort, - extraJvmArguments = extraJvmArguments + listOfNotNull(jolokiaAgent), + extraJvmArguments = extraJvmArguments + listOfNotNull(jolokiaAgent) + bytemanJvmArgs, workingDirectory = config.corda.baseDirectory, maximumHeapSize = maximumHeapSize ) @@ -1017,6 +1062,17 @@ interface InternalDriverDSL : DriverDSL, CordformContext { fun start() fun shutdown() + + fun startNode( + defaultParameters: NodeParameters = NodeParameters(), + providedName: CordaX500Name? = defaultParameters.providedName, + rpcUsers: List = defaultParameters.rpcUsers, + verifierType: VerifierType = defaultParameters.verifierType, + customOverrides: Map = defaultParameters.customOverrides, + startInSameProcess: Boolean? = defaultParameters.startInSameProcess, + maximumHeapSize: String = defaultParameters.maximumHeapSize, + bytemanPort: Int? = null + ): CordaFuture } /**