From bbf5a93761ba537678b77d5224217d2e2f73fadc Mon Sep 17 00:00:00 2001 From: Will Vigor Date: Wed, 5 Aug 2020 14:30:29 +0100 Subject: [PATCH] ENT-5396 Allow Retrying a Hospitalised Flow from the Statemachine (#3499) Added functionality to the statemachine to enable retrying a Hospitalised flow without a node restart. --- .../net/corda/node/services/api/CheckpointStorage.kt | 7 +++++-- .../statemachine/SingleThreadedStateMachineManager.kt | 11 +++++++++-- .../node/services/statemachine/StaffedFlowHospital.kt | 10 ++++++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index 1edc2491a8..9bfd53023d 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -23,6 +23,11 @@ interface CheckpointStorage { fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes?, serializedCheckpointState: SerializedBytes) + /** + * Update an existing checkpoints status ([Checkpoint.status]). + */ + fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus) + /** * Update all persisted checkpoints with status [Checkpoint.FlowStatus.RUNNABLE] or [Checkpoint.FlowStatus.HOSPITALIZED], * changing the status to [Checkpoint.FlowStatus.PAUSED]. @@ -65,6 +70,4 @@ interface CheckpointStorage { * This method does not fetch [Checkpoint.Serialized.serializedFlowState] to save memory. */ fun getPausedCheckpoints(): Stream> - - fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 738b94e1ab..107e4571df 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -98,7 +98,7 @@ internal class SingleThreadedStateMachineManager( private val flowTimeoutScheduler = FlowTimeoutScheduler(innerState, scheduledFutureExecutor, serviceHub) private val ourSenderUUID = serviceHub.networkService.ourSenderUUID - private var checkpointSerializationContext: CheckpointSerializationContext? = null + private lateinit var checkpointSerializationContext: CheckpointSerializationContext private lateinit var flowCreator: FlowCreator override val flowHospital: StaffedFlowHospital = makeFlowHospital() @@ -404,6 +404,13 @@ internal class SingleThreadedStateMachineManager( return } val flow = if (currentState.isAnyCheckpointPersisted) { + + if (currentState.checkpoint.status == Checkpoint.FlowStatus.HOSPITALIZED) { + database.transaction { + checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE) + } + } + // We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that // we mirror exactly what happens when restarting the node. val serializedCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) } @@ -652,7 +659,7 @@ internal class SingleThreadedStateMachineManager( private fun tryDeserializeCheckpoint(serializedCheckpoint: Checkpoint.Serialized, flowId: StateMachineRunId): Checkpoint? { return try { - serializedCheckpoint.deserialize(checkpointSerializationContext!!) + serializedCheckpoint.deserialize(checkpointSerializationContext) } catch (e: Exception) { if (reloadCheckpointAfterSuspend && currentStateMachine() != null) { logger.error( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index f74497d48a..519b2bd3d5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -104,6 +104,16 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, */ private val flowsInHospital = ConcurrentHashMap() + /** + * Returns true if the flow is currently being treated in the hospital. + * The differs to flows with a medical history (which can accessed via [StaffedFlowHospital.contains]). + */ + @VisibleForTesting + internal fun flowInHospital(runId: StateMachineRunId): Boolean { + // The .keys avoids https://youtrack.jetbrains.com/issue/KT-18053 + return runId in flowsInHospital.keys + } + private val mutex = ThreadBox(object { /** * Contains medical history of every flow (a patient) that has entered the hospital. A flow can leave the hospital,