diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt index 8eadb9bf77..f97800214f 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt @@ -1,15 +1,21 @@ package net.corda.node.services.statemachine +import co.paralleluniverse.fibers.Suspendable import net.corda.core.CordaRuntimeException +import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.messaging.DeduplicationHandler +import net.corda.node.services.statemachine.transitions.StartedFlowTransition import net.corda.node.services.statemachine.transitions.TopLevelTransition import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.singleIdentity +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.Test import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -648,4 +654,50 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { assertEquals(0, charlie.rpc.stateMachinesSnapshot().size) } } + + /** + * Throws an exception when creating a transition. + * + * The exception is thrown back to the flow, who catches it and returns a different exception, showing the exception returns to user + * code and can be caught if needed. + */ + @Test(timeout = 300_000) + fun `error during creation of transition that occurs after the first suspend will throw error into flow`() { + startDriver { + val (alice, port) = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Throw exception when creating transition + CLASS ${StartedFlowTransition::class.java.name} + METHOD sleepTransition + AT ENTRY + IF true + DO traceln("Throwing exception"); throw new java.lang.IllegalStateException("die dammit die") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + assertThatExceptionOfType(FlowException::class.java).isThrownBy { + alice.rpc.startFlow(::SleepCatchAndRethrowFlow).returnValue.getOrThrow(30.seconds) + }.withMessage("java.lang.IllegalStateException: die dammit die") + + alice.rpc.assertNumberOfCheckpointsAllZero() + alice.rpc.assertHospitalCounts(propagated = 1) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + } + } + + @StartableByRPC + class SleepCatchAndRethrowFlow : FlowLogic() { + @Suspendable + override fun call(): String { + try { + sleep(5.seconds) + } catch (e: IllegalStateException) { + throw FlowException(e) + } + return "cant get here" + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 5ea2ea6fcc..7d2ac7f62a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -227,6 +227,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, FlowContinuation.Abort -> abortFiber() } } + } catch(t: Throwable) { + logUnexpectedExceptionInFlowEventLoop(isDbTransactionOpenOnExit, t) + throw t } finally { checkDbTransaction(isDbTransactionOpenOnExit) openThreadLocalWormhole() @@ -297,6 +300,14 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } } + private fun logUnexpectedExceptionInFlowEventLoop(isDbTransactionOpenOnExit: Boolean, throwable: Throwable) { + if (isDbTransactionOpenOnExit && contextTransactionOrNull == null) { + logger.error("Unexpected error thrown from flow event loop, transaction context missing", throwable) + } else if (!isDbTransactionOpenOnExit && contextTransactionOrNull != null) { + logger.error("Unexpected error thrown from flow event loop, transaction is marked as not present, but is not null", throwable) + } + } + fun setLoggingContext() { context.pushToLoggingContext() MDC.put("flow-id", id.uuid.toString()) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 7ab0328e86..4cfd564ce9 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -37,33 +37,38 @@ class TopLevelTransition( val event: Event ) : Transition { - @Suppress("ComplexMethod") + @Suppress("ComplexMethod", "TooGenericExceptionCaught") override fun transition(): TransitionResult { + return try { + if (startingState.isKilled) { + return KilledFlowTransition(context, startingState, event).transition() + } - if (startingState.isKilled) { - return KilledFlowTransition(context, startingState, event).transition() - } - - return when (event) { - is Event.DoRemainingWork -> DoRemainingWorkTransition(context, startingState).transition() - is Event.DeliverSessionMessage -> DeliverSessionMessageTransition(context, startingState, event).transition() - is Event.Error -> errorTransition(event) - is Event.TransactionCommitted -> transactionCommittedTransition(event) - is Event.SoftShutdown -> softShutdownTransition() - is Event.StartErrorPropagation -> startErrorPropagationTransition() - is Event.EnterSubFlow -> enterSubFlowTransition(event) - is Event.LeaveSubFlow -> leaveSubFlowTransition() - is Event.Suspend -> suspendTransition(event) - is Event.FlowFinish -> flowFinishTransition(event) - is Event.InitiateFlow -> initiateFlowTransition(event) - is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event) - is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event) - is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition() - is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition() - is Event.OvernightObservation -> overnightObservationTransition() - is Event.WakeUpFromSleep -> wakeUpFromSleepTransition() - is Event.Pause -> pausedFlowTransition() - is Event.TerminateSessions -> terminateSessionsTransition(event) + when (event) { + is Event.DoRemainingWork -> DoRemainingWorkTransition(context, startingState).transition() + is Event.DeliverSessionMessage -> DeliverSessionMessageTransition(context, startingState, event).transition() + is Event.Error -> errorTransition(event) + is Event.TransactionCommitted -> transactionCommittedTransition(event) + is Event.SoftShutdown -> softShutdownTransition() + is Event.StartErrorPropagation -> startErrorPropagationTransition() + is Event.EnterSubFlow -> enterSubFlowTransition(event) + is Event.LeaveSubFlow -> leaveSubFlowTransition() + is Event.Suspend -> suspendTransition(event) + is Event.FlowFinish -> flowFinishTransition(event) + is Event.InitiateFlow -> initiateFlowTransition(event) + is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event) + is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event) + is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition() + is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition() + is Event.OvernightObservation -> overnightObservationTransition() + is Event.WakeUpFromSleep -> wakeUpFromSleepTransition() + is Event.Pause -> pausedFlowTransition() + is Event.TerminateSessions -> terminateSessionsTransition(event) + } + } catch (t: Throwable) { + // All errors coming from the transition should be sent back to the flow + // Letting the flow re-enter standard error handling + builder { resumeFlowLogic(t) } } }