CORDA-3356 Subflow ledger consistency tests + move statemachine tests to slow integration tests (#5722)

* CORDA-3356 Subflow ledger consistency tests + move statemachine tests to slow integration tests

Add tests for subflows that fail during transitions.

Split out `StatemachineErrorHandlingTest` into a series of smaller tests.

Move these tests into the `integration-test-slow` category so they are
not run against every PR.

* CORDA-3356 Fix detekt issue

* CORDA-3356 Tidy test names
This commit is contained in:
Dan Newton 2019-11-25 17:11:54 +00:00 committed by Matthew Nesbit
parent 10e9340871
commit 45d6d3ead4
6 changed files with 1441 additions and 865 deletions

View File

@ -246,9 +246,9 @@ dependencies {
// Byteman for runtime (termination) rules injection on the running node
// Submission tool allowing to install rules on running nodes
integrationTestCompile "org.jboss.byteman:byteman-submit:4.0.3"
slowIntegrationTestCompile "org.jboss.byteman:byteman-submit:4.0.3"
// The actual Byteman agent which should only be in the classpath of the out of process nodes
integrationTestCompile "org.jboss.byteman:byteman:4.0.3"
slowIntegrationTestCompile "org.jboss.byteman:byteman:4.0.3"
testCompile(project(':test-cli'))
testCompile(project(':test-utils'))

View File

@ -0,0 +1,151 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.list
import net.corda.core.internal.readAllLines
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
import net.corda.testing.node.internal.InternalDriverDSL
import org.jboss.byteman.agent.submit.ScriptText
import org.jboss.byteman.agent.submit.Submit
import org.junit.Before
abstract class StatemachineErrorHandlingTest {
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
var counter = 0
@Before
fun setup() {
counter = 0
}
internal fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) {
driver(
DriverParameters(
notarySpecs = listOf(notarySpec),
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
dsl()
}
}
internal fun DriverDSL.createBytemanNode(
providedName: CordaX500Name,
additionalCordapps: Collection<TestCordapp> = emptyList()
): NodeHandle {
return (this as InternalDriverDSL).startNode(
NodeParameters(
providedName = providedName,
rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps
),
bytemanPort = 12000
).getOrThrow()
}
internal fun DriverDSL.createNode(providedName: CordaX500Name, additionalCordapps: Collection<TestCordapp> = emptyList()): NodeHandle {
return startNode(
NodeParameters(
providedName = providedName,
rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps
)
).getOrThrow()
}
internal fun submitBytemanRules(rules: String) {
val submit = Submit("localhost", 12000)
submit.addScripts(listOf(ScriptText("Test script", rules)))
}
internal fun getBytemanOutput(nodeHandle: NodeHandle): List<String> {
return nodeHandle.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
}
@StartableByRPC
@InitiatingFlow
class SendAMessageFlow(private val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
val session = initiateFlow(party)
session.send("hello there")
return "Finished executing test flow - ${this.runId}"
}
}
@InitiatedBy(SendAMessageFlow::class)
class SendAMessageResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
}
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
}
}
@StartableByRPC
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
override fun call(): HospitalCounts =
HospitalCounts(
serviceHub.cordaService(HospitalCounter::class.java).dischargeCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationCounter
)
}
@CordaSerializable
data class HospitalCounts(val discharge: Int, val observation: Int)
@Suppress("UNUSED_PARAMETER")
@CordaService
class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() {
var observationCounter: Int = 0
var dischargeCounter: Int = 0
init {
StaffedFlowHospital.onFlowDischarged.add { _, _ ->
++dischargeCounter
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
}
}
}
}

View File

@ -0,0 +1,357 @@
package net.corda.node.services.statemachine
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.node.services.api.ServiceHubInternal
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import org.junit.Test
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
/**
* Throws an exception when recoding a transaction inside of [ReceiveFinalityFlow] on the responding
* flow's node.
*
* The flow is kept in for observation.
*
* Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its
* send to the responding node and the responding node successfully received it.
*/
@Test
fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work
val rules = """
RULE Set flag when entering receive finality flow
CLASS ${ReceiveFinalityFlow::class.java.name}
METHOD call
AT ENTRY
IF !flagged("finality_flag")
DO flag("finality_flag"); traceln("Setting finality flag")
ENDRULE
RULE Set flag when leaving resolve transactions flow
CLASS ${ResolveTransactionsFlow::class.java.name}
METHOD call
AT EXIT
IF !flagged("resolve_tx_flag")
DO flag("resolve_tx_flag"); traceln("Setting resolve tx flag")
ENDRULE
RULE Throw exception when recording transaction
INTERFACE ${ServiceHubInternal::class.java.name}
METHOD recordTransactions
AT ENTRY
IF flagged("finality_flag") && flagged("resolve_tx_flag")
DO traceln("Throwing exception");
throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
charlie.nodeInfo.singleIdentity(),
false,
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when resolving a transaction's dependencies inside of [ReceiveFinalityFlow] on the responding
* flow's node.
*
* The flow is kept in for observation.
*
* Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its
* send to the responding node and the responding node successfully received it.
*/
@Test
fun `error resolving a transaction's dependencies inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work
val rules = """
RULE Set flag when entering receive finality flow
CLASS ${ReceiveFinalityFlow::class.java.name}
METHOD call
AT ENTRY
IF !flagged("finality_flag")
DO flag("finality_flag"); traceln("Setting finality flag")
ENDRULE
RULE Set flag when entering resolve transactions flow
CLASS ${ResolveTransactionsFlow::class.java.name}
METHOD call
AT ENTRY
IF !flagged("resolve_tx_flag")
DO flag("resolve_tx_flag"); traceln("Setting resolve tx flag")
ENDRULE
RULE Throw exception when recording transaction
INTERFACE ${ServiceHubInternal::class.java.name}
METHOD recordTransactions
AT ENTRY
IF flagged("finality_flag") && flagged("resolve_tx_flag")
DO traceln("Throwing exception");
throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
charlie.nodeInfo.singleIdentity(),
false,
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding
* flow's node.
*
* The exception is thrown 5 times.
*
* The responding flow is retried 3 times and then completes successfully.
*
* The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the
* flow is retried instead of moving straight to observation.
*/
@Test
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and complete successfully`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when entering receive finality flow
CLASS ${ReceiveFinalityFlow::class.java.name}
METHOD call
AT ENTRY
IF !flagged("finality_flag")
DO flag("finality_flag"); traceln("Setting finality flag")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("finality_flag") && readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
charlie.nodeInfo.singleIdentity(),
false,
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
val output = getBytemanOutput(charlie)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding
* flow's node.
*
* The exception is thrown 7 times.
*
* The responding flow is retried 3 times and is then kept in for observation.
*
* Both the initiating node and the responding node keep checkpoints for their flows. The initiating node keeps a checkpoint for the original flow that is
* waiting for the responding flow's receive to complete. The responding flow's checkpoint is kept due to it failing the commit as part of receive.
*
* The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the
* flow is retried instead of moving straight to observation.
*/
@Test
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and be kept for observation is error persists`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when entering receive finality flow
CLASS ${ReceiveFinalityFlow::class.java.name}
METHOD call
AT ENTRY
IF !flagged("finality_flag")
DO flag("finality_flag"); traceln("Setting finality flag")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("finality_flag") && readCounter("counter") < 7
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
charlie.nodeInfo.singleIdentity(),
false,
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
}
val output = getBytemanOutput(charlie)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for CashIssueAndPaymentFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
}

View File

@ -0,0 +1,322 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
/**
* Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test
fun `error during transition due to an InterruptedException (killFlow) will terminate the flow`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(StatemachineKillFlowErrorHandlingTest::SleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == SleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
assertEquals(1, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` during user application code.
*
* The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable]
* call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly.
*
* Although the call to kill the flow is made during user application code. It will not be removed / stop processing
* until the next suspension point is reached within the flow.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test
fun `flow killed during user code execution stops and removes the flow correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(StatemachineKillFlowErrorHandlingTest::ThreadSleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == ThreadSleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(30.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
println(numberOfTerminalDiagnoses)
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and
* all that remains is its checkpoint in the database.
*
* The flow terminates and is not retried.
*
* Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are
* from the original flow that was put in for observation.
*/
@Test
fun `flow killed when it is in the flow hospital for observation is removed correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val charlie = createNode(CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendInitial action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
aliceClient.killFlow(flow.id)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
@StartableByRPC
class SleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
}
@StartableByRPC
class ThreadSleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep()
logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
// Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar
private fun sleep() {
Thread.sleep(20000)
}
}
}

View File

@ -0,0 +1,532 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.services.statemachine.transitions.TopLevelTransition
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import kotlin.test.assertEquals
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
/**
* This test checks that flow calling an initiating subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty.
* The exception is thrown 5 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test
fun `initiating subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when entering subflow
CLASS ${SendAMessageInAnInitiatingSubflowFlow::class.java.name}
METHOD flag
AT ENTRY
IF !flagged("subflow_flag")
DO flag("subflow_flag"); traceln("Setting subflow flag to true")
ENDRULE
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF flagged("subflow_flag") && !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Set flag when executing first commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* This test checks that flow calling an initiating subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty.
* The exception is thrown 5 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test
fun `initiating subflow - error during transition with CommitTransaction action that occurs after the first receive will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when entering subflow
CLASS ${SendAMessageInAnInitiatingSubflowFlow::class.java.name}
METHOD flag
AT ENTRY
IF !flagged("subflow_flag")
DO flag("subflow_flag"); traceln("Setting subflow flag to true")
ENDRULE
RULE Set flag when executing first suspend
CLASS ${FlowSessionImpl::class.java.name}
METHOD receive
AT ENTRY
IF flagged("subflow_flag") && !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("suspend_flag") && readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* This test checks that flow calling an inline subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty.
* The exception is thrown 5 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when entering subflow
CLASS ${SendAMessageInAnInlineSubflowFlow::class.java.name}
METHOD flag
AT ENTRY
IF !flagged("subflow_flag")
DO flag("subflow_flag"); traceln("Setting subflow flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* This test checks that flow calling an inline subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty.
* The exception is thrown 5 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first receive will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when entering subflow
CLASS ${SendAMessageInAnInlineSubflowFlow::class.java.name}
METHOD flag
AT ENTRY
IF !flagged("subflow_flag")
DO flag("subflow_flag"); traceln("Setting subflow flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("commit_flag") && readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Set flag when executing first commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
@StartableByRPC
@InitiatingFlow
class SendAMessageInAnInitiatingSubflowFlow(private val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
val session = initiateFlow(party)
session.send("hello there from top level flow")
session.receive<String>().unwrap { it }
logger.info("entering subflow")
flag()
val result = subFlow(InitiatingSendAMessageFlow(party))
logger.info("Finished sub flow and receive result - $result")
session.send("another hello there from top level flow")
session.receive<String>().unwrap { it }
logger.info("Finished top level flow")
return "Finished executing test flow - ${this.runId}"
}
private fun flag() {
logger.info("for byteman")
}
}
@InitiatedBy(SendAMessageInAnInitiatingSubflowFlow::class)
class SendAMessageInAnInitiatingSubflowResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
session.send("reply 1")
session.receive<String>().unwrap { it }
session.send("reply 2")
}
}
@StartableByRPC
@InitiatingFlow
class InitiatingSendAMessageFlow(private val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
val session = initiateFlow(party)
session.send("hello there")
session.receive<String>().unwrap { it }
return "Finished executing test flow - ${this.runId}"
}
}
@InitiatedBy(InitiatingSendAMessageFlow::class)
class InitiatingSendAMessageResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
session.send("reply 1")
}
}
@StartableByRPC
@InitiatingFlow
class SendAMessageInAnInlineSubflowFlow(private val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
val session = initiateFlow(party)
session.send("hello there from top level flow")
session.receive<String>().unwrap { it }
logger.info("entering subflow")
flag()
val result = subFlow(InlineSendAMessageSubflow(session))
logger.info("Finished sub flow and receive result - $result")
session.send("another hello there from top level flow")
session.receive<String>().unwrap { it }
logger.info("Finished top level flow")
return "Finished executing test flow - ${this.runId}"
}
private fun flag() {
logger.info("for byteman")
}
}
@InitiatedBy(SendAMessageInAnInlineSubflowFlow::class)
class SendAMessageInAnInlineSubflowResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
session.send("reply 1")
session.receive<String>().unwrap { it }
session.send("reply 2")
session.receive<String>().unwrap { it }
session.send("reply 3")
}
}
class InlineSendAMessageSubflow(private val session: FlowSession) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
session.send("hello there")
session.receive<String>().unwrap { it }
return "Finished executing the inline subflow - ${this.runId}"
}
}
}