ENT-1967: Extend integration test to cover Node process termination scenario. (#1254)

* ENT-1967: Enable byteman library

* ENT-1967: Add an integration test to experiment with Byteman.

This needs to be running with: `-Dexperimental.test.enable`
As in: `gradlew -Dexperimental.test.enable integrationTest`

* ENT-1967: Modify Node driver to allow for optional instrumentation and use it in the integration test

* ENT-1967: Rely on port allocation

* ENT-1967: Install the rule that works

* ENT-1967: Trying to introduce counter rule (doesn't work)

* ENT-1967: Install rules that make correct use of countdown and also improve debug logging for Byteman

* ENT-1967: Add assertion to validate that exception is indeed thrown as per rules installed.

* ENT-1967: Less logging and more assertions

* ENT-1967: Replace `fun` with `val`

* ENT-1967: Un-break DriverDSL public API.

* ENT-1967: Minor change

* ENT-1967: Remove Byteman settings from NodeParameters and hide them inside InternalDriverDSL.

* ENT-1967: Introduce node restart scenario

* ENT-1967: Change the place where JVM is terminated and ensure that StdOut and StdErr not lost during restarts.

* ENT-1967: Changes after rebase.

* ENT-1967: Do not use InMemory DB for test that involves Node restart.

* ENT-1967: Minor fixes post merge from `master`.

* ENT-1967: Tighten-up test checks.
This commit is contained in:
Viktor Kolomeyko 2018-07-11 12:54:56 +01:00 committed by GitHub
parent 0fee4aa1d3
commit a4e325a494
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 28 deletions

View File

@ -1,6 +1,7 @@
package net.corda.instrumentation.byteman
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.RPCException
import net.corda.core.contracts.Amount
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
@ -9,6 +10,7 @@ import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.POUNDS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.services.Permissions.Companion.invokeRpc
@ -22,7 +24,9 @@ import net.corda.testing.internal.toDatabaseSchemaNames
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.User
import net.corda.testing.node.internal.DummyClusterSpec
import net.corda.testing.node.internal.InternalDriverDSL
import net.corda.testing.node.internal.internalDriver
import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.jboss.byteman.agent.submit.ScriptText
@ -30,6 +34,8 @@ import org.jboss.byteman.agent.submit.Submit
import org.junit.ClassRule
import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFalse
class InstrumentationTest : IntegrationTest() {
private lateinit var alice: NodeHandle
@ -44,15 +50,22 @@ class InstrumentationTest : IntegrationTest() {
val databaseSchemas = IntegrationTestSchemas(*DUMMY_NOTARY_NAME.toDatabaseSchemaNames("_0", "_1", "_2").toTypedArray(),
ALICE_NAME.toDatabaseSchemaName())
val logger = contextLogger()
}
private fun setup(compositeIdentity: Boolean = false, testBlock: () -> Unit) {
val testUser = User("test", "test", permissions = setOf(
private val logger = contextLogger()
private val testUser = User("test", "test", permissions = setOf(
startFlow<CashIssueFlow>(),
startFlow<CashPaymentFlow>(),
invokeRpc(CordaRPCOps::nodeInfo),
invokeRpc(CordaRPCOps::stateMachinesFeed))
invokeRpc(CordaRPCOps::stateMachineRecordedTransactionMappingSnapshot))
)
private fun connectRpc(node: NodeHandle): CordaRPCOps {
val client = CordaRPCClient(node.rpcAddress)
return client.start(testUser.username, testUser.password).proxy
}
}
private fun setup(inMemoryDB: Boolean = true, testBlock: InternalDriverDSL.() -> Unit) {
val portAllocation = PortAllocation.Incremental(10000)
internalDriver(
@ -62,8 +75,9 @@ class InstrumentationTest : IntegrationTest() {
NotarySpec(
DUMMY_NOTARY_NAME,
rpcUsers = listOf(testUser),
cluster = DummyClusterSpec(clusterSize = 1, compositeServiceIdentity = compositeIdentity))
)
cluster = DummyClusterSpec(clusterSize = 1))
),
inMemoryDB = inMemoryDB
) {
bytemanPort = portAllocation.nextPort()
@ -73,18 +87,6 @@ class InstrumentationTest : IntegrationTest() {
assertThat(notaryNodes).hasSize(1)
for (notaryNode in notaryNodes) {
assertThat(notaryNode.nodeInfo.legalIdentities).contains(raftNotaryIdentity)
}
// Check that each notary has different identity as a node.
assertThat(notaryNodes.flatMap { it.nodeInfo.legalIdentities - raftNotaryIdentity }.toSet()).hasSameSizeAs(notaryNodes)
// Connect to Alice and the notaries
fun connectRpc(node: NodeHandle): CordaRPCOps {
val client = CordaRPCClient(node.rpcAddress)
return client.start("test", "test").proxy
}
aliceProxy = connectRpc(alice)
testBlock()
@ -92,14 +94,14 @@ class InstrumentationTest : IntegrationTest() {
}
@Test
fun test() {
fun testRulesInstall() {
setup {
val submit = Submit("localhost", bytemanPort)
logger.info("Byteman agent version used: " + submit.agentVersion)
logger.info("Remote system properties: " + submit.listSystemProperties())
val COUNTDOWN_REACHED_STR = "Countdown reached"
val countDownReached = "Countdown reached"
val deploymentOutcome = submit.addScripts(listOf(ScriptText("My test script", """
RULE CashIssue invocation logging
CLASS net.corda.finance.flows.CashIssueFlow
@ -130,11 +132,11 @@ CLASS net.corda.finance.flows.CashPaymentFlow
METHOD call
AT EXIT
IF countDown("paymentCounter")
DO throw new java.lang.IllegalStateException("$COUNTDOWN_REACHED_STR")
DO throw new java.lang.IllegalStateException("$countDownReached")
ENDRULE
""")))
assertThat(deploymentOutcome).contains("install rule Decrement CountDown and throw")
assertThat(submit.listAllRules()).contains(COUNTDOWN_REACHED_STR)
assertThat(submit.listAllRules()).contains(countDownReached)
// Issue 100 pounds, then pay ourselves 10x5 pounds
issueCash(100.POUNDS)
@ -145,7 +147,7 @@ ENDRULE
}
// 11th payment should fail as countDown has been reached
assertThatThrownBy { paySelf(5.POUNDS) }.hasMessageContaining(COUNTDOWN_REACHED_STR)
assertThatThrownBy { paySelf(5.POUNDS) }.hasMessageContaining(countDownReached)
}
}
@ -153,7 +155,60 @@ ENDRULE
aliceProxy.startFlow(::CashIssueFlow, amount, OpaqueBytes.of(0), raftNotaryIdentity).returnValue.getOrThrow()
}
private fun paySelf(amount: Amount<Currency>) {
aliceProxy.startFlow(::CashPaymentFlow, amount, alice.nodeInfo.singleIdentity()).returnValue.getOrThrow()
private fun paySelf(amount: Amount<Currency>) = aliceProxy.startFlow(::CashPaymentFlow, amount, alice.nodeInfo.singleIdentity()).returnValue.getOrThrow()
@Test
fun testNodeRestart() {
setup(inMemoryDB = false) {
val submit = Submit("localhost", bytemanPort)
val deploymentOutcome = submit.addScripts(listOf(ScriptText("My restart script", """
RULE Create CountDown
CLASS net.corda.finance.flows.CashIssueFlow
METHOD call
AT EXIT
IF TRUE
DO createCountDown("paymentCounter", 10)
ENDRULE
RULE Decrement CountDown and kill
CLASS net.corda.finance.flows.CashPaymentFlow
METHOD call
AT INVOKE net.corda.core.node.ServiceHub.signInitialTransaction
IF countDown("paymentCounter")
DO debug("Killing JVM now!"); killJVM()
ENDRULE
""")))
assertThat(deploymentOutcome).contains("install rule Decrement CountDown and kill")
assertThat(submit.listAllRules()).contains("killJVM")
// Issue 100 pounds, then pay ourselves 10x5 pounds
issueCash(100.POUNDS)
// Submit 10 successful payments
val successfulPayments = (1..10).map { paySelf(5.POUNDS) }
// 11th payment should be done against killed JVM
assertThatThrownBy { paySelf(5.POUNDS) }.isInstanceOf(RPCException::class.java).hasMessageContaining("Connection failure detected")
// Alice node should no longer be responsive or alive
assertFalse((alice as OutOfProcess).process.isAlive)
assertThatThrownBy { aliceProxy.nodeInfo() }.isInstanceOf(RPCException::class.java).hasMessageContaining("RPC server is not available")
// Restart node
alice.stop() // this should perform un-registration in the NetworkMap
alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(testUser), bytemanPort = bytemanPort).getOrThrow()
aliceProxy = connectRpc(alice)
// Check that all 11 transactions are present
val snapshot = aliceProxy.stateMachineRecordedTransactionMappingSnapshot()
assertEquals(11, snapshot.size)
Assertions.assertThat(snapshot.map { it.transactionId }.toSet()).containsAll(successfulPayments.map { it.stx.id })
// Make an extra payment to ensure that node is operational
val anotherPaymentOutcome = paySelf(5.POUNDS)
assertEquals(500, (anotherPaymentOutcome.stx.tx.outputStates.first() as Cash.State).amount.quantity)
}
}
}

View File

@ -13,6 +13,8 @@ package net.corda.testing.node.internal
import net.corda.core.internal.div
import java.io.File
import java.nio.file.Path
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
object ProcessUtilities {
inline fun <reified C : Any> startJavaProcess(
@ -48,8 +50,11 @@ object ProcessUtilities {
inheritIO()
environment()["CLASSPATH"] = classPath.joinToString(File.pathSeparator)
if (workingDirectory != null) {
redirectError((workingDirectory / "$className.stderr.log").toFile())
redirectOutput((workingDirectory / "$className.stdout.log").toFile())
// Timestamp may be handy if the same process started, killed and then re-started. Without timestamp
// StdOut and StdErr will be overwritten.
val timestamp = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HHmmss"))
redirectError((workingDirectory / "$className.stderr.$timestamp.log").toFile())
redirectOutput((workingDirectory / "$className.stdout.$timestamp.log").toFile())
directory(workingDirectory.toFile())
}
}.start()