CORDA-3848 Uncaught exception hospitalises flow (#6377)

When an uncaught exception propagates all the way to the flow exception
handler, the flow will be forced into observation/hospitalised.

The updating of the checkpoints status is done on a separate thread as
the fiber cannot be relied on anymore. The new thread is needed to allow
database transaction to be created and committed. Failures to the status
update will be rescheduled to ensure that this information is eventually
reflected in the database.
This commit is contained in:
Dan Newton 2020-07-06 22:43:48 +01:00 committed by GitHub
parent 0d5bed5243
commit a1e1bf4e6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 321 additions and 22 deletions

View File

@ -109,6 +109,20 @@ abstract class StatemachineErrorHandlingTest {
}
}
@StartableByRPC
class ThrowAnErrorFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
throwException()
return "cant get here"
}
private fun throwException() {
logger.info("Throwing exception in flow")
throw IllegalStateException("throwing exception in flow")
}
}
@StartableByRPC
class GetNumberOfUncompletedCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {

View File

@ -1715,4 +1715,192 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.recordDuration] to cause an unexpected error during flow initialisation.
*
* The hospital has the flow's medical history updated with the new failure added to it. As the failure occurred before the original
* checkpoint was persisted, there is no checkpoint to update in the database.
*/
@Test(timeout = 300_000)
fun `unexpected error during flow initialisation that gets caught by default exception handler puts flow into in-memory overnight observation`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD recordDuration
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD openThreadLocalWormhole
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
executor.execute {
alice.rpc.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
)
}
Thread.sleep(10.seconds.toMillis())
val (discharge, observation) = alice.rpc.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
// The flow failed during flow initialisation before committing the original checkpoint
// therefore there is no checkpoint to update the status of
assertEquals(0, alice.rpc.startFlow(StatemachineErrorHandlingTest::GetNumberOfHospitalizedCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised.
*
* The hospital has the flow's medical history updated with the new failure added to it. The status of the checkpoint is also set to
* [Checkpoint.FlowStatus.HOSPITALIZED] to reflect this information in the database.
*/
@Test(timeout = 300_000)
fun `unexpected error after flow initialisation that gets caught by default exception handler puts flow into overnight observation and reflected in database`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD recordDuration
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(StatemachineErrorHandlingTest::ThrowAnErrorFlow).returnValue.getOrThrow(30.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, alice.rpc.startFlow(StatemachineErrorHandlingTest::GetNumberOfHospitalizedCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised. When updating the status of the flow to [Checkpoint.FlowStatus.HOSPITALIZED] an error occurs.
*
* The update is rescheduled and tried again. This is done separate from the fiber.
*/
@Test(timeout = 300_000)
fun `unexpected error after flow initialisation that gets caught by default exception handler retries the status update if it fails`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD recordDuration
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception when updating status
INTERFACE ${CheckpointStorage::class.java.name}
METHOD updateStatus
AT ENTRY
IF readCounter("counter") < 2
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("should be a sql exception")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(StatemachineErrorHandlingTest::ThrowAnErrorFlow).returnValue.getOrThrow(50.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, alice.rpc.startFlow(StatemachineErrorHandlingTest::GetNumberOfHospitalizedCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.recordDuration] to cause an unexpected error after a flow has returned its
* result to the client.
*
* As the flow has already returned its result to the client, then the status of the flow has already been updated correctly and now the
* flow has experienced an unexpected error. There is no need to change the status as the flow has already finished.
*/
@Test(timeout = 300_000)
fun `unexpected error after flow has returned result to client that gets caught by default exception handler does nothing except log`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD recordDuration
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD recordDuration
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
alice.rpc.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = alice.rpc.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, alice.rpc.startFlow(StatemachineErrorHandlingTest::GetNumberOfHospitalizedCheckpointsFlow).returnValue.get())
}
}
}

View File

@ -65,4 +65,6 @@ interface CheckpointStorage {
* This method does not fetch [Checkpoint.Serialized.serializedFlowState] to save memory.
*/
fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus)
}

View File

@ -499,6 +499,11 @@ class DBCheckpointStorage(
}
}
override fun updateStatus(runId: StateMachineRunId, flowStatus: FlowStatus) {
val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set status = ${flowStatus.ordinal} where flow_id = '${runId.uuid}'"
currentDBSession().createNativeQuery(update).executeUpdate()
}
private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint): DBFlowMetadata {
val context = checkpoint.checkpointState.invocationContext
val flowInfo = checkpoint.checkpointState.subFlowStack.first()

View File

@ -0,0 +1,67 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.strands.Strand
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.utilities.errorAndTerminate
import net.corda.nodeapi.internal.persistence.CordaPersistence
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
class FlowDefaultUncaughtExceptionHandler(
private val flowHospital: StaffedFlowHospital,
private val checkpointStorage: CheckpointStorage,
private val database: CordaPersistence,
private val scheduledExecutor: ScheduledExecutorService
) : Strand.UncaughtExceptionHandler {
private companion object {
val log = contextLogger()
const val RESCHEDULE_DELAY = 30L
}
override fun uncaughtException(fiber: Strand, throwable: Throwable) {
val id = (fiber as FlowStateMachineImpl<*>).id
if (throwable is VirtualMachineError) {
errorAndTerminate(
"Caught unrecoverable error from flow $id. Forcibly terminating the JVM, this might leave resources open, and most likely will.",
throwable
)
} else {
fiber.logger.warn("Caught exception from flow $id", throwable)
setFlowToHospitalized(fiber, throwable)
}
}
private fun setFlowToHospitalized(fiber: FlowStateMachineImpl<*>, throwable: Throwable) {
val id = fiber.id
if (!fiber.resultFuture.isDone) {
fiber.transientState.let { state ->
if (state != null) {
fiber.logger.warn("Forcing flow $id into overnight observation")
flowHospital.forceIntoOvernightObservation(state.value, listOf(throwable))
val hospitalizedCheckpoint = state.value.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
val hospitalizedState = state.value.copy(checkpoint = hospitalizedCheckpoint)
fiber.transientState = TransientReference(hospitalizedState)
} else {
fiber.logger.warn("The fiber's transient state is not set, cannot force flow $id into in-memory overnight observation, status will still be updated in database")
}
}
scheduledExecutor.schedule({ setFlowToHospitalizedRescheduleOnFailure(id) }, 0, TimeUnit.SECONDS)
}
}
@Suppress("TooGenericExceptionCaught")
private fun setFlowToHospitalizedRescheduleOnFailure(id: StateMachineRunId) {
try {
log.debug { "Updating the status of flow $id to hospitalized after uncaught exception" }
database.transaction { checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.HOSPITALIZED) }
log.debug { "Updated the status of flow $id to hospitalized after uncaught exception" }
} catch (e: Exception) {
log.info("Failed to update the status of flow $id to hospitalized after uncaught exception, rescheduling", e)
scheduledExecutor.schedule({ setFlowToHospitalizedRescheduleOnFailure(id) }, RESCHEDULE_DELAY, TimeUnit.SECONDS)
}
}
}

View File

@ -38,7 +38,6 @@ import net.corda.node.services.statemachine.interceptors.FiberDeserializationChe
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.errorAndTerminate
import net.corda.node.utilities.injectOldProgressTracker
import net.corda.node.utilities.isEnabledTimedFlow
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -83,7 +82,7 @@ internal class SingleThreadedStateMachineManager(
private val scheduledFutureExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder().setNameFormat("flow-scheduled-future-thread").setDaemon(true).build()
)
// How many Fibers are running and not suspended. If zero and stopping is true, then we are halted.
// How many Fibers are running (this includes suspended flows). If zero and stopping is true, then we are halted.
private val liveFibers = ReusableLatch()
// Monitoring support.
private val metrics = serviceHub.monitoringService.metrics
@ -146,13 +145,8 @@ internal class SingleThreadedStateMachineManager(
val fibers = restoreFlowsFromCheckpoints()
metrics.register("Flows.InFlight", Gauge<Int> { innerState.flows.size })
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
if (throwable is VirtualMachineError) {
errorAndTerminate("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", throwable)
} else {
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
}
setFlowDefaultUncaughtExceptionHandler()
val pausedFlows = restoreNonResidentFlowsFromPausedCheckpoints()
innerState.withLock {
@ -175,6 +169,17 @@ internal class SingleThreadedStateMachineManager(
}
}
private fun setFlowDefaultUncaughtExceptionHandler() {
Fiber.setDefaultUncaughtExceptionHandler(
FlowDefaultUncaughtExceptionHandler(
flowHospital,
checkpointStorage,
database,
scheduledFutureExecutor
)
)
}
override fun snapshot(): Set<FlowStateMachineImpl<*>> = innerState.flows.values.map { it.fiber }.toSet()
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
@ -437,7 +442,7 @@ internal class SingleThreadedStateMachineManager(
?.map { it.exception }
?.plus(e) ?: emptyList()
logger.info("Failed to retry flow $flowId, keeping in for observation and aborting")
flowHospital.forceIntoOvernightObservation(flowId, exceptions)
flowHospital.forceIntoOvernightObservation(currentState, exceptions)
throw e
}
}

View File

@ -167,27 +167,27 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
}
/**
* Forces the flow to be kept in for overnight observation by the hospital. A flow must already exist inside the hospital
* and have existing medical records for it to be moved to overnight observation. If it does not meet these criteria then
* an [IllegalArgumentException] will be thrown.
* Forces the flow to be kept in for overnight observation by the hospital.
*
* @param id The [StateMachineRunId] of the flow that you are trying to force into observation
* @param currentState The [StateMachineState] of the flow that is being forced into observation
* @param errors The errors to include in the new medical record
*/
fun forceIntoOvernightObservation(id: StateMachineRunId, errors: List<Throwable>) {
fun forceIntoOvernightObservation(currentState: StateMachineState, errors: List<Throwable>) {
mutex.locked {
// If a flow does not meet the criteria below, then it has moved into an invalid state or the function is being
// called from an incorrect location. The assertions below should error out the flow if they are not true.
requireNotNull(flowsInHospital[id]) { "Flow must already be in the hospital before forcing into overnight observation" }
val history = requireNotNull(flowPatients[id]) { "Flow must already have history before forcing into overnight observation" }
// Use the last staff member that last discharged the flow as the current staff member
val record = history.records.last().copy(
val id = currentState.flowLogic.runId
val medicalHistory = flowPatients.computeIfAbsent(id) { FlowMedicalHistory() }
val record = MedicalRecord.Flow(
time = clock.instant(),
flowId = id,
suspendCount = currentState.checkpoint.checkpointState.numberOfSuspends,
errors = errors,
by = listOf(TransitionErrorGeneralPractitioner),
outcome = Outcome.OVERNIGHT_OBSERVATION
)
medicalHistory.records += record
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(id, record.by.map { it.toString() }) }
history.records += record
recordsPublisher.onNext(record)
}
}

View File

@ -872,6 +872,24 @@ class DBCheckpointStorageTests {
}
}
@Test(timeout = 300_000)
fun `update only the flow status`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
}
database.transaction {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.HOSPITALIZED)
}
database.transaction {
assertEquals(
checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED),
checkpointStorage.checkpoints().single().deserialize()
)
}
}
data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint)
private fun changeStatus(oldCheckpoint: Checkpoint, status: Checkpoint.FlowStatus): IdAndCheckpoint {