From c4027e23bf7e7cb82ba0d90007870da798c7a601 Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Fri, 14 Aug 2020 17:42:19 +0100 Subject: [PATCH] ENT-5649 Always load from db when flow retries (#6637) Always attempt to load a checkpoint from the database when a flow retries. This is to prevent transient errors where the checkpoint is committed to the database but throws an error back to the node. When the node tries to retry in this scenario, `isAnyCheckpointPersisted` is false, meaning that it will try to insert when it tries to save its initial checkpoint again. By loading from the existing checkpoint, even though it doesn't really use it because it is `Unstarted`, the flag gets put into the right state and will update rather than insert later on. --- .../StateMachineFlowInitErrorHandlingTest.kt | 73 +++++++++++++++++-- .../StateMachineGeneralErrorHandlingTest.kt | 17 +++-- .../SingleThreadedStateMachineManager.kt | 66 ++++++++++------- 3 files changed, 115 insertions(+), 41 deletions(-) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt index 07550ef709..0440680745 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt @@ -331,6 +331,62 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { } } + /** + * Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state). + * This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node. + * + * The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier + * to test. + * + * The exception is thrown 3 times. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + * + * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state). + * + * The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false + * at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state. + * + */ + @Test(timeout = 300_000) + fun `error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() { + startDriver { + val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS $actionExecutorClassName + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeSignalFlowHasStarted action + CLASS $actionExecutorClassName + METHOD executeSignalFlowHasStarted + AT ENTRY + IF readCounter("counter") < 3 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("i wish i was a sql exception") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + alice.rpc.startFlow( + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + alice.rpc.assertNumberOfCheckpointsAllZero() + alice.rpc.assertHospitalCounts(discharged = 3) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + } + } + /** * Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and * saved its first checkpoint (remains in an unstarted state). @@ -463,7 +519,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { CLASS $actionExecutorClassName METHOD executeCommitTransaction AT ENTRY - IF createCounter("counter", $counter) + IF createCounter("counter", $counter) && createCounter("counter_2", $counter) DO traceln("Counter created") ENDRULE @@ -479,8 +535,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { INTERFACE ${CheckpointStorage::class.java.name} METHOD getCheckpoint AT ENTRY - IF true - DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + IF readCounter("counter_2") < 3 + DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") ENDRULE """.trimIndent() @@ -496,7 +552,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { alice.rpc.assertNumberOfCheckpointsAllZero() charlie.rpc.assertHospitalCounts( discharged = 3, - observation = 0 + dischargedRetry = 1 ) assertEquals(0, alice.rpc.stateMachinesSnapshot().size) assertEquals(0, charlie.rpc.stateMachinesSnapshot().size) @@ -527,7 +583,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { CLASS $actionExecutorClassName METHOD executeCommitTransaction AT ENTRY - IF createCounter("counter", $counter) + IF createCounter("counter", $counter) && createCounter("counter_2", $counter) DO traceln("Counter created") ENDRULE @@ -543,8 +599,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { INTERFACE ${CheckpointStorage::class.java.name} METHOD getCheckpoint AT ENTRY - IF true - DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + IF readCounter("counter_2") < 3 + DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") ENDRULE """.trimIndent() @@ -562,7 +618,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1) charlie.rpc.assertHospitalCounts( discharged = 3, - observation = 1 + observation = 1, + dischargedRetry = 1 ) assertEquals(1, alice.rpc.stateMachinesSnapshot().size) assertEquals(1, charlie.rpc.stateMachinesSnapshot().size) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt index f97800214f..ad4f388ff5 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt @@ -494,7 +494,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { CLASS $actionExecutorClassName METHOD executeCommitTransaction AT ENTRY - IF createCounter("counter", $counter) + IF createCounter("counter", $counter) && createCounter("counter_2", $counter) DO traceln("Counter created") ENDRULE @@ -510,8 +510,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { INTERFACE ${CheckpointStorage::class.java.name} METHOD getCheckpoint AT ENTRY - IF true - DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + IF readCounter("counter_2") < 3 + DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") ENDRULE """.trimIndent() @@ -527,7 +527,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { alice.rpc.assertNumberOfCheckpointsAllZero() alice.rpc.assertHospitalCounts( discharged = 3, - observation = 0 + dischargedRetry = 1 ) assertEquals(0, alice.rpc.stateMachinesSnapshot().size) } @@ -557,7 +557,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { CLASS $actionExecutorClassName METHOD executeCommitTransaction AT ENTRY - IF createCounter("counter", $counter) + IF createCounter("counter", $counter) && createCounter("counter_2", $counter) DO traceln("Counter created") ENDRULE @@ -573,8 +573,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { INTERFACE ${CheckpointStorage::class.java.name} METHOD getCheckpoint AT ENTRY - IF true - DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + IF readCounter("counter_2") < 3 + DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") ENDRULE """.trimIndent() @@ -590,7 +590,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { alice.rpc.assertNumberOfCheckpoints(hospitalized = 1) alice.rpc.assertHospitalCounts( discharged = 3, - observation = 1 + observation = 1, + dischargedRetry = 1 ) assertEquals(1, alice.rpc.stateMachinesSnapshot().size) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 27f9d36bf5..09c7e0f889 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -495,40 +495,51 @@ internal class SingleThreadedStateMachineManager( logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.") return } - val flow = if (currentState.isAnyCheckpointPersisted) { - // We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that - // we mirror exactly what happens when restarting the node. - val checkpoint = database.transaction { - val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) - if (serializedCheckpoint == null) { - logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.") - return@transaction null - } + // We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that + // we mirror exactly what happens when restarting the node. + // Ignore [isAnyCheckpointPersisted] as the checkpoint could be committed but the flag remains un-updated + val checkpointLoadingStatus = database.transaction { + val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) ?: return@transaction CheckpointLoadingStatus.NotFound + val checkpoint = serializedCheckpoint.let { tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also { if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) { if (checkpointStorage.removeFlowException(flowId)) { checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE) } else { logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not be loaded and run.") - return@transaction null + // This code branch is being removed in a different PR + return@transaction CheckpointLoadingStatus.CouldNotDeserialize } } - } ?: return@transaction null - } ?: return + } ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize + } - // Resurrect flow - flowCreator.createFlowFromCheckpoint( - flowId, - checkpoint, - currentState.reloadCheckpointAfterSuspendCount, - currentState.lock, - firstRestore = false - ) ?: return - } else { - // Just flow initiation message - null + CheckpointLoadingStatus.Success(checkpoint) } + + val flow = when { + // Resurrect flow + checkpointLoadingStatus is CheckpointLoadingStatus.Success -> { + flowCreator.createFlowFromCheckpoint( + flowId, + checkpointLoadingStatus.checkpoint, + currentState.reloadCheckpointAfterSuspendCount, + currentState.lock, + firstRestore = false + ) ?: return + } + checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> { + logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.") + return + } + checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return + else -> { + // Just flow initiation message + null + } + } + innerState.withLock { if (stopping) { return @@ -758,8 +769,7 @@ internal class SingleThreadedStateMachineManager( ): CordaFuture> { onCallingStartFlowInternal?.invoke() - val existingFlow = innerState.withLock { flows[flowId] } - val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState.isAnyCheckpointPersisted) { + val existingCheckpoint = if (innerState.withLock { flows[flowId] != null }) { // Load the flow's checkpoint // The checkpoint will be missing if the flow failed before persisting the original checkpoint // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) @@ -1098,4 +1108,10 @@ internal class SingleThreadedStateMachineManager( } return false } + + private sealed class CheckpointLoadingStatus { + class Success(val checkpoint: Checkpoint) : CheckpointLoadingStatus() + object NotFound : CheckpointLoadingStatus() + object CouldNotDeserialize : CheckpointLoadingStatus() + } }