ENT-5649 Always load from db when flow retries ()

Always attempt to load a checkpoint from the database when a flow
retries.

This is to prevent transient errors where the checkpoint is committed to
the database but throws an error back to the node. When the node tries
to retry in this scenario, `isAnyCheckpointPersisted` is false, meaning
that it will try to insert when it tries to save its initial checkpoint
again.

By loading from the existing checkpoint, even though it doesn't
really use it because it is `Unstarted`, the flag gets put into the
right state and will update rather than insert later on.
This commit is contained in:
Dan Newton 2020-08-14 17:42:19 +01:00 committed by GitHub
parent 55133b02b9
commit c4027e23bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 41 deletions
node/src
integration-test-slow/kotlin/net/corda/node/services/statemachine
main/kotlin/net/corda/node/services/statemachine

@ -331,6 +331,62 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
}
}
/**
* Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state).
* This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node.
*
* The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier
* to test.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
* The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false
* at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state.
*
*/
@Test(timeout = 300_000)
fun `error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSignalFlowHasStarted action
CLASS $actionExecutorClassName
METHOD executeSignalFlowHasStarted
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("i wish i was a sql exception")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
* saved its first checkpoint (remains in an unstarted state).
@ -463,7 +519,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
IF createCounter("counter", $counter) && createCounter("counter_2", $counter)
DO traceln("Counter created")
ENDRULE
@ -479,8 +535,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
IF readCounter("counter_2") < 3
DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
@ -496,7 +552,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 0
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
@ -527,7 +583,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
IF createCounter("counter", $counter) && createCounter("counter_2", $counter)
DO traceln("Counter created")
ENDRULE
@ -543,8 +599,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
IF readCounter("counter_2") < 3
DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
@ -562,7 +618,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
observation = 1,
dischargedRetry = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)

@ -494,7 +494,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
IF createCounter("counter", $counter) && createCounter("counter_2", $counter)
DO traceln("Counter created")
ENDRULE
@ -510,8 +510,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
IF readCounter("counter_2") < 3
DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
@ -527,7 +527,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 0
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
@ -557,7 +557,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
IF createCounter("counter", $counter) && createCounter("counter_2", $counter)
DO traceln("Counter created")
ENDRULE
@ -573,8 +573,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
IF readCounter("counter_2") < 3
DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
@ -590,7 +590,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
observation = 1,
dischargedRetry = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
}

@ -495,40 +495,51 @@ internal class SingleThreadedStateMachineManager(
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val checkpoint = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return@transaction null
}
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
// Ignore [isAnyCheckpointPersisted] as the checkpoint could be committed but the flag remains un-updated
val checkpointLoadingStatus = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) ?: return@transaction CheckpointLoadingStatus.NotFound
val checkpoint = serializedCheckpoint.let {
tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
if (checkpointStorage.removeFlowException(flowId)) {
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
} else {
logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not be loaded and run.")
return@transaction null
// This code branch is being removed in a different PR
return@transaction CheckpointLoadingStatus.CouldNotDeserialize
}
}
} ?: return@transaction null
} ?: return
} ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize
}
// Resurrect flow
flowCreator.createFlowFromCheckpoint(
flowId,
checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
currentState.lock,
firstRestore = false
) ?: return
} else {
// Just flow initiation message
null
CheckpointLoadingStatus.Success(checkpoint)
}
val flow = when {
// Resurrect flow
checkpointLoadingStatus is CheckpointLoadingStatus.Success -> {
flowCreator.createFlowFromCheckpoint(
flowId,
checkpointLoadingStatus.checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
currentState.lock,
firstRestore = false
) ?: return
}
checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return
else -> {
// Just flow initiation message
null
}
}
innerState.withLock {
if (stopping) {
return
@ -758,8 +769,7 @@ internal class SingleThreadedStateMachineManager(
): CordaFuture<FlowStateMachine<A>> {
onCallingStartFlowInternal?.invoke()
val existingFlow = innerState.withLock { flows[flowId] }
val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState.isAnyCheckpointPersisted) {
val existingCheckpoint = if (innerState.withLock { flows[flowId] != null }) {
// Load the flow's checkpoint
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
@ -1098,4 +1108,10 @@ internal class SingleThreadedStateMachineManager(
}
return false
}
private sealed class CheckpointLoadingStatus {
class Success(val checkpoint: Checkpoint) : CheckpointLoadingStatus()
object NotFound : CheckpointLoadingStatus()
object CouldNotDeserialize : CheckpointLoadingStatus()
}
}