ENT-5672 Know if a paused flow is hospitalized (#6639)

* ENT-5672 Update database query to get paused flows which have previously been hospitalised

* NOTICK Remove unneeded check if a database exception was removed when switching a flow to RUNNABLE since we were to remove it anyway
This commit is contained in:
William Vigor 2020-08-14 20:07:00 +01:00 committed by GitHub
parent c4027e23bf
commit 32cb085a53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 55 additions and 25 deletions

View File

@ -78,7 +78,7 @@ interface CheckpointStorage {
* until the underlying database connection is closed, so any processing should happen before it is closed.
* This method does not fetch [Checkpoint.Serialized.serializedFlowState] to save memory.
*/
fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
fun getPausedCheckpoints(): Stream<Triple<StateMachineRunId, Checkpoint.Serialized, Boolean>>
fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>>

View File

@ -555,15 +555,17 @@ class DBCheckpointStorage(
return currentDBSession().find(DBFlowException::class.java, id.uuid.toString())
}
override fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
override fun getPausedCheckpoints(): Stream<Triple<StateMachineRunId, Checkpoint.Serialized, Boolean>> {
val session = currentDBSession()
val jpqlQuery = """select new ${DBPausedFields::class.java.name}(checkpoint.id, blob.checkpoint, checkpoint.status,
checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible) from ${DBFlowCheckpoint::class.java.name}
checkpoint join ${DBFlowCheckpointBlob::class.java.name} blob on checkpoint.blob = blob.id where
checkpoint.status = ${FlowStatus.PAUSED.ordinal}""".trimIndent()
checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible, exception.id)
from ${DBFlowCheckpoint::class.java.name} checkpoint
join ${DBFlowCheckpointBlob::class.java.name} blob on checkpoint.blob = blob.id
left outer join ${DBFlowException::class.java.name} exception on checkpoint.exceptionDetails = exception.id
where checkpoint.status = ${FlowStatus.PAUSED.ordinal}""".trimIndent()
val query = session.createQuery(jpqlQuery, DBPausedFields::class.java)
return query.resultList.stream().map {
StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint()
Triple(StateMachineRunId(UUID.fromString(it.id)), it.toSerializedCheckpoint(), it.wasHospitalized)
}
}
@ -722,8 +724,10 @@ class DBCheckpointStorage(
val status: FlowStatus,
val progressStep: String?,
val ioRequestType: String?,
val compatible: Boolean
val compatible: Boolean,
exception: String?
) {
val wasHospitalized = exception != null
fun toSerializedCheckpoint(): Checkpoint.Serialized {
return Checkpoint.Serialized(
serializedCheckpointState = SerializedBytes(checkpoint),

View File

@ -32,7 +32,8 @@ data class NonResidentFlow(
val runId: StateMachineRunId,
var checkpoint: Checkpoint,
val resultFuture: OpenFuture<Any?> = openFuture(),
val resumable: Boolean = true
val resumable: Boolean = true,
val hospitalized: Boolean = false
) {
val events = mutableListOf<ExternalEvent>()

View File

@ -456,12 +456,8 @@ internal class SingleThreadedStateMachineManager(
innerState.withLock { if (id in flows) return@Checkpoints }
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
if (checkpointStorage.removeFlowException(id)) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
} else {
logger.error("Unable to remove database exception for flow $id. Something is very wrong. The flow will not be loaded and run.")
return@Checkpoints
}
checkpointStorage.removeFlowException(id)
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
}
} ?: return@Checkpoints
val flow = flowCreator.createFlowFromCheckpoint(id, checkpoint)
@ -472,9 +468,9 @@ internal class SingleThreadedStateMachineManager(
flows[id] = flow
}
}
checkpointStorage.getPausedCheckpoints().forEach Checkpoints@{ (id, serializedCheckpoint) ->
checkpointStorage.getPausedCheckpoints().forEach Checkpoints@{ (id, serializedCheckpoint, hospitalised) ->
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@Checkpoints
pausedFlows[id] = NonResidentFlow(id, checkpoint)
pausedFlows[id] = NonResidentFlow(id, checkpoint, hospitalized = hospitalised)
}
return Pair(flows, pausedFlows)
}
@ -504,13 +500,8 @@ internal class SingleThreadedStateMachineManager(
val checkpoint = serializedCheckpoint.let {
tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
if (checkpointStorage.removeFlowException(flowId)) {
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
} else {
logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not be loaded and run.")
// This code branch is being removed in a different PR
return@transaction CheckpointLoadingStatus.CouldNotDeserialize
}
checkpointStorage.removeFlowException(flowId)
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
}
} ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize
}

View File

@ -47,6 +47,7 @@ import java.time.Clock
import java.util.*
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
internal fun CheckpointStorage.checkpoints(): List<Checkpoint.Serialized> {
@ -803,6 +804,40 @@ class DBCheckpointStorageTests {
assertTrue(Checkpoint.FlowStatus.FAILED in finishedStatuses)
}
@Test(timeout = 300_000)
fun `'getPausedCheckpoints' fetches paused flows with and without database exceptions`() {
val (_, checkpoint) = newCheckpoint(1)
val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val checkpointState = checkpoint.serializeCheckpointState()
val hospitalizedPaused = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
val cleanPaused = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
database.transaction {
checkpointStorage.addCheckpoint(hospitalizedPaused.id, hospitalizedPaused.checkpoint, serializedFlowState, checkpointState)
checkpointStorage.addCheckpoint(cleanPaused.id, cleanPaused.checkpoint, serializedFlowState, checkpointState)
}
database.transaction {
checkpointStorage.updateCheckpoint(
hospitalizedPaused.id,
hospitalizedPaused.checkpoint.addError(IllegalStateException(), Checkpoint.FlowStatus.HOSPITALIZED),
serializedFlowState,
checkpointState
)
}
database.transaction {
checkpointStorage.updateStatus(hospitalizedPaused.id, Checkpoint.FlowStatus.PAUSED)
checkpointStorage.updateStatus(cleanPaused.id, Checkpoint.FlowStatus.PAUSED)
}
database.transaction {
val checkpoints = checkpointStorage.getPausedCheckpoints().toList()
val dbHospitalizedPaused = checkpoints.single { it.first == hospitalizedPaused.id }
assertEquals(hospitalizedPaused.id, dbHospitalizedPaused.first)
assertTrue(dbHospitalizedPaused.third)
val dbCleanPaused = checkpoints.single { it.first == cleanPaused.id }
assertEquals(cleanPaused.id, dbCleanPaused.first)
assertFalse(dbCleanPaused.third)
}
}
data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint)
private fun changeStatus(oldCheckpoint: Checkpoint, status: Checkpoint.FlowStatus): IdAndCheckpoint {

View File

@ -886,7 +886,6 @@ class FlowFrameworkTests {
Thread.sleep(3000) // wait until flow saves overnight observation state in database
aliceNode = mockNet.restartNode(aliceNode)
waitUntilHospitalized.acquire()
Thread.sleep(3000) // wait until flow saves overnight observation state in database
assertEquals(2, counter)
@ -1268,4 +1267,4 @@ internal class SuspendingFlow : FlowLogic<Unit>() {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, maySkipCheckpoint = false) // flow checkpoints => checkpoint is in DB
stateMachine.hookAfterCheckpoint()
}
}
}