CORDA-3995 Redeliver external events if number of suspends differs (#6646)

* CORDA-3995 Redeliver external events in number of suspends differs

When retrying a flow, only redeliver external events held in a flow's
pending deduplication handlers if there is a difference in the
`numberOfSuspends` on the `currentState`'s checkpoint or the checkpoint
in the database.

If the checkpoint committed, but the flow retried, then the external
events would have been persisted to the database as part of the same
transaction. Therefore there is no need to replay them, as they have
already been processed as saved as part of the checkpoint.

This change is only relevant when the checkpoint persists, but the flow
still needs to retry after this occurs (within the same
transition/event).

* CORDA-3995 Redeliver external events in number of commits differs

When retrying a flow, only redeliver external events held in a flow's
pending deduplication handlers if there is a difference in the
`numberOfCommits` on the `currentState`'s checkpoint or the checkpoint
in the database.

If the checkpoint committed, but the flow retried, then the external
events would have been persisted to the database as part of the same
transaction. Therefore there is no need to replay them, as they have
already been processed as saved as part of the checkpoint.

This change is only relevant when the checkpoint persists, but the flow
still needs to retry after this occurs (within the same
transition/event).

* CORDA-3995 Redeliver external events if number of commits differs

When retrying a flow, only redeliver external events held in a flow's
pending deduplication handlers if there is a difference in the
`currentState`'s `numberOfCommits` or the `numberOfCommits`
the checkpoint has recorded in the database.

If the checkpoint committed, but the flow retried, then the external
events would have been persisted to the database as part of the same
transaction. Therefore there is no need to replay them, as they have
already been processed as saved as part of the checkpoint.

This change is only relevant when the checkpoint persists, but the flow
still needs to retry after this occurs (within the same
transition/event).

* Add @Suspendable to a test flow.

I am surprised this worked at all.

* Fix a few minor things based on review.

Co-authored-by: Will Vigor <william.vigor@r3.com>
This commit is contained in:
Dan Newton 2020-08-25 11:54:55 +01:00 committed by GitHub
parent 133e6fe39a
commit 99f835bb4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 371 additions and 172 deletions

View File

@ -73,6 +73,7 @@ class DatabaseTransaction(
firstExceptionInDatabaseTransaction = null
}
@Throws(SQLException::class)
fun commit() {
firstExceptionInDatabaseTransaction?.let {
throw DatabaseTransactionException(it)

View File

@ -112,11 +112,10 @@ abstract class StateMachineErrorHandlingTest {
submit.addScripts(listOf(ScriptText("Test script", rules)))
}
internal fun getBytemanOutput(nodeHandle: NodeHandle): List<String> {
return nodeHandle.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
private fun NodeHandle.getBytemanOutput(): List<String> {
return baseDirectory.list()
.filter { "net.corda.node.Corda" in it.toString() && "stdout.log" in it.toString() }
.flatMap { it.readAllLines() }
}
internal fun OutOfProcessImpl.stop(timeout: Duration): Boolean {
@ -126,6 +125,10 @@ abstract class StateMachineErrorHandlingTest {
}.also { onStopCallback() }
}
internal fun NodeHandle.assertBytemanOutput(string: String, count: Int) {
assertEquals(count, getBytemanOutput().filter { string in it }.size)
}
@Suppress("LongParameterList")
internal fun CordaRPCOps.assertHospitalCounts(
discharged: Int = 0,
@ -246,6 +249,7 @@ abstract class StateMachineErrorHandlingTest {
// Internal use for testing only!!
@StartableByRPC
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
@Suspendable
override fun call(): HospitalCounts =
HospitalCounts(
serviceHub.cordaService(HospitalCounter::class.java).dischargedCounter,

View File

@ -6,6 +6,7 @@ import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.api.CheckpointStorage
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
@ -59,6 +60,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -70,6 +79,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
30.seconds
)
alice.assertBytemanOutput("External start flow event", 4)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
@ -257,6 +267,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -268,6 +286,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.assertBytemanOutput("External start flow event", 4)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
@ -364,12 +383,28 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSignalFlowHasStarted action
RULE Flag when commit transaction reached
CLASS $actionExecutorClassName
METHOD executeSignalFlowHasStarted
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("i wish i was a sql exception")
IF true
DO flag("commit")
ENDRULE
RULE Throw exception on executeSignalFlowHasStarted action
CLASS ${DatabaseTransaction::class.java.name}
METHOD commit
AT EXIT
IF readCounter("counter") < 3 && flagged("commit")
DO incrementCounter("counter"); clear("commit"); traceln("Throwing exception"); throw new java.sql.SQLException("you thought it worked didnt you!", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
@ -382,6 +417,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
30.seconds
)
alice.assertBytemanOutput("External start flow event", 1)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
@ -421,6 +457,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -433,6 +477,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
30.seconds
)
alice.assertBytemanOutput("External start flow event", 4)
alice.rpc.assertNumberOfCheckpoints(completed = 1)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
@ -517,6 +562,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -532,6 +585,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.assertBytemanOutput("External start flow event", 4)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
@ -629,13 +683,28 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSignalFlowHasStarted action
RULE Flag when commit transaction reached
CLASS $actionExecutorClassName
METHOD executeSignalFlowHasStarted
# METHOD executeAcknowledgeMessages
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("i wish i was a sql exception")
IF true
DO flag("commit")
ENDRULE
RULE Throw exception on executeSignalFlowHasStarted action
CLASS ${DatabaseTransaction::class.java.name}
METHOD commit
AT EXIT
IF readCounter("counter") < 3 && flagged("commit")
DO incrementCounter("counter"); clear("commit"); traceln("Throwing exception"); throw new java.sql.SQLException("you thought it worked didnt you!", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
@ -649,6 +718,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
30.seconds
)
alice.assertBytemanOutput("External start flow event", 1)
alice.rpc.assertNumberOfCheckpoints(completed = 1)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
@ -687,6 +757,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log session init event
CLASS $stateMachineManagerClassName
METHOD onSessionInit
AT ENTRY
IF true
DO traceln("On session init event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -699,6 +777,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
)
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.assertBytemanOutput("On session init event", 4)
charlie.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(discharged = 3)
}
@ -735,6 +814,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log session init event
CLASS $stateMachineManagerClassName
METHOD onSessionInit
AT ENTRY
IF true
DO traceln("On session init event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -747,6 +834,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
Thread.sleep(30.seconds.toMillis())
alice.rpc.assertNumberOfCheckpoints(runnable = 1)
charlie.assertBytemanOutput("On session init event", 4)
charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
charlie.rpc.assertHospitalCounts(
discharged = 3,
@ -893,4 +981,77 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state).
* This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node.
*
* The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier
* to test.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
* The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false
* at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state.
*
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() {
startDriver {
val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Flag when commit transaction reached
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF true
DO flag("commit")
ENDRULE
RULE Throw exception on executeSignalFlowHasStarted action
CLASS ${DatabaseTransaction::class.java.name}
METHOD commit
AT EXIT
IF readCounter("counter") < 3 && flagged("commit")
DO incrementCounter("counter"); clear("commit"); traceln("Throwing exception"); throw new java.sql.SQLException("you thought it worked didnt you!", "1")
ENDRULE
RULE Log session init event
CLASS $stateMachineManagerClassName
METHOD onSessionInit
AT ENTRY
IF true
DO traceln("On session init event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.assertBytemanOutput("On session init event", 1)
charlie.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(discharged = 3)
}
}
}

View File

@ -513,6 +513,14 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter_2") < 3
DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)

View File

@ -145,7 +145,7 @@ sealed class Action {
/**
* Commit the current database transaction.
*/
object CommitTransaction : Action() {
data class CommitTransaction(val currentState: StateMachineState) : Action() {
override fun toString() = "CommitTransaction"
}

View File

@ -61,7 +61,7 @@ internal class ActionExecutorImpl(
is Action.RemoveFlow -> executeRemoveFlow(action)
is Action.CreateTransaction -> executeCreateTransaction()
is Action.RollbackTransaction -> executeRollbackTransaction()
is Action.CommitTransaction -> executeCommitTransaction()
is Action.CommitTransaction -> executeCommitTransaction(action)
is Action.ExecuteAsyncOperation -> executeAsyncOperation(fiber, action)
is Action.ReleaseSoftLocks -> executeReleaseSoftLocks(action)
is Action.RetryFlowFromSafePoint -> executeRetryFlowFromSafePoint(action)
@ -219,13 +219,14 @@ internal class ActionExecutorImpl(
@Suspendable
@Throws(SQLException::class)
private fun executeCommitTransaction() {
private fun executeCommitTransaction(action: Action.CommitTransaction) {
try {
contextTransaction.commit()
} finally {
contextTransaction.close()
contextTransactionOrNull = null
}
action.currentState.run { numberOfCommits = checkpoint.checkpointState.numberOfCommits }
}
@Suppress("TooGenericExceptionCaught")

View File

@ -103,6 +103,7 @@ class FlowCreator(
updateCompatibleInDb(runId, true)
checkpoint = checkpoint.copy(compatible = true)
}
checkpoint = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
fiber.logic.stateMachine = fiber
@ -114,6 +115,7 @@ class FlowCreator(
anyCheckpointPersisted = true,
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount
?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null,
numberOfCommits = checkpoint.checkpointState.numberOfCommits,
lock = lock
)
injectOldProgressTracker(progressTracker, fiber.logic)
@ -161,6 +163,7 @@ class FlowCreator(
fiber = flowStateMachineImpl,
anyCheckpointPersisted = existingCheckpoint != null,
reloadCheckpointAfterSuspendCount = if (reloadCheckpointAfterSuspend) 0 else null,
numberOfCommits = existingCheckpoint?.checkpointState?.numberOfCommits ?: 0,
lock = Semaphore(1),
deduplicationHandler = deduplicationHandler,
senderUUID = senderUUID
@ -242,6 +245,7 @@ class FlowCreator(
fiber: FlowStateMachineImpl<*>,
anyCheckpointPersisted: Boolean,
reloadCheckpointAfterSuspendCount: Int?,
numberOfCommits: Int,
lock: Semaphore,
deduplicationHandler: DeduplicationHandler? = null,
senderUUID: String? = null
@ -259,6 +263,7 @@ class FlowCreator(
flowLogic = fiber.logic,
senderUUID = senderUUID,
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount,
numberOfCommits = numberOfCommits,
lock = lock
)
}

View File

@ -511,10 +511,11 @@ internal class SingleThreadedStateMachineManager(
CheckpointLoadingStatus.Success(checkpoint)
}
val flow = when {
val (flow, numberOfCommitsFromCheckpoint) = when {
// Resurrect flow
checkpointLoadingStatus is CheckpointLoadingStatus.Success -> {
flowCreator.createFlowFromCheckpoint(
val numberOfCommitsFromCheckpoint = checkpointLoadingStatus.checkpoint.checkpointState.numberOfCommits
val flow = flowCreator.createFlowFromCheckpoint(
flowId,
checkpointLoadingStatus.checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
@ -522,6 +523,7 @@ internal class SingleThreadedStateMachineManager(
firstRestore = false,
progressTracker = currentState.flowLogic.progressTracker
) ?: return
flow to numberOfCommitsFromCheckpoint
}
checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
@ -530,7 +532,7 @@ internal class SingleThreadedStateMachineManager(
checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return
else -> {
// Just flow initiation message
null
null to -1
}
}
@ -545,7 +547,8 @@ internal class SingleThreadedStateMachineManager(
if (flow != null) {
addAndStartFlow(flowId, flow)
}
extractAndScheduleEventsForRetry(oldFlowLeftOver, currentState)
extractAndScheduleEventsForRetry(oldFlowLeftOver, currentState, numberOfCommitsFromCheckpoint)
}
}
@ -571,13 +574,27 @@ internal class SingleThreadedStateMachineManager(
/**
* Extract all the incomplete deduplication handlers as well as the [ExternalEvent] and [Event.Pause] events from this flows event queue
* [oldEventQueue]. Then schedule them (in the same order) for the new flow. This means that if a retried flow has a pause event
* scheduled then the retried flow will eventually pause. The new flow will not retry again if future retry events have been scheduled.
* When this method is called this flow must have been replaced by the new flow in [StateMachineInnerState.flows]. This method differs
* from [extractAndQueueExternalEventsForPausedFlow] where (only) [externalEvents] are extracted and scheduled straight away.
* Extract all the (unpersisted) incomplete deduplication handlers [currentState.pendingDeduplicationHandlers], as well as the
* [ExternalEvent] and [Event.Pause] events from this flows event queue [oldEventQueue]. Then schedule them (in the same order) for the
* new flow. This means that if a retried flow has a pause event scheduled then the retried flow will eventually pause. The new flow
* will not retry again if future retry events have been scheduled. When this method is called this flow must have been replaced by the
* new flow in [StateMachineInnerState.flows].
*
* This method differs from [extractAndQueueExternalEventsForPausedFlow] where (only) [externalEvents] are extracted and scheduled
* straight away.
*
* @param oldEventQueue The old event queue of the flow/fiber to unprocessed extract events from
*
* @param currentState The current state of the flow, used to extract processed events (held in [StateMachineState.pendingDeduplicationHandlers])
*
* @param numberOfCommitsFromCheckpoint The number of commits that the checkpoint loaded from the database has, to compare to the
* commits the flow has currently reached
*/
private fun extractAndScheduleEventsForRetry(oldEventQueue: Channel<Event>, currentState: StateMachineState) {
private fun extractAndScheduleEventsForRetry(
oldEventQueue: Channel<Event>,
currentState: StateMachineState,
numberOfCommitsFromCheckpoint: Int
) {
val flow = innerState.withLock {
flows[currentState.flowLogic.runId]
}
@ -587,9 +604,13 @@ internal class SingleThreadedStateMachineManager(
if (event is Event.Pause || event is Event.GeneratedByExternalEvent) events.add(event)
} while (event != null)
// Only redeliver events if they were not persisted to the database
if (currentState.numberOfCommits >= numberOfCommitsFromCheckpoint) {
for (externalEvent in currentState.pendingDeduplicationHandlers) {
deliverExternalEvent(externalEvent.externalCause)
}
}
for (event in events) {
if (event is Event.GeneratedByExternalEvent) {
deliverExternalEvent(event.deduplicationHandler.externalCause)

View File

@ -49,6 +49,8 @@ import java.util.concurrent.Semaphore
* @param senderUUID the identifier of the sending state machine or null if this flow is resumed from a checkpoint so that it does not participate in de-duplication high-water-marking.
* @param reloadCheckpointAfterSuspendCount The number of times a flow has been reloaded (not retried). This is [null] when
* [NodeConfiguration.reloadCheckpointAfterSuspendCount] is not enabled.
* @param numberOfCommits The number of times the flow's checkpoint has been successfully committed. This field is a var so that it can be
* updated after committing a database transaction that contained a checkpoint insert/update.
* @param lock The flow's lock, used to prevent the flow performing a transition while being interacted with from external threads, and
* vise-versa.
*/
@ -67,6 +69,7 @@ data class StateMachineState(
val isKilled: Boolean,
val senderUUID: String?,
val reloadCheckpointAfterSuspendCount: Int?,
var numberOfCommits: Int,
val lock: Semaphore
) : KryoSerializable {
override fun write(kryo: Kryo?, output: Output?) {
@ -129,7 +132,9 @@ data class Checkpoint(
emptyMap(),
emptySet(),
listOf(topLevelSubFlow),
numberOfSuspends = 0
numberOfSuspends = 0,
// We set this to 1 here to avoid an extra copy and increment in UnstartedFlowTransition.createInitialCheckpoint
numberOfCommits = 1
),
flowState = FlowState.Unstarted(flowStart, frozenFlowLogic),
errorState = ErrorState.Clean
@ -229,12 +234,13 @@ data class Checkpoint(
}
/**
* @param invocationContext the initiator of the flow.
* @param ourIdentity the identity the flow is run as.
* @param sessions map of source session ID to session state.
* @param sessionsToBeClosed the sessions that have pending session end messages and need to be closed. This is available to avoid scanning all the sessions.
* @param subFlowStack the stack of currently executing subflows.
* @param numberOfSuspends the number of flow suspends due to IO API calls.
* @param invocationContext The initiator of the flow.
* @param ourIdentity The identity the flow is run as.
* @param sessions Map of source session ID to session state.
* @param sessionsToBeClosed The sessions that have pending session end messages and need to be closed. This is available to avoid scanning all the sessions.
* @param subFlowStack The stack of currently executing subflows.
* @param numberOfSuspends The number of flow suspends due to IO API calls.
* @param numberOfCommits The number of times this checkpoint has been persisted.
*/
@CordaSerializable
data class CheckpointState(
@ -243,7 +249,8 @@ data class CheckpointState(
val sessions: SessionMap, // This must preserve the insertion order!
val sessionsToBeClosed: Set<SessionId>,
val subFlowStack: List<SubFlow>,
val numberOfSuspends: Int
val numberOfSuspends: Int,
val numberOfCommits: Int
)
/**

View File

@ -49,7 +49,7 @@ class ErrorFlowTransition(
checkpointState = startingState.checkpoint.checkpointState.copy(sessions = newSessions)
)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID))
actions += Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID)
}
// If we're errored but not propagating keep processing events.
@ -59,32 +59,38 @@ class ErrorFlowTransition(
// If we haven't been removed yet remove the flow.
if (!currentState.isRemoved) {
val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
val removeOrPersistCheckpoint = if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
Action.RemoveCheckpoint(context.id)
} else {
Action.PersistCheckpoint(context.id, newCheckpoint.copy(flowState = FlowState.Finished), isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
}
actions.addAll(arrayOf(
Action.CreateTransaction,
removeOrPersistCheckpoint,
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(context.id.uuid),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.RemoveSessionBindings(currentState.checkpoint.checkpointState.sessions.keys)
))
val newCheckpoint = startingState.checkpoint.copy(
status = Checkpoint.FlowStatus.FAILED,
flowState = FlowState.Finished,
checkpointState = startingState.checkpoint.checkpointState.copy(
numberOfCommits = startingState.checkpoint.checkpointState.numberOfCommits + 1
)
)
currentState = currentState.copy(
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isRemoved = true
)
val removalReason = FlowRemovalReason.ErrorFinish(allErrors)
actions.add(Action.RemoveFlow(context.id, removalReason, currentState))
val removeOrPersistCheckpoint = if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
Action.RemoveCheckpoint(context.id)
} else {
Action.PersistCheckpoint(
context.id,
newCheckpoint,
isCheckpointUpdate = currentState.isAnyCheckpointPersisted
)
}
actions += Action.CreateTransaction
actions += removeOrPersistCheckpoint
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.ReleaseSoftLocks(context.id.uuid)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
actions += Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys)
actions += Action.RemoveFlow(context.id, FlowRemovalReason.ErrorFinish(allErrors), currentState)
FlowContinuation.Abort
} else {
// Otherwise keep processing events. This branch happens when there are some outstanding initiating

View File

@ -29,39 +29,31 @@ class KilledFlowTransition(
startingState.checkpoint.checkpointState.sessions,
errorMessages
)
val newCheckpoint = startingState.checkpoint.setSessions(sessions = newSessions)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(
Action.PropagateErrors(
currentState = currentState.copy(
checkpoint = startingState.checkpoint.setSessions(sessions = newSessions),
pendingDeduplicationHandlers = emptyList(),
isRemoved = true
)
actions += Action.PropagateErrors(
errorMessages,
initiatedSessions,
startingState.senderUUID
)
)
if (!startingState.isFlowResumed) {
actions.add(Action.CreateTransaction)
actions += Action.CreateTransaction
}
// The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow]
if (startingState.isAnyCheckpointPersisted) {
actions.add(Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true))
actions += Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true)
}
actions.addAll(
arrayOf(
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(context.id.uuid),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.RemoveSessionBindings(currentState.checkpoint.checkpointState.sessions.keys)
)
)
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.ReleaseSoftLocks(context.id.uuid)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
actions += Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys)
actions += Action.RemoveFlow(context.id, createKilledRemovalReason(killedFlowError), currentState)
currentState = currentState.copy(
pendingDeduplicationHandlers = emptyList(),
isRemoved = true
)
actions.add(Action.RemoveFlow(context.id, createKilledRemovalReason(killedFlowError), currentState))
FlowContinuation.Abort
}
}

View File

@ -121,7 +121,7 @@ class StartedFlowTransition(
actions = listOf(
Action.CreateTransaction,
Action.TrackTransaction(flowIORequest.hash, state),
Action.CommitTransaction
Action.CommitTransaction(state)
)
)
} else {

View File

@ -189,15 +189,16 @@ class TopLevelTransition(
private fun suspendTransition(event: Event.Suspend): TransitionResult {
return builder {
val newCheckpoint = currentState.checkpoint.run {
val newCheckpointState = if (checkpointState.invocationContext.arguments!!.isNotEmpty()) {
checkpointState.copy(
invocationContext = checkpointState.invocationContext.copy(arguments = emptyList()),
numberOfSuspends = checkpointState.numberOfSuspends + 1
)
val newCheckpoint = startingState.checkpoint.run {
val newCheckpointState = checkpointState.copy(
invocationContext = if (checkpointState.invocationContext.arguments!!.isNotEmpty()) {
checkpointState.invocationContext.copy(arguments = emptyList())
} else {
checkpointState.copy(numberOfSuspends = checkpointState.numberOfSuspends + 1)
}
checkpointState.invocationContext
},
numberOfSuspends = checkpointState.numberOfSuspends + 1,
numberOfCommits = checkpointState.numberOfCommits + 1
)
copy(
flowState = FlowState.Started(event.ioRequest, event.fiber),
checkpointState = newCheckpointState,
@ -206,29 +207,26 @@ class TopLevelTransition(
)
}
if (event.maySkipCheckpoint) {
actions.addAll(arrayOf(
Action.CommitTransaction,
Action.ScheduleEvent(Event.DoRemainingWork)
))
currentState = currentState.copy(
currentState = startingState.copy(
checkpoint = newCheckpoint,
isFlowResumed = false
)
actions += Action.CommitTransaction(currentState)
actions += Action.ScheduleEvent(Event.DoRemainingWork)
} else {
actions.addAll(arrayOf(
Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.ScheduleEvent(Event.DoRemainingWork)
))
currentState = currentState.copy(
currentState = startingState.copy(
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isAnyCheckpointPersisted = true
)
actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = startingState.isAnyCheckpointPersisted)
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
actions += Action.ScheduleEvent(Event.DoRemainingWork)
}
FlowContinuation.ProcessEvents
}
}
@ -238,11 +236,11 @@ class TopLevelTransition(
val checkpoint = currentState.checkpoint
when (checkpoint.errorState) {
ErrorState.Clean -> {
val pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers
currentState = currentState.copy(
currentState = startingState.copy(
checkpoint = checkpoint.copy(
checkpointState = checkpoint.checkpointState.copy(
numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1
numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1,
numberOfCommits = checkpoint.checkpointState.numberOfCommits + 1
),
flowState = FlowState.Finished,
result = event.returnValue,
@ -253,29 +251,25 @@ class TopLevelTransition(
isRemoved = true
)
if (currentState.isAnyCheckpointPersisted) {
if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
actions.add(Action.RemoveCheckpoint(context.id))
if (startingState.checkpoint.checkpointState.invocationContext.clientId == null) {
if (startingState.isAnyCheckpointPersisted) {
actions += Action.RemoveCheckpoint(context.id)
}
} else {
actions.add(
Action.PersistCheckpoint(
actions += Action.PersistCheckpoint(
context.id,
currentState.checkpoint,
isCheckpointUpdate = currentState.isAnyCheckpointPersisted
isCheckpointUpdate = startingState.isAnyCheckpointPersisted
)
)
}
}
val allSourceSessionIds = currentState.checkpoint.checkpointState.sessions.keys
actions.addAll(arrayOf(
Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(event.softLocksId),
Action.CommitTransaction,
Action.AcknowledgeMessages(pendingDeduplicationHandlers),
Action.RemoveSessionBindings(allSourceSessionIds),
Action.RemoveFlow(context.id, FlowRemovalReason.OrderlyFinish(event.returnValue), currentState)
))
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.ReleaseSoftLocks(event.softLocksId)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
actions += Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys)
actions += Action.RemoveFlow(context.id, FlowRemovalReason.OrderlyFinish(event.returnValue), currentState)
sendEndMessages()
// Resume to end fiber
FlowContinuation.Resume(null)
@ -358,17 +352,22 @@ class TopLevelTransition(
private fun overnightObservationTransition(): TransitionResult {
return builder {
val flowStartEvents = currentState.pendingDeduplicationHandlers.filter(::isFlowStartEvent)
val flowStartEvents = startingState.pendingDeduplicationHandlers.filter(::isFlowStartEvent)
val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
currentState = startingState.copy(
checkpoint = startingState.checkpoint.copy(
status = Checkpoint.FlowStatus.HOSPITALIZED,
checkpointState = startingState.checkpoint.checkpointState.copy(
numberOfCommits = startingState.checkpoint.checkpointState.numberOfCommits + 1
)
),
pendingDeduplicationHandlers = startingState.pendingDeduplicationHandlers - flowStartEvents
)
actions += Action.CreateTransaction
actions += Action.PersistDeduplicationFacts(flowStartEvents)
actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
actions += Action.CommitTransaction
actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = startingState.isAnyCheckpointPersisted)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(flowStartEvents)
currentState = currentState.copy(
checkpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED),
pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers - flowStartEvents
)
FlowContinuation.ProcessEvents
}
}
@ -394,15 +393,11 @@ class TopLevelTransition(
private fun pausedFlowTransition(): TransitionResult {
return builder {
if (!startingState.isFlowResumed) {
actions.add(Action.CreateTransaction)
actions += Action.CreateTransaction
}
actions.addAll(
arrayOf(
Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.PAUSED),
Action.CommitTransaction,
Action.MoveFlowToPaused(currentState)
)
)
actions += Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.PAUSED)
actions += Action.CommitTransaction(currentState)
actions += Action.MoveFlowToPaused(currentState)
FlowContinuation.Abort
}
}

View File

@ -27,14 +27,14 @@ class UnstartedFlowTransition(
createInitialCheckpoint()
}
actions.add(Action.SignalFlowHasStarted(context.id))
actions += Action.SignalFlowHasStarted(context.id)
if (unstarted.flowStart is FlowStart.Initiated) {
initialiseInitiatedSession(unstarted.flowStart)
}
currentState = currentState.copy(isFlowResumed = true)
actions.add(Action.CreateTransaction)
actions += Action.CreateTransaction
FlowContinuation.Resume(null)
}
}
@ -73,16 +73,14 @@ class UnstartedFlowTransition(
// Create initial checkpoint and acknowledge triggering messages.
private fun TransitionBuilder.createInitialCheckpoint() {
actions.addAll(arrayOf(
Action.CreateTransaction,
Action.PersistCheckpoint(context.id, currentState.checkpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers)
))
currentState = currentState.copy(
currentState = startingState.copy(
pendingDeduplicationHandlers = emptyList(),
isAnyCheckpointPersisted = true
)
actions += Action.CreateTransaction
actions += Action.PersistCheckpoint(context.id, startingState.checkpoint, isCheckpointUpdate = startingState.isAnyCheckpointPersisted)
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
}
}