ENT-5450 Resume flow when transition creation errors (#6604)

If an error occurs when creating a transition (a.k.a anything inside of
`TopLevelTransition`) then resume the flow with the error that occurred.

This is needed, because the current code is swallowing all errors thrown
at this point and causing the flow to hang.

This change will allow better debugging of errors since the real error
will be thrown back to the flow and will get handled and logged by the
normal error code path.

Extra logging has been added to `processEventsUntilFlowIsResumed`, just
in case an exception gets thrown out of the normal code path. We do not
want this exception to be swallowed as it can make it impossible to
debug the original error.
This commit is contained in:
Dan Newton 2020-08-10 13:09:43 +01:00 committed by GitHub
parent e234bd9c96
commit 66406ba0fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 25 deletions

View File

@ -1,15 +1,21 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.transitions.StartedFlowTransition
import net.corda.node.services.statemachine.transitions.TopLevelTransition
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Test
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
@ -648,4 +654,50 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when creating a transition.
*
* The exception is thrown back to the flow, who catches it and returns a different exception, showing the exception returns to user
* code and can be caught if needed.
*/
@Test(timeout = 300_000)
fun `error during creation of transition that occurs after the first suspend will throw error into flow`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception when creating transition
CLASS ${StartedFlowTransition::class.java.name}
METHOD sleepTransition
AT ENTRY
IF true
DO traceln("Throwing exception"); throw new java.lang.IllegalStateException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertThatExceptionOfType(FlowException::class.java).isThrownBy {
alice.rpc.startFlow(::SleepCatchAndRethrowFlow).returnValue.getOrThrow(30.seconds)
}.withMessage("java.lang.IllegalStateException: die dammit die")
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(propagated = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
@StartableByRPC
class SleepCatchAndRethrowFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
try {
sleep(5.seconds)
} catch (e: IllegalStateException) {
throw FlowException(e)
}
return "cant get here"
}
}
}

View File

@ -227,6 +227,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
FlowContinuation.Abort -> abortFiber()
}
}
} catch(t: Throwable) {
logUnexpectedExceptionInFlowEventLoop(isDbTransactionOpenOnExit, t)
throw t
} finally {
checkDbTransaction(isDbTransactionOpenOnExit)
openThreadLocalWormhole()
@ -297,6 +300,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
private fun logUnexpectedExceptionInFlowEventLoop(isDbTransactionOpenOnExit: Boolean, throwable: Throwable) {
if (isDbTransactionOpenOnExit && contextTransactionOrNull == null) {
logger.error("Unexpected error thrown from flow event loop, transaction context missing", throwable)
} else if (!isDbTransactionOpenOnExit && contextTransactionOrNull != null) {
logger.error("Unexpected error thrown from flow event loop, transaction is marked as not present, but is not null", throwable)
}
}
fun setLoggingContext() {
context.pushToLoggingContext()
MDC.put("flow-id", id.uuid.toString())

View File

@ -37,33 +37,38 @@ class TopLevelTransition(
val event: Event
) : Transition {
@Suppress("ComplexMethod")
@Suppress("ComplexMethod", "TooGenericExceptionCaught")
override fun transition(): TransitionResult {
return try {
if (startingState.isKilled) {
return KilledFlowTransition(context, startingState, event).transition()
}
if (startingState.isKilled) {
return KilledFlowTransition(context, startingState, event).transition()
}
return when (event) {
is Event.DoRemainingWork -> DoRemainingWorkTransition(context, startingState).transition()
is Event.DeliverSessionMessage -> DeliverSessionMessageTransition(context, startingState, event).transition()
is Event.Error -> errorTransition(event)
is Event.TransactionCommitted -> transactionCommittedTransition(event)
is Event.SoftShutdown -> softShutdownTransition()
is Event.StartErrorPropagation -> startErrorPropagationTransition()
is Event.EnterSubFlow -> enterSubFlowTransition(event)
is Event.LeaveSubFlow -> leaveSubFlowTransition()
is Event.Suspend -> suspendTransition(event)
is Event.FlowFinish -> flowFinishTransition(event)
is Event.InitiateFlow -> initiateFlowTransition(event)
is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event)
is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event)
is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition()
is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition()
is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
is Event.Pause -> pausedFlowTransition()
is Event.TerminateSessions -> terminateSessionsTransition(event)
when (event) {
is Event.DoRemainingWork -> DoRemainingWorkTransition(context, startingState).transition()
is Event.DeliverSessionMessage -> DeliverSessionMessageTransition(context, startingState, event).transition()
is Event.Error -> errorTransition(event)
is Event.TransactionCommitted -> transactionCommittedTransition(event)
is Event.SoftShutdown -> softShutdownTransition()
is Event.StartErrorPropagation -> startErrorPropagationTransition()
is Event.EnterSubFlow -> enterSubFlowTransition(event)
is Event.LeaveSubFlow -> leaveSubFlowTransition()
is Event.Suspend -> suspendTransition(event)
is Event.FlowFinish -> flowFinishTransition(event)
is Event.InitiateFlow -> initiateFlowTransition(event)
is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event)
is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event)
is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition()
is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition()
is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
is Event.Pause -> pausedFlowTransition()
is Event.TerminateSessions -> terminateSessionsTransition(event)
}
} catch (t: Throwable) {
// All errors coming from the transition should be sent back to the flow
// Letting the flow re-enter standard error handling
builder { resumeFlowLogic(t) }
}
}