ENT-1396: Further improvements to Node HA Testing script. (#873)

* CORDA-1506: Cherry-pick fix from Corda OS.

* ENT-1396: Add verification logic to ensure that transaction reflected on both sides.

* ENT-1396: First stub on HA re-connect logic.

* ENT-1396: Ensure we re-connect correctly.

* ENT-1396: Improve the robustness of the scenario runner.

* ENT-1396: Perform Vault query in re-tryable fashion.

* ENT-1396: Larger iterations count.

* ENT-1396: Introduce "iterationsCount" as parameter.

* ENT-1396: ReadMe document.
This commit is contained in:
Viktor Kolomeyko 2018-05-24 14:31:38 +01:00 committed by GitHub
parent e0bb00eebb
commit ee094ef129
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 178 additions and 20 deletions

View File

@ -1,8 +1,8 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="HA Testing" type="JetRunConfigurationType" factoryName="Kotlin">
<module name="ha-testing_main" />
<option name="VM_PARAMETERS" value="-Dlog4j2.debug=true" />
<option name="PROGRAM_PARAMETERS" value="--haNodeRpcAddress 52.174.253.60:10003 --haNodeRpcUserName corda --haNodeRpcPassword corda_is_awesome --normalNodeRpcAddress ha-testing-vm-d.westeurope.cloudapp.azure.com:10013 --normalNodeRpcUserName corda --normalNodeRpcPassword corda_is_awesome" />
<option name="VM_PARAMETERS" value="-ea -Dlog4j2.debug=true" />
<option name="PROGRAM_PARAMETERS" value="--haNodeRpcAddress 52.174.253.60:10003 --haNodeRpcUserName corda --haNodeRpcPassword corda_is_awesome --normalNodeRpcAddress ha-testing-vm-d.westeurope.cloudapp.azure.com:10013 --normalNodeRpcUserName corda --normalNodeRpcPassword corda_is_awesome --iterationsCount 12" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
<option name="ALTERNATIVE_JRE_PATH" />
<option name="PASS_PARENT_ENVS" value="true" />

View File

@ -0,0 +1,41 @@
# Introduction
This module provides a facility to perform High Availability(HA) testing of Corda Nodes.
It assumes that there is an environment running on remote hosts to which it is possible
communicate via RPC. Also, assumption is made that Finance CorDapp is installed on the nodes.
# Testing principle
The test script running on a local machine and communicates to remote nodes.
There are two nodes in scope:
* Normal node - a stable node that is meant to remain available 100% of the time during test script execution;
* HA node - which may have windows of instability through scenario execution, but meant to recover
and continue serving RPC requests and interact with its peers via P2P.
Main method of this module performs the following actions:
1. Parses incoming parameters;
2. Establishes RPC connection to Normal and HA nodes;
3. Performs a series of flow postings;
4. Whilst flow postings running, triggers termination of the HA node cluster to simulate disaster scenario
(**Note:** This is a target/future state, subject to: https://r3-cev.atlassian.net/browse/ENT-1967 being completed first.
At the time of writing disaster scenario will have to be simulated by the operator manually.);
5. When RPC connection to HA Node is lost, re-establishes RPC communication and attempts to drive test scenario to the end;
6. Once all the postings successfully processed, performs validation of the transactions/states to ensure consistency;
6. Returns exit code of `0` in case success or `1` in case of a failure.
At the moment multi-threaded aspect of testing is not in scope and all the testing performed from a single thread from
test script side. This means that at any one time there can only be a **single** transaction "in flight".
# Running test script
Main method has the following parameters:
| Option | Description |
| ------ | ----------- |
| --normalNodeRpcAddress <<host:port>> | Normal Node RPC address |
| --normalNodeRpcUserName <free_form_text> | Normal Node RPC user name |
| --normalNodeRpcPassword <free_form_text> | Normal Node RPC password |
| --haNodeRpcAddress <<host:port>> | High Available Node RPC address |
| --haNodeRpcUserName <free_form_text> | High Available Node RPC user name |
| --haNodeRpcPassword <free_form_text> | High Available Node RPC password |
| --iterationsCount [positive_integer] | Number of iteration to execute |

View File

@ -11,6 +11,7 @@ fun main(args: Array<String>) {
val parser = OptionParser()
MandatoryCommandLineArguments.values().forEach { argSpec -> parser.accepts(argSpec.name).withRequiredArg().withValuesConvertedBy(argSpec.valueConverter).describedAs(argSpec.description) }
OptionalCommandLineArguments.values().forEach { argSpec -> parser.accepts(argSpec.name).withOptionalArg().withValuesConvertedBy(argSpec.valueConverter).describedAs(argSpec.description) }
val options = parser.parse(*args)
try {
@ -28,7 +29,13 @@ fun main(args: Array<String>) {
}
}
enum class MandatoryCommandLineArguments(val valueConverter: ValueConverter<out Any>, val description: String) {
interface CommandLineArguments {
val name: String
val valueConverter: ValueConverter<out Any>
val description: String
}
enum class MandatoryCommandLineArguments(override val valueConverter: ValueConverter<out Any>, override val description: String) : CommandLineArguments {
haNodeRpcAddress(NetworkHostAndPortValueConverter, "High Available Node RPC address"),
haNodeRpcUserName(StringValueConverter, "High Available Node RPC user name"),
haNodeRpcPassword(StringValueConverter, "High Available Node RPC password"),
@ -37,12 +44,28 @@ enum class MandatoryCommandLineArguments(val valueConverter: ValueConverter<out
normalNodeRpcPassword(StringValueConverter, "Normal Node RPC password"),
}
enum class OptionalCommandLineArguments(override val valueConverter: ValueConverter<out Any>, override val description: String) : CommandLineArguments{
iterationsCount(PositiveIntValueConverter, "Number of iteration to execute"),
}
private object PositiveIntValueConverter : ValueConverter<Int> {
override fun convert(value: String): Int {
val result = value.toInt()
require(result > 0) { "Positive value is expected" }
return result
}
override fun valueType(): Class<out Int> = Int::class.java
override fun valuePattern(): String = "positive_integer"
}
private object StringValueConverter : ValueConverter<String> {
override fun convert(value: String) = value
override fun valueType(): Class<out String> = String::class.java
override fun valuePattern(): String = "<free_form_text>"
override fun valuePattern(): String = "free_form_text"
}
private object NetworkHostAndPortValueConverter : ValueConverter<NetworkHostAndPort> {
@ -50,5 +73,5 @@ private object NetworkHostAndPortValueConverter : ValueConverter<NetworkHostAndP
override fun valueType(): Class<out NetworkHostAndPort> = NetworkHostAndPort::class.java
override fun valuePattern(): String = "<host>:<port>"
override fun valuePattern(): String = "host:port"
}

View File

@ -0,0 +1,63 @@
package net.corda.haTesting
import net.corda.client.rpc.RPCException
import net.corda.core.contracts.ContractState
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
private val logger: Logger = LoggerFactory.getLogger("RpcHelper")
inline fun <T, A, B, C, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithRetryAndGet(
@Suppress("UNUSED_PARAMETER") crossinline
flowConstructor: (A, B, C) -> R,
arg0: A,
arg1: B,
arg2: C,
retryInterval: Duration = 5.seconds,
giveUpInterval: Duration = 5.minutes
): T {
return arithmeticBackoff(retryInterval, giveUpInterval, "startFlowWithRetryAndGet") {
this.startFlow(flowConstructor, arg0, arg1, arg2).returnValue.getOrThrow()
}
}
fun <T> arithmeticBackoff(retryInterval: Duration, giveUpInterval: Duration, meaningfulDescription: String, op: () -> T): T {
val start = System.currentTimeMillis()
var iterCount = 0
do {
try {
iterCount++
return op()
} catch (ex: RPCException) {
logger.warn("Exception $meaningfulDescription, iteration #$iterCount", ex)
Thread.sleep(iterCount * retryInterval.toMillis())
}
} while ((System.currentTimeMillis() - start) < giveUpInterval.toMillis())
throw IllegalStateException("$meaningfulDescription - failed, total number of times tried: $iterCount")
}
inline fun <reified T : ContractState> CordaRPCOps.vaultQueryByWithRetry(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(),
paging: PageSpecification = PageSpecification(),
sorting: Sort = Sort(emptySet()),
retryInterval: Duration = 5.seconds,
giveUpInterval: Duration = 5.minutes): Vault.Page<T> {
return arithmeticBackoff(retryInterval, giveUpInterval, "vaultQueryByWithRetry") {
this.vaultQueryBy(criteria, paging, sorting, T::class.java)
}
}

View File

@ -7,8 +7,14 @@ import net.corda.core.contracts.Amount
import net.corda.core.crypto.SecureHash
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.vaultQueryBy
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.*
import net.corda.finance.GBP
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.AbstractCashFlow
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import java.util.concurrent.Callable
@ -58,11 +64,14 @@ class ScenarioRunner(private val options: OptionSet) : Callable<Boolean> {
val notary = normalNodeRpcOps.notaryIdentities().first()
val iterCount = options.valueOf(OptionalCommandLineArguments.iterationsCount.name) as Int? ?: 10
logger.info("Total number of iterations to run: $iterCount")
// It is assumed that normal Node is capable of issuing.
// Create a unique tag for this issuance round
val issuerBankPartyRef = SecureHash.randomSHA256().bytes
val currency = GBP
val amount = Amount(1_000_000, currency)
val amount = Amount(iterCount * 100L, currency)
logger.info("Trying: issue to normal, amount: $amount")
val issueOutcome = normalNodeRpcOps.startFlow(::CashIssueFlow, amount, OpaqueBytes(issuerBankPartyRef), notary).returnValue.getOrThrow()
logger.info("Success: issue to normal, amount: $amount, TX ID: ${issueOutcome.stx.tx.id}")
@ -71,26 +80,48 @@ class ScenarioRunner(private val options: OptionSet) : Callable<Boolean> {
// The daemon will monitor availability of HA Node and as soon as it is down and then back-up it will install
// the next termination schedule.
val iterCount = 10
val initialAmount: Long = 1000
val initialAmount: Long = iterCount * 10L
require(initialAmount > iterCount)
for(iterNo in 0 until iterCount) {
val transferQuantity = initialAmount - iterNo
logger.info("Trying: normal -> ha, amount: ${transferQuantity}p")
val firstPayment = normalNodeRpcOps.startFlow(::CashPaymentFlow, Amount(transferQuantity, currency), haNodeParty, true).returnValue.getOrThrow()
logger.info("Success: normal -> ha, amount: ${transferQuantity}p, TX ID: ${firstPayment.stx.tx.id}")
val allPayments = mutableListOf<AbstractCashFlow.Result>()
logger.info("Trying: ha -> normal, amount: ${transferQuantity - 1}p")
for(iterNo in 1 .. iterCount) {
val transferQuantity = initialAmount - iterNo + 1
logger.info("#$iterNo.1 - Trying: normal -> ha, amount: ${transferQuantity}p")
val firstPayment = normalNodeRpcOps.startFlow(::CashPaymentFlow, Amount(transferQuantity, currency), haNodeParty, true).returnValue.getOrThrow()
logger.info("#$iterNo.2 - Success: normal -> ha, amount: ${transferQuantity}p, TX ID: ${firstPayment.stx.tx.id}")
allPayments.add(firstPayment)
logger.info("#$iterNo.3 - Trying: ha -> normal, amount: ${transferQuantity - 1}p")
// TODO: HA node may well have a period of instability, therefore the following RPC posting has to be done in re-try fashion.
val secondPayment = haNodeRpcOps.startFlow(::CashPaymentFlow, Amount(transferQuantity - 1, currency), normalNodeParty, true).returnValue.getOrThrow()
logger.info("Success: ha -> normal, amount: ${transferQuantity - 1}p, TX ID: ${secondPayment.stx.tx.id}")
val secondPayment = haNodeRpcOps.startFlowWithRetryAndGet(::CashPaymentFlow, Amount(transferQuantity - 1, currency), normalNodeParty, true)
logger.info("#$iterNo.4 - Success: ha -> normal, amount: ${transferQuantity - 1}p, TX ID: ${secondPayment.stx.tx.id}")
allPayments.add(secondPayment)
}
// TODO: Verify
// Only then we confirm all the checks have passed.
return true
// Verify
assert(allPayments.size == (iterCount * 2)) { "Expected number of payments is ${iterCount * 2}, actual number of payments: ${allPayments.size}" }
val criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)
// TODO: Potentially implement paging validation logic for bigger data sets.
val pageSpecification = PageSpecification(pageNumber = 1, pageSize = Int.MAX_VALUE)
val normalStates = normalNodeRpcOps.vaultQueryBy<Cash.State>(criteria, pageSpecification)
val haStates = haNodeRpcOps.vaultQueryByWithRetry<Cash.State>(criteria, pageSpecification)
return verifyPaymentsAndStatesTally(allPayments, normalStates, haStates)
}
private fun verifyPaymentsAndStatesTally(allPayments: MutableList<AbstractCashFlow.Result>, normalStates: Vault.Page<Cash.State>, haStates: Vault.Page<Cash.State>): Boolean {
val normalTxHashes = normalStates.states.map { it.ref.txhash }.toSet()
val haTxHashes = haStates.states.map { it.ref.txhash }.toSet()
// Check that TX reference is present in both set of states.
allPayments.forEach { payment ->
val outputStatesHashes = payment.stx.coreTransaction.id
// H2 database is unreliable and may not contain all the states.
//assert(normalTxHashes.contains(outputStatesHashes)) { "Normal states should contain reference: $outputStatesHashes" }
assert(haTxHashes.contains(outputStatesHashes)) { "HA states should contain reference: $outputStatesHashes" }
}
return true
}
}