diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt index 0440680745..6817613f85 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt @@ -2,6 +2,7 @@ package net.corda.node.services.statemachine import net.corda.core.CordaRuntimeException import net.corda.core.messaging.startFlow +import net.corda.core.messaging.startFlowWithClientId import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.services.api.CheckpointStorage @@ -387,6 +388,273 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { } } + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint + * (remains in an unstarted state). + * + * The exception is thrown 3 times. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + * + * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state). + * + */ + @Test(timeout = 300_000) + fun `with client id - error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() { + startDriver { + val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS $actionExecutorClassName + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS $actionExecutorClassName + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 3 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + alice.rpc.startFlowWithClientId( + "here is my client id", + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + alice.rpc.assertNumberOfCheckpoints(completed = 1) + alice.rpc.assertHospitalCounts(discharged = 3) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + } + } + + /** + * Throws an exception when calling [FlowStateMachineImpl.processEvent]. + * + * This is not an expected place for an exception to occur, but allows us to test what happens when a random exception is propagated + * up to [FlowStateMachineImpl.run] during flow initialisation. + * + * A "Transaction context is missing" exception is thrown due to where the exception is thrown (no transaction is created so this is + * thrown when leaving [FlowStateMachineImpl.processEventsUntilFlowIsResumed] due to the finally block). + */ + @Test(timeout = 300_000) + fun `with client id - unexpected error during flow initialisation throws exception to client`() { + startDriver { + val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME) + val rules = """ + RULE Create Counter + CLASS ${FlowStateMachineImpl::class.java.name} + METHOD processEvent + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception + CLASS ${FlowStateMachineImpl::class.java.name} + METHOD processEvent + AT ENTRY + IF readCounter("counter") < 1 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + assertFailsWith { + alice.rpc.startFlowWithClientId( + "give me all of your client ids, or else", + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow(30.seconds) + } + + alice.rpc.assertNumberOfCheckpoints(failed = 1) + alice.rpc.assertHospitalCounts(propagated = 1) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + } + } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint + * (remains in an unstarted state). + * + * The exception is thrown 4 times. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation. + * + * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state). + */ + @Test(timeout = 450_000) + fun `with client id - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() { + startDriver { + val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS $actionExecutorClassName + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS $actionExecutorClassName + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 4 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + executor.execute { + alice.rpc.startFlowWithClientId( + "please sir, can i have a client id?", + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ) + } + + // flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead + Thread.sleep(30.seconds.toMillis()) + + alice.rpc.assertNumberOfCheckpoints(hospitalized = 1) + alice.rpc.assertHospitalCounts( + discharged = 3, + observation = 1 + ) + assertEquals(1, alice.rpc.stateMachinesSnapshot().size) + val terminated = (alice as OutOfProcessImpl).stop(60.seconds) + assertTrue(terminated, "The node must be shutdown before it can be restarted") + val (alice2, _) = createBytemanNode(ALICE_NAME) + Thread.sleep(20.seconds.toMillis()) + alice2.rpc.assertNumberOfCheckpoints(completed = 1) + } + } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint + * (remains in an unstarted state). + * + * An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another + * exception is then thrown during the retry itself. + * + * The flow then retries the retry causing the flow to complete successfully. + */ + @Test(timeout = 300_000) + fun `with client id - error during retrying a flow that failed when committing its original checkpoint will retry the flow again and complete successfully`() { + startDriver { + val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME) + + val rules = """ + RULE Throw exception on executeCommitTransaction action after first suspend + commit + CLASS $actionExecutorClassName + METHOD executeCommitTransaction + AT ENTRY + IF !flagged("commit_exception_flag") + DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1") + ENDRULE + + RULE Throw exception on retry + CLASS $stateMachineManagerClassName + METHOD onExternalStartFlow + AT ENTRY + IF flagged("commit_exception_flag") && !flagged("retry_exception_flag") + DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + alice.rpc.startFlowWithClientId( + "hi, i'd like to be your client id", + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + alice.rpc.assertNumberOfCheckpoints(completed = 1) + alice.rpc.assertHospitalCounts( + discharged = 1, + dischargedRetry = 1 + ) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + } + } + + /** + * Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state). + * This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node. + * + * The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier + * to test. + * + * The exception is thrown 3 times. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + * + * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state). + * + * The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false + * at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state. + * + */ + @Test(timeout = 300_000) + fun `with client id - error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() { + startDriver { + val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS $actionExecutorClassName + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeSignalFlowHasStarted action + CLASS $actionExecutorClassName + METHOD executeSignalFlowHasStarted + # METHOD executeAcknowledgeMessages + AT ENTRY + IF readCounter("counter") < 3 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("i wish i was a sql exception") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + alice.rpc.startFlowWithClientId( + "hello im a client id", + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + alice.rpc.assertNumberOfCheckpoints(completed = 1) + alice.rpc.assertHospitalCounts(discharged = 3) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + } + } + /** * Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and * saved its first checkpoint (remains in an unstarted state). diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index e4bfe0ebdf..6e750619f4 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -15,14 +15,20 @@ interface CheckpointStorage { /** * Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id */ - fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes, - serializedCheckpointState: SerializedBytes) + fun addCheckpoint( + id: StateMachineRunId, checkpoint: Checkpoint, + serializedFlowState: SerializedBytes?, + serializedCheckpointState: SerializedBytes + ) /** * Update an existing checkpoint. Will throw if there is not checkpoint for this id. */ - fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes?, - serializedCheckpointState: SerializedBytes) + fun updateCheckpoint( + id: StateMachineRunId, checkpoint: Checkpoint, + serializedFlowState: SerializedBytes?, + serializedCheckpointState: SerializedBytes + ) /** * Update an existing checkpoints status ([Checkpoint.status]). diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index e08181c86b..a088e3bf6e 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -373,7 +373,7 @@ class DBCheckpointStorage( override fun addCheckpoint( id: StateMachineRunId, checkpoint: Checkpoint, - serializedFlowState: SerializedBytes, + serializedFlowState: SerializedBytes?, serializedCheckpointState: SerializedBytes ) { val now = clock.instant() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index b1162a390b..a5f8c8060f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -94,10 +94,7 @@ internal class ActionExecutorImpl( if (action.isCheckpointUpdate) { checkpointStorage.updateCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState) } else { - if (flowState is FlowState.Finished) { - throw IllegalStateException("A new checkpoint cannot be created with a finished flow state.") - } - checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState!!, serializedCheckpointState) + checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 52697247c5..4066a8f972 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -199,13 +199,14 @@ internal class SingleThreadedStateMachineManager( // - Incompatible checkpoints need to be handled upon implementing CORDA-3897 for (flow in fibers.values) { flow.fiber.clientId?.let { - innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(doneFuture(flow.fiber)) + innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(flow.fiber.id, doneFuture(flow.fiber)) } } for (pausedFlow in pausedFlows) { pausedFlow.value.checkpoint.checkpointState.invocationContext.clientId?.let { innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active( + pausedFlow.key, doneClientIdFuture(pausedFlow.key, pausedFlow.value.resultFuture, it) ) } @@ -311,17 +312,20 @@ internal class SingleThreadedStateMachineManager( status } else { newFuture = openFuture() - FlowWithClientIdStatus.Active(newFuture!!) + FlowWithClientIdStatus.Active(flowId, newFuture!!) } } } // Flow -started with client id- already exists, return the existing's flow future and don't start a new flow. existingStatus?.let { - val existingFuture = activeOrRemovedClientIdFuture(it, clientId) - return@startFlow uncheckedCast(existingFuture) - } - onClientIDNotFound?.invoke() + // If the flow ID is the same as the one recorded in the client ID map, + // then this start flow event has been retried, and we should not de-duplicate. + if (flowId != it.flowId) { + val existingFuture = activeOrRemovedClientIdFuture(it, clientId) + return@startFlow uncheckedCast(existingFuture) + } + } ?: onClientIDNotFound?.invoke() } return try { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index c53cf81dc0..7e8efc4ee3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -417,9 +417,13 @@ sealed class SubFlowVersion { data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion() } -sealed class FlowWithClientIdStatus { - data class Active(val flowStateMachineFuture: CordaFuture>) : FlowWithClientIdStatus() - data class Removed(val flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus() +sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId) { + class Active( + flowId: StateMachineRunId, + val flowStateMachineFuture: CordaFuture> + ) : FlowWithClientIdStatus(flowId) + + class Removed(flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus(flowId) } data class FlowResultMetadata(