ENT-1967: Illustration for Byteman library can be used in Node integration test. (#1204)

* ENT-1967: Enable byteman library

* ENT-1967: Add an integration test to experiment with Byteman.

This needs to be running with: `-Dexperimental.test.enable`
As in: `gradlew -Dexperimental.test.enable integrationTest`

* ENT-1967: Modify Node driver to allow for optional instrumentation and use it in the integration test

* ENT-1967: Rely on port allocation

* ENT-1967: Install the rule that works

* ENT-1967: Trying to introduce counter rule (doesn't work)

* ENT-1967: Install rules that make correct use of countdown and also improve debug logging for Byteman

* ENT-1967: Add assertion to validate that exception is indeed thrown as per rules installed.

* ENT-1967: Less logging and more assertions

* ENT-1967: Replace `fun` with `val`

* ENT-1967: Un-break DriverDSL public API.

* ENT-1967: Minor change

* ENT-1967: Remove Byteman settings from NodeParameters and hide them inside InternalDriverDSL.

* ENT-1967: Change the way how Jars resolved and use `Try` construct.
This commit is contained in:
Viktor Kolomeyko 2018-07-10 10:05:07 +01:00 committed by GitHub
parent 13af5e00b6
commit c396b80afe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 265 additions and 43 deletions

View File

@ -8,30 +8,7 @@
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/ */
buildscript { ext.byteman_version = "4.0.3"
// For sharing constants between builds
Properties constants = new Properties()
file("$projectDir/../../constants.properties").withInputStream { constants.load(it) }
ext.kotlin_version = constants.getProperty("kotlinVersion")
ext.byteman_version = "4.0.2"
repositories {
mavenLocal()
mavenCentral()
jcenter()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}
repositories {
mavenLocal()
mavenCentral()
jcenter()
}
apply plugin: 'kotlin' apply plugin: 'kotlin'
apply plugin: 'idea' apply plugin: 'idea'
@ -39,12 +16,30 @@ apply plugin: 'net.corda.plugins.cordapp'
description 'A set of tools to perform Nodes High Availability testing' description 'A set of tools to perform Nodes High Availability testing'
configurations {
integrationTestCompile.extendsFrom testCompile
integrationTestRuntime.extendsFrom testRuntime
}
sourceSets {
integrationTest {
kotlin {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integration-test/kotlin')
}
resources {
srcDir file('src/integration-test/resources')
}
}
}
dependencies { dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
cordaCompile project(":client:rpc") cordaCompile project(":client:rpc")
cordaCompile project(':node-api') cordaCompile project(":node-api")
cordaCompile project(":finance") cordaCompile project(":finance")
cordaCompile project(":perftestcordapp") cordaCompile project(":perftestcordapp")
@ -57,7 +52,14 @@ dependencies {
compile "net.sf.jopt-simple:jopt-simple:$jopt_simple_version" compile "net.sf.jopt-simple:jopt-simple:$jopt_simple_version"
// Byteman for runtime (termination) rules injection on the running node // Byteman for runtime (termination) rules injection on the running node
//compile "org.jboss.byteman:byteman:$byteman_version" // Submission tool allowing to install rules on running nodes
compile "org.jboss.byteman:byteman-submit:$byteman_version"
// The actual Byteman agent which should only be in the classpath of the out of process nodes
integrationTestCompile "org.jboss.byteman:byteman:$byteman_version"
integrationTestCompile project(":test-utils")
integrationTestCompile project(":node-driver")
} }
jar { jar {
@ -71,3 +73,8 @@ jar {
} }
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
} }
task integrationTest(type: Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
}

View File

@ -0,0 +1,159 @@
package net.corda.instrumentation.byteman
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.Amount
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.POUNDS
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.testing.core.*
import net.corda.testing.driver.*
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.internal.toDatabaseSchemaName
import net.corda.testing.internal.toDatabaseSchemaNames
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.User
import net.corda.testing.node.internal.DummyClusterSpec
import net.corda.testing.node.internal.internalDriver
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.jboss.byteman.agent.submit.ScriptText
import org.jboss.byteman.agent.submit.Submit
import org.junit.ClassRule
import org.junit.Test
import java.util.*
class InstrumentationTest : IntegrationTest() {
private lateinit var alice: NodeHandle
private lateinit var notaryNodes: List<OutOfProcess>
private lateinit var aliceProxy: CordaRPCOps
private lateinit var raftNotaryIdentity: Party
private var bytemanPort: Int = -1
companion object {
@ClassRule
@JvmField
val databaseSchemas = IntegrationTestSchemas(*DUMMY_NOTARY_NAME.toDatabaseSchemaNames("_0", "_1", "_2").toTypedArray(),
ALICE_NAME.toDatabaseSchemaName())
val logger = contextLogger()
}
private fun setup(compositeIdentity: Boolean = false, testBlock: () -> Unit) {
val testUser = User("test", "test", permissions = setOf(
startFlow<CashIssueFlow>(),
startFlow<CashPaymentFlow>(),
invokeRpc(CordaRPCOps::nodeInfo),
invokeRpc(CordaRPCOps::stateMachinesFeed))
)
val portAllocation = PortAllocation.Incremental(10000)
internalDriver(
extraCordappPackagesToScan = listOf("net.corda.finance.contracts", "net.corda.finance.schemas"),
portAllocation = portAllocation,
notarySpecs = listOf(
NotarySpec(
DUMMY_NOTARY_NAME,
rpcUsers = listOf(testUser),
cluster = DummyClusterSpec(clusterSize = 1, compositeServiceIdentity = compositeIdentity))
)
) {
bytemanPort = portAllocation.nextPort()
alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(testUser), bytemanPort = bytemanPort).getOrThrow()
raftNotaryIdentity = defaultNotaryIdentity
notaryNodes = defaultNotaryHandle.nodeHandles.getOrThrow().map { it as OutOfProcess }
assertThat(notaryNodes).hasSize(1)
for (notaryNode in notaryNodes) {
assertThat(notaryNode.nodeInfo.legalIdentities).contains(raftNotaryIdentity)
}
// Check that each notary has different identity as a node.
assertThat(notaryNodes.flatMap { it.nodeInfo.legalIdentities - raftNotaryIdentity }.toSet()).hasSameSizeAs(notaryNodes)
// Connect to Alice and the notaries
fun connectRpc(node: NodeHandle): CordaRPCOps {
val client = CordaRPCClient(node.rpcAddress)
return client.start("test", "test").proxy
}
aliceProxy = connectRpc(alice)
testBlock()
}
}
@Test
fun test() {
setup {
val submit = Submit("localhost", bytemanPort)
logger.info("Byteman agent version used: " + submit.agentVersion)
logger.info("Remote system properties: " + submit.listSystemProperties())
val COUNTDOWN_REACHED_STR = "Countdown reached"
val deploymentOutcome = submit.addScripts(listOf(ScriptText("My test script", """
RULE CashIssue invocation logging
CLASS net.corda.finance.flows.CashIssueFlow
METHOD call
AT ENTRY
IF TRUE
DO System.out.println("Installing paymentCounter countdown")
ENDRULE
RULE Create CountDown
CLASS net.corda.finance.flows.CashIssueFlow
METHOD call
AT EXIT
IF TRUE
DO createCountDown("paymentCounter", 10)
ENDRULE
RULE trace CashPaymentFlow.call
CLASS net.corda.finance.flows.CashPaymentFlow
METHOD call
AT ENTRY
IF TRUE
DO debug("CashPaymentFlow invoked")
ENDRULE
RULE Decrement CountDown and throw
CLASS net.corda.finance.flows.CashPaymentFlow
METHOD call
AT EXIT
IF countDown("paymentCounter")
DO throw new java.lang.IllegalStateException("$COUNTDOWN_REACHED_STR")
ENDRULE
""")))
assertThat(deploymentOutcome).contains("install rule Decrement CountDown and throw")
assertThat(submit.listAllRules()).contains(COUNTDOWN_REACHED_STR)
// Issue 100 pounds, then pay ourselves 10x5 pounds
issueCash(100.POUNDS)
// Submit 10 successful payments
for (i in 1..10) {
paySelf(5.POUNDS)
}
// 11th payment should fail as countDown has been reached
assertThatThrownBy { paySelf(5.POUNDS) }.hasMessageContaining(COUNTDOWN_REACHED_STR)
}
}
private fun issueCash(amount: Amount<Currency>) {
aliceProxy.startFlow(::CashIssueFlow, amount, OpaqueBytes.of(0), raftNotaryIdentity).returnValue.getOrThrow()
}
private fun paySelf(amount: Amount<Currency>) {
aliceProxy.startFlow(::CashPaymentFlow, amount, alice.nodeInfo.singleIdentity()).returnValue.getOrThrow()
}
}

View File

@ -28,10 +28,7 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.*
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.node.NodeRegistrationOption import net.corda.node.NodeRegistrationOption
import net.corda.node.VersionInfo import net.corda.node.VersionInfo
import net.corda.node.internal.Node import net.corda.node.internal.Node
@ -138,9 +135,17 @@ class DriverDSLImpl(
private val state = ThreadBox(State()) private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly. //TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$") } private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$").getOrThrow() }
private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$") } private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$").getOrThrow() }
private val bytemanJarPath: String? by lazy {
val maybeResolvedJar = resolveJar(".*byteman-\\d.*\\.jar$")
when (maybeResolvedJar) {
is Try.Success -> maybeResolvedJar.getOrThrow()
is Try.Failure -> null
}
}
private fun NodeConfig.checkAndOverrideForInMemoryDB(): NodeConfig = this.run { private fun NodeConfig.checkAndOverrideForInMemoryDB(): NodeConfig = this.run {
if (inMemoryDB && corda.dataSourceProperties.getProperty("dataSource.url").startsWith("jdbc:h2:")) { if (inMemoryDB && corda.dataSourceProperties.getProperty("dataSource.url").startsWith("jdbc:h2:")) {
@ -152,16 +157,16 @@ class DriverDSLImpl(
} }
} }
private fun resolveJar(jarNamePattern: String): String { private fun resolveJar(jarNamePattern: String): Try<String> {
return try { return try {
val cl = ClassLoader.getSystemClassLoader() val cl = ClassLoader.getSystemClassLoader()
val urls = (cl as URLClassLoader).urLs val urls = (cl as URLClassLoader).urLs
val jarPattern = jarNamePattern.toRegex() val jarPattern = jarNamePattern.toRegex()
val jarFileUrl = urls.first { jarPattern.matches(it.path) } val jarFileUrl = urls.first { jarPattern.matches(it.path) }
jarFileUrl.toPath().toString() Try.Success(jarFileUrl.toPath().toString())
} catch (e: Exception) { } catch (e: Exception) {
log.warn("Unable to locate JAR `$jarNamePattern` on classpath: ${e.message}", e) log.warn("Unable to locate JAR `$jarNamePattern` on classpath: ${e.message}", e)
throw e Try.Failure(e)
} }
} }
@ -205,6 +210,28 @@ class DriverDSLImpl(
customOverrides: Map<String, Any?>, customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?, startInSameProcess: Boolean?,
maximumHeapSize: String maximumHeapSize: String
): CordaFuture<NodeHandle> {
return startNode(
defaultParameters,
providedName,
rpcUsers,
verifierType,
customOverrides,
startInSameProcess,
maximumHeapSize,
null
)
}
override fun startNode(
defaultParameters: NodeParameters,
providedName: CordaX500Name?,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?,
maximumHeapSize: String,
bytemanPort: Int?
): CordaFuture<NodeHandle> { ): CordaFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort() val p2pAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name // TODO: Derive name from the full picked name, don't just wrap the common name
@ -220,7 +247,7 @@ class DriverDSLImpl(
return registrationFuture.flatMap { return registrationFuture.flatMap {
networkMapAvailability.flatMap { networkMapAvailability.flatMap {
// But starting the node proper does require the network map // But starting the node proper does require the network map
startRegisteredNode(name, it, rpcUsers, verifierType, customOverrides, startInSameProcess, maximumHeapSize, p2pAddress) startRegisteredNode(name, it, rpcUsers, verifierType, customOverrides, startInSameProcess, maximumHeapSize, p2pAddress, bytemanPort)
} }
} }
} }
@ -232,7 +259,8 @@ class DriverDSLImpl(
customOverrides: Map<String, Any?>, customOverrides: Map<String, Any?>,
startInSameProcess: Boolean? = null, startInSameProcess: Boolean? = null,
maximumHeapSize: String = "512m", maximumHeapSize: String = "512m",
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort()): CordaFuture<NodeHandle> { p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort(),
bytemanPort: Int? = null): CordaFuture<NodeHandle> {
val rpcAddress = portAllocation.nextHostAndPort() val rpcAddress = portAllocation.nextHostAndPort()
val rpcAdminAddress = portAllocation.nextHostAndPort() val rpcAdminAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort() val webAddress = portAllocation.nextHostAndPort()
@ -261,7 +289,7 @@ class DriverDSLImpl(
allowMissingConfig = true, allowMissingConfig = true,
configOverrides = if (overrides.hasPath("devMode")) overrides else overrides + mapOf("devMode" to true) configOverrides = if (overrides.hasPath("devMode")) overrides else overrides + mapOf("devMode" to true)
)).checkAndOverrideForInMemoryDB() )).checkAndOverrideForInMemoryDB()
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize, localNetworkMap) return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize, localNetworkMap, bytemanPort)
} }
private fun startNodeRegistration(providedName: CordaX500Name, rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture<NodeConfig> { private fun startNodeRegistration(providedName: CordaX500Name, rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture<NodeConfig> {
@ -402,7 +430,7 @@ class DriverDSLImpl(
) )
val cordaConfig = typesafe.parseAsNodeConfiguration() val cordaConfig = typesafe.parseAsNodeConfiguration()
val config = NodeConfig(rawConfig, cordaConfig).checkAndOverrideForInMemoryDB() val config = NodeConfig(rawConfig, cordaConfig).checkAndOverrideForInMemoryDB()
return startNodeInternal(config, webAddress, null, "512m", localNetworkMap) return startNodeInternal(config, webAddress, null, "512m", localNetworkMap, null)
} }
@Suppress("DEPRECATION") @Suppress("DEPRECATION")
@ -621,6 +649,8 @@ class DriverDSLImpl(
debugPort, debugPort,
jolokiaJarPath, jolokiaJarPath,
monitorPort, monitorPort,
bytemanJarPath,
null,
systemProperties, systemProperties,
cordappPackages, cordappPackages,
"512m", "512m",
@ -636,7 +666,8 @@ class DriverDSLImpl(
webAddress: NetworkHostAndPort, webAddress: NetworkHostAndPort,
startInProcess: Boolean?, startInProcess: Boolean?,
maximumHeapSize: String, maximumHeapSize: String,
localNetworkMap: LocalNetworkMap?): CordaFuture<NodeHandle> { localNetworkMap: LocalNetworkMap?,
bytemanPort: Int?): CordaFuture<NodeHandle> {
val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName) val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName)
val baseDirectory = config.corda.baseDirectory.createDirectories() val baseDirectory = config.corda.baseDirectory.createDirectories()
localNetworkMap?.networkParametersCopier?.install(baseDirectory) localNetworkMap?.networkParametersCopier?.install(baseDirectory)
@ -677,7 +708,7 @@ class DriverDSLImpl(
} else { } else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val monitorPort = if (jmxPolicy.startJmxHttpServer) jmxPolicy.jmxHttpServerPortAllocation?.nextPort() else null val monitorPort = if (jmxPolicy.startJmxHttpServer) jmxPolicy.jmxHttpServerPortAllocation?.nextPort() else null
val process = startOutOfProcessNode(config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort, systemProperties, cordappPackages, maximumHeapSize) val process = startOutOfProcessNode(config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort, bytemanJarPath, bytemanPort, systemProperties, cordappPackages, maximumHeapSize)
// Destroy the child process when the parent exits.This is needed even when `waitForAllNodesToFinish` is // Destroy the child process when the parent exits.This is needed even when `waitForAllNodesToFinish` is
// true because we don't want orphaned processes in the case that the parent process is terminated by the // true because we don't want orphaned processes in the case that the parent process is terminated by the
@ -800,6 +831,8 @@ class DriverDSLImpl(
debugPort: Int?, debugPort: Int?,
jolokiaJarPath: String, jolokiaJarPath: String,
monitorPort: Int?, monitorPort: Int?,
bytemanJarPath: String?,
bytemanPort: Int?,
overriddenSystemProperties: Map<String, String>, overriddenSystemProperties: Map<String, String>,
cordappPackages: List<String>, cordappPackages: List<String>,
maximumHeapSize: String, maximumHeapSize: String,
@ -807,7 +840,8 @@ class DriverDSLImpl(
): Process { ): Process {
log.info("Starting out-of-process Node ${config.corda.myLegalName.organisation}, " + log.info("Starting out-of-process Node ${config.corda.myLegalName.organisation}, " +
"debug port is " + (debugPort ?: "not enabled") + ", " + "debug port is " + (debugPort ?: "not enabled") + ", " +
"jolokia monitoring port is " + (monitorPort ?: "not enabled")) "jolokia monitoring port is " + (monitorPort ?: "not enabled") + ", " +
"byteMan: " + if (bytemanJarPath == null) "not in classpath" else "port is " + (bytemanPort ?: "not enabled"))
// Write node.conf // Write node.conf
writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe.toNodeOnly()) writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe.toNodeOnly())
@ -845,11 +879,22 @@ class DriverDSLImpl(
it += extraCmdLineFlag it += extraCmdLineFlag
}.toList() }.toList()
val bytemanJvmArgs = {
val bytemanAgent = bytemanJarPath?.let {
bytemanPort?.let {
"-javaagent:$bytemanJarPath=port:$bytemanPort,listener:true"
}
}
listOfNotNull(bytemanAgent) +
if (bytemanAgent != null && debugPort != null) listOf("-Dorg.jboss.byteman.verbose=true", "-Dorg.jboss.byteman.debug=true")
else emptyList()
}.invoke()
return ProcessUtilities.startJavaProcess( return ProcessUtilities.startJavaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = arguments, arguments = arguments,
jdwpPort = debugPort, jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments + listOfNotNull(jolokiaAgent), extraJvmArguments = extraJvmArguments + listOfNotNull(jolokiaAgent) + bytemanJvmArgs,
workingDirectory = config.corda.baseDirectory, workingDirectory = config.corda.baseDirectory,
maximumHeapSize = maximumHeapSize maximumHeapSize = maximumHeapSize
) )
@ -1017,6 +1062,17 @@ interface InternalDriverDSL : DriverDSL, CordformContext {
fun start() fun start()
fun shutdown() fun shutdown()
fun startNode(
defaultParameters: NodeParameters = NodeParameters(),
providedName: CordaX500Name? = defaultParameters.providedName,
rpcUsers: List<User> = defaultParameters.rpcUsers,
verifierType: VerifierType = defaultParameters.verifierType,
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
startInSameProcess: Boolean? = defaultParameters.startInSameProcess,
maximumHeapSize: String = defaultParameters.maximumHeapSize,
bytemanPort: Int? = null
): CordaFuture<NodeHandle>
} }
/** /**