mirror of
https://github.com/corda/corda.git
synced 2025-04-08 03:44:49 +00:00
CORDA-3994 Retry errors in flow init started with client ids (#6643)
Flows that were started with a client id would hang because it would retrieve the existing flow's future and wait for it to finish. But, because the flow has failed its flow init and not saved its initial checkpoint, it is relying on `startFlow` to start the flow again (by redelivering the start flow external event). `FlowWithClientIdStatus` now holds the flow id that it is related to. This is then checked in `startFlow`. If a matching client id is found for a flow start, it then checks the flow id as well. If the flow id matches, then it lets the `startFlow` call continue, allowing it to actually start the flow again (how a flow without a client id would retry in this situation).
This commit is contained in:
parent
854e6638ff
commit
949489a117
@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.messaging.startFlowWithClientId
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
@ -387,6 +388,273 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
|
||||
* (remains in an unstarted state).
|
||||
*
|
||||
* The exception is thrown 3 times.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* succeeds and the flow finishes.
|
||||
*
|
||||
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
|
||||
*
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `with client id - error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
|
||||
startDriver {
|
||||
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 3
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
|
||||
alice.rpc.startFlowWithClientId(
|
||||
"here is my client id",
|
||||
StateMachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
alice.rpc.assertNumberOfCheckpoints(completed = 1)
|
||||
alice.rpc.assertHospitalCounts(discharged = 3)
|
||||
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when calling [FlowStateMachineImpl.processEvent].
|
||||
*
|
||||
* This is not an expected place for an exception to occur, but allows us to test what happens when a random exception is propagated
|
||||
* up to [FlowStateMachineImpl.run] during flow initialisation.
|
||||
*
|
||||
* A "Transaction context is missing" exception is thrown due to where the exception is thrown (no transaction is created so this is
|
||||
* thrown when leaving [FlowStateMachineImpl.processEventsUntilFlowIsResumed] due to the finally block).
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `with client id - unexpected error during flow initialisation throws exception to client`() {
|
||||
startDriver {
|
||||
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${FlowStateMachineImpl::class.java.name}
|
||||
METHOD processEvent
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception
|
||||
CLASS ${FlowStateMachineImpl::class.java.name}
|
||||
METHOD processEvent
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 1
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
|
||||
assertFailsWith<CordaRuntimeException> {
|
||||
alice.rpc.startFlowWithClientId(
|
||||
"give me all of your client ids, or else",
|
||||
StateMachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
|
||||
alice.rpc.assertNumberOfCheckpoints(failed = 1)
|
||||
alice.rpc.assertHospitalCounts(propagated = 1)
|
||||
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
|
||||
* (remains in an unstarted state).
|
||||
*
|
||||
* The exception is thrown 4 times.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
|
||||
*
|
||||
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
|
||||
*/
|
||||
@Test(timeout = 450_000)
|
||||
fun `with client id - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
|
||||
startDriver {
|
||||
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 4
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
|
||||
executor.execute {
|
||||
alice.rpc.startFlowWithClientId(
|
||||
"please sir, can i have a client id?",
|
||||
StateMachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
)
|
||||
}
|
||||
|
||||
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
|
||||
Thread.sleep(30.seconds.toMillis())
|
||||
|
||||
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
|
||||
alice.rpc.assertHospitalCounts(
|
||||
discharged = 3,
|
||||
observation = 1
|
||||
)
|
||||
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
|
||||
val terminated = (alice as OutOfProcessImpl).stop(60.seconds)
|
||||
assertTrue(terminated, "The node must be shutdown before it can be restarted")
|
||||
val (alice2, _) = createBytemanNode(ALICE_NAME)
|
||||
Thread.sleep(20.seconds.toMillis())
|
||||
alice2.rpc.assertNumberOfCheckpoints(completed = 1)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
|
||||
* (remains in an unstarted state).
|
||||
*
|
||||
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
|
||||
* exception is then thrown during the retry itself.
|
||||
*
|
||||
* The flow then retries the retry causing the flow to complete successfully.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `with client id - error during retrying a flow that failed when committing its original checkpoint will retry the flow again and complete successfully`() {
|
||||
startDriver {
|
||||
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Throw exception on executeCommitTransaction action after first suspend + commit
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF !flagged("commit_exception_flag")
|
||||
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on retry
|
||||
CLASS $stateMachineManagerClassName
|
||||
METHOD onExternalStartFlow
|
||||
AT ENTRY
|
||||
IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")
|
||||
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
|
||||
alice.rpc.startFlowWithClientId(
|
||||
"hi, i'd like to be your client id",
|
||||
StateMachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
alice.rpc.assertNumberOfCheckpoints(completed = 1)
|
||||
alice.rpc.assertHospitalCounts(
|
||||
discharged = 1,
|
||||
dischargedRetry = 1
|
||||
)
|
||||
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state).
|
||||
* This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node.
|
||||
*
|
||||
* The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier
|
||||
* to test.
|
||||
*
|
||||
* The exception is thrown 3 times.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* succeeds and the flow finishes.
|
||||
*
|
||||
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
|
||||
*
|
||||
* The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false
|
||||
* at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state.
|
||||
*
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `with client id - error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() {
|
||||
startDriver {
|
||||
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeSignalFlowHasStarted action
|
||||
CLASS $actionExecutorClassName
|
||||
METHOD executeSignalFlowHasStarted
|
||||
# METHOD executeAcknowledgeMessages
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 3
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("i wish i was a sql exception")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules, port)
|
||||
|
||||
alice.rpc.startFlowWithClientId(
|
||||
"hello im a client id",
|
||||
StateMachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
alice.rpc.assertNumberOfCheckpoints(completed = 1)
|
||||
alice.rpc.assertHospitalCounts(discharged = 3)
|
||||
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
|
||||
* saved its first checkpoint (remains in an unstarted state).
|
||||
|
@ -15,14 +15,20 @@ interface CheckpointStorage {
|
||||
/**
|
||||
* Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id
|
||||
*/
|
||||
fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>,
|
||||
serializedCheckpointState: SerializedBytes<CheckpointState>)
|
||||
fun addCheckpoint(
|
||||
id: StateMachineRunId, checkpoint: Checkpoint,
|
||||
serializedFlowState: SerializedBytes<FlowState>?,
|
||||
serializedCheckpointState: SerializedBytes<CheckpointState>
|
||||
)
|
||||
|
||||
/**
|
||||
* Update an existing checkpoint. Will throw if there is not checkpoint for this id.
|
||||
*/
|
||||
fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>?,
|
||||
serializedCheckpointState: SerializedBytes<CheckpointState>)
|
||||
fun updateCheckpoint(
|
||||
id: StateMachineRunId, checkpoint: Checkpoint,
|
||||
serializedFlowState: SerializedBytes<FlowState>?,
|
||||
serializedCheckpointState: SerializedBytes<CheckpointState>
|
||||
)
|
||||
|
||||
/**
|
||||
* Update an existing checkpoints status ([Checkpoint.status]).
|
||||
|
@ -373,7 +373,7 @@ class DBCheckpointStorage(
|
||||
override fun addCheckpoint(
|
||||
id: StateMachineRunId,
|
||||
checkpoint: Checkpoint,
|
||||
serializedFlowState: SerializedBytes<FlowState>,
|
||||
serializedFlowState: SerializedBytes<FlowState>?,
|
||||
serializedCheckpointState: SerializedBytes<CheckpointState>
|
||||
) {
|
||||
val now = clock.instant()
|
||||
|
@ -94,10 +94,7 @@ internal class ActionExecutorImpl(
|
||||
if (action.isCheckpointUpdate) {
|
||||
checkpointStorage.updateCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState)
|
||||
} else {
|
||||
if (flowState is FlowState.Finished) {
|
||||
throw IllegalStateException("A new checkpoint cannot be created with a finished flow state.")
|
||||
}
|
||||
checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState!!, serializedCheckpointState)
|
||||
checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -199,13 +199,14 @@ internal class SingleThreadedStateMachineManager(
|
||||
// - Incompatible checkpoints need to be handled upon implementing CORDA-3897
|
||||
for (flow in fibers.values) {
|
||||
flow.fiber.clientId?.let {
|
||||
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(doneFuture(flow.fiber))
|
||||
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(flow.fiber.id, doneFuture(flow.fiber))
|
||||
}
|
||||
}
|
||||
|
||||
for (pausedFlow in pausedFlows) {
|
||||
pausedFlow.value.checkpoint.checkpointState.invocationContext.clientId?.let {
|
||||
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(
|
||||
pausedFlow.key,
|
||||
doneClientIdFuture(pausedFlow.key, pausedFlow.value.resultFuture, it)
|
||||
)
|
||||
}
|
||||
@ -311,17 +312,20 @@ internal class SingleThreadedStateMachineManager(
|
||||
status
|
||||
} else {
|
||||
newFuture = openFuture()
|
||||
FlowWithClientIdStatus.Active(newFuture!!)
|
||||
FlowWithClientIdStatus.Active(flowId, newFuture!!)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flow -started with client id- already exists, return the existing's flow future and don't start a new flow.
|
||||
existingStatus?.let {
|
||||
val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
|
||||
return@startFlow uncheckedCast(existingFuture)
|
||||
}
|
||||
onClientIDNotFound?.invoke()
|
||||
// If the flow ID is the same as the one recorded in the client ID map,
|
||||
// then this start flow event has been retried, and we should not de-duplicate.
|
||||
if (flowId != it.flowId) {
|
||||
val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
|
||||
return@startFlow uncheckedCast(existingFuture)
|
||||
}
|
||||
} ?: onClientIDNotFound?.invoke()
|
||||
}
|
||||
|
||||
return try {
|
||||
|
@ -417,9 +417,13 @@ sealed class SubFlowVersion {
|
||||
data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion()
|
||||
}
|
||||
|
||||
sealed class FlowWithClientIdStatus {
|
||||
data class Active(val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>) : FlowWithClientIdStatus()
|
||||
data class Removed(val flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus()
|
||||
sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId) {
|
||||
class Active(
|
||||
flowId: StateMachineRunId,
|
||||
val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>
|
||||
) : FlowWithClientIdStatus(flowId)
|
||||
|
||||
class Removed(flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus(flowId)
|
||||
}
|
||||
|
||||
data class FlowResultMetadata(
|
||||
|
Loading…
x
Reference in New Issue
Block a user