From 7acc51053475a86dda844fe126e9ee0c52235ffa Mon Sep 17 00:00:00 2001 From: Will Vigor Date: Wed, 5 Aug 2020 16:25:18 +0100 Subject: [PATCH] CORDA-3602 Set a Checkpoint as incompatible if it can't be deserialised (#3653) Update the compatible flag in the DB if the flowstate cannot be deserialised. The most common cause of this problem is if a CorDapp has been upgraded without draining flows from the node. `RUNNABLE` and `HOSPITALISED` flows are restored on node startup so the flag is set for these then. The flag can also be set when a flow retries for some reason (see retryFlowFromSafePoint) in this case the problem has been caused by another reason. --- .../node/services/api/CheckpointStorage.kt | 5 ++ .../persistence/DBCheckpointStorage.kt | 5 ++ .../corda/node/services/statemachine/Event.kt | 14 ++-- .../node/services/statemachine/FlowCreator.kt | 80 +++++++++++++------ .../SingleThreadedStateMachineManager.kt | 51 ++++++------ .../transitions/TopLevelTransition.kt | 2 +- .../persistence/DBCheckpointStorageTests.kt | 18 +++++ 7 files changed, 116 insertions(+), 59 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index 9bfd53023d..c7624266e7 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -28,6 +28,11 @@ interface CheckpointStorage { */ fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus) + /** + * Update an existing checkpoints compatibility flag ([Checkpoint.compatible]). + */ + fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) + /** * Update all persisted checkpoints with status [Checkpoint.FlowStatus.RUNNABLE] or [Checkpoint.FlowStatus.HOSPITALIZED], * changing the status to [Checkpoint.FlowStatus.PAUSED]. diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 38cbf1d833..2a815fba94 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -504,6 +504,11 @@ class DBCheckpointStorage( currentDBSession().createNativeQuery(update).executeUpdate() } + override fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) { + val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set compatible = $compatible where flow_id = '${runId.uuid}'" + currentDBSession().createNativeQuery(update).executeUpdate() + } + private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint): DBFlowMetadata { val context = checkpoint.checkpointState.invocationContext val flowInfo = checkpoint.checkpointState.subFlowStack.first() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt index d528830fa1..5f1f86c671 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt @@ -179,13 +179,6 @@ sealed class Event { override fun toString() = "WakeUpSleepyFlow" } - /** - * Terminate the specified [sessions], removing them from in-memory datastructures. - * - * @param sessions The sessions to terminate - */ - data class TerminateSessions(val sessions: Set) : Event() - /** * Pause the flow. */ @@ -193,6 +186,13 @@ sealed class Event { override fun toString() = "Pause" } + /** + * Terminate the specified [sessions], removing them from in-memory datastructures. + * + * @param sessions The sessions to terminate + */ + data class TerminateSessions(val sessions: Set) : Event() + /** * Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow, * even if it has not yet been processed and placed on the pending de-duplication handlers list. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt index 50ac051860..aa340c582a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt @@ -19,7 +19,6 @@ import net.corda.core.utilities.contextLogger import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.DeduplicationHandler -import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.currentStateMachine import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.isEnabledTimedFlow import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -29,7 +28,12 @@ import java.util.concurrent.Semaphore class Flow(val fiber: FlowStateMachineImpl, val resultFuture: OpenFuture) -class NonResidentFlow(val runId: StateMachineRunId, val checkpoint: Checkpoint, val resultFuture: OpenFuture = openFuture()) { +data class NonResidentFlow( + val runId: StateMachineRunId, + var checkpoint: Checkpoint, + val resultFuture: OpenFuture = openFuture(), + val resumable: Boolean = true +) { val events = mutableListOf() fun addExternalEvent(message: ExternalEvent) { @@ -69,15 +73,26 @@ class FlowCreator( return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, resultFuture = nonResidentFlow.resultFuture) } + @Suppress("LongParameterList") fun createFlowFromCheckpoint( runId: StateMachineRunId, oldCheckpoint: Checkpoint, reloadCheckpointAfterSuspendCount: Int? = null, lock: Semaphore = Semaphore(1), - resultFuture: OpenFuture = openFuture() + resultFuture: OpenFuture = openFuture(), + firstRestore: Boolean = true ): Flow<*>? { - val checkpoint = oldCheckpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE) - val fiber = checkpoint.getFiberFromCheckpoint(runId) ?: return null + val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore) + var checkpoint = oldCheckpoint + if (fiber == null) { + updateCompatibleInDb(runId, false) + return null + } else if (!oldCheckpoint.compatible) { + updateCompatibleInDb(runId, true) + checkpoint = checkpoint.copy(compatible = true) + } + checkpoint = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE) + fiber.logic.stateMachine = fiber verifyFlowLogicIsSuspendable(fiber.logic) fiber.transientValues = createTransientValues(runId, resultFuture) @@ -92,6 +107,12 @@ class FlowCreator( return Flow(fiber, resultFuture) } + private fun updateCompatibleInDb(runId: StateMachineRunId, compatible: Boolean) { + database.transaction { + checkpointStorage.updateCompatible(runId, compatible) + } + } + @Suppress("LongParameterList") fun createFlowFromLogic( flowId: StateMachineRunId, @@ -135,36 +156,45 @@ class FlowCreator( return Flow(flowStateMachineImpl, resultFuture) } - private fun Checkpoint.getFiberFromCheckpoint(runId: StateMachineRunId): FlowStateMachineImpl<*>? { - return when (this.flowState) { - is FlowState.Unstarted -> { - val logic = tryCheckpointDeserialize(this.flowState.frozenFlowLogic, runId) ?: return null - FlowStateMachineImpl(runId, logic, scheduler) - } - is FlowState.Started -> tryCheckpointDeserialize(this.flowState.frozenFiber, runId) ?: return null - // Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint. - else -> null - } - } - @Suppress("TooGenericExceptionCaught") - private inline fun tryCheckpointDeserialize(bytes: SerializedBytes, flowId: StateMachineRunId): T? { - return try { - bytes.checkpointDeserialize(context = checkpointSerializationContext) + private fun Checkpoint.getFiberFromCheckpoint(runId: StateMachineRunId, firstRestore: Boolean): FlowStateMachineImpl<*>? { + try { + return when(flowState) { + is FlowState.Unstarted -> { + val logic = deserializeFlowState(flowState.frozenFlowLogic) + FlowStateMachineImpl(runId, logic, scheduler) + } + is FlowState.Started -> deserializeFlowState(flowState.frozenFiber) + // Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint. + else -> return null + } } catch (e: Exception) { - if (reloadCheckpointAfterSuspend && currentStateMachine() != null) { + if (reloadCheckpointAfterSuspend && FlowStateMachineImpl.currentStateMachine() != null) { logger.error( - "Unable to deserialize checkpoint for flow $flowId. [reloadCheckpointAfterSuspend] is turned on, throwing exception", - e + "Unable to deserialize checkpoint for flow $runId. [reloadCheckpointAfterSuspend] is turned on, throwing exception", + e ) throw ReloadFlowFromCheckpointException(e) } else { - logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e) - null + logSerializationError(firstRestore, runId, e) + return null } } } + private inline fun deserializeFlowState(bytes: SerializedBytes): T { + return bytes.checkpointDeserialize(context = checkpointSerializationContext) + } + + private fun logSerializationError(firstRestore: Boolean, flowId: StateMachineRunId, exception: Exception) { + if (firstRestore) { + logger.warn("Flow with id $flowId could not be restored from its checkpoint. Normally this means that a CorDapp has been" + + " upgraded without draining the node. To run this flow restart the node after downgrading the CorDapp.", exception) + } else { + logger.error("Unable to deserialize fiber for flow $flowId. Something is very wrong and this flow will be ignored.", exception) + } + } + private fun verifyFlowLogicIsSuspendable(logic: FlowLogic) { // Quasar requires (in Java 8) that at least the call method be annotated suspendable. Unfortunately, it's // easy to forget to add this when creating a new flow, so we check here to give the user a better error. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index fe09e2b1e5..71fda8c194 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -19,7 +19,6 @@ import net.corda.core.internal.castIfPossible import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.mapError import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.mapNotNull import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.DataFeed import net.corda.core.serialization.deserialize @@ -56,7 +55,6 @@ import javax.annotation.concurrent.ThreadSafe import kotlin.collections.component1 import kotlin.collections.component2 import kotlin.collections.set -import kotlin.streams.toList /** * The StateMachineManagerImpl will always invoke the flow fibers on the given [AffinityExecutor], regardless of which @@ -169,12 +167,11 @@ internal class SingleThreadedStateMachineManager( flowTimeoutScheduler::resetCustomTimeout ) - val fibers = restoreFlowsFromCheckpoints() + val (fibers, pausedFlows) = restoreFlowsFromCheckpoints() metrics.register("Flows.InFlight", Gauge { innerState.flows.size }) setFlowDefaultUncaughtExceptionHandler() - val pausedFlows = restoreNonResidentFlowsFromPausedCheckpoints() innerState.withLock { this.pausedFlows.putAll(pausedFlows) for ((id, flow) in pausedFlows) { @@ -367,30 +364,31 @@ internal class SingleThreadedStateMachineManager( liveFibers.countUp() } - private fun restoreFlowsFromCheckpoints(): List> { - return checkpointStorage.getCheckpointsToRun().use { - it.mapNotNull { (id, serializedCheckpoint) -> - // If a flow is added before start() then don't attempt to restore it - innerState.withLock { if (id in flows) return@mapNotNull null } - val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@mapNotNull null - flowCreator.createFlowFromCheckpoint(id, checkpoint) - }.toList() + private fun restoreFlowsFromCheckpoints(): Pair>, MutableMap> { + val flows = mutableMapOf>() + val pausedFlows = mutableMapOf() + checkpointStorage.getCheckpointsToRun().forEach Checkpoints@{(id, serializedCheckpoint) -> + // If a flow is added before start() then don't attempt to restore it + innerState.withLock { if (id in flows) return@Checkpoints } + val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@Checkpoints + val flow = flowCreator.createFlowFromCheckpoint(id, checkpoint) + if (flow == null) { + // Set the flowState to paused so we don't waste memory storing it anymore. + pausedFlows[id] = NonResidentFlow(id, checkpoint.copy(flowState = FlowState.Paused), resumable = false) + } else { + flows[id] = flow + } } + checkpointStorage.getPausedCheckpoints().forEach Checkpoints@{ (id, serializedCheckpoint) -> + val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@Checkpoints + pausedFlows[id] = NonResidentFlow(id, checkpoint) + } + return Pair(flows, pausedFlows) } - private fun restoreNonResidentFlowsFromPausedCheckpoints(): Map { - return checkpointStorage.getPausedCheckpoints().use { - it.mapNotNull { (id, serializedCheckpoint) -> - // If a flow is added before start() then don't attempt to restore it - val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@mapNotNull null - id to NonResidentFlow(id, checkpoint) - }.toList().toMap() - } - } - - private fun resumeRestoredFlows(flows: List>) { - for (flow in flows) { - addAndStartFlow(flow.fiber.id, flow) + private fun resumeRestoredFlows(flows: Map>) { + for ((id, flow) in flows.entries) { + addAndStartFlow(id, flow) } } @@ -426,7 +424,8 @@ internal class SingleThreadedStateMachineManager( flowId, checkpoint, currentState.reloadCheckpointAfterSuspendCount, - currentState.lock + currentState.lock, + firstRestore = false ) ?: return } else { // Just flow initiation message diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 8f41f7e2cc..511eca00d5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -62,8 +62,8 @@ class TopLevelTransition( is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition() is Event.OvernightObservation -> overnightObservationTransition() is Event.WakeUpFromSleep -> wakeUpFromSleepTransition() - is Event.TerminateSessions -> terminateSessionsTransition(event) is Event.Pause -> pausedFlowTransition() + is Event.TerminateSessions -> terminateSessionsTransition(event) } } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index c8b1301c15..7017a19e65 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -890,6 +890,24 @@ class DBCheckpointStorageTests { } } + @Test(timeout = 300_000) + fun `update only compatible`() { + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = checkpoint.serializeFlowState() + database.transaction { + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState()) + } + database.transaction { + checkpointStorage.updateCompatible(id, !checkpoint.compatible) + } + database.transaction { + assertEquals( + checkpoint.copy(compatible = !checkpoint.compatible), + checkpointStorage.checkpoints().single().deserialize() + ) + } + } + data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint) private fun changeStatus(oldCheckpoint: Checkpoint, status: Checkpoint.FlowStatus): IdAndCheckpoint {