ENT-1986: Extend node HA test scenario to support LinearState. (#893)

* ENT-1986: Introduce new parameter `scenarioType`

* ENT-1986: Introduce `AbstractScenarioRunner` and refactor all the reusable bit of functionality into this class.

* ENT-1986: Create `LinearStateScenarioRunner` to use `perftestcordapp` flows.

* ENT-1986: More improvements to make Linear scenario runner work.
This commit is contained in:
Viktor Kolomeyko 2018-05-31 11:14:59 +01:00 committed by GitHub
parent 27e688e2c8
commit 63d357f62b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 249 additions and 133 deletions

View File

@ -2,7 +2,7 @@
<configuration default="false" name="HA Testing" type="JetRunConfigurationType" factoryName="Kotlin">
<module name="ha-testing_main" />
<option name="VM_PARAMETERS" value="-ea -Dlog4j2.debug=true" />
<option name="PROGRAM_PARAMETERS" value="--haNodeRpcAddress 52.174.253.60:10003 --haNodeRpcUserName corda --haNodeRpcPassword corda_is_awesome --normalNodeRpcAddress ha-testing-vm-d.westeurope.cloudapp.azure.com:10013 --normalNodeRpcUserName corda --normalNodeRpcPassword corda_is_awesome --iterationsCount 12" />
<option name="PROGRAM_PARAMETERS" value="--haNodeRpcAddress 52.174.253.60:10003 --haNodeRpcUserName corda --haNodeRpcPassword corda_is_awesome --normalNodeRpcAddress ha-testing-vm-d.westeurope.cloudapp.azure.com:10013 --normalNodeRpcUserName corda --normalNodeRpcPassword corda_is_awesome --scenarioType LinearState --iterationsCount 200" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
<option name="ALTERNATIVE_JRE_PATH" />
<option name="PASS_PARENT_ENVS" value="true" />

View File

@ -38,4 +38,5 @@ Main method has the following parameters:
| --haNodeRpcAddress <<host:port>> | High Available Node RPC address |
| --haNodeRpcUserName <free_form_text> | High Available Node RPC user name |
| --haNodeRpcPassword <free_form_text> | High Available Node RPC password |
| --scenarioType <free_form_text> | Type of scenario to run. Currently supported values: `Cash`, `LinearState` |
| --iterationsCount [positive_integer] | Number of iteration to execute |

View File

@ -46,6 +46,7 @@ dependencies {
cordaCompile project(":client:rpc")
cordaCompile project(":finance")
cordaCompile project(":perftestcordapp")
// Logging
compile "org.slf4j:log4j-over-slf4j:$slf4j_version"

View File

@ -0,0 +1,73 @@
package net.corda.haTesting
import joptsimple.OptionSet
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
abstract class AbstractScenarioRunner(options: OptionSet) {
companion object {
private val logger = contextLogger()
@JvmStatic
protected fun establishRpcConnection(endpoint: NetworkHostAndPort, user: String, password: String,
onError: (Throwable) -> CordaRPCOps =
{
logger.error("establishRpcConnection", it)
throw it
}): CordaRPCOps {
try {
val retryInterval = 5.seconds
val client = CordaRPCClient(endpoint,
object : CordaRPCClientConfiguration {
override val connectionMaxRetryInterval = retryInterval
}
)
val connection = client.start(user, password)
return connection.proxy
} catch (th: Throwable) {
return onError(th)
}
}
}
protected val haNodeRpcOps: CordaRPCOps
protected val normalNodeRpcOps: CordaRPCOps
protected val haNodeParty: Party
protected val normalNodeParty: Party
protected val notary: Party
protected val iterCount: Int
init {
haNodeRpcOps = establishRpcConnection(
options.valueOf(MandatoryCommandLineArguments.haNodeRpcAddress.name) as NetworkHostAndPort,
options.valueOf(MandatoryCommandLineArguments.haNodeRpcUserName.name) as String,
options.valueOf(MandatoryCommandLineArguments.haNodeRpcPassword.name) as String
)
haNodeParty = haNodeRpcOps.nodeInfo().legalIdentities.first()
normalNodeRpcOps = establishRpcConnection(
options.valueOf(MandatoryCommandLineArguments.normalNodeRpcAddress.name) as NetworkHostAndPort,
options.valueOf(MandatoryCommandLineArguments.normalNodeRpcUserName.name) as String,
options.valueOf(MandatoryCommandLineArguments.normalNodeRpcPassword.name) as String
)
normalNodeParty = normalNodeRpcOps.nodeInfo().legalIdentities.first()
notary = normalNodeRpcOps.notaryIdentities().first()
iterCount = options.valueOf(OptionalCommandLineArguments.iterationsCount.name) as Int? ?: 10
logger.info("Total number of iterations to run: $iterCount")
}
protected fun scenarioInitialized() {
// TODO: start a daemon thread which will talk to HA Node and installs termination schedule to it
// The daemon will monitor availability of HA Node and as soon as it is down and then back-up it will install
// the next termination schedule.
}
protected fun scenarioCompleted() {
// TODO: stop the daemon and dispose any other resources
}
}

View File

@ -0,0 +1,91 @@
package net.corda.haTesting
import joptsimple.OptionSet
import net.corda.core.contracts.Amount
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.Party
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.vaultQueryBy
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.GBP
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.AbstractCashFlow
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import java.util.concurrent.Callable
// Responsible for executing test scenario for 2 nodes and verifying the outcome
class CashScenarioRunner(options: OptionSet) : AbstractScenarioRunner(options), Callable<Boolean> {
companion object {
private val logger = contextLogger()
}
override fun call(): Boolean {
// It is assumed that normal Node is capable of issuing.
// Create a unique tag for this issuance round
val issuerBankPartyRef = SecureHash.randomSHA256().bytes
val currency = GBP
val issueAmount = Amount(iterCount * 100L, currency)
logger.info("Trying: issue to normal, amount: $issueAmount")
val issueOutcome = normalNodeRpcOps.startFlow(::CashIssueFlow, issueAmount, OpaqueBytes(issuerBankPartyRef), notary).returnValue.getOrThrow()
logger.info("Success: issue to normal, amount: $issueAmount, TX ID: ${issueOutcome.stx.id}")
scenarioInitialized()
try {
val initialAmount: Long = iterCount * 10L
require(initialAmount > iterCount)
val allPayments = mutableListOf<AbstractCashFlow.Result>()
for (iterNo in 1..iterCount) {
val transferQuantity = issueAmount.quantity
logger.info("#$iterNo.1 - Trying: normal -> ha, amount: ${transferQuantity}p")
val firstPayment = normalNodeRpcOps.startFlow(::CashPaymentFlow, Amount(transferQuantity, currency), haNodeParty, false).returnValue.getOrThrow()
logger.info("#$iterNo.2 - Success: normal -> ha, amount: ${transferQuantity}p, TX ID: ${firstPayment.stx.id}")
allPayments.add(firstPayment)
val transferBackQuantity = transferQuantity
logger.info("#$iterNo.3 - Trying: ha -> normal, amount: ${transferBackQuantity}p")
val secondPayment = haNodeRpcOps.startFlowWithRetryAndGet(::CashPaymentFlow, Amount(transferBackQuantity, currency), normalNodeParty, false)
logger.info("#$iterNo.4 - Success: ha -> normal, amount: ${transferBackQuantity}p, TX ID: ${secondPayment.stx.id}")
allPayments.add(secondPayment)
}
// Verify
assert(allPayments.size == (iterCount * 2)) { "Expected number of payments is ${iterCount * 2}, actual number of payments: ${allPayments.size}" }
val criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)
// TODO: Potentially implement paging validation logic for bigger data sets.
val pageSpecification = PageSpecification(pageNumber = 1, pageSize = Int.MAX_VALUE)
val normalStates = normalNodeRpcOps.vaultQueryBy<Cash.State>(criteria, pageSpecification)
val haStates = haNodeRpcOps.vaultQueryByWithRetry<Cash.State>(criteria, pageSpecification)
return verifyPaymentsAndStatesTally(allPayments, mapOf(normalNodeParty to normalStates, haNodeParty to haStates))
} finally {
scenarioCompleted()
}
}
private fun verifyPaymentsAndStatesTally(allPayments: MutableList<AbstractCashFlow.Result>, statesByParty: Map<Party, Vault.Page<Cash.State>>): Boolean {
val hashesByParty: Map<Party, Set<SecureHash>> = statesByParty.mapValues { entry -> entry.value.states.map { state -> state.ref.txhash }.toSet() }
// Check that TX reference is present in both set of states.
allPayments.forEach { payment ->
val transactionId = payment.stx.id
val recipient = payment.recipient
val allStatesForParty = hashesByParty[recipient] ?: throw IllegalArgumentException("Cannot find states for party: $recipient in transaction: $transactionId")
// Recipient definitely should have hash of a transaction in its states.
assert(transactionId in allStatesForParty) { "States for party: $recipient should contain reference: $transactionId" }
}
return true
}
}

View File

@ -0,0 +1,49 @@
package net.corda.haTesting
import com.r3.corda.enterprise.perftestcordapp.contracts.LinearStateBatchNotariseContract
import com.r3.corda.enterprise.perftestcordapp.flows.LinearStateBatchNotariseFlow
import joptsimple.OptionSet
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.contextLogger
import java.util.concurrent.Callable
// Responsible for executing test scenario for a single node executing `LinearStateBatchNotariseFlow` and verifying the results
class LinearStateScenarioRunner(options: OptionSet) : AbstractScenarioRunner(options), Callable<Boolean> {
companion object {
private val logger = contextLogger()
}
override fun call(): Boolean {
scenarioInitialized()
try {
val results = mutableListOf<LinearStateBatchNotariseFlow.Result>()
for (iterNo in 1..iterCount) {
logger.info("#$iterNo.1 - Trying: Linear state on HA")
val result = haNodeRpcOps.startFlowWithRetryAndGet(::LinearStateBatchNotariseFlow, notary, 1, 1, true, 1000.0)
logger.info("#$iterNo.2 - Done: Linear state on HA")
results.add(result)
}
// Verify
assert(results.size == iterCount) { "Expected number of results is $iterCount, actual number of payments: ${results.size}" }
val criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)
// TODO: Potentially implement paging validation logic for bigger data sets.
val pageSpecification = PageSpecification(pageNumber = 1, pageSize = Int.MAX_VALUE)
val haStates: Vault.Page<LinearStateBatchNotariseContract.State> = haNodeRpcOps.vaultQueryByWithRetry(criteria, pageSpecification)
return verifyResultsAndStatesTally(results, haStates)
} finally {
scenarioCompleted()
}
}
private fun verifyResultsAndStatesTally(results: MutableList<LinearStateBatchNotariseFlow.Result>, states: Vault.Page<LinearStateBatchNotariseContract.State>): Boolean {
// Unfortunately, there is absolutely nothing in `LinearStateBatchNotariseFlow.Result` which can link it to the original transaction
return true
}
}

View File

@ -4,6 +4,7 @@ import joptsimple.OptionParser
import joptsimple.ValueConverter
import net.corda.core.utilities.NetworkHostAndPort
import org.slf4j.LoggerFactory
import java.util.concurrent.Callable
fun main(args: Array<String>) {
@ -20,8 +21,15 @@ fun main(args: Array<String>) {
parser.printHelpOn(System.err)
throw th
}
val scenarioType = ScenarioType.valueOf(options.valueOf(MandatoryCommandLineArguments.scenarioType.name) as String)
val scenarioRunner: Callable<Boolean> = when(scenarioType) {
ScenarioType.Cash -> CashScenarioRunner(options)
ScenarioType.LinearState -> LinearStateScenarioRunner(options)
}
try {
require(ScenarioRunner(options).call()) { "Scenario should pass" }
require(scenarioRunner.call()) { "Scenario should pass" }
System.exit(0)
} catch (th: Throwable) {
logger.error("Exception in main()", th)
@ -42,12 +50,18 @@ enum class MandatoryCommandLineArguments(override val valueConverter: ValueConve
normalNodeRpcAddress(NetworkHostAndPortValueConverter, "Normal Node RPC address"),
normalNodeRpcUserName(StringValueConverter, "Normal Node RPC user name"),
normalNodeRpcPassword(StringValueConverter, "Normal Node RPC password"),
scenarioType(StringValueConverter, "Type of scenario to run"),
}
enum class OptionalCommandLineArguments(override val valueConverter: ValueConverter<out Any>, override val description: String) : CommandLineArguments{
iterationsCount(PositiveIntValueConverter, "Number of iteration to execute"),
}
private enum class ScenarioType {
Cash,
LinearState,
}
private object PositiveIntValueConverter : ValueConverter<Int> {
override fun convert(value: String): Int {
val result = value.toInt()

View File

@ -35,6 +35,24 @@ inline fun <T, A, B, C, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithRetry
}
inline fun <T, A, B, C, D, E, reified R : FlowLogic<T>> CordaRPCOps.startFlowWithRetryAndGet(
@Suppress("UNUSED_PARAMETER") crossinline
flowConstructor: (A, B, C, D, E) -> R,
arg0: A,
arg1: B,
arg2: C,
arg3: D,
arg4: E,
retryInterval: Duration = 5.seconds,
giveUpInterval: Duration = 5.minutes
): T {
return arithmeticBackoff(retryInterval, giveUpInterval, "startFlowWithRetryAndGet") {
this.startFlow(flowConstructor, arg0, arg1, arg2, arg3, arg4).returnValue.getOrThrow()
}
}
fun <T> arithmeticBackoff(retryInterval: Duration, giveUpInterval: Duration, meaningfulDescription: String, op: () -> T): T {
val start = System.currentTimeMillis()
var iterCount = 0

View File

@ -1,131 +0,0 @@
package net.corda.haTesting
import joptsimple.OptionSet
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.contracts.Amount
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.vaultQueryBy
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.*
import net.corda.finance.GBP
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.AbstractCashFlow
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import java.util.concurrent.Callable
// Responsible for executing test scenario for 2 nodes and verifying the outcome
class ScenarioRunner(private val options: OptionSet) : Callable<Boolean> {
companion object {
private val logger = contextLogger()
private fun establishRpcConnection(endpoint: NetworkHostAndPort, user: String, password: String,
onError: (Throwable) -> CordaRPCOps =
{
logger.error("establishRpcConnection", it)
throw it
}): CordaRPCOps {
try {
val retryInterval = 5.seconds
val client = CordaRPCClient(endpoint,
object : CordaRPCClientConfiguration {
override val connectionMaxRetryInterval = retryInterval
}
)
val connection = client.start(user, password)
return connection.proxy
} catch (th: Throwable) {
return onError(th)
}
}
}
override fun call(): Boolean {
val haNodeRpcOps = establishRpcConnection(
options.valueOf(MandatoryCommandLineArguments.haNodeRpcAddress.name) as NetworkHostAndPort,
options.valueOf(MandatoryCommandLineArguments.haNodeRpcUserName.name) as String,
options.valueOf(MandatoryCommandLineArguments.haNodeRpcPassword.name) as String
)
val haNodeParty = haNodeRpcOps.nodeInfo().legalIdentities.first()
val normalNodeRpcOps = establishRpcConnection(
options.valueOf(MandatoryCommandLineArguments.normalNodeRpcAddress.name) as NetworkHostAndPort,
options.valueOf(MandatoryCommandLineArguments.normalNodeRpcUserName.name) as String,
options.valueOf(MandatoryCommandLineArguments.normalNodeRpcPassword.name) as String
)
val normalNodeParty = normalNodeRpcOps.nodeInfo().legalIdentities.first()
val notary = normalNodeRpcOps.notaryIdentities().first()
val iterCount = options.valueOf(OptionalCommandLineArguments.iterationsCount.name) as Int? ?: 10
logger.info("Total number of iterations to run: $iterCount")
// It is assumed that normal Node is capable of issuing.
// Create a unique tag for this issuance round
val issuerBankPartyRef = SecureHash.randomSHA256().bytes
val currency = GBP
val issueAmount = Amount(iterCount * 100L, currency)
logger.info("Trying: issue to normal, amount: $issueAmount")
val issueOutcome = normalNodeRpcOps.startFlow(::CashIssueFlow, issueAmount, OpaqueBytes(issuerBankPartyRef), notary).returnValue.getOrThrow()
logger.info("Success: issue to normal, amount: $issueAmount, TX ID: ${issueOutcome.stx.id}")
// TODO start a daemon thread which will talk to HA Node and installs termination schedule to it
// The daemon will monitor availability of HA Node and as soon as it is down and then back-up it will install
// the next termination schedule.
val initialAmount: Long = iterCount * 10L
require(initialAmount > iterCount)
val allPayments = mutableListOf<AbstractCashFlow.Result>()
for(iterNo in 1 .. iterCount) {
val transferQuantity = issueAmount.quantity
logger.info("#$iterNo.1 - Trying: normal -> ha, amount: ${transferQuantity}p")
val firstPayment = normalNodeRpcOps.startFlow(::CashPaymentFlow, Amount(transferQuantity, currency), haNodeParty, false).returnValue.getOrThrow()
logger.info("#$iterNo.2 - Success: normal -> ha, amount: ${transferQuantity}p, TX ID: ${firstPayment.stx.id}")
allPayments.add(firstPayment)
val transferBackQuantity = transferQuantity
logger.info("#$iterNo.3 - Trying: ha -> normal, amount: ${transferBackQuantity}p")
// TODO: HA node may well have a period of instability, therefore the following RPC posting has to be done in re-try fashion.
val secondPayment = haNodeRpcOps.startFlowWithRetryAndGet(::CashPaymentFlow, Amount(transferBackQuantity, currency), normalNodeParty, false)
logger.info("#$iterNo.4 - Success: ha -> normal, amount: ${transferBackQuantity}p, TX ID: ${secondPayment.stx.id}")
allPayments.add(secondPayment)
}
// Verify
assert(allPayments.size == (iterCount * 2)) { "Expected number of payments is ${iterCount * 2}, actual number of payments: ${allPayments.size}" }
val criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)
// TODO: Potentially implement paging validation logic for bigger data sets.
val pageSpecification = PageSpecification(pageNumber = 1, pageSize = Int.MAX_VALUE)
val normalStates = normalNodeRpcOps.vaultQueryBy<Cash.State>(criteria, pageSpecification)
val haStates = haNodeRpcOps.vaultQueryByWithRetry<Cash.State>(criteria, pageSpecification)
return verifyPaymentsAndStatesTally(allPayments, mapOf(normalNodeParty to normalStates, haNodeParty to haStates))
}
private fun verifyPaymentsAndStatesTally(allPayments: MutableList<AbstractCashFlow.Result>, statesByParty: Map<Party, Vault.Page<Cash.State>>): Boolean {
val hashesByParty: Map<Party, Set<SecureHash>> = statesByParty.mapValues { entry -> entry.value.states.map { state -> state.ref.txhash }.toSet() }
// Check that TX reference is present in both set of states.
allPayments.forEach { payment ->
val transactionId = payment.stx.id
val recipient = payment.recipient
val allStatesForParty = hashesByParty[recipient] ?: throw IllegalArgumentException("Cannot find states for party: $recipient in transaction: $transactionId")
// Recipient definitely should have hash of a transaction in its states.
assert(transactionId in allStatesForParty) { "States for party: $recipient should contain reference: $transactionId" }
}
return true
}
}