Add simulation with broken flows. Refactor of simulation code.

This commit is contained in:
Katarzyna Streich 2017-05-11 19:00:23 +01:00
parent ab0bc8b8d0
commit e402f4d5af
7 changed files with 300 additions and 138 deletions

View File

@ -2,7 +2,7 @@
<configuration default="false" name="Explorer - demo nodes" type="JetRunConfigurationType" factoryName="Kotlin" singleton="true">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<option name="MAIN_CLASS_NAME" value="net.corda.explorer.MainKt" />
<option name="VM_PARAMETERS" value="" />
<option name="VM_PARAMETERS" value="-DAMQ_DELIVERY_DELAY_MS=15000" />
<option name="PROGRAM_PARAMETERS" value="" />
<option name="WORKING_DIRECTORY" value="" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />

View File

@ -0,0 +1,15 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Explorer - demo nodes (flow triage)" type="JetRunConfigurationType" factoryName="Kotlin" singleton="true">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<option name="MAIN_CLASS_NAME" value="net.corda.explorer.MainKt" />
<option name="VM_PARAMETERS" value="-DAMQ_DELIVERY_DELAY_MS=15000" />
<option name="PROGRAM_PARAMETERS" value="-F" />
<option name="WORKING_DIRECTORY" value="" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<option name="ALTERNATIVE_JRE_PATH" value="1.8" />
<option name="PASS_PARENT_ENVS" value="true" />
<module name="explorer_main" />
<envs />
<method />
</configuration>
</component>

View File

@ -2,7 +2,7 @@
<configuration default="false" name="Explorer - demo nodes (simulation)" type="JetRunConfigurationType" factoryName="Kotlin" singleton="true">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<option name="MAIN_CLASS_NAME" value="net.corda.explorer.MainKt" />
<option name="VM_PARAMETERS" value="-DAMQ_DELIVERY_DELAY_MS=5000" />
<option name="VM_PARAMETERS" value="-DAMQ_DELIVERY_DELAY_MS=15000" />
<option name="PROGRAM_PARAMETERS" value="-S" />
<option name="WORKING_DIRECTORY" value="" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />

View File

@ -1,36 +1,92 @@
package net.corda.client.mock
import net.corda.core.contracts.Amount
import net.corda.core.contracts.GBP
import net.corda.core.contracts.USD
import net.corda.core.identity.Party
import net.corda.core.serialization.OpaqueBytes
import net.corda.flows.CashFlowCommand
import java.util.*
/**
* [Generator]s for incoming/outgoing events to/from the [WalletMonitorService]. Internally it keeps track of owned
* state/ref pairs, but it doesn't necessarily generate "correct" events!
* [Generator]s for incoming/outgoing cash flow events between parties. It doesn't necessarily generate correct events!
* Especially at the beginning of simulation there might be few insufficient spend errors.
*/
class EventGenerator(val parties: List<Party>, val currencies: List<Currency>, val notary: Party) {
private val partyGenerator = Generator.pickOne(parties)
private val issueRefGenerator = Generator.intRange(0, 1).map { number -> OpaqueBytes(ByteArray(1, { number.toByte() })) }
private val amountGenerator = Generator.longRange(10000, 1000000)
private val currencyGenerator = Generator.pickOne(currencies)
open class EventGenerator(val parties: List<Party>, val currencies: List<Currency>, val notary: Party) {
protected val partyGenerator = Generator.pickOne(parties)
protected val issueRefGenerator = Generator.intRange(0, 1).map { number -> OpaqueBytes(ByteArray(1, { number.toByte() })) }
protected val amountGenerator = Generator.longRange(10000, 1000000)
protected val currencyGenerator = Generator.pickOne(currencies)
protected val currencyMap: MutableMap<Currency, Long> = mutableMapOf(USD to 0L, GBP to 0L) // Used for rough estimation of how much money we have in general.
private val issueCashGenerator = amountGenerator.combine(partyGenerator, issueRefGenerator, currencyGenerator) { amount, to, issueRef, ccy ->
protected fun addToMap(ccy: Currency, amount: Long) {
val value = currencyMap[ccy]
if (value != null)
currencyMap[ccy] = Math.max(0L, value + amount)
}
protected val issueCashGenerator = amountGenerator.combine(partyGenerator, issueRefGenerator, currencyGenerator) { amount, to, issueRef, ccy ->
addToMap(ccy, amount)
CashFlowCommand.IssueCash(Amount(amount, ccy), issueRef, to, notary)
}
private val exitCashGenerator = amountGenerator.combine(issueRefGenerator, currencyGenerator) { amount, issueRef, ccy ->
protected val exitCashGenerator = amountGenerator.combine(issueRefGenerator, currencyGenerator) { amount, issueRef, ccy ->
addToMap(ccy, -amount)
CashFlowCommand.ExitCash(Amount(amount, ccy), issueRef)
}
val moveCashGenerator = amountGenerator.combine(partyGenerator, currencyGenerator) { amountIssued, recipient, currency ->
open val moveCashGenerator = amountGenerator.combine(partyGenerator, currencyGenerator) { amountIssued, recipient, currency ->
CashFlowCommand.PayCash(Amount(amountIssued, currency), recipient)
}
val issuerGenerator = Generator.frequency(listOf(
open val issuerGenerator = Generator.frequency(listOf(
0.1 to exitCashGenerator,
0.9 to issueCashGenerator
))
}
/**
* [Generator]s for incoming/outgoing events of starting different [Cash] flows. It invokes flows that throw exceptions
* for use in explorer flow triage. Exceptions are of kind spending/exiting too much cash.
*/
class ErrorFlowsEventGenerator(parties: List<Party>, currencies: List<Currency>, notary: Party): EventGenerator(parties, currencies, notary) {
enum class IssuerEvents {
NORMAL_EXIT,
EXIT_ERROR
}
val errorGenerator = Generator.pickOne(IssuerEvents.values().toList())
val errorExitCashGenerator = amountGenerator.combine(issueRefGenerator, currencyGenerator, errorGenerator) { amount, issueRef, ccy, errorType ->
when (errorType) {
IssuerEvents.NORMAL_EXIT -> {
println("Normal exit")
if (currencyMap[ccy]!! <= amount) addToMap(ccy, -amount)
CashFlowCommand.ExitCash(Amount(amount, ccy), issueRef) // It may fail at the beginning, but we don't care.
}
IssuerEvents.EXIT_ERROR -> {
println("Exit error")
CashFlowCommand.ExitCash(Amount(currencyMap[ccy]!! * 2, ccy), issueRef)
}
}
}
val normalMoveGenerator = amountGenerator.combine(partyGenerator, currencyGenerator) { amountIssued, recipient, currency ->
CashFlowCommand.PayCash(Amount(amountIssued, currency), recipient)
}
val errorMoveGenerator = partyGenerator.combine(currencyGenerator) { recipient, currency ->
CashFlowCommand.PayCash(Amount(currencyMap[currency]!! * 2, currency), recipient)
}
override val moveCashGenerator = Generator.frequency(listOf(
0.2 to errorMoveGenerator,
0.8 to normalMoveGenerator
))
override val issuerGenerator = Generator.frequency(listOf(
0.3 to errorExitCashGenerator,
0.7 to issueCashGenerator
))
}

View File

@ -68,3 +68,9 @@ task(runSimulationNodes, dependsOn: 'classes', type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
args '-S'
}
task(runFlowTriageNodes, dependsOn: 'classes', type: JavaExec) {
main = 'net.corda.explorer.MainKt'
classpath = sourceSets.main.runtimeClasspath
args '-F'
}

View File

@ -0,0 +1,207 @@
package net.corda.explorer
import joptsimple.OptionSet
import net.corda.client.mock.ErrorFlowsEventGenerator
import net.corda.client.mock.EventGenerator
import net.corda.client.mock.Generator
import net.corda.client.mock.pickOne
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount
import net.corda.core.contracts.GBP
import net.corda.core.contracts.USD
import net.corda.core.crypto.Party
import net.corda.core.failure
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.FlowHandle
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.success
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ALICE
import net.corda.core.utilities.BOB
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.flows.CashExitFlow
import net.corda.flows.CashFlowCommand
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.flows.IssuerFlow
import net.corda.node.driver.NodeHandle
import net.corda.node.driver.PortAllocation
import net.corda.node.driver.driver
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.User
import org.bouncycastle.asn1.x500.X500Name
import java.time.Instant
import java.util.*
class ExplorerSimulation(val options: OptionSet) {
val user = User("user1", "test", permissions = setOf(
startFlowPermission<CashPaymentFlow>()
))
val manager = User("manager", "test", permissions = setOf(
startFlowPermission<CashIssueFlow>(),
startFlowPermission<CashPaymentFlow>(),
startFlowPermission<CashExitFlow>(),
startFlowPermission<IssuerFlow.IssuanceRequester>())
)
lateinit var notaryNode: NodeHandle
lateinit var aliceNode: NodeHandle
lateinit var bobNode: NodeHandle
lateinit var issuerNodeGBP: NodeHandle
lateinit var issuerNodeUSD: NodeHandle
val RPCConnections = ArrayList<CordaRPCConnection>()
val issuers = HashMap<Currency, CordaRPCOps>()
val parties = ArrayList<Pair<Party, CordaRPCOps>>()
init {
startDemoNodes()
}
private fun onEnd() {
println("Closing RPC connections")
RPCConnections.forEach { it.close() }
}
private fun startDemoNodes() {
val portAllocation = PortAllocation.Incremental(20000)
driver(portAllocation = portAllocation) {
// TODO : Supported flow should be exposed somehow from the node instead of set of ServiceInfo.
val notary = startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)),
customOverrides = mapOf("nearestCity" to "Zurich"))
val alice = startNode(ALICE.name, rpcUsers = arrayListOf(user),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("cash"))),
customOverrides = mapOf("nearestCity" to "Milan"))
val bob = startNode(BOB.name, rpcUsers = arrayListOf(user),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("cash"))),
customOverrides = mapOf("nearestCity" to "Madrid"))
val ukBankName = X500Name("CN=UK Bank Plc,O=UK Bank Plc,L=London,C=UK")
val usaBankName = X500Name("CN=USA Bank Corp,O=USA Bank Corp,L=New York,C=USA")
val issuerGBP = startNode(ukBankName, rpcUsers = arrayListOf(manager),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.GBP"))),
customOverrides = mapOf("nearestCity" to "London"))
val issuerUSD = startNode(usaBankName, rpcUsers = arrayListOf(manager),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.USD"))),
customOverrides = mapOf("nearestCity" to "New York"))
notaryNode = notary.get()
aliceNode = alice.get()
bobNode = bob.get()
issuerNodeGBP = issuerGBP.get()
issuerNodeUSD = issuerUSD.get()
arrayOf(notaryNode, aliceNode, bobNode, issuerNodeGBP, issuerNodeUSD).forEach {
println("${it.nodeInfo.legalIdentity} started on ${it.configuration.rpcAddress}")
}
when {
options.has("S") -> startNormalSimulation()
options.has("F") -> starErrorFlowsSimulation()
}
waitForAllNodesToFinish()
}
}
private fun setUpRPC() {
// Register with alice to use alice's RPC proxy to create random events.
val aliceClient = aliceNode.rpcClientToNode()
val aliceConnection = aliceClient.start(user.username, user.password)
val aliceRPC = aliceConnection.proxy
val bobClient = bobNode.rpcClientToNode()
val bobConnection = bobClient.start(user.username, user.password)
val bobRPC = bobConnection.proxy
val issuerClientGBP = issuerNodeGBP.rpcClientToNode()
val issuerGBPConnection = issuerClientGBP.start(manager.username, manager.password)
val issuerRPCGBP = issuerGBPConnection.proxy
val issuerClientUSD = issuerNodeUSD.rpcClientToNode()
val issuerUSDConnection =issuerClientUSD.start(manager.username, manager.password)
val issuerRPCUSD = issuerUSDConnection.proxy
RPCConnections.addAll(listOf(aliceConnection, bobConnection, issuerGBPConnection, issuerUSDConnection))
issuers.putAll(mapOf(USD to issuerRPCUSD, GBP to issuerRPCGBP))
parties.addAll(listOf(aliceNode.nodeInfo.legalIdentity to aliceRPC,
bobNode.nodeInfo.legalIdentity to bobRPC,
issuerNodeGBP.nodeInfo.legalIdentity to issuerRPCGBP,
issuerNodeUSD.nodeInfo.legalIdentity to issuerRPCUSD))
}
private fun startSimulation(eventGenerator: EventGenerator, maxIterations: Int) {
// Log to logger when flow finish.
fun FlowHandle<SignedTransaction>.log(seq: Int, name: String) {
val out = "[$seq] $name $id :"
returnValue.success {
Main.log.info("$out ${it.id} ${(it.tx.outputs.first().data as Cash.State).amount}")
}.failure {
Main.log.info("$out ${it.message}")
}
}
for (i in 0..maxIterations) {
Thread.sleep(300)
// Issuer requests.
eventGenerator.issuerGenerator.map { command ->
when (command) {
is CashFlowCommand.IssueCash -> issuers[command.amount.token]?.let {
println("${Instant.now()} [$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}")
command.startFlow(it).log(i, "${command.amount.token}Issuer")
} ?: command.startFlow(issuers[USD]!!).log(i, "${command.amount.token}Issuer") // TODO workaround
is CashFlowCommand.ExitCash -> issuers[command.amount.token]?.let {
println("${Instant.now()} [$i] EXITING ${command.amount} with ref ${command.issueRef}")
command.startFlow(it).log(i, "${command.amount.token}Exit")
}
else -> throw IllegalArgumentException("Unsupported command: $command")
}
}.generate(SplittableRandom())
// Party pay requests.
eventGenerator.moveCashGenerator.combine(Generator.pickOne(parties)) { command, (party, rpc) ->
println("${Instant.now()} [$i] SENDING ${command.amount} from $party to ${command.recipient}")
command.startFlow(rpc).log(i, party.name.toString())
}.generate(SplittableRandom())
}
println("Simulation completed")
}
private fun startNormalSimulation() {
println("Running simulation mode ...")
setUpRPC()
val eventGenerator = EventGenerator(
parties = parties.map { it.first },
notary = notaryNode.nodeInfo.notaryIdentity,
currencies = listOf(GBP, USD)
)
val maxIterations = 100_000
// Pre allocate some money to each party.
eventGenerator.parties.forEach {
for (ref in 0..1) {
for ((currency, issuer) in issuers) {
CashFlowCommand.IssueCash(Amount(1_000_000, currency), OpaqueBytes(ByteArray(1, { ref.toByte() })), it, notaryNode.nodeInfo.notaryIdentity).startFlow(issuer)
}
}
}
startSimulation(eventGenerator, maxIterations)
onEnd()
}
private fun starErrorFlowsSimulation() {
println("Running flows with errors simulation mode ...")
setUpRPC()
val eventGenerator = ErrorFlowsEventGenerator(
parties = parties.map { it.first },
notary = notaryNode.nodeInfo.notaryIdentity,
currencies = listOf(GBP, USD)
)
val maxIterations = 10_000
startSimulation(eventGenerator, maxIterations)
onEnd()
}
}

View File

@ -156,129 +156,7 @@ class Main : App(MainView::class) {
* On each iteration, the issuers will execute a Cash Issue or Cash Exit flow (at a 9:1 ratio) and a random party will execute a move of cash to another random party.
*/
fun main(args: Array<String>) {
val portAllocation = PortAllocation.Incremental(20000)
driver(portAllocation = portAllocation) {
val user = User("user1", "test", permissions = setOf(
startFlowPermission<CashPaymentFlow>()
))
val manager = User("manager", "test", permissions = setOf(
startFlowPermission<CashIssueFlow>(),
startFlowPermission<CashPaymentFlow>(),
startFlowPermission<CashExitFlow>(),
startFlowPermission<IssuanceRequester>())
)
// TODO : Supported flow should be exposed somehow from the node instead of set of ServiceInfo.
val notary = startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)),
customOverrides = mapOf("nearestCity" to "Zurich"))
val alice = startNode(ALICE.name, rpcUsers = arrayListOf(user),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("cash"))),
customOverrides = mapOf("nearestCity" to "Milan"))
val bob = startNode(BOB.name, rpcUsers = arrayListOf(user),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("cash"))),
customOverrides = mapOf("nearestCity" to "Madrid"))
val issuerGBP = startNode(X500Name("CN=UK Bank Plc,O=UK Bank Plc,L=London,C=UK"), rpcUsers = arrayListOf(manager),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.GBP"))),
customOverrides = mapOf("nearestCity" to "London"))
val issuerUSD = startNode(X500Name("CN=USA Bank Corp,O=USA Bank Corp,L=New York,C=US"), rpcUsers = arrayListOf(manager),
advertisedServices = setOf(ServiceInfo(ServiceType.corda.getSubType("issuer.USD"))),
customOverrides = mapOf("nearestCity" to "New York"))
val notaryNode = notary.get()
val aliceNode = alice.get()
val bobNode = bob.get()
val issuerNodeGBP = issuerGBP.get()
val issuerNodeUSD = issuerUSD.get()
arrayOf(notaryNode, aliceNode, bobNode, issuerNodeGBP, issuerNodeUSD).forEach {
println("${it.nodeInfo.legalIdentity} started on ${it.configuration.rpcAddress}")
}
val parser = OptionParser("S")
val options = parser.parse(*args)
if (options.has("S")) {
println("Running simulation mode ...")
// Register with alice to use alice's RPC proxy to create random events.
val aliceClient = aliceNode.rpcClientToNode()
val aliceConnection = aliceClient.start(user.username, user.password)
val aliceRPC = aliceConnection.proxy
val bobClient = bobNode.rpcClientToNode()
val bobConnection = bobClient.start(user.username, user.password)
val bobRPC = bobConnection.proxy
val issuerClientGBP = issuerNodeGBP.rpcClientToNode()
val issuerGBPConnection = issuerClientGBP.start(manager.username, manager.password)
val issuerRPCGBP = issuerGBPConnection.proxy
val issuerClientUSD = issuerNodeUSD.rpcClientToNode()
val issuerUSDConnection = issuerClientUSD.start(manager.username, manager.password)
val issuerRPCUSD = issuerUSDConnection.proxy
val issuers = mapOf(USD to issuerRPCUSD, GBP to issuerRPCGBP)
val parties = listOf(aliceNode.nodeInfo.legalIdentity to aliceRPC,
bobNode.nodeInfo.legalIdentity to bobRPC,
issuerNodeGBP.nodeInfo.legalIdentity to issuerRPCGBP,
issuerNodeUSD.nodeInfo.legalIdentity to issuerRPCUSD)
val eventGenerator = EventGenerator(
parties = parties.map { it.first },
notary = notaryNode.nodeInfo.notaryIdentity,
currencies = listOf(GBP, USD)
)
val maxIterations = 100_000
// Log to logger when flow finish.
fun FlowHandle<SignedTransaction>.log(seq: Int, name: String) {
val out = "[$seq] $name $id :"
returnValue.success {
Main.log.info("$out ${it.id} ${(it.tx.outputs.first().data as Cash.State).amount}")
}.failure {
Main.log.info("$out ${it.message}")
}
}
// Pre allocate some money to each party.
eventGenerator.parties.forEach {
for (ref in 0..1) {
for ((currency, issuer) in issuers) {
CashFlowCommand.IssueCash(Amount(1_000_000, currency), OpaqueBytes(ByteArray(1, { ref.toByte() })), it, notaryNode.nodeInfo.notaryIdentity).startFlow(issuer)
}
}
}
for (i in 0..maxIterations) {
Thread.sleep(300) //Thread.sleep(5000) -> todo rebase
// Issuer requests.
eventGenerator.issuerGenerator.map { command ->
when (command) {
is CashFlowCommand.IssueCash -> issuers[command.amount.token]?.let {
println("${Instant.now()} [$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}")
command.startFlow(it).log(i, "${command.amount.token}Issuer")
}
is CashFlowCommand.ExitCash -> issuers[command.amount.token]?.let {
println("${Instant.now()} [$i] EXITING ${command.amount} with ref ${command.issueRef}")
command.startFlow(it).log(i, "${command.amount.token}Exit")
}
else -> throw IllegalArgumentException("Unsupported command: $command")
}
}.generate(SplittableRandom())
// Party pay requests.
eventGenerator.moveCashGenerator.combine(Generator.pickOne(parties)) { command, (party, rpc) ->
println("${Instant.now()} [$i] SENDING ${command.amount} from $party to ${command.recipient}")
command.startFlow(rpc).log(i, party.name.toString())
}.generate(SplittableRandom())
}
println("Simulation completed")
aliceConnection.close()
bobConnection.close()
issuerGBPConnection.close()
issuerUSDConnection.close()
}
waitForAllNodesToFinish()
}
val parser = OptionParser("SF")
val options = parser.parse(*args)
ExplorerSimulation(options)
}