CORDA-3491 - Do not keep flow state in memory after a flow has finished (#6573)

Do not retain in database `checkpoint.flowState` for flows that have completed 
or failed and have started with a client id, after their lifetime.
This commit is contained in:
Kyriakos Tharrouniatis 2020-08-06 09:46:04 +01:00 committed by GitHub
parent 5192a9a2dd
commit 5ba8477733
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 60 additions and 15 deletions

View File

@ -160,7 +160,7 @@ class DBCheckpointStorage(
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY,
@Type(type = "corda-blob")
@Column(name = "flow_state")
@Column(name = "flow_state", nullable = true)
var flowStack: ByteArray?,
@Type(type = "corda-wrapper-binary")
@ -419,8 +419,15 @@ class DBCheckpointStorage(
val now = clock.instant()
val flowId = id.uuid.toString()
// Do not update in DB [Checkpoint.checkpointState] or [Checkpoint.flowState] if flow failed or got hospitalized
val blob = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
val blob = if (checkpoint.status == FlowStatus.HOSPITALIZED) {
// Do not update 'checkpointState' or 'flowState' if flow hospitalized
null
} else if (checkpoint.status == FlowStatus.FAILED) {
// We need to update only the 'flowState' to null, and we don't want to update the checkpoint state
// because we want to retain the last clean checkpoint state, therefore just use a query for that update.
val sqlQuery = "Update ${NODE_DATABASE_PREFIX}checkpoint_blobs set flow_state = null where flow_id = '$flowId'"
val query = currentDBSession().createNativeQuery(sqlQuery)
query.executeUpdate()
null
} else {
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)

View File

@ -83,7 +83,7 @@ internal class ActionExecutorImpl(
val checkpoint = action.checkpoint
val flowState = checkpoint.flowState
val serializedFlowState = when(flowState) {
FlowState.Completed -> null
FlowState.Finished -> null
// upon implementing CORDA-3816: If we have errored or hospitalized then we don't need to serialize the flowState as it will not get saved in the DB
else -> flowState.checkpointSerialize(checkpointSerializationContext)
}
@ -92,8 +92,8 @@ internal class ActionExecutorImpl(
if (action.isCheckpointUpdate) {
checkpointStorage.updateCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState)
} else {
if (flowState is FlowState.Completed) {
throw IllegalStateException("A new checkpoint cannot be created with a Completed FlowState.")
if (flowState is FlowState.Finished) {
throw IllegalStateException("A new checkpoint cannot be created with a finished flow state.")
}
checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState!!, serializedCheckpointState)
}

View File

@ -767,7 +767,7 @@ internal class SingleThreadedStateMachineManager(
is FlowState.Started -> {
Fiber.unparkDeserialized(flow.fiber, scheduler)
}
is FlowState.Completed -> throw IllegalStateException("Cannot start (or resume) a completed flow.")
is FlowState.Finished -> throw IllegalStateException("Cannot start (or resume) a finished flow.")
}
}

View File

@ -206,7 +206,7 @@ data class Checkpoint(
fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint {
val flowState = when(status) {
FlowStatus.PAUSED -> FlowState.Paused
FlowStatus.COMPLETED -> FlowState.Completed
FlowStatus.COMPLETED, FlowStatus.FAILED -> FlowState.Finished
else -> serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext)
}
return Checkpoint(
@ -349,9 +349,9 @@ sealed class FlowState {
object Paused: FlowState()
/**
* The flow has completed. It does not have a running fiber that needs to be serialized and checkpointed.
* The flow has finished. It does not have a running fiber that needs to be serialized and checkpointed.
*/
object Completed : FlowState()
object Finished : FlowState()
}

View File

@ -25,12 +25,13 @@ class DoRemainingWorkTransition(
}
// If the flow is clean check the FlowState
@Suppress("ThrowsCount")
private fun cleanTransition(): TransitionResult {
val flowState = startingState.checkpoint.flowState
return when (flowState) {
is FlowState.Unstarted -> UnstartedFlowTransition(context, startingState, flowState).transition()
is FlowState.Started -> StartedFlowTransition(context, startingState, flowState).transition()
is FlowState.Completed -> throw IllegalStateException("Cannot transition a state with completed flow state.")
is FlowState.Finished -> throw IllegalStateException("Cannot transition a state with finished flow state.")
is FlowState.Paused -> throw IllegalStateException("Cannot transition a state with paused flow state.")
}
}

View File

@ -64,7 +64,7 @@ class ErrorFlowTransition(
val removeOrPersistCheckpoint = if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
Action.RemoveCheckpoint(context.id)
} else {
Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
Action.PersistCheckpoint(context.id, newCheckpoint.copy(flowState = FlowState.Finished), isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
}
actions.addAll(arrayOf(

View File

@ -232,7 +232,7 @@ class TopLevelTransition(
checkpointState = checkpoint.checkpointState.copy(
numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1
),
flowState = FlowState.Completed,
flowState = FlowState.Finished,
result = event.returnValue,
status = Checkpoint.FlowStatus.COMPLETED
),

View File

@ -157,7 +157,7 @@ class DBCheckpointStorageTests {
}
database.transaction {
assertEquals(
completedCheckpoint.copy(flowState = FlowState.Completed),
completedCheckpoint.copy(flowState = FlowState.Finished),
checkpointStorage.checkpoints().single().deserialize()
)
}

View File

@ -126,7 +126,7 @@ class CheckpointDumperImplTest {
checkpointStorage.addCheckpoint(id, checkpoint, serializeFlowState(checkpoint), serializeCheckpointState(checkpoint))
}
val newCheckpoint = checkpoint.copy(
flowState = FlowState.Completed,
flowState = FlowState.Finished,
status = Checkpoint.FlowStatus.COMPLETED
)
database.transaction {

View File

@ -672,6 +672,34 @@ class FlowClientIdTests {
assertEquals("Flow's ${flowHandle0!!.id} exception was not found in the database. Something is very wrong.", e.message)
}
@Test(timeout=300_000)
fun `completed flow started with a client id nulls its flow state in database after its lifetime`() {
val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle.resultFuture.getOrThrow()
aliceNode.services.database.transaction {
val dbFlowCheckpoint = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowHandle.id)
assertNull(dbFlowCheckpoint!!.blob!!.flowStack)
}
}
@Test(timeout=300_000)
fun `failed flow started with a client id nulls its flow state in database after its lifetime`() {
val clientId = UUID.randomUUID().toString()
ResultFlow.hook = { throw IllegalStateException() }
var flowHandle: FlowStateMachineHandle<Int>? = null
assertFailsWith<IllegalStateException> {
flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle!!.resultFuture.getOrThrow()
}
aliceNode.services.database.transaction {
val dbFlowCheckpoint = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowHandle!!.id)
assertNull(dbFlowCheckpoint!!.blob!!.flowStack)
}
}
}
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {

View File

@ -688,6 +688,9 @@ class FlowFrameworkTests {
firstExecution = false
throw HospitalizeFlowException()
} else {
// the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
// and doesn't commit before flow starts running.
Thread.sleep(3000)
dbCheckpointStatusBeforeSuspension = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
currentDBSession().clear() // clear session as Hibernate with fails with 'org.hibernate.NonUniqueObjectException' once it tries to save a DBFlowCheckpoint upon checkpoint
inMemoryCheckpointStatusBeforeSuspension = flowFiber.transientState.checkpoint.status
@ -736,6 +739,9 @@ class FlowFrameworkTests {
firstExecution = false
throw HospitalizeFlowException()
} else {
// the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
// and doesn't commit before flow starts running.
Thread.sleep(3000)
dbCheckpointStatus = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
inMemoryCheckpointStatus = flowFiber.transientState.checkpoint.status
@ -850,6 +856,9 @@ class FlowFrameworkTests {
var secondRun = false
SuspendingFlow.hookBeforeCheckpoint = {
if(secondRun) {
// the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
// and doesn't commit before flow starts running.
Thread.sleep(3000)
aliceNode.database.transaction {
checkpointStatusAfterRestart = findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().single().status
dbExceptionAfterRestart = findRecordsFromDatabase()