diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt index 8e925ec6a1..9f3e0a5b66 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt @@ -4,6 +4,7 @@ import net.corda.core.CordaRuntimeException import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds +import net.corda.node.services.api.CheckpointStorage import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.singleIdentity @@ -12,6 +13,7 @@ import org.junit.Test import java.sql.Connection import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.TimeoutException import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertTrue @@ -444,4 +446,136 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() { charlie2.rpc.assertNumberOfCheckpoints(0) } } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state) on a responding node. + * + * The exception is thrown 3 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + */ + @Test(timeout = 300_000) + fun `responding flow - session init can be retried when there is a transient connection error to the database`() { + startDriver { + val (charlie, port) = createBytemanNode(CHARLIE_NAME) + val alice = createNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 3 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + alice.rpc.startFlow( + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + charlie.rpc.assertHospitalCounts( + discharged = 3, + observation = 0 + ) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + assertEquals(0, charlie.rpc.stateMachinesSnapshot().size) + assertEquals(0, charlie.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state) on a responding node. + * + * The exception is thrown 4 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * fails and is kept for in for observation. + */ + @Test(timeout = 300_000) + fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() { + startDriver { + val (charlie, port) = createBytemanNode(CHARLIE_NAME) + val alice = createNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 4 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + assertFailsWith { + alice.rpc.startFlow( + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + } + + charlie.rpc.assertHospitalCounts( + discharged = 3, + observation = 1 + ) + assertEquals(1, alice.rpc.stateMachinesSnapshot().size) + assertEquals(1, charlie.rpc.stateMachinesSnapshot().size) + assertEquals(0, charlie.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } } \ No newline at end of file diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt index 62c18d4107..8e3f44597a 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt @@ -4,12 +4,15 @@ import net.corda.core.CordaRuntimeException import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds +import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.statemachine.transitions.TopLevelTransition import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.singleIdentity import org.junit.Test +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.concurrent.TimeoutException import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -17,6 +20,10 @@ import kotlin.test.assertFailsWith @Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { + private companion object { + val executor: ExecutorService = Executors.newSingleThreadExecutor() + } + /** * Throws an exception when performing an [Action.SendInitial] action. * @@ -34,15 +41,15 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { val rules = """ RULE Create Counter CLASS ${ActionExecutorImpl::class.java.name} - METHOD executeSendInitial + METHOD executeSendMultiple AT ENTRY IF createCounter("counter", $counter) DO traceln("Counter created") ENDRULE - RULE Throw exception on executeSendInitial action + RULE Throw exception on executeSendMultiple action CLASS ${ActionExecutorImpl::class.java.name} - METHOD executeSendInitial + METHOD executeSendMultiple AT ENTRY IF readCounter("counter") < 4 DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") @@ -86,15 +93,15 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { val rules = """ RULE Create Counter CLASS ${ActionExecutorImpl::class.java.name} - METHOD executeSendInitial + METHOD executeSendMultiple AT ENTRY IF createCounter("counter", $counter) DO traceln("Counter created") ENDRULE - RULE Throw exception on executeSendInitial action + RULE Throw exception on executeSendMultiple action CLASS ${ActionExecutorImpl::class.java.name} - METHOD executeSendInitial + METHOD executeSendMultiple AT ENTRY IF readCounter("counter") < 3 DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") @@ -464,6 +471,134 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() { } } + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state). + * + * The exception is thrown 3 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + */ + @Test(timeout = 300_000) + fun `flow can be retried when there is a transient connection error to the database`() { + startDriver { + val charlie = createNode(CHARLIE_NAME) + val (alice, port) = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 3 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + alice.rpc.startFlow( + StateMachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + alice.rpc.assertHospitalCounts( + discharged = 3, + observation = 0 + ) + assertEquals(0, alice.rpc.stateMachinesSnapshot().size) + assertEquals(0, alice.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state). + * + * The exception is thrown 4 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * fails and is kept for in for observation. + */ + @Test(timeout = 300_000) + fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() { + startDriver { + val charlie = createNode(CHARLIE_NAME) + val (alice, port) = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 4 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + executor.execute { + alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()) + } + + // flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead + Thread.sleep(30.seconds.toMillis()) + + alice.rpc.assertHospitalCounts( + discharged = 3, + observation = 1 + ) + assertEquals(1, alice.rpc.stateMachinesSnapshot().size) + assertEquals(0, alice.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + /** * Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing on a responding node. * diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineKillFlowErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineKillFlowErrorHandlingTest.kt index ae07eb60d4..31f73c526d 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineKillFlowErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineKillFlowErrorHandlingTest.kt @@ -2,6 +2,7 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import net.corda.core.flows.FlowLogic +import net.corda.core.flows.KilledFlowException import net.corda.core.flows.StartableByRPC import net.corda.core.messaging.startFlow import net.corda.core.messaging.startTrackedFlow @@ -30,21 +31,9 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() { * No pass through the hospital is recorded. As the flow is marked as `isRemoved`. */ @Test(timeout = 300_000) - fun `error during transition due to an InterruptedException (killFlow) will terminate the flow`() { + fun `error during transition due to killing a flow will terminate the flow`() { startDriver { - val (alice, port) = createBytemanNode(ALICE_NAME) - - val rules = """ - RULE Increment terminal counter - CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} - METHOD consult - AT READ TERMINAL - IF true - DO traceln("Byteman test - terminal") - ENDRULE - """.trimIndent() - - submitBytemanRules(rules, port) + val alice = createNode(ALICE_NAME) val flow = alice.rpc.startTrackedFlow(StateMachineKillFlowErrorHandlingTest::SleepFlow) @@ -56,14 +45,10 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() { } } - assertFailsWith { flow.returnValue.getOrThrow(20.seconds) } - - val output = getBytemanOutput(alice) + assertFailsWith { flow.returnValue.getOrThrow(20.seconds) } assertTrue(flowKilled) - val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size - assertEquals(1, numberOfTerminalDiagnoses) - alice.rpc.assertHospitalCounts(propagated = 1) + alice.rpc.assertHospitalCountsAllZero() assertEquals(0, alice.rpc.stateMachinesSnapshot().size) alice.rpc.assertNumberOfCheckpoints(0) } @@ -97,7 +82,7 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() { } } - assertFailsWith { flow.returnValue.getOrThrow(30.seconds) } + assertFailsWith { flow.returnValue.getOrThrow(30.seconds) } assertTrue(flowKilled) alice.rpc.assertHospitalCountsAllZero() @@ -124,15 +109,15 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() { val rules = """ RULE Create Counter CLASS ${ActionExecutorImpl::class.java.name} - METHOD executeSendInitial + METHOD executeSendMultiple AT ENTRY IF createCounter("counter", $counter) DO traceln("Counter created") ENDRULE - RULE Throw exception on executeSendInitial action + RULE Throw exception on executeSendMultiple action CLASS ${ActionExecutorImpl::class.java.name} - METHOD executeSendInitial + METHOD executeSendMultiple AT ENTRY IF readCounter("counter") < 4 DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")