mirror of
https://github.com/corda/corda.git
synced 2025-02-21 09:51:57 +00:00
CORDA-3841 Update session init flow error handling tests (#6431)
These tests were removed after doing a merge from 4.4. They needed updating after the changes from 4.4 anyway. These have been included in this change. Also fix kill flow tests and send initial tests.
This commit is contained in:
parent
f8d86c0617
commit
9a8ae0fd32
@ -4,6 +4,7 @@ import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.CHARLIE_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
@ -12,6 +13,7 @@ import org.junit.Test
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeoutException
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
@ -444,4 +446,136 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
charlie2.rpc.assertNumberOfCheckpoints(0)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||
* state) on a responding node.
|
||||
*
|
||||
* The exception is thrown 3 times.
|
||||
*
|
||||
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
|
||||
*
|
||||
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
|
||||
* retrying the flow after it failed to commit it's original checkpoint.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* succeeds and the flow finishes.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `responding flow - session init can be retried when there is a transient connection error to the database`() {
|
||||
startDriver {
|
||||
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
|
||||
val alice = createNode(ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 3
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on getCheckpoint
|
||||
INTERFACE ${CheckpointStorage::class.java.name}
|
||||
METHOD getCheckpoint
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
|
||||
alice.rpc.startFlow(
|
||||
StateMachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
charlie.rpc.assertHospitalCounts(
|
||||
discharged = 3,
|
||||
observation = 0
|
||||
)
|
||||
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
|
||||
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
|
||||
assertEquals(0, charlie.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||
* state) on a responding node.
|
||||
*
|
||||
* The exception is thrown 4 times.
|
||||
*
|
||||
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
|
||||
*
|
||||
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
|
||||
* retrying the flow after it failed to commit it's original checkpoint.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* fails and is kept for in for observation.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() {
|
||||
startDriver {
|
||||
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
|
||||
val alice = createNode(ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 4
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on getCheckpoint
|
||||
INTERFACE ${CheckpointStorage::class.java.name}
|
||||
METHOD getCheckpoint
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
|
||||
assertFailsWith<TimeoutException> {
|
||||
alice.rpc.startFlow(
|
||||
StateMachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
}
|
||||
|
||||
charlie.rpc.assertHospitalCounts(
|
||||
discharged = 3,
|
||||
observation = 1
|
||||
)
|
||||
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
|
||||
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
|
||||
assertEquals(0, charlie.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
|
||||
}
|
||||
}
|
||||
}
|
@ -4,12 +4,15 @@ import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.statemachine.transitions.TopLevelTransition
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.CHARLIE_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeoutException
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
@ -17,6 +20,10 @@ import kotlin.test.assertFailsWith
|
||||
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
|
||||
class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
|
||||
private companion object {
|
||||
val executor: ExecutorService = Executors.newSingleThreadExecutor()
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.SendInitial] action.
|
||||
*
|
||||
@ -34,15 +41,15 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeSendInitial
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeSendInitial action
|
||||
RULE Throw exception on executeSendMultiple action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeSendInitial
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 4
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
@ -86,15 +93,15 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeSendInitial
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeSendInitial action
|
||||
RULE Throw exception on executeSendMultiple action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeSendInitial
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 3
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
@ -464,6 +471,134 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||
* state).
|
||||
*
|
||||
* The exception is thrown 3 times.
|
||||
*
|
||||
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
|
||||
*
|
||||
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
|
||||
* retrying the flow after it failed to commit it's original checkpoint.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* succeeds and the flow finishes.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `flow can be retried when there is a transient connection error to the database`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val (alice, port) = createBytemanNode(ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 3
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on getCheckpoint
|
||||
INTERFACE ${CheckpointStorage::class.java.name}
|
||||
METHOD getCheckpoint
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
|
||||
alice.rpc.startFlow(
|
||||
StateMachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
alice.rpc.assertHospitalCounts(
|
||||
discharged = 3,
|
||||
observation = 0
|
||||
)
|
||||
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
|
||||
assertEquals(0, alice.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||
* state).
|
||||
*
|
||||
* The exception is thrown 4 times.
|
||||
*
|
||||
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
|
||||
*
|
||||
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
|
||||
* retrying the flow after it failed to commit it's original checkpoint.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* fails and is kept for in for observation.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val (alice, port) = createBytemanNode(ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 4
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on getCheckpoint
|
||||
INTERFACE ${CheckpointStorage::class.java.name}
|
||||
METHOD getCheckpoint
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
|
||||
executor.execute {
|
||||
alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
|
||||
}
|
||||
|
||||
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
|
||||
Thread.sleep(30.seconds.toMillis())
|
||||
|
||||
alice.rpc.assertHospitalCounts(
|
||||
discharged = 3,
|
||||
observation = 1
|
||||
)
|
||||
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
|
||||
assertEquals(0, alice.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing on a responding node.
|
||||
*
|
||||
|
@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.KilledFlowException
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.messaging.startTrackedFlow
|
||||
@ -30,21 +31,9 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during transition due to an InterruptedException (killFlow) will terminate the flow`() {
|
||||
fun `error during transition due to killing a flow will terminate the flow`() {
|
||||
startDriver {
|
||||
val (alice, port) = createBytemanNode(ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Increment terminal counter
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT READ TERMINAL
|
||||
IF true
|
||||
DO traceln("Byteman test - terminal")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
val alice = createNode(ALICE_NAME)
|
||||
|
||||
val flow = alice.rpc.startTrackedFlow(StateMachineKillFlowErrorHandlingTest::SleepFlow)
|
||||
|
||||
@ -56,14 +45,10 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
}
|
||||
}
|
||||
|
||||
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
|
||||
|
||||
val output = getBytemanOutput(alice)
|
||||
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(20.seconds) }
|
||||
|
||||
assertTrue(flowKilled)
|
||||
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
|
||||
assertEquals(1, numberOfTerminalDiagnoses)
|
||||
alice.rpc.assertHospitalCounts(propagated = 1)
|
||||
alice.rpc.assertHospitalCountsAllZero()
|
||||
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
|
||||
alice.rpc.assertNumberOfCheckpoints(0)
|
||||
}
|
||||
@ -97,7 +82,7 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
}
|
||||
}
|
||||
|
||||
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(30.seconds) }
|
||||
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(30.seconds) }
|
||||
|
||||
assertTrue(flowKilled)
|
||||
alice.rpc.assertHospitalCountsAllZero()
|
||||
@ -124,15 +109,15 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeSendInitial
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeSendInitial action
|
||||
RULE Throw exception on executeSendMultiple action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeSendInitial
|
||||
METHOD executeSendMultiple
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 4
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
|
Loading…
x
Reference in New Issue
Block a user