CORDA-3194 Wrap state transition exceptions and add flow hospital error handling for them (#2542)

Wrap exceptions that occur in state machine transitions with a custom exception type which is
then handled inside of the flow hospital. As part of this change, a number of side negative side
effects have been addressed.

General summary:

- `StateTransitionException` wraps exceptions caught in `TransitionExecutorImpl`
- `StateTransitionExceptions` are handled in the flow hospital, retried 3 times and then kept in
    for observation if errors persist (assuming conditions below are false)
- Exceptions that occur in `FlowAsyncOperation` events are wrapped in
  `AsyncOperationTransitionException` and ignored by the flow hospital transition staff member
- `InterruptException`s are given a `TERMINAL` diagnosis by the flow hospital transition staff
   member (can occur due to `killFlow`)
- Allow flows which have not persisted their original checkpoint to still retry by replaying their
   start flow messages
- Swallow exceptions in `AcknowledgeMessages` actions

Detailed summary:

* CORDA-3194 Add state machine transition error handling to flow hospital

Wrap exceptions that are caught in `TransitionExecutorImpl` (coming from
new errors) with `StateTransitionException`. This exception is then
handled explicitly by the flow hospital.

Add `TransitionErrorGeneralPractitioner` to `StaffedFlowHospital`. This
staff member handles errors that mention `StateTransitionException`.
Errors are retried and then kept in the hospital if the errors persist.

* CORDA-3194 Remove a fiber from the `hospitalisedFlows` if its previous state was clean

If the fiber's previous state was clean then remove it from
`HospitalisingInterceptor.hospitalisedFlows`. This allows flows that are
being retried to clean themselves. Doing this allows them to re-enter
the flow hospital after executing the fiber's transition (if an error
occurs).

This is important for retrying a flow that has errored during a
transition.

* CORDA-3194 Set `isAnyCheckpointPersisted` to true when retrying a flow

Added to prevent a single flow from creating multiple checkpoints when
a failure occurs during `Action.AcknowledgeMessages`.

More specifically, to `isAnyCheckpointPersisted` is false when retrying
the flow, even though a checkpoint has actually been saved. Due to this
a brand new flow is started with a new flow id (causing duplication).

Setting `isAnyCheckpointPersisted` to true specifically when retrying a
flow resolves this issue.

* CORDA-3194 Add Byteman test to verify transition error handling

Add `StatemachineErrorHandlingTest` to verify transition error handling.

Byteman allows exceptions to be injected at certain points in the code's
execution. Therefore exceptions can be thrown when needed inside of the
state machine.

The current tests check errors in events:
- `InitiateFlow`
- `AcknowledgeMessages`

* CORDA-3194 Swallow all exceptions in `ActionExecutorImpl.executeAcknowledgeMessages`

Swallow the exceptions that occur in the `DeduplicationHandler`s when
inside of `ActionExecutorImpl.executeAcknowledgeMessages`.

The side effects of the failures that can happen in the handlers are
not serious enough to put the transition into a failure state.
Therefore they are now caught. This allows the transition to continue
as normal, even if an error occurs in one any of the handlers.

* CORDA-3194 Wrap unexpected exceptions thrown in async operation transitions

Exceptions thrown inside of `FlowAsyncOperation.execute` implementations
that are not returned as part of the future, are caught, wrapped and
rethrown. This prevents unexpected exceptions thrown by (most likely)
user code from being handled by the hospital by the transition
staff member.

This handling might change moving forward, but it allows the async
operation to continue working as it was before transition error handling
was added.

* CORDA-3194 Verify that errors inside of `AcknowledgeMessages` work as expected

Update `StatemachineErrorHandlingTest` to correctly test errors that
occur when executing the `AcknowledgeMessages` action.

* CORDA-3194 Retry flows that failed to persist their original checkpoint

Allow a flow that failed when creating their original checkpoint (for
example - failing to commit the db transaction) to retry.

The flow will create a brand new checkpoint (as the original did not
saved).

This required adding `flowId` to `ExternalStartFlowEvent` to allow the
event to keep a record of the flow's id. When the flow is retried, the
events are replayed which trigger a flow to be started that has the
id stored in the event.

To allow this change, code was removed from `retryFlowFromSafePoint` to
allow the function to continue, even if no checkpoint matches the passed
in flow id.

* CORDA-3194 Correct `FlowFrameworkTests` test due to error handling

Test assumed that errors in transitions are not retried, this has now
been updated so the test passes with the flow succeeding after an
exception is thrown.

* CORDA-3194 Remove unneeded import

* CORDA-3194 Make the state transition exceptions extend `CordaException`

`StateTransitionException` and `AsyncOperationTransitionException` now
extend `CordaException` instead of `Exception`.

* CORDA-3194 Improve log messages

* CORDA-3194 Remove unneeded code in `HospitalisingInterceptor`

Due to a previous change, a section of code that removes a flow id
from the `hospitalisedFlows` map is no longer required. This code has
been removed.

* CORDA-3194 Constraint violations are given `TERMINAL` diagnosis

Add `Diagnosis.TERMINAL` to `StaffedFlowHospital` to allow an error
to be ignored and left to die a quick and painful death.

`StateTransitionException` changed so it does not cause serialisation
errors when propagated from a flow.

* CORDA-3194 `InterruptedExceptions` are given `TERMINAL` diagnosis
This commit is contained in:
Dan Newton 2019-09-23 09:31:38 +01:00 committed by LankyDan
parent 938828b52f
commit 9b169df2b8
11 changed files with 1030 additions and 35 deletions

View File

@ -228,6 +228,12 @@ dependencies {
// Required by JVMAgentUtil (x-compatible java 8 & 11 agent lookup mechanism)
compile files("${System.properties['java.home']}/../lib/tools.jar")
// Byteman for runtime (termination) rules injection on the running node
// Submission tool allowing to install rules on running nodes
integrationTestCompile "org.jboss.byteman:byteman-submit:4.0.3"
// The actual Byteman agent which should only be in the classpath of the out of process nodes
integrationTestCompile "org.jboss.byteman:byteman:4.0.3"
testCompile(project(':test-cli'))
testCompile(project(':test-utils'))

View File

@ -0,0 +1,908 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.list
import net.corda.core.internal.readAllLines
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.transitions.TopLevelTransition
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.node.User
import net.corda.testing.node.internal.InternalDriverDSL
import org.jboss.byteman.agent.submit.ScriptText
import org.jboss.byteman.agent.submit.Submit
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class StatemachineErrorHandlingTest : IntegrationTest() {
companion object {
val databaseSchemas = IntegrationTestSchemas(CHARLIE_NAME, ALICE_NAME, DUMMY_NOTARY_NAME)
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
var counter = 0
}
@Before
fun setup() {
counter = 0
}
/**
* Throws an exception when performing an [Action.SendInitial] action.
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and is then kept in
* the hospital for observation.
*/
@Test
fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendInitial action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
}
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.SendInitial] event.
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test
fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendInitial action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendInitial
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when executing [DeduplicationHandler.afterDatabaseTransaction] from inside an [Action.AcknowledgeMessages] action.
* The exception is thrown every time [DeduplicationHandler.afterDatabaseTransaction] is executed inside of [ActionExecutorImpl.executeAcknowledgeMessages]
*
* The exceptions should be swallowed. Therefore there should be no trips to the hospital and no retries.
* The flow should complete successfully as the error is swallowed.
*/
@Test
fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Set flag when inside executeAcknowledgeMessages
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeAcknowledgeMessages
AT INVOKE ${DeduplicationHandler::class.java.name}.afterDatabaseTransaction()
IF !flagged("exception_flag")
DO flag("exception_flag"); traceln("Setting flag to true")
ENDRULE
RULE Throw exception when executing ${DeduplicationHandler::class.java.name}.afterDatabaseTransaction when inside executeAcknowledgeMessages
INTERFACE ${DeduplicationHandler::class.java.name}
METHOD afterDatabaseTransaction
AT ENTRY
IF flagged("exception_flag")
DO traceln("Throwing exception"); clear("exception_flag"); traceln("SETTING FLAG TO FALSE"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted state)..
* The exception is thrown 5 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted state)..
* The exception is thrown 7 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*
* TODO Fix this scenario - it is currently hanging after putting the flow in for observation
*/
@Test
@Ignore
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 7
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
}
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event after the flow has suspended (has moved to a started state).
* The exception is thrown 5 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test
fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
// seems to be restarting the flow from the beginning every time
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Set flag when executing first commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing.
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test
fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
// seems to be restarting the flow from the beginning every time
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); clear("remove_checkpoint_flag"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws a [ConstraintViolationException] when performing an [Action.CommitTransaction] event when the flow is finishing.
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the error
* propagates and the flow ends.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test
fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and fail if error persists`() {
driver(
DriverParameters(
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
val charlie = startNode(
NodeParameters(
providedName = CHARLIE_NAME,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
val alice =
(this as InternalDriverDSL).startNode(
NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)),
bytemanPort = 12000
).getOrThrow()
val submit = Submit("localhost", 12000)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 5
DO incrementCounter("counter");
clear("remove_checkpoint_flag");
traceln("Throwing exception");
throw new org.hibernate.exception.ConstraintViolationException("This flow has a terminal condition", new java.sql.SQLException(), "made up constraint")
ENDRULE
RULE Entering duplicate insert staff member
CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached duplicate insert staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.DuplicateInsertSpecialist::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submit.addScripts(listOf(ScriptText("Test script", rules)))
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<StateTransitionException> {
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
Duration.of(30, ChronoUnit.SECONDS)
)
}
val output = alice.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
// Check the stdout for the lines generated by byteman
assertEquals(4, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - terminal") }.size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
}
@StartableByRPC
@InitiatingFlow
class SendAMessageFlow(private val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
val session = initiateFlow(party)
session.send("hello there")
return "Finished executing test flow - ${this.runId}"
}
}
@InitiatedBy(SendAMessageFlow::class)
class SendAMessageResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
}
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
}
}

View File

@ -1097,6 +1097,7 @@ class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogi
override val deduplicationHandler: DeduplicationHandler
get() = this
override val flowId: StateMachineRunId = StateMachineRunId.createRandom()
override val flowLogic: FlowLogic<T>
get() = logic
override val context: InvocationContext

View File

@ -11,6 +11,7 @@ import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
@ -239,6 +240,7 @@ class NodeSchedulerService(private val clock: CordaClock,
}
private inner class FlowStartDeduplicationHandler(val scheduledState: ScheduledStateRef, override val flowLogic: FlowLogic<Any?>, override val context: InvocationContext) : DeduplicationHandler, ExternalEvent.ExternalStartFlowEvent<Any?> {
override val flowId: StateMachineRunId = StateMachineRunId.createRandom()
override val externalCause: ExternalEvent
get() = this
override val deduplicationHandler: FlowStartDeduplicationHandler

View File

@ -123,7 +123,17 @@ class ActionExecutorImpl(
@Suspendable
private fun executeAcknowledgeMessages(action: Action.AcknowledgeMessages) {
action.deduplicationHandlers.forEach {
it.afterDatabaseTransaction()
try {
it.afterDatabaseTransaction()
} catch (e: Exception) {
// Catch all exceptions that occur in the [DeduplicationHandler]s (although errors should be unlikely)
// It is deemed safe for errors to occur here
// Therefore the current transition should not fail is something does go wrong
log.info(
"An error occurred executing a deduplication post-database commit handler. Continuing, as it is safe to do so.",
e
)
}
}
}
@ -220,15 +230,21 @@ class ActionExecutorImpl(
@Suspendable
private fun executeAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) {
val operationFuture = action.operation.execute(action.deduplicationId)
operationFuture.thenMatch(
try {
val operationFuture = action.operation.execute(action.deduplicationId)
operationFuture.thenMatch(
success = { result ->
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
},
failure = { exception ->
fiber.scheduleEvent(Event.Error(exception))
}
)
)
} catch (e: Exception) {
// Catch and wrap any unexpected exceptions from the async operation
// Wrapping the exception allows it to be better handled by the flow hospital
throw AsyncOperationTransitionException(e)
}
}
private fun executeRetryFlowFromSafePoint(action: Action.RetryFlowFromSafePoint) {

View File

@ -10,6 +10,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.messaging.DataFeed
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.seconds
import net.corda.node.services.FinalityHandler
import org.hibernate.exception.ConstraintViolationException
@ -34,7 +35,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
DoctorTimeout,
FinalityDoctor,
TransientConnectionCardiologist,
DatabaseEndocrinologist
DatabaseEndocrinologist,
TransitionErrorGeneralPractitioner
)
}
@ -125,7 +127,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
Triple(Outcome.OVERNIGHT_OBSERVATION, null, 0.seconds)
}
Diagnosis.NOT_MY_SPECIALTY -> {
Diagnosis.NOT_MY_SPECIALTY, Diagnosis.TERMINAL -> {
// None of the staff care for these errors so we let them propagate
log.info("Flow error allowed to propagate", report.error)
Triple(Outcome.UNTREATABLE, Event.StartErrorPropagation, 0.seconds)
@ -250,6 +252,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
/** The order of the enum values are in priority order. */
enum class Diagnosis {
/** The flow should not see other staff members */
TERMINAL,
/** Retry from last safe point. */
DISCHARGE,
/** Park and await intervention. */
@ -286,8 +290,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
*/
object DuplicateInsertSpecialist : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (newError.mentionsThrowable(ConstraintViolationException::class.java) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) {
Diagnosis.DISCHARGE
return if (newError.mentionsThrowable(ConstraintViolationException::class.java)) {
if(history.notDischargedForTheSameThingMoreThan(3, this, currentState)) {
Diagnosis.DISCHARGE
} else {
Diagnosis.TERMINAL
}
} else {
Diagnosis.NOT_MY_SPECIALTY
}
@ -398,6 +406,47 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
@VisibleForTesting
val customConditions = mutableSetOf<(t: Throwable) -> Boolean>()
}
/**
* Handles exceptions from internal state transitions that are not dealt with by the rest of the staff.
*
* [InterruptedException]s are diagnosed as [Diagnosis.TERMINAL] so they are never retried
* (can occur when a flow is killed - `killFlow`).
* [AsyncOperationTransitionException]s ares ignored as the error is likely to have originated in user async code rather than inside of a
* transition.
* All other exceptions are retried a maximum of 3 times before being kept in for observation.
*/
object TransitionErrorGeneralPractitioner : Staff {
override fun consult(
flowFiber: FlowFiber,
currentState: StateMachineState,
newError: Throwable,
history: FlowMedicalHistory
): Diagnosis {
return if (newError.mentionsThrowable(StateTransitionException::class.java)) {
when {
newError.mentionsThrowable(InterruptedException::class.java) -> Diagnosis.TERMINAL
newError.mentionsThrowable(AsyncOperationTransitionException::class.java) -> Diagnosis.NOT_MY_SPECIALTY
history.notDischargedForTheSameThingMoreThan(2, this, currentState) -> Diagnosis.DISCHARGE
else -> Diagnosis.OVERNIGHT_OBSERVATION
}
} else {
Diagnosis.NOT_MY_SPECIALTY
}.also { diagnosis ->
if (diagnosis != Diagnosis.NOT_MY_SPECIALTY) {
log.debug {
"""
Flow ${flowFiber.id} given $diagnosis diagnosis due to a transition error
- Exception: ${newError.message}
- History: $history
${(newError as? StateTransitionException)?.transitionAction?.let { "- Action: $it" }}
${(newError as? StateTransitionException)?.transitionEvent?.let { "- Event: $it" }}
""".trimIndent()
}
}
}
}
}
}
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {

View File

@ -121,6 +121,7 @@ interface ExternalEvent {
* An external request to start a flow, from the scheduler for example.
*/
interface ExternalStartFlowEvent<T> : ExternalEvent {
val flowId: StateMachineRunId
val flowLogic: FlowLogic<T>
val context: InvocationContext

View File

@ -0,0 +1,17 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaException
import net.corda.core.serialization.ConstructorForDeserialization
// TODO This exception should not be propagated up to rpc as it suppresses the real exception
class StateTransitionException(
val transitionAction: Action?,
val transitionEvent: Event?,
val exception: Exception
) : CordaException(exception.message, exception) {
@ConstructorForDeserialization
constructor(exception: Exception): this(null, null, exception)
}
internal class AsyncOperationTransitionException(exception: Exception) : CordaException(exception.message, exception)

View File

@ -51,11 +51,12 @@ class TransitionExecutorImpl(
} else {
// Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork
// to trigger error propagation
log.info("Error while executing $action, erroring state", exception)
log.info("Error while executing $action, with event $event, erroring state", exception)
val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
listOf(FlowError(secureRandom.nextLong(), exception))
// Wrap the exception with [StateTransitionException] for handling by the flow hospital
listOf(FlowError(secureRandom.nextLong(), StateTransitionException(action, event, exception)))
)
),
isFlowResumed = false
@ -67,4 +68,4 @@ class TransitionExecutorImpl(
}
return Pair(transition.continuation, transition.newState)
}
}
}

View File

@ -2,13 +2,7 @@ package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.util.concurrent.ConcurrentHashMap
@ -41,19 +35,21 @@ class HospitalisingInterceptor(
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
// If the fiber's previous state was clean then remove it from the [hospitalisedFlows] map
// This is important for retrying a flow that has errored during a state machine transition
if(previousState.checkpoint.errorState is ErrorState.Clean) {
hospitalisedFlows.remove(fiber.id)
}
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
when (nextState.checkpoint.errorState) {
is ErrorState.Clean -> {
hospitalisedFlows.remove(fiber.id)
}
is ErrorState.Errored -> {
val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception }
if (hospitalisedFlows.putIfAbsent(fiber.id, fiber) == null) {
flowHospital.flowErrored(fiber, previousState, exceptionsToHandle)
}
}
if (nextState.checkpoint.errorState is ErrorState.Errored) {
val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception }
if (hospitalisedFlows.putIfAbsent(fiber.id, fiber) == null) {
flowHospital.flowErrored(fiber, previousState, exceptionsToHandle)
}
}
if (nextState.isRemoved) {
removeFlow(fiber.id)
}

View File

@ -37,7 +37,8 @@ import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions.*
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
import org.assertj.core.api.Condition
import org.junit.After
@ -115,18 +116,15 @@ class FlowFrameworkTests {
}
@Test
fun `exception while fiber suspended`() {
fun `exception while fiber suspended is retried and completes successfully`() {
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
val flow = ReceiveFlow(bob)
val fiber = aliceNode.services.startFlow(flow) as FlowStateMachineImpl
// Before the flow runs change the suspend action to throw an exception
val exceptionDuringSuspend = Exception("Thrown during suspend")
val throwingActionExecutor = SuspendThrowingActionExecutor(exceptionDuringSuspend, fiber.transientValues!!.value.actionExecutor)
val throwingActionExecutor = SuspendThrowingActionExecutor(Exception("Thrown during suspend"), fiber.transientValues!!.value.actionExecutor)
fiber.transientValues = TransientReference(fiber.transientValues!!.value.copy(actionExecutor = throwingActionExecutor))
mockNet.runNetwork()
assertThatThrownBy {
fiber.resultFuture.getOrThrow()
}.isSameAs(exceptionDuringSuspend)
fiber.resultFuture.getOrThrow()
assertThat(aliceNode.smm.allStateMachines).isEmpty()
// Make sure the fiber does actually terminate
assertThat(fiber.state).isEqualTo(Strand.State.WAITING)