ENT-5196 handle errors during flow initialisation (#6378)

## Summary

This change deals with multiple issues:

* Errors that occur during flow initialisation.

* Errors that occur when handling the outcome of an existing flow error.

* Failures to rollback and close a database transaction when an error
occurs in `TransitionExecutorImpl`.

* Removal of create and commit transaction actions around retrying a flow.

## Errors that occur during flow initialisation

Flow initialisation has been moved into the try/catch that exists inside
`FlowStateMachineImpl.run`. This means if an error is thrown all the way
out of `initialiseFlow` (which should rarely happen) it will be caught 
and move into a flow's standard error handling path. The flow should 
then properly terminate.

`Event.Error` was changed to make the choice to rollback be optional. 
Errors during flow initialisation cause the flow to not have a open 
database transaction. Therefore there is no need to rollback.

## Errors that occur when handling the outcome of an existing flow error

When an error occurs a flow goes to the flow hospital and is given an 
outcome event to address the original error. If the transition that was 
processing the error outcome event (`StartErrorPropagation` and
`RetryFlowFromSafePoint`) has an error then the flow aborts and
nothing happens. This means that the flow is left in a runnable state.

To resolve this, we now retry the original error outcome event whenever 
another error occurs doing so.

This is done by adding a new staff member that looks for 
`ErrorStateTransitionException` thrown in the error code path of 
`TransitionExecutorImpl`. It then takes the last outcome for that flow 
and schedules it to run again. This scheduling runs with a backoff.

This means that a flow will continually retry the original error outcome
event until it completes it successfully.

## Failures to rollback and close a database transaction when an error occurs in `TransitionExecutorImpl`
   
Rolling back and closing the database transaction inside of 
`TransitionExecutorImpl` is now done inside individual try/catch blocks
as this should not prevent the flow from continuing.

## Removal of create and commit transaction actions around retrying a flow

The database commit that occurs after retrying a flow can fail which 
required some custom code just for that event to prevent inconsistent 
behaviour. The transaction was only needed for reading checkpoints from 
the database, therefore the transaction was moved into 
`retryFlowFromSafePoint` instead and the commit removed.

If we need to commit data inside of `retryFlowFromSafePoint` in the 
future, a commit should be added directly to `retryFlowFromSafePoint`. 
The commit should occur before the flow is started on a new fiber.
This commit is contained in:
Dan Newton 2020-06-30 11:50:42 +01:00 committed by GitHub
parent 3f03de6fbd
commit f6b5737277
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1636 additions and 2090 deletions

View File

@ -6,6 +6,7 @@ import org.hibernate.Session
import org.hibernate.Transaction import org.hibernate.Transaction
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.sql.Connection import java.sql.Connection
import java.sql.SQLException
import java.util.UUID import java.util.UUID
import javax.persistence.EntityManager import javax.persistence.EntityManager
@ -79,6 +80,7 @@ class DatabaseTransaction(
committed = true committed = true
} }
@Throws(SQLException::class)
fun rollback() { fun rollback() {
if (sessionDelegate.isInitialized() && session.isOpen) { if (sessionDelegate.isInitialized() && session.isOpen) {
session.clear() session.clear()
@ -89,16 +91,20 @@ class DatabaseTransaction(
clearException() clearException()
} }
@Throws(SQLException::class)
fun close() { fun close() {
if (sessionDelegate.isInitialized() && session.isOpen) { try {
session.close() if (sessionDelegate.isInitialized() && session.isOpen) {
session.close()
}
if (database.closeConnection) {
connection.close()
}
} finally {
clearException()
contextTransactionOrNull = outerTransaction
} }
if (database.closeConnection) {
connection.close()
}
clearException()
contextTransactionOrNull = outerTransaction
if (outerTransaction == null) { if (outerTransaction == null) {
synchronized(this) { synchronized(this) {
closed = true closed = true

View File

@ -3,6 +3,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession import net.corda.core.flows.FlowSession
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
@ -10,11 +11,14 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.list import net.corda.core.internal.list
import net.corda.core.internal.readAllLines import net.corda.core.internal.readAllLines
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions import net.corda.node.services.Permissions
import net.corda.testing.core.DUMMY_NOTARY_NAME import net.corda.testing.core.DUMMY_NOTARY_NAME
@ -23,6 +27,7 @@ import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.OutOfProcessImpl
import net.corda.testing.node.NotarySpec import net.corda.testing.node.NotarySpec
import net.corda.testing.node.TestCordapp import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User import net.corda.testing.node.User
@ -30,8 +35,11 @@ import net.corda.testing.node.internal.InternalDriverDSL
import org.jboss.byteman.agent.submit.ScriptText import org.jboss.byteman.agent.submit.ScriptText
import org.jboss.byteman.agent.submit.Submit import org.jboss.byteman.agent.submit.Submit
import org.junit.Before import org.junit.Before
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
abstract class StatemachineErrorHandlingTest { abstract class StateMachineErrorHandlingTest {
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all())) val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
var counter = 0 var counter = 0
@ -57,15 +65,17 @@ abstract class StatemachineErrorHandlingTest {
internal fun DriverDSL.createBytemanNode( internal fun DriverDSL.createBytemanNode(
providedName: CordaX500Name, providedName: CordaX500Name,
additionalCordapps: Collection<TestCordapp> = emptyList() additionalCordapps: Collection<TestCordapp> = emptyList()
): NodeHandle { ): Pair<NodeHandle, Int> {
return (this as InternalDriverDSL).startNode( val port = nextPort()
val nodeHandle = (this as InternalDriverDSL).startNode(
NodeParameters( NodeParameters(
providedName = providedName, providedName = providedName,
rpcUsers = listOf(rpcUser), rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps additionalCordapps = additionalCordapps
), ),
bytemanPort = 12000 bytemanPort = port
).getOrThrow() ).getOrThrow()
return nodeHandle to port
} }
internal fun DriverDSL.createNode(providedName: CordaX500Name, additionalCordapps: Collection<TestCordapp> = emptyList()): NodeHandle { internal fun DriverDSL.createNode(providedName: CordaX500Name, additionalCordapps: Collection<TestCordapp> = emptyList()): NodeHandle {
@ -78,8 +88,8 @@ abstract class StatemachineErrorHandlingTest {
).getOrThrow() ).getOrThrow()
} }
internal fun submitBytemanRules(rules: String) { internal fun submitBytemanRules(rules: String, port: Int) {
val submit = Submit("localhost", 12000) val submit = Submit("localhost", port)
submit.addScripts(listOf(ScriptText("Test script", rules))) submit.addScripts(listOf(ScriptText("Test script", rules)))
} }
@ -90,6 +100,37 @@ abstract class StatemachineErrorHandlingTest {
.readAllLines() .readAllLines()
} }
internal fun OutOfProcessImpl.stop(timeout: Duration): Boolean {
return process.run {
destroy()
waitFor(timeout.seconds, TimeUnit.SECONDS)
}.also { onStopCallback() }
}
@Suppress("LongParameterList")
internal fun CordaRPCOps.assertHospitalCounts(
discharged: Int = 0,
observation: Int = 0,
propagated: Int = 0,
dischargedRetry: Int = 0,
observationRetry: Int = 0,
propagatedRetry: Int = 0
) {
val counts = startFlow(StateMachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.getOrThrow(20.seconds)
assertEquals(discharged, counts.discharged)
assertEquals(observation, counts.observation)
assertEquals(propagated, counts.propagated)
assertEquals(dischargedRetry, counts.dischargeRetry)
assertEquals(observationRetry, counts.observationRetry)
assertEquals(propagatedRetry, counts.propagatedRetry)
}
internal fun CordaRPCOps.assertHospitalCountsAllZero() = assertHospitalCounts()
internal fun CordaRPCOps.assertNumberOfCheckpoints(number: Long) {
assertEquals(number, startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
@StartableByRPC @StartableByRPC
@InitiatingFlow @InitiatingFlow
class SendAMessageFlow(private val party: Party) : FlowLogic<String>() { class SendAMessageFlow(private val party: Party) : FlowLogic<String>() {
@ -97,6 +138,7 @@ abstract class StatemachineErrorHandlingTest {
override fun call(): String { override fun call(): String {
val session = initiateFlow(party) val session = initiateFlow(party)
session.send("hello there") session.send("hello there")
logger.info("Finished my flow")
return "Finished executing test flow - ${this.runId}" return "Finished executing test flow - ${this.runId}"
} }
} }
@ -106,18 +148,49 @@ abstract class StatemachineErrorHandlingTest {
@Suspendable @Suspendable
override fun call() { override fun call() {
session.receive<String>().unwrap { it } session.receive<String>().unwrap { it }
logger.info("Finished my flow")
}
}
@StartableByRPC
class ThrowAnErrorFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
throwException()
return "cant get here"
}
private fun throwException() {
logger.info("Throwing exception in flow")
throw IllegalStateException("throwing exception in flow")
}
}
@StartableByRPC
class ThrowAHospitalizeErrorFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
throwException()
return "cant get here"
}
private fun throwException() {
logger.info("Throwing exception in flow")
throw HospitalizeFlowException("throwing exception in flow")
} }
} }
@StartableByRPC @StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() { class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long { override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps -> return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints where checkpoint_id != ?")
ps.executeQuery().use { rs -> .apply { setString(1, runId.uuid.toString()) }
rs.next() .use { ps ->
rs.getLong(1) ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
} }
}
} }
} }
@ -126,26 +199,51 @@ abstract class StatemachineErrorHandlingTest {
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() { class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
override fun call(): HospitalCounts = override fun call(): HospitalCounts =
HospitalCounts( HospitalCounts(
serviceHub.cordaService(HospitalCounter::class.java).dischargeCounter, serviceHub.cordaService(HospitalCounter::class.java).dischargedCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationCounter serviceHub.cordaService(HospitalCounter::class.java).observationCounter,
serviceHub.cordaService(HospitalCounter::class.java).propagatedCounter,
serviceHub.cordaService(HospitalCounter::class.java).dischargeRetryCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationRetryCounter,
serviceHub.cordaService(HospitalCounter::class.java).propagatedRetryCounter
) )
} }
@CordaSerializable @CordaSerializable
data class HospitalCounts(val discharge: Int, val observation: Int) data class HospitalCounts(
val discharged: Int,
val observation: Int,
val propagated: Int,
val dischargeRetry: Int,
val observationRetry: Int,
val propagatedRetry: Int
)
@Suppress("UNUSED_PARAMETER") @Suppress("UNUSED_PARAMETER")
@CordaService @CordaService
class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() { class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() {
var dischargedCounter: Int = 0
var observationCounter: Int = 0 var observationCounter: Int = 0
var dischargeCounter: Int = 0 var propagatedCounter: Int = 0
var dischargeRetryCounter: Int = 0
var observationRetryCounter: Int = 0
var propagatedRetryCounter: Int = 0
init { init {
StaffedFlowHospital.onFlowDischarged.add { _, _ -> StaffedFlowHospital.onFlowDischarged.add { _, _ ->
++dischargeCounter dischargedCounter++
} }
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter observationCounter++
}
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
propagatedCounter++
}
StaffedFlowHospital.onFlowResuscitated.add { _, _, outcome ->
when (outcome) {
StaffedFlowHospital.Outcome.DISCHARGE -> dischargeRetryCounter++
StaffedFlowHospital.Outcome.OVERNIGHT_OBSERVATION -> observationRetryCounter++
StaffedFlowHospital.Outcome.UNTREATABLE -> propagatedRetryCounter++
}
} }
} }
} }

View File

@ -1,6 +1,5 @@
package net.corda.node.services.statemachine package net.corda.node.services.statemachine
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.ReceiveFinalityFlow import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
@ -22,7 +21,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped @Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() { class StateMachineFinalityErrorHandlingTest : StateMachineErrorHandlingTest() {
/** /**
* Throws an exception when recoding a transaction inside of [ReceiveFinalityFlow] on the responding * Throws an exception when recoding a transaction inside of [ReceiveFinalityFlow] on the responding
@ -33,10 +32,10 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its * Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its
* send to the responding node and the responding node successfully received it. * send to the responding node and the responding node successfully received it.
*/ */
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() { fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) { startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS) val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS) val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work // could not get rule for FinalityDoctor + observation counter to work
@ -67,14 +66,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
ENDRULE ENDRULE
""".trimIndent() """.trimIndent()
submitBytemanRules(rules) submitBytemanRules(rules, port)
val aliceClient = alice.rpc.startFlow(
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
::CashIssueAndPaymentFlow, ::CashIssueAndPaymentFlow,
500.DOLLARS, 500.DOLLARS,
OpaqueBytes.of(0x01), OpaqueBytes.of(0x01),
@ -83,15 +77,11 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
defaultNotaryIdentity defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds) ).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() charlie.rpc.assertHospitalCounts(observation = 1)
assertEquals(0, discharge) assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, observation) assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size) alice.rpc.assertNumberOfCheckpoints(0)
assertEquals(1, charlieClient.stateMachinesSnapshot().size) charlie.rpc.assertNumberOfCheckpoints(1)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
} }
} }
@ -104,10 +94,10 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its * Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its
* send to the responding node and the responding node successfully received it. * send to the responding node and the responding node successfully received it.
*/ */
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `error resolving a transaction's dependencies inside of ReceiveFinalityFlow will keep the flow in for observation`() { fun `error resolving a transaction's dependencies inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) { startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS) val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS) val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work // could not get rule for FinalityDoctor + observation counter to work
@ -138,14 +128,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
ENDRULE ENDRULE
""".trimIndent() """.trimIndent()
submitBytemanRules(rules) submitBytemanRules(rules, port)
val aliceClient = alice.rpc.startFlow(
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
::CashIssueAndPaymentFlow, ::CashIssueAndPaymentFlow,
500.DOLLARS, 500.DOLLARS,
OpaqueBytes.of(0x01), OpaqueBytes.of(0x01),
@ -154,15 +139,11 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
defaultNotaryIdentity defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds) ).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() charlie.rpc.assertHospitalCounts(observation = 1)
assertEquals(0, discharge) assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, observation) assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
assertEquals(0, aliceClient.stateMachinesSnapshot().size) alice.rpc.assertNumberOfCheckpoints(0)
assertEquals(1, charlieClient.stateMachinesSnapshot().size) charlie.rpc.assertNumberOfCheckpoints(1)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
} }
} }
@ -170,17 +151,17 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding * Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding
* flow's node. * flow's node.
* *
* The exception is thrown 5 times. * The exception is thrown 3 times.
* *
* The responding flow is retried 3 times and then completes successfully. * The responding flow is retried 3 times and then completes successfully.
* *
* The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the * The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the
* flow is retried instead of moving straight to observation. * flow is retried instead of moving straight to observation.
*/ */
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and complete successfully`() { fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and complete successfully`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) { startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS) val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS) val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """ val rules = """
@ -204,35 +185,14 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS ${ActionExecutorImpl::class.java.name} CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction METHOD executeCommitTransaction
AT ENTRY AT ENTRY
IF flagged("finality_flag") && readCounter("counter") < 5 IF flagged("finality_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent() """.trimIndent()
submitBytemanRules(rules) submitBytemanRules(rules, port)
val aliceClient = alice.rpc.startFlow(
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
::CashIssueAndPaymentFlow, ::CashIssueAndPaymentFlow,
500.DOLLARS, 500.DOLLARS,
OpaqueBytes.of(0x01), OpaqueBytes.of(0x01),
@ -241,20 +201,11 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
defaultNotaryIdentity defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds) ).returnValue.getOrThrow(30.seconds)
val output = getBytemanOutput(charlie) charlie.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
// Check the stdout for the lines generated by byteman assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) alice.rpc.assertNumberOfCheckpoints(0)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) charlie.rpc.assertNumberOfCheckpoints(0)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
} }
} }
@ -262,7 +213,7 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding * Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding
* flow's node. * flow's node.
* *
* The exception is thrown 7 times. * The exception is thrown 4 times.
* *
* The responding flow is retried 3 times and is then kept in for observation. * The responding flow is retried 3 times and is then kept in for observation.
* *
@ -272,10 +223,10 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the * The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the
* flow is retried instead of moving straight to observation. * flow is retried instead of moving straight to observation.
*/ */
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and be kept for observation is error persists`() { fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and be kept for observation is error persists`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) { startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS) val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS) val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """ val rules = """
@ -299,36 +250,15 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS ${ActionExecutorImpl::class.java.name} CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction METHOD executeCommitTransaction
AT ENTRY AT ENTRY
IF flagged("finality_flag") && readCounter("counter") < 7 IF flagged("finality_flag") && readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent() """.trimIndent()
submitBytemanRules(rules) submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> { assertFailsWith<TimeoutException> {
aliceClient.startFlow( alice.rpc.startFlow(
::CashIssueAndPaymentFlow, ::CashIssueAndPaymentFlow,
500.DOLLARS, 500.DOLLARS,
OpaqueBytes.of(0x01), OpaqueBytes.of(0x01),
@ -338,20 +268,14 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
).returnValue.getOrThrow(30.seconds) ).returnValue.getOrThrow(30.seconds)
} }
val output = getBytemanOutput(charlie) charlie.rpc.assertHospitalCounts(
discharged = 3,
// Check the stdout for the lines generated by byteman observation = 1
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) )
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
assertEquals(3, discharge) alice.rpc.assertNumberOfCheckpoints(1)
assertEquals(1, observation) charlie.rpc.assertNumberOfCheckpoints(1)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for CashIssueAndPaymentFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
} }
} }
} }

View File

@ -0,0 +1,447 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaRuntimeException
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.internal.OutOfProcessImpl
import org.junit.Test
import java.sql.Connection
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
private companion object {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.processEvent].
*
* This is not an expected place for an exception to occur, but allows us to test what happens when a random exception is propagated
* up to [FlowStateMachineImpl.run] during flow initialisation.
*
* A "Transaction context is missing" exception is thrown due to where the exception is thrown (no transaction is created so this is
* thrown when leaving [FlowStateMachineImpl.processEventsUntilFlowIsResumed] due to the finally block).
*/
@Test(timeout = 300_000)
fun `unexpected error during flow initialisation throws exception to client`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD processEvent
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD processEvent
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<CordaRuntimeException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
}
alice.rpc.assertHospitalCounts(propagated = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* A [SQLException] is then thrown when trying to rollback the flow's database transaction.
*
* The [SQLException] should be suppressed and the flow should continue to retry and complete successfully.
*/
@Test(timeout = 300_000)
fun `error during initialisation when trying to rollback the flow's database transaction the flow is able to retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") == 0
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception when rolling back transaction in transition executor
INTERFACE ${Connection::class.java.name}
METHOD rollback
AT ENTRY
IF readCounter("counter") == 1
DO incrementCounter("counter"); traceln("Throwing exception in transition executor"); throw new java.sql.SQLException("could not reach db", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
alice.rpc.assertHospitalCounts(discharged = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* A [SQLException] is then thrown when trying to close the flow's database transaction.
*
* The [SQLException] should be suppressed and the flow should continue to retry and complete successfully.
*/
@Test(timeout = 300_000)
fun `error during initialisation when trying to close the flow's database transaction the flow is able to retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") == 0
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception when rolling back transaction in transition executor
INTERFACE ${Connection::class.java.name}
METHOD close
AT ENTRY
IF readCounter("counter") == 1
DO incrementCounter("counter"); traceln("Throwing exception in transition executor"); throw new java.sql.SQLException("could not reach db", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
alice.rpc.assertHospitalCounts(discharged = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
val terminated = (alice as OutOfProcessImpl).stop(60.seconds)
assertTrue(terminated, "The node must be shutdown before it can be restarted")
val (alice2, _) = createBytemanNode(ALICE_NAME)
Thread.sleep(10.seconds.toMillis())
alice2.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the retry itself.
*
* The flow then retries the retry causing the flow to complete successfully.
*/
@Test(timeout = 300_000)
fun `error during retrying a flow that failed when committing its original checkpoint will retry the flow again and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Throw exception on retry
CLASS ${SingleThreadedStateMachineManager::class.java.name}
METHOD onExternalStartFlow
AT ENTRY
IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(
discharged = 1,
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
* saved its first checkpoint (remains in an unstarted state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
charlie.rpc.assertHospitalCounts(discharged = 3)
alice.rpc.assertNumberOfCheckpoints(0)
charlie.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
* saved its first checkpoint (remains in an unstarted state).
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(1)
charlie.rpc.assertNumberOfCheckpoints(0)
val terminated = (charlie as OutOfProcessImpl).stop(60.seconds)
assertTrue(terminated, "The node must be shutdown before it can be restarted")
val (charlie2, _) = createBytemanNode(CHARLIE_NAME)
Thread.sleep(10.seconds.toMillis())
alice.rpc.assertNumberOfCheckpoints(0)
charlie2.rpc.assertNumberOfCheckpoints(0)
}
}
}

View File

@ -0,0 +1,526 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaRuntimeException
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.transitions.TopLevelTransition
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
/**
* Throws an exception when performing an [Action.SendInitial] action.
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and is then kept in
* the hospital for observation.
*/
@Test(timeout = 300_000)
fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendInitial action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
}
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(1)
}
}
/**
* Throws an exception when performing an [Action.SendInitial] event.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendInitial action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when executing [DeduplicationHandler.afterDatabaseTransaction] from inside an [Action.AcknowledgeMessages] action.
*
* The exception is thrown every time [DeduplicationHandler.afterDatabaseTransaction] is executed inside of
* [ActionExecutorImpl.executeAcknowledgeMessages]
*
* The exceptions should be swallowed. Therefore there should be no trips to the hospital and no retries.
* The flow should complete successfully as the error is swallowed.
*/
@Test(timeout = 300_000)
fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Set flag when inside executeAcknowledgeMessages
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeAcknowledgeMessages
AT INVOKE ${DeduplicationHandler::class.java.name}.afterDatabaseTransaction()
IF !flagged("exception_flag")
DO flag("exception_flag"); traceln("Setting flag to true")
ENDRULE
RULE Throw exception when executing ${DeduplicationHandler::class.java.name}.afterDatabaseTransaction when inside executeAcknowledgeMessages
INTERFACE ${DeduplicationHandler::class.java.name}
METHOD afterDatabaseTransaction
AT ENTRY
IF flagged("exception_flag")
DO traceln("Throwing exception"); clear("exception_flag"); traceln("SETTING FLAG TO FALSE"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCountsAllZero()
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when trying to propagate an error (processing an
* [Event.StartErrorPropagation] event)
*
* The exception is thrown 3 times.
*
* This causes the flow to retry the [Event.StartErrorPropagation] event until it succeeds. This this scenario it is retried 3 times,
* on the final retry the flow successfully propagates the error and completes exceptionally.
*/
@Test(timeout = 300_000)
fun `error during error propagation the flow is able to retry and recover`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ThrowAnErrorFlow::class.java.name}
METHOD throwException
AT ENTRY
IF !flagged("my_flag")
DO traceln("SETTING FLAG TO TRUE"); flag("my_flag")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("my_flag") && readCounter("counter") < 3
DO traceln("Throwing exception"); incrementCounter("counter"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<CordaRuntimeException> {
alice.rpc.startFlow(StateMachineErrorHandlingTest::ThrowAnErrorFlow).returnValue.getOrThrow(60.seconds)
}
alice.rpc.assertHospitalCounts(
propagated = 1,
propagatedRetry = 3
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when replaying a flow that has already successfully created its initial checkpoint.
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the retry itself.
*
* The flow is discharged and replayed from the hospital. An exception is then thrown during the retry that causes the flow to be
* retried again.
*/
@Test(timeout = 300_000)
fun `error during flow retry when executing retryFlowFromSafePoint the flow is able to retry and recover`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Set flag when executing first commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Throw exception on retry
CLASS ${SingleThreadedStateMachineManager::class.java.name}
METHOD addAndStartFlow
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(40.seconds)
alice.rpc.assertHospitalCounts(
discharged = 1,
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event after the flow has suspended (has moved to a started state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
// seems to be restarting the flow from the beginning every time
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Set flag when executing first commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
// seems to be restarting the flow from the beginning every time
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); clear("remove_checkpoint_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws a [ConstraintViolationException] when performing an [Action.CommitTransaction] event when the flow is finishing.
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 4
DO incrementCounter("counter");
clear("remove_checkpoint_flag");
traceln("Throwing exception");
throw new org.hibernate.exception.ConstraintViolationException("This flow has a terminal condition", new java.sql.SQLException(), "made up constraint")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
}
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(1)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing on a responding node.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
DO incrementCounter("counter");
clear("remove_checkpoint_flag");
traceln("Throwing exception");
throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
charlie.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
charlie.rpc.assertNumberOfCheckpoints(0)
}
}
}

View File

@ -0,0 +1,196 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
/**
* Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout = 300_000)
fun `error during transition due to an InterruptedException (killFlow) will terminate the flow`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val flow = alice.rpc.startTrackedFlow(StateMachineKillFlowErrorHandlingTest::SleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == SleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = alice.rpc.killFlow(flow.id)
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
assertEquals(1, numberOfTerminalDiagnoses)
alice.rpc.assertHospitalCounts(propagated = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Triggers `killFlow` during user application code.
*
* The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable]
* call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly.
*
* Although the call to kill the flow is made during user application code. It will not be removed / stop processing
* until the next suspension point is reached within the flow.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout = 300_000)
fun `flow killed during user code execution stops and removes the flow correctly`() {
startDriver {
val alice = createNode(ALICE_NAME)
val flow = alice.rpc.startTrackedFlow(StateMachineKillFlowErrorHandlingTest::ThreadSleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == ThreadSleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = alice.rpc.killFlow(flow.id)
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(30.seconds) }
assertTrue(flowKilled)
alice.rpc.assertHospitalCountsAllZero()
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and
* all that remains is its checkpoint in the database.
*
* The flow terminates and is not retried.
*
* Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are
* from the original flow that was put in for observation.
*/
@Test(timeout = 300_000)
fun `flow killed when it is in the flow hospital for observation is removed correctly`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val charlie = createNode(CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendInitial action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val flow = alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
alice.rpc.killFlow(flow.id)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
@StartableByRPC
class SleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
}
@StartableByRPC
class ThreadSleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep()
logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
// Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar
private fun sleep() {
Thread.sleep(20000)
}
}
}

View File

@ -1,7 +1,6 @@
package net.corda.node.services.statemachine package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatedBy
@ -20,13 +19,14 @@ import org.junit.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped @Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() { class StateMachineSubFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
/** /**
* This test checks that flow calling an initiating subflow will recover correctly. * This test checks that flow calling an initiating subflow will recover correctly.
* *
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty. * Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty.
* The exception is thrown 5 times. *
* The exception is thrown 3 times.
* *
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes. * succeeds and the flow finishes.
@ -37,11 +37,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering. * that 3 retries are attempted before recovering.
*/ */
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() { fun `initiating subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME) val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -72,7 +72,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS ${ActionExecutorImpl::class.java.name} CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction METHOD executeCommitTransaction
AT ENTRY AT ENTRY
IF flagged("subflow_flag") && flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 5 IF flagged("subflow_flag") && flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE ENDRULE
@ -83,52 +83,20 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
IF flagged("subflow_flag") && flagged("suspend_flag") && !flagged("commit_flag") IF flagged("subflow_flag") && flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true") DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent() """.trimIndent()
submitBytemanRules(rules) submitBytemanRules(rules, port)
val aliceClient = alice.rpc.startFlow(
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow,
charlie.nodeInfo.singleIdentity()
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( ).returnValue.getOrThrow(
30.seconds 30.seconds
) )
val output = getBytemanOutput(alice) alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
// Check the stdout for the lines generated by byteman alice.rpc.assertNumberOfCheckpoints(0)
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
} }
} }
@ -136,7 +104,8 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* This test checks that flow calling an initiating subflow will recover correctly. * This test checks that flow calling an initiating subflow will recover correctly.
* *
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty. * Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty.
* The exception is thrown 5 times. *
* The exception is thrown 3 times.
* *
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes. * succeeds and the flow finishes.
@ -147,11 +116,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering. * that 3 retries are attempted before recovering.
*/ */
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs after the first receive will retry and complete successfully`() { fun `initiating subflow - error during transition with CommitTransaction action that occurs after the first receive will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME) val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -182,55 +151,23 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS ${ActionExecutorImpl::class.java.name} CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction METHOD executeCommitTransaction
AT ENTRY AT ENTRY
IF flagged("subflow_flag") && flagged("suspend_flag") && readCounter("counter") < 5 IF flagged("subflow_flag") && flagged("suspend_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent() """.trimIndent()
submitBytemanRules(rules) submitBytemanRules(rules, port)
val aliceClient = alice.rpc.startFlow(
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow,
charlie.nodeInfo.singleIdentity()
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( ).returnValue.getOrThrow(
30.seconds 30.seconds
) )
val output = getBytemanOutput(alice) alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
// Check the stdout for the lines generated by byteman alice.rpc.assertNumberOfCheckpoints(0)
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
} }
} }
@ -238,7 +175,8 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* This test checks that flow calling an inline subflow will recover correctly. * This test checks that flow calling an inline subflow will recover correctly.
* *
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty. * Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty.
* The exception is thrown 5 times. *
* The exception is thrown 3 times.
* *
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes. * succeeds and the flow finishes.
@ -249,11 +187,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering. * that 3 retries are attempted before recovering.
*/ */
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() { fun `inline subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME) val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -276,55 +214,23 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS ${ActionExecutorImpl::class.java.name} CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction METHOD executeCommitTransaction
AT ENTRY AT ENTRY
IF flagged("subflow_flag") && readCounter("counter") < 5 IF flagged("subflow_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent() """.trimIndent()
submitBytemanRules(rules) submitBytemanRules(rules, port)
val aliceClient = alice.rpc.startFlow(
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow,
charlie.nodeInfo.singleIdentity()
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( ).returnValue.getOrThrow(
30.seconds 30.seconds
) )
val output = getBytemanOutput(alice) alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
// Check the stdout for the lines generated by byteman alice.rpc.assertNumberOfCheckpoints(0)
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
} }
} }
@ -332,7 +238,8 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* This test checks that flow calling an inline subflow will recover correctly. * This test checks that flow calling an inline subflow will recover correctly.
* *
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty. * Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty.
* The exception is thrown 5 times. *
* The exception is thrown 3 times.
* *
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes. * succeeds and the flow finishes.
@ -343,11 +250,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering. * that 3 retries are attempted before recovering.
*/ */
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first receive will retry and complete successfully`() { fun `inline subflow - error during transition with CommitTransaction action that occurs during the first receive will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME) val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -370,7 +277,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS ${ActionExecutorImpl::class.java.name} CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction METHOD executeCommitTransaction
AT ENTRY AT ENTRY
IF flagged("subflow_flag") && flagged("commit_flag") && readCounter("counter") < 5 IF flagged("subflow_flag") && flagged("commit_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE ENDRULE
@ -381,52 +288,20 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
IF flagged("subflow_flag") && !flagged("commit_flag") IF flagged("subflow_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true") DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent() """.trimIndent()
submitBytemanRules(rules) submitBytemanRules(rules, port)
val aliceClient = alice.rpc.startFlow(
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow,
charlie.nodeInfo.singleIdentity()
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( ).returnValue.getOrThrow(
30.seconds 30.seconds
) )
val output = getBytemanOutput(alice) alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
// Check the stdout for the lines generated by byteman alice.rpc.assertNumberOfCheckpoints(0)
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
} }
} }

View File

@ -1,322 +0,0 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
/**
* Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout=300_000)
fun `error during transition due to an InterruptedException (killFlow) will terminate the flow`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(StatemachineKillFlowErrorHandlingTest::SleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == SleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
assertEquals(1, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` during user application code.
*
* The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable]
* call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly.
*
* Although the call to kill the flow is made during user application code. It will not be removed / stop processing
* until the next suspension point is reached within the flow.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout=300_000)
fun `flow killed during user code execution stops and removes the flow correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(StatemachineKillFlowErrorHandlingTest::ThreadSleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == ThreadSleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(30.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
println(numberOfTerminalDiagnoses)
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and
* all that remains is its checkpoint in the database.
*
* The flow terminates and is not retried.
*
* Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are
* from the original flow that was put in for observation.
*/
@Test(timeout=300_000)
fun `flow killed when it is in the flow hospital for observation is removed correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val charlie = createNode(CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendInitial action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
aliceClient.killFlow(flow.id)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
@StartableByRPC
class SleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
}
@StartableByRPC
class ThreadSleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep()
logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
// Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar
private fun sleep() {
Thread.sleep(20000)
}
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import java.sql.SQLException
/** /**
* An executor of a single [Action]. * An executor of a single [Action].
@ -10,5 +11,6 @@ interface ActionExecutor {
* Execute [action] by [fiber]. * Execute [action] by [fiber].
*/ */
@Suspendable @Suspendable
@Throws(SQLException::class)
fun executeAction(fiber: FlowFiber, action: Action) fun executeAction(fiber: FlowFiber, action: Action)
} }

View File

@ -19,6 +19,7 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.nodeapi.internal.persistence.contextDatabase import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.sql.SQLException
import java.time.Duration import java.time.Duration
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
@ -212,6 +213,7 @@ class ActionExecutorImpl(
} }
@Suspendable @Suspendable
@Throws(SQLException::class)
private fun executeCreateTransaction() { private fun executeCreateTransaction() {
if (contextTransactionOrNull != null) { if (contextTransactionOrNull != null) {
throw IllegalStateException("Refusing to create a second transaction") throw IllegalStateException("Refusing to create a second transaction")
@ -225,6 +227,7 @@ class ActionExecutorImpl(
} }
@Suspendable @Suspendable
@Throws(SQLException::class)
private fun executeCommitTransaction() { private fun executeCommitTransaction() {
try { try {
contextTransaction.commit() contextTransaction.commit()

View File

@ -40,7 +40,7 @@ sealed class Event {
* Signal that an error has happened. This may be due to an uncaught exception in the flow or some external error. * Signal that an error has happened. This may be due to an uncaught exception in the flow or some external error.
* @param exception the exception itself. * @param exception the exception itself.
*/ */
data class Error(val exception: Throwable) : Event() data class Error(val exception: Throwable, val rollback: Boolean = true) : Event()
/** /**
* Signal that a ledger transaction has committed. This is an event completing a [FlowIORequest.WaitForLedgerCommit] * Signal that a ledger transaction has committed. This is an event completing a [FlowIORequest.WaitForLedgerCommit]

View File

@ -258,12 +258,15 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
openThreadLocalWormhole() openThreadLocalWormhole()
setLoggingContext() setLoggingContext()
initialiseFlow()
logger.debug { "Calling flow: $logic" } logger.debug { "Calling flow: $logic" }
val startTime = System.nanoTime() val startTime = System.nanoTime()
var initialised = false
val resultOrError = try { val resultOrError = try {
initialiseFlow()
initialised = true
// This sets the Cordapp classloader on the contextClassLoader of the current thread. // This sets the Cordapp classloader on the contextClassLoader of the current thread.
// Needed because in previous versions of the finance app we used Thread.contextClassLoader to resolve services defined in cordapps. // Needed because in previous versions of the finance app we used Thread.contextClassLoader to resolve services defined in cordapps.
Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader
@ -284,14 +287,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Event.FlowFinish(resultOrError.value, softLocksId) Event.FlowFinish(resultOrError.value, softLocksId)
} }
is Try.Failure -> { is Try.Failure -> {
Event.Error(resultOrError.exception) Event.Error(resultOrError.exception, initialised)
} }
} }
// Immediately process the last event. This is to make sure the transition can assume that it has an open // Immediately process the last event. This is to make sure the transition can assume that it has an open
// database transaction. // database transaction.
val continuation = processEventImmediately( val continuation = processEventImmediately(
finalEvent, finalEvent,
isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnEntry = initialised,
isDbTransactionOpenOnExit = false isDbTransactionOpenOnExit = false
) )
if (continuation == FlowContinuation.ProcessEvents) { if (continuation == FlowContinuation.ProcessEvents) {
@ -309,8 +312,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable @Suspendable
private fun initialiseFlow() { private fun initialiseFlow() {
processEventsUntilFlowIsResumed( processEventsUntilFlowIsResumed(
isDbTransactionOpenOnEntry = false, isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = true isDbTransactionOpenOnExit = true
) )
} }

View File

@ -354,67 +354,55 @@ class SingleThreadedStateMachineManager(
override fun retryFlowFromSafePoint(currentState: StateMachineState) { override fun retryFlowFromSafePoint(currentState: StateMachineState) {
// Get set of external events // Get set of external events
val flowId = currentState.flowLogic.runId val flowId = currentState.flowLogic.runId
try { val oldFlowLeftOver = mutex.locked { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
val oldFlowLeftOver = mutex.locked { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue if (oldFlowLeftOver == null) {
if (oldFlowLeftOver == null) { logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.") return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return return
} }
val flow = if (currentState.isAnyCheckpointPersisted) { // Resurrect flow
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that createFlowFromCheckpoint(
// we mirror exactly what happens when restarting the node. id = flowId,
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) serializedCheckpoint = serializedCheckpoint,
if (serializedCheckpoint == null) { initialDeduplicationHandler = null,
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.") isAnyCheckpointPersisted = true,
return isStartIdempotent = false
} ) ?: return
// Resurrect flow } else {
createFlowFromCheckpoint( // Just flow initiation message
id = flowId, null
serializedCheckpoint = serializedCheckpoint, }
initialDeduplicationHandler = null, mutex.locked {
isAnyCheckpointPersisted = true, if (stopping) {
isStartIdempotent = false return
) ?: return
} else {
// Just flow initiation message
null
} }
mutex.locked { // Remove any sessions the old flow has.
if (stopping) { for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
return sessionToFlow.remove(sessionId)
} }
// Remove any sessions the old flow has. if (flow != null) {
for (sessionId in getFlowSessionIds(currentState.checkpoint)) { injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
sessionToFlow.remove(sessionId) addAndStartFlow(flowId, flow)
} }
if (flow != null) { // Deliver all the external events from the old flow instance.
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic) val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
addAndStartFlow(flowId, flow) do {
} val event = oldFlowLeftOver.tryReceive()
// Deliver all the external events from the old flow instance. if (event is Event.GeneratedByExternalEvent) {
val unprocessedExternalEvents = mutableListOf<ExternalEvent>() unprocessedExternalEvents += event.deduplicationHandler.externalCause
do { }
val event = oldFlowLeftOver.tryReceive() } while (event != null)
if (event is Event.GeneratedByExternalEvent) { val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
unprocessedExternalEvents += event.deduplicationHandler.externalCause for (externalEvent in externalEvents) {
} deliverExternalEvent(externalEvent)
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
} }
} catch (e: Exception) {
// Failed to retry - manually put the flow in for observation rather than
// relying on the [HospitalisingInterceptor] to do so
val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored)
?.errors
?.map { it.exception }
?.plus(e) ?: emptyList()
logger.info("Failed to retry flow $flowId, keeping in for observation and aborting")
flowHospital.forceIntoOvernightObservation(flowId, exceptions)
throw e
} }
} }
@ -589,7 +577,8 @@ class SingleThreadedStateMachineManager(
// Load the flow's checkpoint // Load the flow's checkpoint
// The checkpoint will be missing if the flow failed before persisting the original checkpoint // The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
checkpointStorage.getCheckpoint(flowId)?.let { serializedCheckpoint -> val existingCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
existingCheckpoint?.let { serializedCheckpoint ->
val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId) val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId)
if (checkpoint == null) { if (checkpoint == null) {
return openFuture<FlowStateMachine<A>>().mapError { return openFuture<FlowStateMachine<A>>().mapError {
@ -919,6 +908,8 @@ class SingleThreadedStateMachineManager(
(exception as? FlowException)?.originalErrorId = flowError.errorId (exception as? FlowException)?.originalErrorId = flowError.errorId
flow.resultFuture.setException(exception) flow.resultFuture.setException(exception)
lastState.flowLogic.progressTracker?.endWithError(exception) lastState.flowLogic.progressTracker?.endWithError(exception)
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Failure<Nothing>(exception))) changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Failure<Nothing>(exception)))
} }

View File

@ -31,6 +31,7 @@ import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import javax.persistence.PersistenceException import javax.persistence.PersistenceException
import kotlin.collections.HashMap
import kotlin.concurrent.timerTask import kotlin.concurrent.timerTask
import kotlin.math.pow import kotlin.math.pow
@ -51,15 +52,24 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
DatabaseEndocrinologist, DatabaseEndocrinologist,
TransitionErrorGeneralPractitioner, TransitionErrorGeneralPractitioner,
SedationNurse, SedationNurse,
NotaryDoctor NotaryDoctor,
ResuscitationSpecialist
) )
private const val MAX_BACKOFF_TIME = 110.0 // Totals to 2 minutes when calculating the backoff time
@VisibleForTesting @VisibleForTesting
val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>() val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting @VisibleForTesting
val onFlowDischarged = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>() val onFlowDischarged = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowErrorPropagated = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowResuscitated = mutableListOf<(id: StateMachineRunId, by: List<String>, outcome: Outcome) -> Unit>()
@VisibleForTesting @VisibleForTesting
val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>() val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>()
} }
@ -164,39 +174,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
} }
/** /**
* Forces the flow to be kept in for overnight observation by the hospital. A flow must already exist inside the hospital * Request treatment for the [flowFiber].
* and have existing medical records for it to be moved to overnight observation. If it does not meet these criteria then
* an [IllegalArgumentException] will be thrown.
*
* @param id The [StateMachineRunId] of the flow that you are trying to force into observation
* @param errors The errors to include in the new medical record
*/
fun forceIntoOvernightObservation(id: StateMachineRunId, errors: List<Throwable>) {
mutex.locked {
// If a flow does not meet the criteria below, then it has moved into an invalid state or the function is being
// called from an incorrect location. The assertions below should error out the flow if they are not true.
requireNotNull(flowsInHospital[id]) { "Flow must already be in the hospital before forcing into overnight observation" }
val history = requireNotNull(flowPatients[id]) { "Flow must already have history before forcing into overnight observation" }
// Use the last staff member that last discharged the flow as the current staff member
val record = history.records.last().copy(
time = clock.instant(),
errors = errors,
outcome = Outcome.OVERNIGHT_OBSERVATION
)
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(id, record.by.map { it.toString() }) }
history.records += record
recordsPublisher.onNext(record)
}
}
/**
* Request treatment for the [flowFiber]. A flow can only be added to the hospital if they are not already being
* treated.
*/ */
fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) { fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
// Only treat flows that are not already in the hospital if (!currentState.isRemoved) {
if (!currentState.isRemoved && flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) { flowsInHospital[flowFiber.id] = flowFiber
admit(flowFiber, currentState, errors) admit(flowFiber, currentState, errors)
} }
} }
@ -216,20 +198,30 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
Diagnosis.DISCHARGE -> { Diagnosis.DISCHARGE -> {
val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState) val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState)
log.info("Flow error discharged from hospital (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})") log.info("Flow error discharged from hospital (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})")
onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) } onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }) }
Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff) Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff)
} }
Diagnosis.OVERNIGHT_OBSERVATION -> { Diagnosis.OVERNIGHT_OBSERVATION -> {
log.info("Flow error kept for overnight observation by ${report.by} (error was ${report.error.message})") log.info("Flow error kept for overnight observation by ${report.by} (error was ${report.error.message})")
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart // We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) } onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }) }
Triple(Outcome.OVERNIGHT_OBSERVATION, null, 0.seconds) Triple(Outcome.OVERNIGHT_OBSERVATION, null, 0.seconds)
} }
Diagnosis.NOT_MY_SPECIALTY, Diagnosis.TERMINAL -> { Diagnosis.NOT_MY_SPECIALTY, Diagnosis.TERMINAL -> {
// None of the staff care for these errors, or someone decided it is a terminal condition, so we let them propagate // None of the staff care for these errors, or someone decided it is a terminal condition, so we let them propagate
log.info("Flow error allowed to propagate", report.error) log.info("Flow error allowed to propagate", report.error)
onFlowErrorPropagated.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }) }
Triple(Outcome.UNTREATABLE, Event.StartErrorPropagation, 0.seconds) Triple(Outcome.UNTREATABLE, Event.StartErrorPropagation, 0.seconds)
} }
Diagnosis.RESUSCITATE -> {
// reschedule the last outcome as it failed to process it
// do a 0.seconds backoff in dev mode? / when coming from the driver? make it configurable?
val backOff = calculateBackOffForResuscitation(medicalHistory, currentState)
val outcome = medicalHistory.records.last().outcome
log.info("Flow error to be resuscitated, rescheduling previous outcome - $outcome (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})")
onFlowResuscitated.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }, outcome) }
Triple(outcome, outcome.event, backOff)
}
} }
val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome) val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome)
@ -249,18 +241,29 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
} }
} }
private fun calculateBackOffForChronicCondition(report: ConsultationReport, medicalHistory: FlowMedicalHistory, currentState: StateMachineState): Duration { private fun calculateBackOffForChronicCondition(
return report.by.firstOrNull { it is Chronic }?.let { chronicStaff -> report: ConsultationReport,
return medicalHistory.timesDischargedForTheSameThing(chronicStaff, currentState).let { medicalHistory: FlowMedicalHistory,
if (it == 0) { currentState: StateMachineState
0.seconds ): Duration {
} else { return report.by.firstOrNull { it is Chronic }?.let { staff ->
maxOf(10, (10 + (Math.random()) * (10 * 1.5.pow(it)) / 2).toInt()).seconds calculateBackOff(medicalHistory.timesDischargedForTheSameThing(staff, currentState))
}
}
} ?: 0.seconds } ?: 0.seconds
} }
private fun calculateBackOffForResuscitation(
medicalHistory: FlowMedicalHistory,
currentState: StateMachineState
): Duration = calculateBackOff(medicalHistory.timesResuscitated(currentState))
private fun calculateBackOff(timesDiagnosisGiven: Int): Duration {
return if (timesDiagnosisGiven == 0) {
0.seconds
} else {
maxOf(10, (10 + (Math.random()) * minOf(MAX_BACKOFF_TIME, (10 * 1.5.pow(timesDiagnosisGiven)) / 2)).toInt()).seconds
}
}
private fun consultStaff(flowFiber: FlowFiber, private fun consultStaff(flowFiber: FlowFiber,
currentState: StateMachineState, currentState: StateMachineState,
errors: List<Throwable>, errors: List<Throwable>,
@ -318,6 +321,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount } return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount }
} }
fun timesResuscitated(currentState: StateMachineState): Int {
val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends
return records.count { ResuscitationSpecialist in it.by && it.suspendCount == lastAdmittanceSuspendCount }
}
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)" override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
} }
@ -351,10 +359,16 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
} }
} }
enum class Outcome { DISCHARGE, OVERNIGHT_OBSERVATION, UNTREATABLE } enum class Outcome(val event: Event?) {
DISCHARGE(Event.RetryFlowFromSafePoint),
OVERNIGHT_OBSERVATION(null),
UNTREATABLE(Event.StartErrorPropagation)
}
/** The order of the enum values are in priority order. */ /** The order of the enum values are in priority order. */
enum class Diagnosis { enum class Diagnosis {
/** Retry the last outcome/diagnosis **/
RESUSCITATE,
/** The flow should not see other staff members */ /** The flow should not see other staff members */
TERMINAL, TERMINAL,
/** Retry from last safe point. */ /** Retry from last safe point. */
@ -369,6 +383,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
} }
/**
* The [Chronic] interface relates to [Staff] that return diagnoses that can be constantly be diagnosed if the flow keeps returning to
* the hospital. [Chronic] diagnoses apply a backoff before scheduling a new [Event], this prevents a flow from constantly retrying
* without a chance for the underlying issue to resolve itself.
*/
interface Chronic interface Chronic
/** /**
@ -539,10 +558,10 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
newError.mentionsThrowable(AsyncOperationTransitionException::class.java) -> Diagnosis.NOT_MY_SPECIALTY newError.mentionsThrowable(AsyncOperationTransitionException::class.java) -> Diagnosis.NOT_MY_SPECIALTY
history.notDischargedForTheSameThingMoreThan(2, this, currentState) -> Diagnosis.DISCHARGE history.notDischargedForTheSameThingMoreThan(2, this, currentState) -> Diagnosis.DISCHARGE
else -> Diagnosis.OVERNIGHT_OBSERVATION else -> Diagnosis.OVERNIGHT_OBSERVATION
} }.also { logDiagnosis(it, newError, flowFiber, history) }
} else { } else {
Diagnosis.NOT_MY_SPECIALTY Diagnosis.NOT_MY_SPECIALTY
}.also { logDiagnosis(it, newError, flowFiber, history) } }
} }
private fun logDiagnosis(diagnosis: Diagnosis, newError: Throwable, flowFiber: FlowFiber, history: FlowMedicalHistory) { private fun logDiagnosis(diagnosis: Diagnosis, newError: Throwable, flowFiber: FlowFiber, history: FlowMedicalHistory) {
@ -593,6 +612,25 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
return Diagnosis.NOT_MY_SPECIALTY return Diagnosis.NOT_MY_SPECIALTY
} }
} }
/**
* Handles errors coming from the processing of errors events ([Event.StartErrorPropagation] and [Event.RetryFlowFromSafePoint]),
* returning a [Diagnosis.RESUSCITATE] diagnosis
*/
object ResuscitationSpecialist : Staff {
override fun consult(
flowFiber: FlowFiber,
currentState: StateMachineState,
newError: Throwable,
history: FlowMedicalHistory
): Diagnosis {
return if (newError is ErrorStateTransitionException) {
Diagnosis.RESUSCITATE
} else {
Diagnosis.NOT_MY_SPECIALTY
}
}
}
} }
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean { private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {

View File

@ -16,3 +16,5 @@ class StateTransitionException(
} }
class AsyncOperationTransitionException(exception: Exception) : CordaException(exception.message, exception) class AsyncOperationTransitionException(exception: Exception) : CordaException(exception.message, exception)
class ErrorStateTransitionException(val exception: Exception) : CordaException(exception.message, exception)

View File

@ -10,6 +10,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseTransactionException
import net.corda.nodeapi.internal.persistence.contextDatabase import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.security.SecureRandom import java.security.SecureRandom
import java.sql.SQLException
import javax.persistence.OptimisticLockException import javax.persistence.OptimisticLockException
/** /**
@ -20,8 +21,8 @@ import javax.persistence.OptimisticLockException
* completely aborted to avoid error loops. * completely aborted to avoid error loops.
*/ */
class TransitionExecutorImpl( class TransitionExecutorImpl(
val secureRandom: SecureRandom, val secureRandom: SecureRandom,
val database: CordaPersistence val database: CordaPersistence
) : TransitionExecutor { ) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {} override fun forceRemoveFlow(id: StateMachineRunId) {}
@ -32,33 +33,44 @@ class TransitionExecutorImpl(
@Suppress("NestedBlockDepth", "ReturnCount") @Suppress("NestedBlockDepth", "ReturnCount")
@Suspendable @Suspendable
override fun executeTransition( override fun executeTransition(
fiber: FlowFiber, fiber: FlowFiber,
previousState: StateMachineState, previousState: StateMachineState,
event: Event, event: Event,
transition: TransitionResult, transition: TransitionResult,
actionExecutor: ActionExecutor actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> { ): Pair<FlowContinuation, StateMachineState> {
contextDatabase = database contextDatabase = database
for (action in transition.actions) { for (action in transition.actions) {
try { try {
actionExecutor.executeAction(fiber, action) actionExecutor.executeAction(fiber, action)
} catch (exception: Exception) { } catch (exception: Exception) {
contextTransactionOrNull?.close() rollbackTransactionOnError()
if (transition.newState.checkpoint.errorState is ErrorState.Errored) { if (transition.newState.checkpoint.errorState is ErrorState.Errored) {
// If we errored while transitioning to an error state then we cannot record the additional log.warn("Error while executing $action, with error event $event, updating errored state", exception)
// error as that may result in an infinite loop, e.g. error propagation fails -> record error -> propagate fails again.
// Instead we just keep around the old error state and wait for a new schedule, perhaps val newState = previousState.copy(
// triggered from a flow hospital checkpoint = previousState.checkpoint.copy(
log.warn("Error while executing $action during transition to errored state, aborting transition", exception) errorState = previousState.checkpoint.errorState.addErrors(
// CORDA-3354 - Go to the hospital with the new error that has occurred listOf(
// while already in a error state (as this error could be for a different reason) FlowError(
return Pair(FlowContinuation.Abort, previousState.copy(isFlowResumed = false)) secureRandom.nextLong(),
ErrorStateTransitionException(exception)
)
)
)
),
isFlowResumed = false
)
return Pair(FlowContinuation.ProcessEvents, newState)
} else { } else {
// Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork // Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork
// to trigger error propagation // to trigger error propagation
if(previousState.isRemoved && exception is OptimisticLockException) { if (log.isDebugEnabled && previousState.isRemoved && exception is OptimisticLockException) {
log.debug("Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " + log.debug(
"Occurred while executing $action, with event $event", exception) "Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " +
"Occurred while executing $action, with event $event", exception
)
} else { } else {
log.info("Error while executing $action, with event $event, erroring state", exception) log.info("Error while executing $action, with event $event, erroring state", exception)
} }
@ -76,12 +88,12 @@ class TransitionExecutorImpl(
} }
val newState = previousState.copy( val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy( checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors( errorState = previousState.checkpoint.errorState.addErrors(
listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException)) listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException))
) )
), ),
isFlowResumed = false isFlowResumed = false
) )
fiber.scheduleEvent(Event.DoRemainingWork) fiber.scheduleEvent(Event.DoRemainingWork)
return Pair(FlowContinuation.ProcessEvents, newState) return Pair(FlowContinuation.ProcessEvents, newState)
@ -90,4 +102,25 @@ class TransitionExecutorImpl(
} }
return Pair(transition.continuation, transition.newState) return Pair(transition.continuation, transition.newState)
} }
private fun rollbackTransactionOnError() {
contextTransactionOrNull?.run {
try {
rollback()
} catch (rollbackException: SQLException) {
log.info(
"Error rolling back database transaction from a previous error, continuing error handling for the original error",
rollbackException
)
}
try {
close()
} catch (rollbackException: SQLException) {
log.info(
"Error closing database transaction from a previous error, continuing error handling for the original error",
rollbackException
)
}
}
}
} }

View File

@ -17,8 +17,8 @@ import net.corda.node.services.statemachine.transitions.TransitionResult
* transition. * transition.
*/ */
class HospitalisingInterceptor( class HospitalisingInterceptor(
private val flowHospital: StaffedFlowHospital, private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor private val delegate: TransitionExecutor
) : TransitionExecutor { ) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) { override fun forceRemoveFlow(id: StateMachineRunId) {
removeFlow(id) removeFlow(id)
@ -32,11 +32,11 @@ class HospitalisingInterceptor(
@Suspendable @Suspendable
override fun executeTransition( override fun executeTransition(
fiber: FlowFiber, fiber: FlowFiber,
previousState: StateMachineState, previousState: StateMachineState,
event: Event, event: Event,
transition: TransitionResult, transition: TransitionResult,
actionExecutor: ActionExecutor actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> { ): Pair<FlowContinuation, StateMachineState> {
// If the fiber's previous state was clean then remove it from the hospital // If the fiber's previous state was clean then remove it from the hospital
@ -47,8 +47,8 @@ class HospitalisingInterceptor(
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor) val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
if (nextState.checkpoint.errorState is ErrorState.Errored && previousState.checkpoint.errorState is ErrorState.Clean) { if (canEnterHospital(previousState, nextState)) {
val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception } val exceptionsToHandle = (nextState.checkpoint.errorState as ErrorState.Errored).errors.map { it.exception }
flowHospital.requestTreatment(fiber, previousState, exceptionsToHandle) flowHospital.requestTreatment(fiber, previousState, exceptionsToHandle)
} }
if (nextState.isRemoved) { if (nextState.isRemoved) {
@ -56,4 +56,9 @@ class HospitalisingInterceptor(
} }
return Pair(continuation, nextState) return Pair(continuation, nextState)
} }
private fun canEnterHospital(previousState: StateMachineState, nextState: StateMachineState): Boolean {
return nextState.checkpoint.errorState is ErrorState.Errored
&& (previousState.checkpoint.errorState as? ErrorState.Errored)?.errors != nextState.checkpoint.errorState.errors
}
} }

View File

@ -1,6 +1,8 @@
package net.corda.node.services.statemachine.transitions package net.corda.node.services.statemachine.transitions
import net.corda.node.services.statemachine.* import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.StateMachineState
/** /**
* This transition checks the current state of the flow and determines whether anything needs to be done. * This transition checks the current state of the flow and determines whether anything needs to be done.

View File

@ -53,7 +53,7 @@ class TopLevelTransition(
private fun errorTransition(event: Event.Error): TransitionResult { private fun errorTransition(event: Event.Error): TransitionResult {
return builder { return builder {
freshErrorTransition(event.exception) freshErrorTransition(event.exception, event.rollback)
FlowContinuation.ProcessEvents FlowContinuation.ProcessEvents
} }
} }
@ -292,9 +292,7 @@ class TopLevelTransition(
private fun retryFlowFromSafePointTransition(startingState: StateMachineState): TransitionResult { private fun retryFlowFromSafePointTransition(startingState: StateMachineState): TransitionResult {
return builder { return builder {
// Need to create a flow from the prior checkpoint or flow initiation. // Need to create a flow from the prior checkpoint or flow initiation.
actions.add(Action.CreateTransaction)
actions.add(Action.RetryFlowFromSafePoint(startingState)) actions.add(Action.RetryFlowFromSafePoint(startingState))
actions.add(Action.CommitTransaction)
FlowContinuation.Abort FlowContinuation.Abort
} }
} }

View File

@ -28,12 +28,12 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
* *
* @param error the error. * @param error the error.
*/ */
fun freshErrorTransition(error: Throwable) { fun freshErrorTransition(error: Throwable, rollback: Boolean = true) {
val flowError = FlowError( val flowError = FlowError(
errorId = (error as? IdentifiableException)?.errorId ?: context.secureRandom.nextLong(), errorId = (error as? IdentifiableException)?.errorId ?: context.secureRandom.nextLong(),
exception = error exception = error
) )
errorTransition(flowError) errorTransition(flowError, rollback)
} }
/** /**
@ -42,7 +42,7 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
* *
* @param error the error. * @param error the error.
*/ */
fun errorsTransition(errors: List<FlowError>) { fun errorsTransition(errors: List<FlowError>, rollback: Boolean) {
currentState = currentState.copy( currentState = currentState.copy(
checkpoint = currentState.checkpoint.copy( checkpoint = currentState.checkpoint.copy(
errorState = currentState.checkpoint.errorState.addErrors(errors) errorState = currentState.checkpoint.errorState.addErrors(errors)
@ -50,10 +50,10 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
isFlowResumed = false isFlowResumed = false
) )
actions.clear() actions.clear()
actions.addAll(arrayOf( if(rollback) {
Action.RollbackTransaction, actions += Action.RollbackTransaction
Action.ScheduleEvent(Event.DoRemainingWork) }
)) actions += Action.ScheduleEvent(Event.DoRemainingWork)
} }
/** /**
@ -62,8 +62,8 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
* *
* @param error the error. * @param error the error.
*/ */
fun errorTransition(error: FlowError) { fun errorTransition(error: FlowError, rollback: Boolean) {
errorsTransition(listOf(error)) errorsTransition(listOf(error), rollback)
} }
fun resumeFlowLogic(result: Any?): FlowContinuation { fun resumeFlowLogic(result: Any?): FlowContinuation {