ENT-1967: Illustration for Byteman library can be used in Node integration test. (#1204)

* ENT-1967: Enable byteman library

* ENT-1967: Add an integration test to experiment with Byteman.

This needs to be running with: `-Dexperimental.test.enable`
As in: `gradlew -Dexperimental.test.enable integrationTest`

* ENT-1967: Modify Node driver to allow for optional instrumentation and use it in the integration test

* ENT-1967: Rely on port allocation

* ENT-1967: Install the rule that works

* ENT-1967: Trying to introduce counter rule (doesn't work)

* ENT-1967: Install rules that make correct use of countdown and also improve debug logging for Byteman

* ENT-1967: Add assertion to validate that exception is indeed thrown as per rules installed.

* ENT-1967: Less logging and more assertions

* ENT-1967: Replace `fun` with `val`

* ENT-1967: Un-break DriverDSL public API.

* ENT-1967: Minor change

* ENT-1967: Remove Byteman settings from NodeParameters and hide them inside InternalDriverDSL.

* ENT-1967: Change the way how Jars resolved and use `Try` construct.
This commit is contained in:
Viktor Kolomeyko 2018-07-10 10:05:07 +01:00 committed by GitHub
parent 13af5e00b6
commit c396b80afe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 265 additions and 43 deletions

View File

@ -8,30 +8,7 @@
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
buildscript {
// For sharing constants between builds
Properties constants = new Properties()
file("$projectDir/../../constants.properties").withInputStream { constants.load(it) }
ext.kotlin_version = constants.getProperty("kotlinVersion")
ext.byteman_version = "4.0.2"
repositories {
mavenLocal()
mavenCentral()
jcenter()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}
repositories {
mavenLocal()
mavenCentral()
jcenter()
}
ext.byteman_version = "4.0.3"
apply plugin: 'kotlin'
apply plugin: 'idea'
@ -39,12 +16,30 @@ apply plugin: 'net.corda.plugins.cordapp'
description 'A set of tools to perform Nodes High Availability testing'
configurations {
integrationTestCompile.extendsFrom testCompile
integrationTestRuntime.extendsFrom testRuntime
}
sourceSets {
integrationTest {
kotlin {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integration-test/kotlin')
}
resources {
srcDir file('src/integration-test/resources')
}
}
}
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
cordaCompile project(":client:rpc")
cordaCompile project(':node-api')
cordaCompile project(":node-api")
cordaCompile project(":finance")
cordaCompile project(":perftestcordapp")
@ -57,7 +52,14 @@ dependencies {
compile "net.sf.jopt-simple:jopt-simple:$jopt_simple_version"
// Byteman for runtime (termination) rules injection on the running node
//compile "org.jboss.byteman:byteman:$byteman_version"
// Submission tool allowing to install rules on running nodes
compile "org.jboss.byteman:byteman-submit:$byteman_version"
// The actual Byteman agent which should only be in the classpath of the out of process nodes
integrationTestCompile "org.jboss.byteman:byteman:$byteman_version"
integrationTestCompile project(":test-utils")
integrationTestCompile project(":node-driver")
}
jar {
@ -71,3 +73,8 @@ jar {
}
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
}
task integrationTest(type: Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
}

View File

@ -0,0 +1,159 @@
package net.corda.instrumentation.byteman
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.Amount
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.POUNDS
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.testing.core.*
import net.corda.testing.driver.*
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.internal.toDatabaseSchemaName
import net.corda.testing.internal.toDatabaseSchemaNames
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.User
import net.corda.testing.node.internal.DummyClusterSpec
import net.corda.testing.node.internal.internalDriver
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.jboss.byteman.agent.submit.ScriptText
import org.jboss.byteman.agent.submit.Submit
import org.junit.ClassRule
import org.junit.Test
import java.util.*
class InstrumentationTest : IntegrationTest() {
private lateinit var alice: NodeHandle
private lateinit var notaryNodes: List<OutOfProcess>
private lateinit var aliceProxy: CordaRPCOps
private lateinit var raftNotaryIdentity: Party
private var bytemanPort: Int = -1
companion object {
@ClassRule
@JvmField
val databaseSchemas = IntegrationTestSchemas(*DUMMY_NOTARY_NAME.toDatabaseSchemaNames("_0", "_1", "_2").toTypedArray(),
ALICE_NAME.toDatabaseSchemaName())
val logger = contextLogger()
}
private fun setup(compositeIdentity: Boolean = false, testBlock: () -> Unit) {
val testUser = User("test", "test", permissions = setOf(
startFlow<CashIssueFlow>(),
startFlow<CashPaymentFlow>(),
invokeRpc(CordaRPCOps::nodeInfo),
invokeRpc(CordaRPCOps::stateMachinesFeed))
)
val portAllocation = PortAllocation.Incremental(10000)
internalDriver(
extraCordappPackagesToScan = listOf("net.corda.finance.contracts", "net.corda.finance.schemas"),
portAllocation = portAllocation,
notarySpecs = listOf(
NotarySpec(
DUMMY_NOTARY_NAME,
rpcUsers = listOf(testUser),
cluster = DummyClusterSpec(clusterSize = 1, compositeServiceIdentity = compositeIdentity))
)
) {
bytemanPort = portAllocation.nextPort()
alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(testUser), bytemanPort = bytemanPort).getOrThrow()
raftNotaryIdentity = defaultNotaryIdentity
notaryNodes = defaultNotaryHandle.nodeHandles.getOrThrow().map { it as OutOfProcess }
assertThat(notaryNodes).hasSize(1)
for (notaryNode in notaryNodes) {
assertThat(notaryNode.nodeInfo.legalIdentities).contains(raftNotaryIdentity)
}
// Check that each notary has different identity as a node.
assertThat(notaryNodes.flatMap { it.nodeInfo.legalIdentities - raftNotaryIdentity }.toSet()).hasSameSizeAs(notaryNodes)
// Connect to Alice and the notaries
fun connectRpc(node: NodeHandle): CordaRPCOps {
val client = CordaRPCClient(node.rpcAddress)
return client.start("test", "test").proxy
}
aliceProxy = connectRpc(alice)
testBlock()
}
}
@Test
fun test() {
setup {
val submit = Submit("localhost", bytemanPort)
logger.info("Byteman agent version used: " + submit.agentVersion)
logger.info("Remote system properties: " + submit.listSystemProperties())
val COUNTDOWN_REACHED_STR = "Countdown reached"
val deploymentOutcome = submit.addScripts(listOf(ScriptText("My test script", """
RULE CashIssue invocation logging
CLASS net.corda.finance.flows.CashIssueFlow
METHOD call
AT ENTRY
IF TRUE
DO System.out.println("Installing paymentCounter countdown")
ENDRULE
RULE Create CountDown
CLASS net.corda.finance.flows.CashIssueFlow
METHOD call
AT EXIT
IF TRUE
DO createCountDown("paymentCounter", 10)
ENDRULE
RULE trace CashPaymentFlow.call
CLASS net.corda.finance.flows.CashPaymentFlow
METHOD call
AT ENTRY
IF TRUE
DO debug("CashPaymentFlow invoked")
ENDRULE
RULE Decrement CountDown and throw
CLASS net.corda.finance.flows.CashPaymentFlow
METHOD call
AT EXIT
IF countDown("paymentCounter")
DO throw new java.lang.IllegalStateException("$COUNTDOWN_REACHED_STR")
ENDRULE
""")))
assertThat(deploymentOutcome).contains("install rule Decrement CountDown and throw")
assertThat(submit.listAllRules()).contains(COUNTDOWN_REACHED_STR)
// Issue 100 pounds, then pay ourselves 10x5 pounds
issueCash(100.POUNDS)
// Submit 10 successful payments
for (i in 1..10) {
paySelf(5.POUNDS)
}
// 11th payment should fail as countDown has been reached
assertThatThrownBy { paySelf(5.POUNDS) }.hasMessageContaining(COUNTDOWN_REACHED_STR)
}
}
private fun issueCash(amount: Amount<Currency>) {
aliceProxy.startFlow(::CashIssueFlow, amount, OpaqueBytes.of(0), raftNotaryIdentity).returnValue.getOrThrow()
}
private fun paySelf(amount: Amount<Currency>) {
aliceProxy.startFlow(::CashPaymentFlow, amount, alice.nodeInfo.singleIdentity()).returnValue.getOrThrow()
}
}

View File

@ -28,10 +28,7 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.core.utilities.*
import net.corda.node.NodeRegistrationOption
import net.corda.node.VersionInfo
import net.corda.node.internal.Node
@ -138,9 +135,17 @@ class DriverDSLImpl(
private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$") }
private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$").getOrThrow() }
private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$") }
private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$").getOrThrow() }
private val bytemanJarPath: String? by lazy {
val maybeResolvedJar = resolveJar(".*byteman-\\d.*\\.jar$")
when (maybeResolvedJar) {
is Try.Success -> maybeResolvedJar.getOrThrow()
is Try.Failure -> null
}
}
private fun NodeConfig.checkAndOverrideForInMemoryDB(): NodeConfig = this.run {
if (inMemoryDB && corda.dataSourceProperties.getProperty("dataSource.url").startsWith("jdbc:h2:")) {
@ -152,16 +157,16 @@ class DriverDSLImpl(
}
}
private fun resolveJar(jarNamePattern: String): String {
private fun resolveJar(jarNamePattern: String): Try<String> {
return try {
val cl = ClassLoader.getSystemClassLoader()
val urls = (cl as URLClassLoader).urLs
val jarPattern = jarNamePattern.toRegex()
val jarFileUrl = urls.first { jarPattern.matches(it.path) }
jarFileUrl.toPath().toString()
Try.Success(jarFileUrl.toPath().toString())
} catch (e: Exception) {
log.warn("Unable to locate JAR `$jarNamePattern` on classpath: ${e.message}", e)
throw e
Try.Failure(e)
}
}
@ -205,6 +210,28 @@ class DriverDSLImpl(
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?,
maximumHeapSize: String
): CordaFuture<NodeHandle> {
return startNode(
defaultParameters,
providedName,
rpcUsers,
verifierType,
customOverrides,
startInSameProcess,
maximumHeapSize,
null
)
}
override fun startNode(
defaultParameters: NodeParameters,
providedName: CordaX500Name?,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?,
maximumHeapSize: String,
bytemanPort: Int?
): CordaFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
@ -220,7 +247,7 @@ class DriverDSLImpl(
return registrationFuture.flatMap {
networkMapAvailability.flatMap {
// But starting the node proper does require the network map
startRegisteredNode(name, it, rpcUsers, verifierType, customOverrides, startInSameProcess, maximumHeapSize, p2pAddress)
startRegisteredNode(name, it, rpcUsers, verifierType, customOverrides, startInSameProcess, maximumHeapSize, p2pAddress, bytemanPort)
}
}
}
@ -232,7 +259,8 @@ class DriverDSLImpl(
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean? = null,
maximumHeapSize: String = "512m",
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort()): CordaFuture<NodeHandle> {
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort(),
bytemanPort: Int? = null): CordaFuture<NodeHandle> {
val rpcAddress = portAllocation.nextHostAndPort()
val rpcAdminAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
@ -261,7 +289,7 @@ class DriverDSLImpl(
allowMissingConfig = true,
configOverrides = if (overrides.hasPath("devMode")) overrides else overrides + mapOf("devMode" to true)
)).checkAndOverrideForInMemoryDB()
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize, localNetworkMap)
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize, localNetworkMap, bytemanPort)
}
private fun startNodeRegistration(providedName: CordaX500Name, rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture<NodeConfig> {
@ -402,7 +430,7 @@ class DriverDSLImpl(
)
val cordaConfig = typesafe.parseAsNodeConfiguration()
val config = NodeConfig(rawConfig, cordaConfig).checkAndOverrideForInMemoryDB()
return startNodeInternal(config, webAddress, null, "512m", localNetworkMap)
return startNodeInternal(config, webAddress, null, "512m", localNetworkMap, null)
}
@Suppress("DEPRECATION")
@ -621,6 +649,8 @@ class DriverDSLImpl(
debugPort,
jolokiaJarPath,
monitorPort,
bytemanJarPath,
null,
systemProperties,
cordappPackages,
"512m",
@ -636,7 +666,8 @@ class DriverDSLImpl(
webAddress: NetworkHostAndPort,
startInProcess: Boolean?,
maximumHeapSize: String,
localNetworkMap: LocalNetworkMap?): CordaFuture<NodeHandle> {
localNetworkMap: LocalNetworkMap?,
bytemanPort: Int?): CordaFuture<NodeHandle> {
val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName)
val baseDirectory = config.corda.baseDirectory.createDirectories()
localNetworkMap?.networkParametersCopier?.install(baseDirectory)
@ -677,7 +708,7 @@ class DriverDSLImpl(
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val monitorPort = if (jmxPolicy.startJmxHttpServer) jmxPolicy.jmxHttpServerPortAllocation?.nextPort() else null
val process = startOutOfProcessNode(config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort, systemProperties, cordappPackages, maximumHeapSize)
val process = startOutOfProcessNode(config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort, bytemanJarPath, bytemanPort, systemProperties, cordappPackages, maximumHeapSize)
// Destroy the child process when the parent exits.This is needed even when `waitForAllNodesToFinish` is
// true because we don't want orphaned processes in the case that the parent process is terminated by the
@ -800,6 +831,8 @@ class DriverDSLImpl(
debugPort: Int?,
jolokiaJarPath: String,
monitorPort: Int?,
bytemanJarPath: String?,
bytemanPort: Int?,
overriddenSystemProperties: Map<String, String>,
cordappPackages: List<String>,
maximumHeapSize: String,
@ -807,7 +840,8 @@ class DriverDSLImpl(
): Process {
log.info("Starting out-of-process Node ${config.corda.myLegalName.organisation}, " +
"debug port is " + (debugPort ?: "not enabled") + ", " +
"jolokia monitoring port is " + (monitorPort ?: "not enabled"))
"jolokia monitoring port is " + (monitorPort ?: "not enabled") + ", " +
"byteMan: " + if (bytemanJarPath == null) "not in classpath" else "port is " + (bytemanPort ?: "not enabled"))
// Write node.conf
writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe.toNodeOnly())
@ -845,11 +879,22 @@ class DriverDSLImpl(
it += extraCmdLineFlag
}.toList()
val bytemanJvmArgs = {
val bytemanAgent = bytemanJarPath?.let {
bytemanPort?.let {
"-javaagent:$bytemanJarPath=port:$bytemanPort,listener:true"
}
}
listOfNotNull(bytemanAgent) +
if (bytemanAgent != null && debugPort != null) listOf("-Dorg.jboss.byteman.verbose=true", "-Dorg.jboss.byteman.debug=true")
else emptyList()
}.invoke()
return ProcessUtilities.startJavaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = arguments,
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments + listOfNotNull(jolokiaAgent),
extraJvmArguments = extraJvmArguments + listOfNotNull(jolokiaAgent) + bytemanJvmArgs,
workingDirectory = config.corda.baseDirectory,
maximumHeapSize = maximumHeapSize
)
@ -1017,6 +1062,17 @@ interface InternalDriverDSL : DriverDSL, CordformContext {
fun start()
fun shutdown()
fun startNode(
defaultParameters: NodeParameters = NodeParameters(),
providedName: CordaX500Name? = defaultParameters.providedName,
rpcUsers: List<User> = defaultParameters.rpcUsers,
verifierType: VerifierType = defaultParameters.verifierType,
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
startInSameProcess: Boolean? = defaultParameters.startInSameProcess,
maximumHeapSize: String = defaultParameters.maximumHeapSize,
bytemanPort: Int? = null
): CordaFuture<NodeHandle>
}
/**