Reduce occurrence of flow exception in explorer demo (#467)

* changed event generator to reduce flow exception due to wrongly generated event
This commit is contained in:
Patrick Kuo 2017-03-31 14:19:02 +01:00 committed by GitHub
parent 0bbc330a04
commit d8370a41b5
3 changed files with 76 additions and 176 deletions

View File

@ -1,10 +1,8 @@
package net.corda.client.mock package net.corda.client.mock
import net.corda.contracts.asset.Cash import net.corda.core.contracts.Amount
import net.corda.core.contracts.*
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.TransactionBuilder
import net.corda.flows.CashFlowCommand import net.corda.flows.CashFlowCommand
import java.util.* import java.util.*
@ -12,91 +10,27 @@ import java.util.*
* [Generator]s for incoming/outgoing events to/from the [WalletMonitorService]. Internally it keeps track of owned * [Generator]s for incoming/outgoing events to/from the [WalletMonitorService]. Internally it keeps track of owned
* state/ref pairs, but it doesn't necessarily generate "correct" events! * state/ref pairs, but it doesn't necessarily generate "correct" events!
*/ */
class EventGenerator(
val parties: List<Party>,
val notary: Party,
val currencies: List<Currency> = listOf(USD, GBP, CHF),
val issuers: List<Party> = parties
) {
private var vault = listOf<StateAndRef<Cash.State>>()
val issuerGenerator = class EventGenerator(val parties: List<Party>, val currencies: List<Currency>, val notary: Party) {
Generator.pickOne(issuers).combine(Generator.intRange(0, 1)) { party, ref -> party.ref(ref.toByte()) } private val partyGenerator = Generator.pickOne(parties)
private val issueRefGenerator = Generator.intRange(0, 1).map { number -> OpaqueBytes(ByteArray(1, { number.toByte() })) }
private val amountGenerator = Generator.longRange(10000, 1000000)
private val currencyGenerator = Generator.pickOne(currencies)
val currencyGenerator = Generator.pickOne(currencies) private val issueCashGenerator = amountGenerator.combine(partyGenerator, issueRefGenerator, currencyGenerator) { amount, to, issueRef, ccy ->
CashFlowCommand.IssueCash(Amount(amount, ccy), issueRef, to, notary)
val issuedGenerator = issuerGenerator.combine(currencyGenerator) { issuer, currency -> Issued(issuer, currency) }
val amountIssuedGenerator = generateAmount(1, 10000, issuedGenerator)
val publicKeyGenerator = Generator.pickOne(parties.map { it.owningKey })
val partyGenerator = Generator.pickOne(parties)
val cashStateGenerator = amountIssuedGenerator.combine(publicKeyGenerator) { amount, from ->
val builder = TransactionBuilder(notary = notary)
builder.addOutputState(Cash.State(amount, from))
builder.addCommand(Command(Cash.Commands.Issue(), amount.token.issuer.party.owningKey))
builder.toWireTransaction().outRef<Cash.State>(0)
} }
val consumedGenerator: Generator<Set<StateRef>> = Generator.frequency( private val exitCashGenerator = amountGenerator.combine(issueRefGenerator, currencyGenerator) { amount, issueRef, ccy ->
0.7 to Generator.pure(setOf()), CashFlowCommand.ExitCash(Amount(amount, ccy), issueRef)
0.3 to Generator.impure { vault }.bind { states -> }
Generator.sampleBernoulli(states, 0.2).map { someStates ->
val consumedSet = someStates.map { it.ref }.toSet()
vault = vault.filter { it.ref !in consumedSet }
consumedSet
}
}
)
val producedGenerator: Generator<Set<StateAndRef<ContractState>>> = Generator.frequency(
// 0.1 to Generator.pure(setOf())
0.9 to Generator.impure { vault }.bind { states ->
Generator.replicate(2, cashStateGenerator).map {
vault = states + it
it.toSet()
}
}
)
val issueRefGenerator = Generator.intRange(0, 1).map { number -> OpaqueBytes(ByteArray(1, { number.toByte() })) } val moveCashGenerator = amountGenerator.combine(partyGenerator, currencyGenerator) { amountIssued, recipient, currency ->
CashFlowCommand.PayCash(Amount(amountIssued, currency), recipient)
}
val amountToIssueGenerator = Generator.intRange(10000, 1000000).combine(currencyGenerator) { quantity, currency -> Amount(quantity.toLong(), currency) } val issuerGenerator = Generator.frequency(listOf(
0.1 to exitCashGenerator,
val issueCashGenerator = 0.9 to issueCashGenerator
amountToIssueGenerator.combine(partyGenerator, issueRefGenerator) { amount, to, issueRef -> ))
CashFlowCommand.IssueCash(
amount,
issueRef,
to,
notary
)
}
val moveCashGenerator =
amountIssuedGenerator.combine(partyGenerator) { amountIssued, recipient ->
CashFlowCommand.PayCash(
amount = amountIssued.withoutIssuer(),
recipient = recipient
)
}
val exitCashGenerator =
amountToIssueGenerator.combine(partyGenerator, issueRefGenerator) { amount, _, issueRef ->
CashFlowCommand.ExitCash(
amount,
issueRef
)
}
val clientCommandGenerator = Generator.frequency(
1.0 to moveCashGenerator
)
val bankOfCordaExitGenerator = Generator.frequency(
0.4 to exitCashGenerator
)
val bankOfCordaIssueGenerator = Generator.frequency(
0.6 to issueCashGenerator
)
} }

View File

@ -49,8 +49,8 @@ The Demo Nodes can be started in one of two modes:
2. Simulation 2. Simulation
In this mode Nodes will automatically commence executing commands as part of a random generation process. In this mode Nodes will automatically commence executing commands as part of a random generation process.
Issuer nodes will randomly issue, move and exit cash. The simulation start with pre-allocating chunks of cash to each of the party in 2 currencies (USD, GBP), then it enter a loop to generate random events.
Participant nodes will randomly generate spends (eg. move cash to other nodes, including issuers) In each iteration, the issuers will execute a Cash Issue or Cash Exit command (at a 9:1 ratio) and a random party will execute a move of cash to another random party.
**Windows**:: **Windows**::

View File

@ -12,13 +12,19 @@ import joptsimple.OptionParser
import net.corda.client.jfx.model.Models import net.corda.client.jfx.model.Models
import net.corda.client.jfx.model.observableValue import net.corda.client.jfx.model.observableValue
import net.corda.client.mock.EventGenerator import net.corda.client.mock.EventGenerator
import net.corda.client.mock.Generator
import net.corda.client.mock.pickOne
import net.corda.client.rpc.notUsed import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount
import net.corda.core.contracts.GBP import net.corda.core.contracts.GBP
import net.corda.core.contracts.USD import net.corda.core.contracts.USD
import net.corda.core.failure
import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowHandle
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType import net.corda.core.node.services.ServiceType
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.success
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.explorer.model.CordaViewModel import net.corda.explorer.model.CordaViewModel
@ -26,6 +32,7 @@ import net.corda.explorer.model.SettingsModel
import net.corda.explorer.views.* import net.corda.explorer.views.*
import net.corda.explorer.views.cordapps.cash.CashViewer import net.corda.explorer.views.cordapps.cash.CashViewer
import net.corda.flows.CashExitFlow import net.corda.flows.CashExitFlow
import net.corda.flows.CashFlowCommand
import net.corda.flows.CashIssueFlow import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow import net.corda.flows.CashPaymentFlow
import net.corda.flows.IssuerFlow.IssuanceRequester import net.corda.flows.IssuerFlow.IssuanceRequester
@ -37,10 +44,8 @@ import net.corda.nodeapi.User
import org.apache.commons.lang.SystemUtils import org.apache.commons.lang.SystemUtils
import org.controlsfx.dialog.ExceptionDialog import org.controlsfx.dialog.ExceptionDialog
import tornadofx.* import tornadofx.*
import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutionException
import kotlin.concurrent.thread
/** /**
* Main class for Explorer, you will need Tornado FX to run the explorer. * Main class for Explorer, you will need Tornado FX to run the explorer.
@ -140,6 +145,9 @@ class Main : App(MainView::class) {
/** /**
* This main method will starts 5 nodes (Notary, Alice, Bob, UK Bank and USA Bank) locally for UI testing, * This main method will starts 5 nodes (Notary, Alice, Bob, UK Bank and USA Bank) locally for UI testing,
* they will be on localhost ports 20003, 20006, 20009, 20012 and 20015 respectively. * they will be on localhost ports 20003, 20006, 20009, 20012 and 20015 respectively.
*
* The simulation start with pre-allocating chunks of cash to each of the party in 2 currencies (USD, GBP), then it enter a loop to generate random events.
* On each iteration, the issuers will execute a Cash Issue or Cash Exit flow (at a 9:1 ratio) and a random party will execute a move of cash to another random party.
*/ */
fun main(args: Array<String>) { fun main(args: Array<String>) {
val portAllocation = PortAllocation.Incremental(20000) val portAllocation = PortAllocation.Incremental(20000)
@ -201,107 +209,65 @@ fun main(args: Array<String>) {
issuerClientUSD.start(manager.username, manager.password) issuerClientUSD.start(manager.username, manager.password)
val issuerRPCUSD = issuerClientUSD.proxy() val issuerRPCUSD = issuerClientUSD.proxy()
val issuers = mapOf(USD to issuerRPCUSD, GBP to issuerRPCGBP)
val parties = listOf(aliceNode.nodeInfo.legalIdentity to aliceRPC,
bobNode.nodeInfo.legalIdentity to bobRPC,
issuerNodeGBP.nodeInfo.legalIdentity to issuerRPCGBP,
issuerNodeUSD.nodeInfo.legalIdentity to issuerRPCUSD)
val eventGenerator = EventGenerator( val eventGenerator = EventGenerator(
parties = listOf(aliceNode.nodeInfo.legalIdentity, bobNode.nodeInfo.legalIdentity), parties = parties.map { it.first },
notary = notaryNode.nodeInfo.notaryIdentity, notary = notaryNode.nodeInfo.notaryIdentity,
issuers = listOf(issuerNodeGBP.nodeInfo.legalIdentity, issuerNodeUSD.nodeInfo.legalIdentity) currencies = listOf(GBP, USD)
)
val issuerGBPEventGenerator = EventGenerator(
parties = listOf(issuerNodeGBP.nodeInfo.legalIdentity, aliceNode.nodeInfo.legalIdentity, bobNode.nodeInfo.legalIdentity),
notary = notaryNode.nodeInfo.notaryIdentity,
currencies = listOf(GBP)
)
val issuerUSDEventGenerator = EventGenerator(
parties = listOf(issuerNodeUSD.nodeInfo.legalIdentity, aliceNode.nodeInfo.legalIdentity, bobNode.nodeInfo.legalIdentity),
notary = notaryNode.nodeInfo.notaryIdentity,
currencies = listOf(USD)
) )
val maxIterations = 100000 val maxIterations = 100_000
val flowHandles = mapOf( // Log to logger when flow finish.
"GBPIssuer" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1), fun FlowHandle<SignedTransaction>.log(seq: Int, name: String) {
"USDIssuer" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1), val out = "[$seq] $name $id :"
"Alice" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1), progress.notUsed()
"Bob" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1), returnValue.success {
"GBPExit" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1), Main.log.info("$out ${it.id} ${(it.tx.outputs.first().data as Cash.State).amount}")
"USDExit" to ArrayBlockingQueue<FlowHandle<SignedTransaction>>(maxIterations + 1) }.failure {
) Main.log.info("$out ${it.message}")
}
}
flowHandles.forEach { // Pre allocate some money to each party.
thread { eventGenerator.parties.forEach {
for (i in 0..maxIterations) { for (ref in 0..1) {
val item = it.value.take() for ((currency, issuer) in issuers) {
val out = "[$i] ${it.key} ${item.id} :" CashFlowCommand.IssueCash(Amount(1_000_000, currency), OpaqueBytes(ByteArray(1, { ref.toByte() })), it, notaryNode.nodeInfo.notaryIdentity).startFlow(issuer)
try { .progress.notUsed()
val result = item.returnValue.get()
Main.log.info("$out ${result.id} ${(result.tx.outputs.first().data as Cash.State).amount}")
} catch(e: ExecutionException) {
Main.log.info("$out ${e.cause!!.message}")
}
} }
} }
} }
for (i in 0..maxIterations) { for (i in 0..maxIterations) {
Thread.sleep(500) Thread.sleep(300)
// Issuer requests.
// Issuer requests eventGenerator.issuerGenerator.map { command ->
if ((i % 5) == 0) { when (command) {
issuerGBPEventGenerator.bankOfCordaIssueGenerator.map { command -> is CashFlowCommand.IssueCash -> issuers[command.amount.token]?.let {
println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}") println("${Instant.now()} [$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}")
val cmd = command.startFlow(issuerRPCGBP) command.startFlow(it).log(i, "${command.amount.token}Issuer")
flowHandles["GBPIssuer"]?.add(cmd) }
cmd.progress.notUsed() is CashFlowCommand.ExitCash -> issuers[command.amount.token]?.let {
Unit println("${Instant.now()} [$i] EXITING ${command.amount} with ref ${command.issueRef}")
}.generate(SplittableRandom()) command.startFlow(it).log(i, "${command.amount.token}Exit")
issuerUSDEventGenerator.bankOfCordaIssueGenerator.map { command -> }
println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}") else -> throw IllegalArgumentException("Unsupported command: $command")
val cmd = command.startFlow(issuerRPCUSD) }
flowHandles["USDIssuer"]?.add(cmd)
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom())
}
// Exit requests
if ((i % 10) == 0) {
issuerGBPEventGenerator.bankOfCordaExitGenerator.map { command ->
println("[$i] EXITING ${command.amount} with ref ${command.issueRef}")
val cmd = command.startFlow(issuerRPCGBP)
flowHandles["GBPExit"]?.add(cmd)
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom())
issuerUSDEventGenerator.bankOfCordaExitGenerator.map { command ->
println("[$i] EXITING ${command.amount} with ref ${command.issueRef}")
val cmd = command.startFlow(issuerRPCUSD)
flowHandles["USDExit"]?.add(cmd)
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom())
}
// Party pay requests
// Alice
eventGenerator.clientCommandGenerator.map { command ->
println("[$i] SENDING ${command.amount} from ${aliceRPC.nodeIdentity().legalIdentity} to ${command.recipient}")
val cmd = command.startFlow(aliceRPC)
flowHandles["Alice"]?.add(cmd)
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom()) }.generate(SplittableRandom())
// Bob // Party pay requests.
eventGenerator.clientCommandGenerator.map { command -> eventGenerator.moveCashGenerator.combine(Generator.pickOne(parties)) { command, (party, rpc) ->
println("[$i] SENDING ${command.amount} from ${bobRPC.nodeIdentity().legalIdentity} to ${command.recipient}") println("${Instant.now()} [$i] SENDING ${command.amount} from $party to ${command.recipient}")
val cmd = command.startFlow(bobRPC) command.startFlow(rpc).log(i, party.name)
flowHandles["Bob"]?.add(cmd)
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom()) }.generate(SplittableRandom())
} }
println("Simulation completed") println("Simulation completed")
aliceClient.close() aliceClient.close()