ENT-5396 Allow Retrying a Hospitalised Flow from the Statemachine (#3499)

Added functionality to the statemachine to enable retrying a Hospitalised flow without a node restart.
This commit is contained in:
Will Vigor 2020-08-05 14:30:29 +01:00
parent a73dad00e2
commit bbf5a93761
3 changed files with 24 additions and 4 deletions

View File

@ -23,6 +23,11 @@ interface CheckpointStorage {
fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>?,
serializedCheckpointState: SerializedBytes<CheckpointState>)
/**
* Update an existing checkpoints status ([Checkpoint.status]).
*/
fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus)
/**
* Update all persisted checkpoints with status [Checkpoint.FlowStatus.RUNNABLE] or [Checkpoint.FlowStatus.HOSPITALIZED],
* changing the status to [Checkpoint.FlowStatus.PAUSED].
@ -65,6 +70,4 @@ interface CheckpointStorage {
* This method does not fetch [Checkpoint.Serialized.serializedFlowState] to save memory.
*/
fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus)
}

View File

@ -98,7 +98,7 @@ internal class SingleThreadedStateMachineManager(
private val flowTimeoutScheduler = FlowTimeoutScheduler(innerState, scheduledFutureExecutor, serviceHub)
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
private var checkpointSerializationContext: CheckpointSerializationContext? = null
private lateinit var checkpointSerializationContext: CheckpointSerializationContext
private lateinit var flowCreator: FlowCreator
override val flowHospital: StaffedFlowHospital = makeFlowHospital()
@ -404,6 +404,13 @@ internal class SingleThreadedStateMachineManager(
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
if (currentState.checkpoint.status == Checkpoint.FlowStatus.HOSPITALIZED) {
database.transaction {
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
}
}
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
@ -652,7 +659,7 @@ internal class SingleThreadedStateMachineManager(
private fun tryDeserializeCheckpoint(serializedCheckpoint: Checkpoint.Serialized, flowId: StateMachineRunId): Checkpoint? {
return try {
serializedCheckpoint.deserialize(checkpointSerializationContext!!)
serializedCheckpoint.deserialize(checkpointSerializationContext)
} catch (e: Exception) {
if (reloadCheckpointAfterSuspend && currentStateMachine() != null) {
logger.error(

View File

@ -104,6 +104,16 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
*/
private val flowsInHospital = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
/**
* Returns true if the flow is currently being treated in the hospital.
* The differs to flows with a medical history (which can accessed via [StaffedFlowHospital.contains]).
*/
@VisibleForTesting
internal fun flowInHospital(runId: StateMachineRunId): Boolean {
// The .keys avoids https://youtrack.jetbrains.com/issue/KT-18053
return runId in flowsInHospital.keys
}
private val mutex = ThreadBox(object {
/**
* Contains medical history of every flow (a patient) that has entered the hospital. A flow can leave the hospital,