diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt index 07550ef709..0440680745 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt @@ -331,6 +331,62 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { } } + /** + * Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state). + * This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node. + * + * The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier + * to test. + * + * The exception is thrown 3 times. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + * + * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state). + * + * The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false + * at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state. + * + */ + @Test(timeout = 300_000) + fun `error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() { + startDriver { + val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS $actionExecutorClassName + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeSignalFlowHasStarted action + CLASS $actionExecutorClassName + METHOD executeSignalFlowHasStarted + AT ENTRY + IF readCounter("counter") < 3 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("i wish i was a sql exception") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + alice.rpc.startFlow( + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + alice.rpc.assertNumberOfCheckpointsAllZero() + alice.rpc.assertHospitalCounts(discharged = 3) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + } + } + /** * Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and * saved its first checkpoint (remains in an unstarted state). @@ -463,7 +519,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { CLASS $actionExecutorClassName METHOD executeCommitTransaction AT ENTRY - IF createCounter("counter", $counter) + IF createCounter("counter", $counter) && createCounter("counter_2", $counter) DO traceln("Counter created") ENDRULE @@ -479,8 +535,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { INTERFACE ${CheckpointStorage::class.java.name} METHOD getCheckpoint AT ENTRY - IF true - DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + IF readCounter("counter_2") < 3 + DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") ENDRULE """.trimIndent() @@ -496,7 +552,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { alice.rpc.assertNumberOfCheckpointsAllZero() charlie.rpc.assertHospitalCounts( discharged = 3, - observation = 0 + dischargedRetry = 1 ) assertEquals(0, alice.rpc.stateMachinesSnapshot().size) assertEquals(0, charlie.rpc.stateMachinesSnapshot().size) @@ -527,7 +583,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { CLASS $actionExecutorClassName METHOD executeCommitTransaction AT ENTRY - IF createCounter("counter", $counter) + IF createCounter("counter", $counter) && createCounter("counter_2", $counter) DO traceln("Counter created") ENDRULE @@ -543,8 +599,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { INTERFACE ${CheckpointStorage::class.java.name} METHOD getCheckpoint AT ENTRY - IF true - DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + IF readCounter("counter_2") < 3 + DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") ENDRULE """.trimIndent() @@ -562,7 +618,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1) charlie.rpc.assertHospitalCounts( discharged = 3, - observation = 1 + observation = 1, + dischargedRetry = 1 ) assertEquals(1, alice.rpc.stateMachinesSnapshot().size) assertEquals(1, charlie.rpc.stateMachinesSnapshot().size) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt index f97800214f..ad4f388ff5 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt @@ -494,7 +494,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { CLASS $actionExecutorClassName METHOD executeCommitTransaction AT ENTRY - IF createCounter("counter", $counter) + IF createCounter("counter", $counter) && createCounter("counter_2", $counter) DO traceln("Counter created") ENDRULE @@ -510,8 +510,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { INTERFACE ${CheckpointStorage::class.java.name} METHOD getCheckpoint AT ENTRY - IF true - DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + IF readCounter("counter_2") < 3 + DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") ENDRULE """.trimIndent() @@ -527,7 +527,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { alice.rpc.assertNumberOfCheckpointsAllZero() alice.rpc.assertHospitalCounts( discharged = 3, - observation = 0 + dischargedRetry = 1 ) assertEquals(0, alice.rpc.stateMachinesSnapshot().size) } @@ -557,7 +557,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { CLASS $actionExecutorClassName METHOD executeCommitTransaction AT ENTRY - IF createCounter("counter", $counter) + IF createCounter("counter", $counter) && createCounter("counter_2", $counter) DO traceln("Counter created") ENDRULE @@ -573,8 +573,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { INTERFACE ${CheckpointStorage::class.java.name} METHOD getCheckpoint AT ENTRY - IF true - DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + IF readCounter("counter_2") < 3 + DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") ENDRULE """.trimIndent() @@ -590,7 +590,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { alice.rpc.assertNumberOfCheckpoints(hospitalized = 1) alice.rpc.assertHospitalCounts( discharged = 3, - observation = 1 + observation = 1, + dischargedRetry = 1 ) assertEquals(1, alice.rpc.stateMachinesSnapshot().size) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 27f9d36bf5..09c7e0f889 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -495,40 +495,51 @@ internal class SingleThreadedStateMachineManager( logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.") return } - val flow = if (currentState.isAnyCheckpointPersisted) { - // We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that - // we mirror exactly what happens when restarting the node. - val checkpoint = database.transaction { - val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) - if (serializedCheckpoint == null) { - logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.") - return@transaction null - } + // We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that + // we mirror exactly what happens when restarting the node. + // Ignore [isAnyCheckpointPersisted] as the checkpoint could be committed but the flag remains un-updated + val checkpointLoadingStatus = database.transaction { + val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) ?: return@transaction CheckpointLoadingStatus.NotFound + val checkpoint = serializedCheckpoint.let { tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also { if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) { if (checkpointStorage.removeFlowException(flowId)) { checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE) } else { logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not be loaded and run.") - return@transaction null + // This code branch is being removed in a different PR + return@transaction CheckpointLoadingStatus.CouldNotDeserialize } } - } ?: return@transaction null - } ?: return + } ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize + } - // Resurrect flow - flowCreator.createFlowFromCheckpoint( - flowId, - checkpoint, - currentState.reloadCheckpointAfterSuspendCount, - currentState.lock, - firstRestore = false - ) ?: return - } else { - // Just flow initiation message - null + CheckpointLoadingStatus.Success(checkpoint) } + + val flow = when { + // Resurrect flow + checkpointLoadingStatus is CheckpointLoadingStatus.Success -> { + flowCreator.createFlowFromCheckpoint( + flowId, + checkpointLoadingStatus.checkpoint, + currentState.reloadCheckpointAfterSuspendCount, + currentState.lock, + firstRestore = false + ) ?: return + } + checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> { + logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.") + return + } + checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return + else -> { + // Just flow initiation message + null + } + } + innerState.withLock { if (stopping) { return @@ -758,8 +769,7 @@ internal class SingleThreadedStateMachineManager( ): CordaFuture> { onCallingStartFlowInternal?.invoke() - val existingFlow = innerState.withLock { flows[flowId] } - val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState.isAnyCheckpointPersisted) { + val existingCheckpoint = if (innerState.withLock { flows[flowId] != null }) { // Load the flow's checkpoint // The checkpoint will be missing if the flow failed before persisting the original checkpoint // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) @@ -1098,4 +1108,10 @@ internal class SingleThreadedStateMachineManager( } return false } + + private sealed class CheckpointLoadingStatus { + class Success(val checkpoint: Checkpoint) : CheckpointLoadingStatus() + object NotFound : CheckpointLoadingStatus() + object CouldNotDeserialize : CheckpointLoadingStatus() + } }