diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt index 304bf0818d..07550ef709 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt @@ -115,7 +115,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { ).returnValue.getOrThrow(30.seconds) } - alice.rpc.assertNumberOfCheckpoints(failed = 1) + alice.rpc.assertNumberOfCheckpointsAllZero() alice.rpc.assertHospitalCounts(propagated = 1) assertEquals(0, alice.rpc.stateMachinesSnapshot().size) } @@ -235,7 +235,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { * * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state). */ - @Test(timeout = 300_000) + @Test(timeout = 450_000) fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() { startDriver { val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME) @@ -390,7 +390,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { * * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state). */ - @Test(timeout = 300_000) + @Test(timeout = 450_000) fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() { startDriver { val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt index 79067ffae8..8eadb9bf77 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt @@ -206,7 +206,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { alice.rpc.startFlow(StateMachineErrorHandlingTest::ThrowAnErrorFlow).returnValue.getOrThrow(60.seconds) } - alice.rpc.assertNumberOfCheckpoints(failed = 1) + alice.rpc.assertNumberOfCheckpointsAllZero() alice.rpc.assertHospitalCounts( propagated = 1, propagatedRetry = 3 diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 0b96ccfb7c..cf2976f662 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -390,12 +390,20 @@ class DBCheckpointStorage( val metadata = createDBFlowMetadata(flowId, checkpoint, now) + val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) { + val errored = checkpoint.errorState as? ErrorState.Errored + errored?.let { createDBFlowException(flowId, it, now) } + ?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}") + } else { + null + } + // Most fields are null as they cannot have been set when creating the initial checkpoint val dbFlowCheckpoint = DBFlowCheckpoint( flowId = flowId, blob = blob, result = null, - exceptionDetails = null, + exceptionDetails = dbFlowException, flowMetadata = metadata, status = checkpoint.status, compatible = checkpoint.compatible, @@ -407,6 +415,7 @@ class DBCheckpointStorage( currentDBSession().save(dbFlowCheckpoint) currentDBSession().save(blob) currentDBSession().save(metadata) + dbFlowException?.let { currentDBSession().save(it) } } @Suppress("ComplexMethod") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 00cb28d0da..27f9d36bf5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -447,6 +447,7 @@ internal class SingleThreadedStateMachineManager( liveFibers.countUp() } + @Suppress("ComplexMethod") private fun restoreFlowsFromCheckpoints(): Pair>, MutableMap> { val flows = mutableMapOf>() val pausedFlows = mutableMapOf() @@ -455,8 +456,9 @@ internal class SingleThreadedStateMachineManager( innerState.withLock { if (id in flows) return@Checkpoints } val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.also { if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) { - checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE) - if (!checkpointStorage.removeFlowException(id)) { + if (checkpointStorage.removeFlowException(id)) { + checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE) + } else { logger.error("Unable to remove database exception for flow $id. Something is very wrong. The flow will not be loaded and run.") return@Checkpoints } @@ -505,9 +507,10 @@ internal class SingleThreadedStateMachineManager( tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also { if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) { - checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE) - if (!checkpointStorage.removeFlowException(flowId)) { - logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not retry.") + if (checkpointStorage.removeFlowException(flowId)) { + checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE) + } else { + logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not be loaded and run.") return@transaction null } } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index a7420977b9..96f5535a1c 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -516,10 +516,10 @@ class DBCheckpointStorageTests { val (_, checkpoint) = newCheckpoint(1) // runnables val runnable = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE) - val hospitalized = checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED) + val hospitalized = checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.HOSPITALIZED) // not runnables val completed = checkpoint.copy(status = Checkpoint.FlowStatus.COMPLETED) - val failed = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED) + val failed = checkpoint.addError(IllegalStateException("bla bla"),status = Checkpoint.FlowStatus.FAILED) val killed = checkpoint.copy(status = Checkpoint.FlowStatus.KILLED) // paused val paused = checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED) @@ -663,10 +663,16 @@ class DBCheckpointStorageTests { val (_, checkpoint) = newCheckpoint(1) // runnables val runnable = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE) - val hospitalized = changeStatus(checkpoint, Checkpoint.FlowStatus.HOSPITALIZED) + val hospitalized = IdAndCheckpoint( + StateMachineRunId.createRandom(), + checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.HOSPITALIZED) + ) // not runnables val completed = changeStatus(checkpoint, Checkpoint.FlowStatus.COMPLETED) - val failed = changeStatus(checkpoint, Checkpoint.FlowStatus.FAILED) + val failed = IdAndCheckpoint( + StateMachineRunId.createRandom(), + checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.FAILED) + ) val killed = changeStatus(checkpoint, Checkpoint.FlowStatus.KILLED) // paused val paused = changeStatus(checkpoint, Checkpoint.FlowStatus.PAUSED) @@ -758,9 +764,15 @@ class DBCheckpointStorageTests { fun `'getFinishedFlowsResultsMetadata' fetches flows results metadata for finished flows only`() { val (_, checkpoint) = newCheckpoint(1) val runnable = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE) - val hospitalized = changeStatus(checkpoint, Checkpoint.FlowStatus.HOSPITALIZED) + val hospitalized = IdAndCheckpoint( + StateMachineRunId.createRandom(), + checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.HOSPITALIZED) + ) val completed = changeStatus(checkpoint, Checkpoint.FlowStatus.COMPLETED) - val failed = changeStatus(checkpoint, Checkpoint.FlowStatus.FAILED) + val failed = IdAndCheckpoint( + StateMachineRunId.createRandom(), + checkpoint.addError(IllegalStateException("bla bla"), status = Checkpoint.FlowStatus.FAILED) + ) val killed = changeStatus(checkpoint, Checkpoint.FlowStatus.KILLED) val paused = changeStatus(checkpoint, Checkpoint.FlowStatus.PAUSED) @@ -844,7 +856,7 @@ class DBCheckpointStorageTests { return deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) } - private fun Checkpoint.addError(exception: Exception): Checkpoint { + private fun Checkpoint.addError(exception: Exception, status: Checkpoint.FlowStatus = Checkpoint.FlowStatus.FAILED): Checkpoint { return copy( errorState = ErrorState.Errored( listOf( @@ -854,7 +866,7 @@ class DBCheckpointStorageTests { ) ), 0, false ), - status = Checkpoint.FlowStatus.FAILED + status = status ) }