diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 8dcdaa54d5..de247599a1 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -160,7 +160,7 @@ class DBCheckpointStorage( var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, @Type(type = "corda-blob") - @Column(name = "flow_state") + @Column(name = "flow_state", nullable = true) var flowStack: ByteArray?, @Type(type = "corda-wrapper-binary") @@ -419,8 +419,15 @@ class DBCheckpointStorage( val now = clock.instant() val flowId = id.uuid.toString() - // Do not update in DB [Checkpoint.checkpointState] or [Checkpoint.flowState] if flow failed or got hospitalized - val blob = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) { + val blob = if (checkpoint.status == FlowStatus.HOSPITALIZED) { + // Do not update 'checkpointState' or 'flowState' if flow hospitalized + null + } else if (checkpoint.status == FlowStatus.FAILED) { + // We need to update only the 'flowState' to null, and we don't want to update the checkpoint state + // because we want to retain the last clean checkpoint state, therefore just use a query for that update. + val sqlQuery = "Update ${NODE_DATABASE_PREFIX}checkpoint_blobs set flow_state = null where flow_id = '$flowId'" + val query = currentDBSession().createNativeQuery(sqlQuery) + query.executeUpdate() null } else { checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index ccb9cb2656..6f9a6c15e5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -83,7 +83,7 @@ internal class ActionExecutorImpl( val checkpoint = action.checkpoint val flowState = checkpoint.flowState val serializedFlowState = when(flowState) { - FlowState.Completed -> null + FlowState.Finished -> null // upon implementing CORDA-3816: If we have errored or hospitalized then we don't need to serialize the flowState as it will not get saved in the DB else -> flowState.checkpointSerialize(checkpointSerializationContext) } @@ -92,8 +92,8 @@ internal class ActionExecutorImpl( if (action.isCheckpointUpdate) { checkpointStorage.updateCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState) } else { - if (flowState is FlowState.Completed) { - throw IllegalStateException("A new checkpoint cannot be created with a Completed FlowState.") + if (flowState is FlowState.Finished) { + throw IllegalStateException("A new checkpoint cannot be created with a finished flow state.") } checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState!!, serializedCheckpointState) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index ae39cad3bf..ae53f5a191 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -767,7 +767,7 @@ internal class SingleThreadedStateMachineManager( is FlowState.Started -> { Fiber.unparkDeserialized(flow.fiber, scheduler) } - is FlowState.Completed -> throw IllegalStateException("Cannot start (or resume) a completed flow.") + is FlowState.Finished -> throw IllegalStateException("Cannot start (or resume) a finished flow.") } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 222b20c251..aeb04d729b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -206,7 +206,7 @@ data class Checkpoint( fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint { val flowState = when(status) { FlowStatus.PAUSED -> FlowState.Paused - FlowStatus.COMPLETED -> FlowState.Completed + FlowStatus.COMPLETED, FlowStatus.FAILED -> FlowState.Finished else -> serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext) } return Checkpoint( @@ -349,9 +349,9 @@ sealed class FlowState { object Paused: FlowState() /** - * The flow has completed. It does not have a running fiber that needs to be serialized and checkpointed. + * The flow has finished. It does not have a running fiber that needs to be serialized and checkpointed. */ - object Completed : FlowState() + object Finished : FlowState() } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DoRemainingWorkTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DoRemainingWorkTransition.kt index 7d56967c24..05872aea7f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DoRemainingWorkTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DoRemainingWorkTransition.kt @@ -25,12 +25,13 @@ class DoRemainingWorkTransition( } // If the flow is clean check the FlowState + @Suppress("ThrowsCount") private fun cleanTransition(): TransitionResult { val flowState = startingState.checkpoint.flowState return when (flowState) { is FlowState.Unstarted -> UnstartedFlowTransition(context, startingState, flowState).transition() is FlowState.Started -> StartedFlowTransition(context, startingState, flowState).transition() - is FlowState.Completed -> throw IllegalStateException("Cannot transition a state with completed flow state.") + is FlowState.Finished -> throw IllegalStateException("Cannot transition a state with finished flow state.") is FlowState.Paused -> throw IllegalStateException("Cannot transition a state with paused flow state.") } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt index 0ce6a5faa2..4f4e6cd51e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt @@ -64,7 +64,7 @@ class ErrorFlowTransition( val removeOrPersistCheckpoint = if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) { Action.RemoveCheckpoint(context.id) } else { - Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted) + Action.PersistCheckpoint(context.id, newCheckpoint.copy(flowState = FlowState.Finished), isCheckpointUpdate = currentState.isAnyCheckpointPersisted) } actions.addAll(arrayOf( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 4068069c3c..67526340d1 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -232,7 +232,7 @@ class TopLevelTransition( checkpointState = checkpoint.checkpointState.copy( numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1 ), - flowState = FlowState.Completed, + flowState = FlowState.Finished, result = event.returnValue, status = Checkpoint.FlowStatus.COMPLETED ), diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index 12a2e4f640..6a15008e1a 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -157,7 +157,7 @@ class DBCheckpointStorageTests { } database.transaction { assertEquals( - completedCheckpoint.copy(flowState = FlowState.Completed), + completedCheckpoint.copy(flowState = FlowState.Finished), checkpointStorage.checkpoints().single().deserialize() ) } diff --git a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt index 4037bd80f0..d8644ccede 100644 --- a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt @@ -126,7 +126,7 @@ class CheckpointDumperImplTest { checkpointStorage.addCheckpoint(id, checkpoint, serializeFlowState(checkpoint), serializeCheckpointState(checkpoint)) } val newCheckpoint = checkpoint.copy( - flowState = FlowState.Completed, + flowState = FlowState.Finished, status = Checkpoint.FlowStatus.COMPLETED ) database.transaction { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt index efdea60199..712fb48473 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt @@ -672,6 +672,34 @@ class FlowClientIdTests { assertEquals("Flow's ${flowHandle0!!.id} exception was not found in the database. Something is very wrong.", e.message) } + @Test(timeout=300_000) + fun `completed flow started with a client id nulls its flow state in database after its lifetime`() { + val clientId = UUID.randomUUID().toString() + val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + flowHandle.resultFuture.getOrThrow() + + aliceNode.services.database.transaction { + val dbFlowCheckpoint = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowHandle.id) + assertNull(dbFlowCheckpoint!!.blob!!.flowStack) + } + } + + @Test(timeout=300_000) + fun `failed flow started with a client id nulls its flow state in database after its lifetime`() { + val clientId = UUID.randomUUID().toString() + ResultFlow.hook = { throw IllegalStateException() } + + var flowHandle: FlowStateMachineHandle<Int>? = null + assertFailsWith<IllegalStateException> { + flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) + flowHandle!!.resultFuture.getOrThrow() + } + + aliceNode.services.database.transaction { + val dbFlowCheckpoint = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowHandle!!.id) + assertNull(dbFlowCheckpoint!!.blob!!.flowStack) + } + } } internal class ResultFlow<A>(private val result: A): FlowLogic<A>() { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index e6f1484064..fd7c926d0d 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -688,6 +688,9 @@ class FlowFrameworkTests { firstExecution = false throw HospitalizeFlowException() } else { + // the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long + // and doesn't commit before flow starts running. + Thread.sleep(3000) dbCheckpointStatusBeforeSuspension = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status currentDBSession().clear() // clear session as Hibernate with fails with 'org.hibernate.NonUniqueObjectException' once it tries to save a DBFlowCheckpoint upon checkpoint inMemoryCheckpointStatusBeforeSuspension = flowFiber.transientState.checkpoint.status @@ -736,6 +739,9 @@ class FlowFrameworkTests { firstExecution = false throw HospitalizeFlowException() } else { + // the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long + // and doesn't commit before flow starts running. + Thread.sleep(3000) dbCheckpointStatus = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status inMemoryCheckpointStatus = flowFiber.transientState.checkpoint.status @@ -850,6 +856,9 @@ class FlowFrameworkTests { var secondRun = false SuspendingFlow.hookBeforeCheckpoint = { if(secondRun) { + // the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long + // and doesn't commit before flow starts running. + Thread.sleep(3000) aliceNode.database.transaction { checkpointStatusAfterRestart = findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().single().status dbExceptionAfterRestart = findRecordsFromDatabase()