From 0d5ee8b0fa70d22bbc6e85e119749d15a6c1f2fe Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Thu, 6 Aug 2020 22:35:05 +0100 Subject: [PATCH] NOTICK Save exception for hospitalized session init errors (#6587) Save the exception for flows that fail during session init when they are kept for observation. Change the exception tidy up logic to only update the flow's status if the exception was removed. --- .../StateMachineFlowInitErrorHandlingTest.kt | 6 ++-- .../StateMachineGeneralErrorHandlingTest.kt | 2 +- .../persistence/DBCheckpointStorage.kt | 11 +++++++- .../SingleThreadedStateMachineManager.kt | 13 +++++---- .../persistence/DBCheckpointStorageTests.kt | 28 +++++++++++++------ 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt index 304bf0818d..07550ef709 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt @@ -115,7 +115,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { ).returnValue.getOrThrow(30.seconds) } - alice.rpc.assertNumberOfCheckpoints(failed = 1) + alice.rpc.assertNumberOfCheckpointsAllZero() alice.rpc.assertHospitalCounts(propagated = 1) assertEquals(0, alice.rpc.stateMachinesSnapshot().size) } @@ -235,7 +235,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { * * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state). */ - @Test(timeout = 300_000) + @Test(timeout = 450_000) fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() { startDriver { val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME) @@ -390,7 +390,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { * * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state). */ - @Test(timeout = 300_000) + @Test(timeout = 450_000) fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() { startDriver { val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt index 79067ffae8..8eadb9bf77 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt @@ -206,7 +206,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { alice.rpc.startFlow(StateMachineErrorHandlingTest::ThrowAnErrorFlow).returnValue.getOrThrow(60.seconds) } - alice.rpc.assertNumberOfCheckpoints(failed = 1) + alice.rpc.assertNumberOfCheckpointsAllZero() alice.rpc.assertHospitalCounts( propagated = 1, propagatedRetry = 3 diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 0b96ccfb7c..cf2976f662 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -390,12 +390,20 @@ class DBCheckpointStorage( val metadata = createDBFlowMetadata(flowId, checkpoint, now) + val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) { + val errored = checkpoint.errorState as? ErrorState.Errored + errored?.let { createDBFlowException(flowId, it, now) } + ?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}") + } else { + null + } + // Most fields are null as they cannot have been set when creating the initial checkpoint val dbFlowCheckpoint = DBFlowCheckpoint( flowId = flowId, blob = blob, result = null, - exceptionDetails = null, + exceptionDetails = dbFlowException, flowMetadata = metadata, status = checkpoint.status, compatible = checkpoint.compatible, @@ -407,6 +415,7 @@ class DBCheckpointStorage( currentDBSession().save(dbFlowCheckpoint) currentDBSession().save(blob) currentDBSession().save(metadata) + dbFlowException?.let { currentDBSession().save(it) } } @Suppress("ComplexMethod") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 00cb28d0da..27f9d36bf5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -447,6 +447,7 @@ internal class SingleThreadedStateMachineManager( liveFibers.countUp() } + @Suppress("ComplexMethod") private fun restoreFlowsFromCheckpoints(): Pair>, MutableMap> { val flows = mutableMapOf>() val pausedFlows = mutableMapOf() @@ -455,8 +456,9 @@ internal class SingleThreadedStateMachineManager( innerState.withLock { if (id in flows) return@Checkpoints } val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.also { if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) { - checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE) - if (!checkpointStorage.removeFlowException(id)) { + if (checkpointStorage.removeFlowException(id)) { + checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE) + } else { logger.error("Unable to remove database exception for flow $id. Something is very wrong. The flow will not be loaded and run.") return@Checkpoints } @@ -505,9 +507,10 @@ internal class SingleThreadedStateMachineManager( tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also { if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) { - checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE) - if (!checkpointStorage.removeFlowException(flowId)) { - logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not retry.") + if (checkpointStorage.removeFlowException(flowId)) { + checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE) + } else { + logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not be loaded and run.") return@transaction null } } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index a7420977b9..96f5535a1c 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -516,10 +516,10 @@ class DBCheckpointStorageTests { val (_, checkpoint) = newCheckpoint(1) // runnables val runnable = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE) - val hospitalized = checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED) + val hospitalized = checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.HOSPITALIZED) // not runnables val completed = checkpoint.copy(status = Checkpoint.FlowStatus.COMPLETED) - val failed = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED) + val failed = checkpoint.addError(IllegalStateException("bla bla"),status = Checkpoint.FlowStatus.FAILED) val killed = checkpoint.copy(status = Checkpoint.FlowStatus.KILLED) // paused val paused = checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED) @@ -663,10 +663,16 @@ class DBCheckpointStorageTests { val (_, checkpoint) = newCheckpoint(1) // runnables val runnable = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE) - val hospitalized = changeStatus(checkpoint, Checkpoint.FlowStatus.HOSPITALIZED) + val hospitalized = IdAndCheckpoint( + StateMachineRunId.createRandom(), + checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.HOSPITALIZED) + ) // not runnables val completed = changeStatus(checkpoint, Checkpoint.FlowStatus.COMPLETED) - val failed = changeStatus(checkpoint, Checkpoint.FlowStatus.FAILED) + val failed = IdAndCheckpoint( + StateMachineRunId.createRandom(), + checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.FAILED) + ) val killed = changeStatus(checkpoint, Checkpoint.FlowStatus.KILLED) // paused val paused = changeStatus(checkpoint, Checkpoint.FlowStatus.PAUSED) @@ -758,9 +764,15 @@ class DBCheckpointStorageTests { fun `'getFinishedFlowsResultsMetadata' fetches flows results metadata for finished flows only`() { val (_, checkpoint) = newCheckpoint(1) val runnable = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE) - val hospitalized = changeStatus(checkpoint, Checkpoint.FlowStatus.HOSPITALIZED) + val hospitalized = IdAndCheckpoint( + StateMachineRunId.createRandom(), + checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.HOSPITALIZED) + ) val completed = changeStatus(checkpoint, Checkpoint.FlowStatus.COMPLETED) - val failed = changeStatus(checkpoint, Checkpoint.FlowStatus.FAILED) + val failed = IdAndCheckpoint( + StateMachineRunId.createRandom(), + checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.FAILED) + ) val killed = changeStatus(checkpoint, Checkpoint.FlowStatus.KILLED) val paused = changeStatus(checkpoint, Checkpoint.FlowStatus.PAUSED) @@ -844,7 +856,7 @@ class DBCheckpointStorageTests { return deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) } - private fun Checkpoint.addError(exception: Exception): Checkpoint { + private fun Checkpoint.addError(exception: Exception, status: Checkpoint.FlowStatus = Checkpoint.FlowStatus.FAILED): Checkpoint { return copy( errorState = ErrorState.Errored( listOf( @@ -854,7 +866,7 @@ class DBCheckpointStorageTests { ) ), 0, false ), - status = Checkpoint.FlowStatus.FAILED + status = status ) }