diff --git a/experimental/ha-testing/src/main/kotlin/net/corda/haTesting/ScenarioRunner.kt b/experimental/ha-testing/src/main/kotlin/net/corda/haTesting/ScenarioRunner.kt index 29563fbc40..0b8b0a81fd 100644 --- a/experimental/ha-testing/src/main/kotlin/net/corda/haTesting/ScenarioRunner.kt +++ b/experimental/ha-testing/src/main/kotlin/net/corda/haTesting/ScenarioRunner.kt @@ -5,6 +5,7 @@ import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.core.contracts.Amount import net.corda.core.crypto.SecureHash +import net.corda.core.identity.Party import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.messaging.vaultQueryBy @@ -71,10 +72,10 @@ class ScenarioRunner(private val options: OptionSet) : Callable { // Create a unique tag for this issuance round val issuerBankPartyRef = SecureHash.randomSHA256().bytes val currency = GBP - val amount = Amount(iterCount * 100L, currency) - logger.info("Trying: issue to normal, amount: $amount") - val issueOutcome = normalNodeRpcOps.startFlow(::CashIssueFlow, amount, OpaqueBytes(issuerBankPartyRef), notary).returnValue.getOrThrow() - logger.info("Success: issue to normal, amount: $amount, TX ID: ${issueOutcome.stx.tx.id}") + val issueAmount = Amount(iterCount * 100L, currency) + logger.info("Trying: issue to normal, amount: $issueAmount") + val issueOutcome = normalNodeRpcOps.startFlow(::CashIssueFlow, issueAmount, OpaqueBytes(issuerBankPartyRef), notary).returnValue.getOrThrow() + logger.info("Success: issue to normal, amount: $issueAmount, TX ID: ${issueOutcome.stx.id}") // TODO start a daemon thread which will talk to HA Node and installs termination schedule to it // The daemon will monitor availability of HA Node and as soon as it is down and then back-up it will install @@ -86,16 +87,17 @@ class ScenarioRunner(private val options: OptionSet) : Callable { val allPayments = mutableListOf() for(iterNo in 1 .. iterCount) { - val transferQuantity = initialAmount - iterNo + 1 + val transferQuantity = issueAmount.quantity logger.info("#$iterNo.1 - Trying: normal -> ha, amount: ${transferQuantity}p") - val firstPayment = normalNodeRpcOps.startFlow(::CashPaymentFlow, Amount(transferQuantity, currency), haNodeParty, true).returnValue.getOrThrow() - logger.info("#$iterNo.2 - Success: normal -> ha, amount: ${transferQuantity}p, TX ID: ${firstPayment.stx.tx.id}") + val firstPayment = normalNodeRpcOps.startFlow(::CashPaymentFlow, Amount(transferQuantity, currency), haNodeParty, false).returnValue.getOrThrow() + logger.info("#$iterNo.2 - Success: normal -> ha, amount: ${transferQuantity}p, TX ID: ${firstPayment.stx.id}") allPayments.add(firstPayment) - logger.info("#$iterNo.3 - Trying: ha -> normal, amount: ${transferQuantity - 1}p") + val transferBackQuantity = transferQuantity + logger.info("#$iterNo.3 - Trying: ha -> normal, amount: ${transferBackQuantity}p") // TODO: HA node may well have a period of instability, therefore the following RPC posting has to be done in re-try fashion. - val secondPayment = haNodeRpcOps.startFlowWithRetryAndGet(::CashPaymentFlow, Amount(transferQuantity - 1, currency), normalNodeParty, true) - logger.info("#$iterNo.4 - Success: ha -> normal, amount: ${transferQuantity - 1}p, TX ID: ${secondPayment.stx.tx.id}") + val secondPayment = haNodeRpcOps.startFlowWithRetryAndGet(::CashPaymentFlow, Amount(transferBackQuantity, currency), normalNodeParty, false) + logger.info("#$iterNo.4 - Success: ha -> normal, amount: ${transferBackQuantity}p, TX ID: ${secondPayment.stx.id}") allPayments.add(secondPayment) } @@ -106,20 +108,22 @@ class ScenarioRunner(private val options: OptionSet) : Callable { val pageSpecification = PageSpecification(pageNumber = 1, pageSize = Int.MAX_VALUE) val normalStates = normalNodeRpcOps.vaultQueryBy(criteria, pageSpecification) val haStates = haNodeRpcOps.vaultQueryByWithRetry(criteria, pageSpecification) - return verifyPaymentsAndStatesTally(allPayments, normalStates, haStates) + return verifyPaymentsAndStatesTally(allPayments, mapOf(normalNodeParty to normalStates, haNodeParty to haStates)) } - private fun verifyPaymentsAndStatesTally(allPayments: MutableList, normalStates: Vault.Page, haStates: Vault.Page): Boolean { + private fun verifyPaymentsAndStatesTally(allPayments: MutableList, statesByParty: Map>): Boolean { - val normalTxHashes = normalStates.states.map { it.ref.txhash }.toSet() - val haTxHashes = haStates.states.map { it.ref.txhash }.toSet() + val hashesByParty: Map> = statesByParty.mapValues { entry -> entry.value.states.map { state -> state.ref.txhash }.toSet() } // Check that TX reference is present in both set of states. allPayments.forEach { payment -> - val outputStatesHashes = payment.stx.coreTransaction.id - // H2 database is unreliable and may not contain all the states. - //assert(normalTxHashes.contains(outputStatesHashes)) { "Normal states should contain reference: $outputStatesHashes" } - assert(haTxHashes.contains(outputStatesHashes)) { "HA states should contain reference: $outputStatesHashes" } + val transactionId = payment.stx.id + val recipient = payment.recipient + + val allStatesForParty = hashesByParty[recipient] ?: throw IllegalArgumentException("Cannot find states for party: $recipient in transaction: $transactionId") + + // Recipient definitely should have hash of a transaction in its states. + assert(transactionId in allStatesForParty) { "States for party: $recipient should contain reference: $transactionId" } } return true