From 66406ba0fbf709cdf330bca2715f9d6a6628e6fb Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Mon, 10 Aug 2020 13:09:43 +0100 Subject: [PATCH] ENT-5450 Resume flow when transition creation errors (#6604) If an error occurs when creating a transition (a.k.a anything inside of `TopLevelTransition`) then resume the flow with the error that occurred. This is needed, because the current code is swallowing all errors thrown at this point and causing the flow to hang. This change will allow better debugging of errors since the real error will be thrown back to the flow and will get handled and logged by the normal error code path. Extra logging has been added to `processEventsUntilFlowIsResumed`, just in case an exception gets thrown out of the normal code path. We do not want this exception to be swallowed as it can make it impossible to debug the original error. --- .../StateMachineGeneralErrorHandlingTest.kt | 52 ++++++++++++++++++ .../statemachine/FlowStateMachineImpl.kt | 11 ++++ .../transitions/TopLevelTransition.kt | 55 ++++++++++--------- 3 files changed, 93 insertions(+), 25 deletions(-) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt index 8eadb9bf77..f97800214f 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt @@ -1,15 +1,21 @@ package net.corda.node.services.statemachine +import co.paralleluniverse.fibers.Suspendable import net.corda.core.CordaRuntimeException +import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.messaging.DeduplicationHandler +import net.corda.node.services.statemachine.transitions.StartedFlowTransition import net.corda.node.services.statemachine.transitions.TopLevelTransition import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.singleIdentity +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.Test import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -648,4 +654,50 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { assertEquals(0, charlie.rpc.stateMachinesSnapshot().size) } } + + /** + * Throws an exception when creating a transition. + * + * The exception is thrown back to the flow, who catches it and returns a different exception, showing the exception returns to user + * code and can be caught if needed. + */ + @Test(timeout = 300_000) + fun `error during creation of transition that occurs after the first suspend will throw error into flow`() { + startDriver { + val (alice, port) = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Throw exception when creating transition + CLASS ${StartedFlowTransition::class.java.name} + METHOD sleepTransition + AT ENTRY + IF true + DO traceln("Throwing exception"); throw new java.lang.IllegalStateException("die dammit die") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + assertThatExceptionOfType(FlowException::class.java).isThrownBy { + alice.rpc.startFlow(::SleepCatchAndRethrowFlow).returnValue.getOrThrow(30.seconds) + }.withMessage("java.lang.IllegalStateException: die dammit die") + + alice.rpc.assertNumberOfCheckpointsAllZero() + alice.rpc.assertHospitalCounts(propagated = 1) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + } + } + + @StartableByRPC + class SleepCatchAndRethrowFlow : FlowLogic() { + @Suspendable + override fun call(): String { + try { + sleep(5.seconds) + } catch (e: IllegalStateException) { + throw FlowException(e) + } + return "cant get here" + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 5ea2ea6fcc..7d2ac7f62a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -227,6 +227,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, FlowContinuation.Abort -> abortFiber() } } + } catch(t: Throwable) { + logUnexpectedExceptionInFlowEventLoop(isDbTransactionOpenOnExit, t) + throw t } finally { checkDbTransaction(isDbTransactionOpenOnExit) openThreadLocalWormhole() @@ -297,6 +300,14 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } } + private fun logUnexpectedExceptionInFlowEventLoop(isDbTransactionOpenOnExit: Boolean, throwable: Throwable) { + if (isDbTransactionOpenOnExit && contextTransactionOrNull == null) { + logger.error("Unexpected error thrown from flow event loop, transaction context missing", throwable) + } else if (!isDbTransactionOpenOnExit && contextTransactionOrNull != null) { + logger.error("Unexpected error thrown from flow event loop, transaction is marked as not present, but is not null", throwable) + } + } + fun setLoggingContext() { context.pushToLoggingContext() MDC.put("flow-id", id.uuid.toString()) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 7ab0328e86..4cfd564ce9 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -37,33 +37,38 @@ class TopLevelTransition( val event: Event ) : Transition { - @Suppress("ComplexMethod") + @Suppress("ComplexMethod", "TooGenericExceptionCaught") override fun transition(): TransitionResult { + return try { + if (startingState.isKilled) { + return KilledFlowTransition(context, startingState, event).transition() + } - if (startingState.isKilled) { - return KilledFlowTransition(context, startingState, event).transition() - } - - return when (event) { - is Event.DoRemainingWork -> DoRemainingWorkTransition(context, startingState).transition() - is Event.DeliverSessionMessage -> DeliverSessionMessageTransition(context, startingState, event).transition() - is Event.Error -> errorTransition(event) - is Event.TransactionCommitted -> transactionCommittedTransition(event) - is Event.SoftShutdown -> softShutdownTransition() - is Event.StartErrorPropagation -> startErrorPropagationTransition() - is Event.EnterSubFlow -> enterSubFlowTransition(event) - is Event.LeaveSubFlow -> leaveSubFlowTransition() - is Event.Suspend -> suspendTransition(event) - is Event.FlowFinish -> flowFinishTransition(event) - is Event.InitiateFlow -> initiateFlowTransition(event) - is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event) - is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event) - is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition() - is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition() - is Event.OvernightObservation -> overnightObservationTransition() - is Event.WakeUpFromSleep -> wakeUpFromSleepTransition() - is Event.Pause -> pausedFlowTransition() - is Event.TerminateSessions -> terminateSessionsTransition(event) + when (event) { + is Event.DoRemainingWork -> DoRemainingWorkTransition(context, startingState).transition() + is Event.DeliverSessionMessage -> DeliverSessionMessageTransition(context, startingState, event).transition() + is Event.Error -> errorTransition(event) + is Event.TransactionCommitted -> transactionCommittedTransition(event) + is Event.SoftShutdown -> softShutdownTransition() + is Event.StartErrorPropagation -> startErrorPropagationTransition() + is Event.EnterSubFlow -> enterSubFlowTransition(event) + is Event.LeaveSubFlow -> leaveSubFlowTransition() + is Event.Suspend -> suspendTransition(event) + is Event.FlowFinish -> flowFinishTransition(event) + is Event.InitiateFlow -> initiateFlowTransition(event) + is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event) + is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event) + is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition() + is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition() + is Event.OvernightObservation -> overnightObservationTransition() + is Event.WakeUpFromSleep -> wakeUpFromSleepTransition() + is Event.Pause -> pausedFlowTransition() + is Event.TerminateSessions -> terminateSessionsTransition(event) + } + } catch (t: Throwable) { + // All errors coming from the transition should be sent back to the flow + // Letting the flow re-enter standard error handling + builder { resumeFlowLogic(t) } } }