CORDA-3841 Check isAnyCheckpointPersisted in startFlowInternal (#6351)

Only hit the database if `StateMachineState.isAnyCheckpointPersisted`
returns true. Otherwise, there will be no checkpoint to retrieve from the
database anyway. This can prevent errors due to a transient loss of
connection to the database.
This commit is contained in:
Dan Newton
2020-06-16 09:22:26 +01:00
committed by GitHub
parent 4b63bab4fc
commit 7ab6a8f600
4 changed files with 489 additions and 47 deletions

View File

@ -4,6 +4,7 @@ import net.corda.client.rpc.CordaRPCClient
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.transitions.TopLevelTransition import net.corda.node.services.statemachine.transitions.TopLevelTransition
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
@ -11,6 +12,8 @@ import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity import net.corda.testing.core.singleIdentity
import org.junit.Ignore import org.junit.Ignore
import org.junit.Test import org.junit.Test
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
@ -18,6 +21,10 @@ import kotlin.test.assertFailsWith
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped @Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
private companion object {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
}
/** /**
* Throws an exception when performing an [Action.SendInitial] action. * Throws an exception when performing an [Action.SendInitial] action.
* The exception is thrown 4 times. * The exception is thrown 4 times.
@ -79,7 +86,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> { assertFailsWith<TimeoutException> {
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
} }
@ -158,7 +168,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
val aliceClient = val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
@ -238,7 +251,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
val aliceClient = val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
@ -323,7 +339,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
val aliceClient = val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
@ -411,7 +430,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> { assertFailsWith<TimeoutException> {
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
} }
@ -513,7 +535,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
val aliceClient = val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
@ -602,7 +627,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
val aliceClient = val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
@ -699,7 +727,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> { assertFailsWith<TimeoutException> {
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
} }
@ -798,7 +829,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
val aliceClient = val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
@ -883,7 +917,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> { assertFailsWith<TimeoutException> {
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
} }
@ -975,7 +1012,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> { assertFailsWith<TimeoutException> {
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
} }
@ -994,6 +1034,196 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
} }
} }
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state).
*
* The exception is thrown 5 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `flow can be retried when there is a transient connection error to the database`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state).
*
* The exception is thrown 7 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* fails and is kept for in for observation.
*/
@Test(timeout = 300_000)
fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 7
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
executor.execute {
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/** /**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding flow. The failure prevents the node from saving * Throws an exception when performing an [Action.CommitTransaction] event on a responding flow. The failure prevents the node from saving
* its original checkpoint. * its original checkpoint.
@ -1064,7 +1294,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
val charlieClient = val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
@ -1160,7 +1393,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> { assertFailsWith<TimeoutException> {
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
} }
@ -1258,7 +1494,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
val charlieClient = val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds 30.seconds
) )
@ -1278,4 +1517,202 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
} }
} }
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state) on a responding node.
*
* The exception is thrown 5 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `responding flow - session init can be retried when there is a transient connection error to the database`() {
startDriver {
val charlie = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 5
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(charlie)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state) on a responding node.
*
* The exception is thrown 7 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* fails and is kept for in for observation.
*/
@Test(timeout = 300_000)
fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() {
startDriver {
val charlie = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 7
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(
StatemachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
}
val output = getBytemanOutput(charlie)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
} }

View File

@ -66,6 +66,7 @@ class DBCheckpointStorage : CheckpointStorage {
return session.createQuery(delete).executeUpdate() > 0 return session.createQuery(delete).executeUpdate() > 0
} }
@Throws(SQLException::class)
override fun getCheckpoint(id: StateMachineRunId): SerializedBytes<Checkpoint>? { override fun getCheckpoint(id: StateMachineRunId): SerializedBytes<Checkpoint>? {
val bytes = currentDBSession().get(DBCheckpoint::class.java, id.uuid.toString())?.checkpoint ?: return null val bytes = currentDBSession().get(DBCheckpoint::class.java, id.uuid.toString())?.checkpoint ?: return null
return SerializedBytes(bytes) return SerializedBytes(bytes)

View File

@ -607,9 +607,8 @@ class SingleThreadedStateMachineManager(
val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion) val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
val flowAlreadyExists = mutex.locked { flows[flowId] != null } val existingFlow = mutex.locked { flows[flowId] }
val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState?.value?.isAnyCheckpointPersisted == true) {
val existingCheckpoint = if (flowAlreadyExists) {
// Load the flow's checkpoint // Load the flow's checkpoint
// The checkpoint will be missing if the flow failed before persisting the original checkpoint // The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
@ -617,8 +616,10 @@ class SingleThreadedStateMachineManager(
val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId) val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId)
if (checkpoint == null) { if (checkpoint == null) {
return openFuture<FlowStateMachine<A>>().mapError { return openFuture<FlowStateMachine<A>>().mapError {
IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " + IllegalStateException(
"Something is very wrong. The flow will not retry.") "Unable to deserialize database checkpoint for flow $flowId. " +
"Something is very wrong. The flow will not retry."
)
} }
} else { } else {
checkpoint checkpoint
@ -628,6 +629,7 @@ class SingleThreadedStateMachineManager(
// This is a brand new flow // This is a brand new flow
null null
} }
val checkpoint = existingCheckpoint ?: Checkpoint.create( val checkpoint = existingCheckpoint ?: Checkpoint.create(
invocationContext, invocationContext,
flowStart, flowStart,

View File

@ -146,6 +146,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
val payload = RejectSessionMessage(message, secureRandom.nextLong()) val payload = RejectSessionMessage(message, secureRandom.nextLong())
val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload) val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload)
log.info("Sending session initiation error back to $sender", error)
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
event.deduplicationHandler.afterDatabaseTransaction() event.deduplicationHandler.afterDatabaseTransaction()
} }