NOTICK Save exception for hospitalized session init errors (#6587)

Save the exception for flows that fail during session init when they are
kept for observation.

Change the exception tidy up logic to only update the flow's status if
the exception was removed.
This commit is contained in:
Dan Newton 2020-08-06 22:35:05 +01:00 committed by GitHub
parent ef27dbfdbb
commit 0d5ee8b0fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 42 additions and 18 deletions

View File

@ -115,7 +115,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
).returnValue.getOrThrow(30.seconds)
}
alice.rpc.assertNumberOfCheckpoints(failed = 1)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(propagated = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
@ -235,7 +235,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
@Test(timeout = 450_000)
fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
@ -390,7 +390,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
@Test(timeout = 450_000)
fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver {
val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME)

View File

@ -206,7 +206,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
alice.rpc.startFlow(StateMachineErrorHandlingTest::ThrowAnErrorFlow).returnValue.getOrThrow(60.seconds)
}
alice.rpc.assertNumberOfCheckpoints(failed = 1)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(
propagated = 1,
propagatedRetry = 3

View File

@ -390,12 +390,20 @@ class DBCheckpointStorage(
val metadata = createDBFlowMetadata(flowId, checkpoint, now)
val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
val errored = checkpoint.errorState as? ErrorState.Errored
errored?.let { createDBFlowException(flowId, it, now) }
?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}")
} else {
null
}
// Most fields are null as they cannot have been set when creating the initial checkpoint
val dbFlowCheckpoint = DBFlowCheckpoint(
flowId = flowId,
blob = blob,
result = null,
exceptionDetails = null,
exceptionDetails = dbFlowException,
flowMetadata = metadata,
status = checkpoint.status,
compatible = checkpoint.compatible,
@ -407,6 +415,7 @@ class DBCheckpointStorage(
currentDBSession().save(dbFlowCheckpoint)
currentDBSession().save(blob)
currentDBSession().save(metadata)
dbFlowException?.let { currentDBSession().save(it) }
}
@Suppress("ComplexMethod")

View File

@ -447,6 +447,7 @@ internal class SingleThreadedStateMachineManager(
liveFibers.countUp()
}
@Suppress("ComplexMethod")
private fun restoreFlowsFromCheckpoints(): Pair<MutableMap<StateMachineRunId, Flow<*>>, MutableMap<StateMachineRunId, NonResidentFlow>> {
val flows = mutableMapOf<StateMachineRunId, Flow<*>>()
val pausedFlows = mutableMapOf<StateMachineRunId, NonResidentFlow>()
@ -455,8 +456,9 @@ internal class SingleThreadedStateMachineManager(
innerState.withLock { if (id in flows) return@Checkpoints }
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
if (!checkpointStorage.removeFlowException(id)) {
if (checkpointStorage.removeFlowException(id)) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
} else {
logger.error("Unable to remove database exception for flow $id. Something is very wrong. The flow will not be loaded and run.")
return@Checkpoints
}
@ -505,9 +507,10 @@ internal class SingleThreadedStateMachineManager(
tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
if (!checkpointStorage.removeFlowException(flowId)) {
logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not retry.")
if (checkpointStorage.removeFlowException(flowId)) {
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
} else {
logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not be loaded and run.")
return@transaction null
}
}

View File

@ -516,10 +516,10 @@ class DBCheckpointStorageTests {
val (_, checkpoint) = newCheckpoint(1)
// runnables
val runnable = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
val hospitalized = checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
val hospitalized = checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.HOSPITALIZED)
// not runnables
val completed = checkpoint.copy(status = Checkpoint.FlowStatus.COMPLETED)
val failed = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
val failed = checkpoint.addError(IllegalStateException("bla bla"),status = Checkpoint.FlowStatus.FAILED)
val killed = checkpoint.copy(status = Checkpoint.FlowStatus.KILLED)
// paused
val paused = checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED)
@ -663,10 +663,16 @@ class DBCheckpointStorageTests {
val (_, checkpoint) = newCheckpoint(1)
// runnables
val runnable = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
val hospitalized = changeStatus(checkpoint, Checkpoint.FlowStatus.HOSPITALIZED)
val hospitalized = IdAndCheckpoint(
StateMachineRunId.createRandom(),
checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.HOSPITALIZED)
)
// not runnables
val completed = changeStatus(checkpoint, Checkpoint.FlowStatus.COMPLETED)
val failed = changeStatus(checkpoint, Checkpoint.FlowStatus.FAILED)
val failed = IdAndCheckpoint(
StateMachineRunId.createRandom(),
checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.FAILED)
)
val killed = changeStatus(checkpoint, Checkpoint.FlowStatus.KILLED)
// paused
val paused = changeStatus(checkpoint, Checkpoint.FlowStatus.PAUSED)
@ -758,9 +764,15 @@ class DBCheckpointStorageTests {
fun `'getFinishedFlowsResultsMetadata' fetches flows results metadata for finished flows only`() {
val (_, checkpoint) = newCheckpoint(1)
val runnable = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
val hospitalized = changeStatus(checkpoint, Checkpoint.FlowStatus.HOSPITALIZED)
val hospitalized = IdAndCheckpoint(
StateMachineRunId.createRandom(),
checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.HOSPITALIZED)
)
val completed = changeStatus(checkpoint, Checkpoint.FlowStatus.COMPLETED)
val failed = changeStatus(checkpoint, Checkpoint.FlowStatus.FAILED)
val failed = IdAndCheckpoint(
StateMachineRunId.createRandom(),
checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.FAILED)
)
val killed = changeStatus(checkpoint, Checkpoint.FlowStatus.KILLED)
val paused = changeStatus(checkpoint, Checkpoint.FlowStatus.PAUSED)
@ -844,7 +856,7 @@ class DBCheckpointStorageTests {
return deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
}
private fun Checkpoint.addError(exception: Exception): Checkpoint {
private fun Checkpoint.addError(exception: Exception, status: Checkpoint.FlowStatus = Checkpoint.FlowStatus.FAILED): Checkpoint {
return copy(
errorState = ErrorState.Errored(
listOf(
@ -854,7 +866,7 @@ class DBCheckpointStorageTests {
)
), 0, false
),
status = Checkpoint.FlowStatus.FAILED
status = status
)
}