From f4e9b9d5d290744132c3d5eb38cd8ec279a655e6 Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Thu, 17 Oct 2019 16:09:27 +0100 Subject: [PATCH] CORDA-3194 Stop killed flows from re-entering hospital (#2664) * CORDA-3194 Do not allow killed flows back into the hospital This change has been made to prevent killed flows from being added back to the hospital after being forcibly removed by `killFlow`. Not doing so, could leave references to a flow inside of the hospital, which is not the correct behaviour. `killFlow` now sets a flow's `StatemachineState.isRemoved` to true. This check is then used in `StaffedFlowHospital` and the `DumpHistoryOnErrorInterceptor`. * CORDA-3194 Log different message for transition error due to killed flow When a flow is killed, its checkpoint is deleted. Currently, the statemachine will still try a process the next event even if it has been killed. This can lead to an error when trying to update the deleted checkpoint. The exception thrown from this is logged out. An if statement has been added to log a different message at debug level if it is due to an update error for a killed flow. This is done to not alarm node operators of the exception. --- .../StatemachineErrorHandlingTest.kt | 337 +++++++++++++++++- .../statemachine/StaffedFlowHospital.kt | 2 +- .../statemachine/TransitionExecutorImpl.kt | 7 + .../DumpHistoryOnErrorInterceptor.kt | 24 +- 4 files changed, 341 insertions(+), 29 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt index c05543f0b1..1f8b1835f2 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt @@ -14,12 +14,15 @@ import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.list import net.corda.core.internal.readAllLines import net.corda.core.messaging.startFlow +import net.corda.core.messaging.startTrackedFlow import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds import net.corda.core.utilities.unwrap import net.corda.finance.DOLLARS import net.corda.finance.flows.CashIssueAndPaymentFlow @@ -53,6 +56,7 @@ import java.time.temporal.ChronoUnit import java.util.concurrent.TimeoutException import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertTrue class StatemachineErrorHandlingTest : IntegrationTest() { @@ -129,7 +133,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -208,7 +212,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -286,7 +290,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -370,7 +374,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -457,7 +461,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -559,7 +563,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -648,7 +652,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -745,7 +749,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -844,7 +848,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(alice) @@ -929,7 +933,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -1021,7 +1025,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -1110,7 +1114,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(charlie) @@ -1206,7 +1210,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { assertFailsWith { aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) } @@ -1304,7 +1308,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( - Duration.of(30, ChronoUnit.SECONDS) + 30.seconds ) val output = getBytemanOutput(charlie) @@ -1381,7 +1385,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { charlie.nodeInfo.singleIdentity(), false, defaultNotaryIdentity - ).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS)) + ).returnValue.getOrThrow(30.seconds) val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() assertEquals(0, discharge) @@ -1452,7 +1456,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { charlie.nodeInfo.singleIdentity(), false, defaultNotaryIdentity - ).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS)) + ).returnValue.getOrThrow(30.seconds) val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get() assertEquals(0, discharge) @@ -1539,7 +1543,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { charlie.nodeInfo.singleIdentity(), false, defaultNotaryIdentity - ).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS)) + ).returnValue.getOrThrow(30.seconds) val output = getBytemanOutput(charlie) @@ -1635,7 +1639,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() { charlie.nodeInfo.singleIdentity(), false, defaultNotaryIdentity - ).returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS)) + ).returnValue.getOrThrow(30.seconds) } val output = getBytemanOutput(charlie) @@ -1655,6 +1659,265 @@ class StatemachineErrorHandlingTest : IntegrationTest() { } } + /** + * Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital. + * + * The flow terminates and is not retried. + * + * No pass through the hospital is recorded. As the flow is marked as `isRemoved`. + */ + @Test + fun `error during transition due to an InterruptedException (killFlow) will terminate the flow`() { + startDriver { + val alice = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + + RULE Increment terminal counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ TERMINAL + IF true + DO traceln("Byteman test - terminal") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + val flow = aliceClient.startTrackedFlow(::SleepFlow) + + var flowKilled = false + flow.progress.subscribe { + if (it == SleepFlow.STARTED.label) { + Thread.sleep(5000) + flowKilled = aliceClient.killFlow(flow.id) + } + } + + assertFailsWith { flow.returnValue.getOrThrow(20.seconds) } + + val output = getBytemanOutput(alice) + + assertTrue(flowKilled) + // Check the stdout for the lines generated by byteman + assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size + assertEquals(1, numberOfTerminalDiagnoses) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(0, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + // 1 for GetNumberOfCheckpointsFlow + assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Triggers `killFlow` during user application code. + * + * The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable] + * call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly. + * + * Although the call to kill the flow is made during user application code. It will not be removed / stop processing + * until the next suspension point is reached within the flow. + * + * The flow terminates and is not retried. + * + * No pass through the hospital is recorded. As the flow is marked as `isRemoved`. + */ + @Test + fun `flow killed during user code execution stops and removes the flow correctly`() { + startDriver { + val alice = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + + RULE Increment terminal counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ TERMINAL + IF true + DO traceln("Byteman test - terminal") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + val flow = aliceClient.startTrackedFlow(::ThreadSleepFlow) + + var flowKilled = false + flow.progress.subscribe { + if (it == ThreadSleepFlow.STARTED.label) { + Thread.sleep(5000) + flowKilled = aliceClient.killFlow(flow.id) + } + } + + assertFailsWith { flow.returnValue.getOrThrow(30.seconds) } + + val output = getBytemanOutput(alice) + + assertTrue(flowKilled) + // Check the stdout for the lines generated by byteman + assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size + println(numberOfTerminalDiagnoses) + assertEquals(0, numberOfTerminalDiagnoses) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(0, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + // 1 for GetNumberOfCheckpointsFlow + assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and + * all that remains is its checkpoint in the database. + * + * The flow terminates and is not retried. + * + * Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are + * from the original flow that was put in for observation. + */ + @Test + fun `flow killed when it is in the flow hospital for observation is removed correctly`() { + startDriver { + val alice = createBytemanNode(ALICE_NAME) + val charlie = createNode(CHARLIE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeSendInitial + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeSendInitial action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeSendInitial + AT ENTRY + IF readCounter("counter") < 4 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + + RULE Increment terminal counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ TERMINAL + IF true + DO traceln("Byteman test - terminal") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + val flow = aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()) + + assertFailsWith { flow.returnValue.getOrThrow(20.seconds) } + + aliceClient.killFlow(flow.id) + + val output = getBytemanOutput(alice) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size + assertEquals(0, numberOfTerminalDiagnoses) + val (discharge, observation) = aliceClient.startFlow(::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + // 1 for GetNumberOfCheckpointsFlow + assertEquals(1, aliceClient.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + private fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) { driver( DriverParameters( @@ -1724,6 +1987,44 @@ class SendAMessageResponder(private val session: FlowSession) : FlowLogic( } } +@StartableByRPC +class SleepFlow : FlowLogic() { + + object STARTED : ProgressTracker.Step("I am ready to die") + + override val progressTracker = ProgressTracker(STARTED) + + @Suspendable + override fun call() { + sleep(Duration.of(1, ChronoUnit.SECONDS)) + progressTracker.currentStep = STARTED + sleep(Duration.of(2, ChronoUnit.MINUTES)) + } +} + +@StartableByRPC +class ThreadSleepFlow : FlowLogic() { + + object STARTED : ProgressTracker.Step("I am ready to die") + + override val progressTracker = ProgressTracker(STARTED) + + @Suspendable + override fun call() { + sleep(Duration.of(1, ChronoUnit.SECONDS)) + progressTracker.currentStep = STARTED + logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep") + sleep() + logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep") + sleep(Duration.of(2, ChronoUnit.MINUTES)) + } + + // Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar + private fun sleep() { + Thread.sleep(20000) + } +} + @StartableByRPC class GetNumberOfCheckpointsFlow : FlowLogic() { override fun call(): Long { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index b02e718daf..023f541504 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -160,7 +160,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, */ fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { // Only treat flows that are not already in the hospital - if (flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) { + if (!currentState.isRemoved && flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) { admit(flowFiber, currentState, errors) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt index 6a95665097..85a9f57f60 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt @@ -9,6 +9,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.contextDatabase import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import java.security.SecureRandom +import javax.persistence.OptimisticLockException /** * This [TransitionExecutor] runs the transition actions using the passed in [ActionExecutor] and manually dirties the @@ -52,6 +53,12 @@ class TransitionExecutorImpl( // Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork // to trigger error propagation log.info("Error while executing $action, with event $event, erroring state", exception) + if(previousState.isRemoved && exception is OptimisticLockException) { + log.debug("Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " + + "Occurred while executing $action, with event $event", exception) + } else { + log.info("Error while executing $action, with event $event, erroring state", exception) + } val newState = previousState.copy( checkpoint = previousState.checkpoint.copy( errorState = previousState.checkpoint.errorState.addErrors( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt index ecef21157b..6b5b8132dd 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt @@ -35,17 +35,21 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti actionExecutor: ActionExecutor ): Pair { val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor) - val transitionRecord = TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation) - val record = records.compute(fiber.id) { _, record -> - (record ?: ArrayList()).apply { add(transitionRecord) } - } - // Just if we decide to propagate, and not if just on the way to the hospital. Only log at debug level here - the flow transition - // information is often unhelpful in the logs, and the actual cause of the problem will be logged elsewhere. - if (nextState.checkpoint.errorState is ErrorState.Errored && nextState.checkpoint.errorState.propagating) { - log.warn("Flow ${fiber.id} errored, dumping all transitions:\n${record!!.joinToString("\n")}") - for (error in nextState.checkpoint.errorState.errors) { - log.warn("Flow ${fiber.id} error", error.exception) + if (!previousState.isRemoved) { + val transitionRecord = + TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation) + val record = records.compute(fiber.id) { _, record -> + (record ?: ArrayList()).apply { add(transitionRecord) } + } + + // Just if we decide to propagate, and not if just on the way to the hospital. Only log at debug level here - the flow transition + // information is often unhelpful in the logs, and the actual cause of the problem will be logged elsewhere. + if (nextState.checkpoint.errorState is ErrorState.Errored && nextState.checkpoint.errorState.propagating) { + log.warn("Flow ${fiber.id} errored, dumping all transitions:\n${record!!.joinToString("\n")}") + for (error in nextState.checkpoint.errorState.errors) { + log.warn("Flow ${fiber.id} error", error.exception) + } } }