CORDA-3602 Set a Checkpoint as incompatible if it can't be deserialised (#3653)

Update the compatible flag in the DB if the flowstate cannot be deserialised.

The most common cause of this problem is if a CorDapp has been upgraded
without draining flows from the node.

`RUNNABLE` and `HOSPITALISED` flows are restored on node startup so
the flag is set for these then. The flag can also be set when a flow
retries for some reason (see retryFlowFromSafePoint) in this case the
problem has been caused by another reason.
This commit is contained in:
Will Vigor 2020-08-05 16:25:18 +01:00
parent 4a828fcb99
commit 7acc510534
7 changed files with 116 additions and 59 deletions

View File

@ -28,6 +28,11 @@ interface CheckpointStorage {
*/ */
fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus) fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus)
/**
* Update an existing checkpoints compatibility flag ([Checkpoint.compatible]).
*/
fun updateCompatible(runId: StateMachineRunId, compatible: Boolean)
/** /**
* Update all persisted checkpoints with status [Checkpoint.FlowStatus.RUNNABLE] or [Checkpoint.FlowStatus.HOSPITALIZED], * Update all persisted checkpoints with status [Checkpoint.FlowStatus.RUNNABLE] or [Checkpoint.FlowStatus.HOSPITALIZED],
* changing the status to [Checkpoint.FlowStatus.PAUSED]. * changing the status to [Checkpoint.FlowStatus.PAUSED].

View File

@ -504,6 +504,11 @@ class DBCheckpointStorage(
currentDBSession().createNativeQuery(update).executeUpdate() currentDBSession().createNativeQuery(update).executeUpdate()
} }
override fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) {
val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set compatible = $compatible where flow_id = '${runId.uuid}'"
currentDBSession().createNativeQuery(update).executeUpdate()
}
private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint): DBFlowMetadata { private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint): DBFlowMetadata {
val context = checkpoint.checkpointState.invocationContext val context = checkpoint.checkpointState.invocationContext
val flowInfo = checkpoint.checkpointState.subFlowStack.first() val flowInfo = checkpoint.checkpointState.subFlowStack.first()

View File

@ -179,13 +179,6 @@ sealed class Event {
override fun toString() = "WakeUpSleepyFlow" override fun toString() = "WakeUpSleepyFlow"
} }
/**
* Terminate the specified [sessions], removing them from in-memory datastructures.
*
* @param sessions The sessions to terminate
*/
data class TerminateSessions(val sessions: Set<SessionId>) : Event()
/** /**
* Pause the flow. * Pause the flow.
*/ */
@ -193,6 +186,13 @@ sealed class Event {
override fun toString() = "Pause" override fun toString() = "Pause"
} }
/**
* Terminate the specified [sessions], removing them from in-memory datastructures.
*
* @param sessions The sessions to terminate
*/
data class TerminateSessions(val sessions: Set<SessionId>) : Event()
/** /**
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow, * Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
* even if it has not yet been processed and placed on the pending de-duplication handlers list. * even if it has not yet been processed and placed on the pending de-duplication handlers list.

View File

@ -19,7 +19,6 @@ import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.currentStateMachine
import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.isEnabledTimedFlow import net.corda.node.utilities.isEnabledTimedFlow
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -29,7 +28,12 @@ import java.util.concurrent.Semaphore
class Flow<A>(val fiber: FlowStateMachineImpl<A>, val resultFuture: OpenFuture<Any?>) class Flow<A>(val fiber: FlowStateMachineImpl<A>, val resultFuture: OpenFuture<Any?>)
class NonResidentFlow(val runId: StateMachineRunId, val checkpoint: Checkpoint, val resultFuture: OpenFuture<Any?> = openFuture()) { data class NonResidentFlow(
val runId: StateMachineRunId,
var checkpoint: Checkpoint,
val resultFuture: OpenFuture<Any?> = openFuture(),
val resumable: Boolean = true
) {
val events = mutableListOf<ExternalEvent>() val events = mutableListOf<ExternalEvent>()
fun addExternalEvent(message: ExternalEvent) { fun addExternalEvent(message: ExternalEvent) {
@ -69,15 +73,26 @@ class FlowCreator(
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, resultFuture = nonResidentFlow.resultFuture) return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, resultFuture = nonResidentFlow.resultFuture)
} }
@Suppress("LongParameterList")
fun createFlowFromCheckpoint( fun createFlowFromCheckpoint(
runId: StateMachineRunId, runId: StateMachineRunId,
oldCheckpoint: Checkpoint, oldCheckpoint: Checkpoint,
reloadCheckpointAfterSuspendCount: Int? = null, reloadCheckpointAfterSuspendCount: Int? = null,
lock: Semaphore = Semaphore(1), lock: Semaphore = Semaphore(1),
resultFuture: OpenFuture<Any?> = openFuture() resultFuture: OpenFuture<Any?> = openFuture(),
firstRestore: Boolean = true
): Flow<*>? { ): Flow<*>? {
val checkpoint = oldCheckpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE) val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore)
val fiber = checkpoint.getFiberFromCheckpoint(runId) ?: return null var checkpoint = oldCheckpoint
if (fiber == null) {
updateCompatibleInDb(runId, false)
return null
} else if (!oldCheckpoint.compatible) {
updateCompatibleInDb(runId, true)
checkpoint = checkpoint.copy(compatible = true)
}
checkpoint = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
fiber.logic.stateMachine = fiber fiber.logic.stateMachine = fiber
verifyFlowLogicIsSuspendable(fiber.logic) verifyFlowLogicIsSuspendable(fiber.logic)
fiber.transientValues = createTransientValues(runId, resultFuture) fiber.transientValues = createTransientValues(runId, resultFuture)
@ -92,6 +107,12 @@ class FlowCreator(
return Flow(fiber, resultFuture) return Flow(fiber, resultFuture)
} }
private fun updateCompatibleInDb(runId: StateMachineRunId, compatible: Boolean) {
database.transaction {
checkpointStorage.updateCompatible(runId, compatible)
}
}
@Suppress("LongParameterList") @Suppress("LongParameterList")
fun <A> createFlowFromLogic( fun <A> createFlowFromLogic(
flowId: StateMachineRunId, flowId: StateMachineRunId,
@ -135,36 +156,45 @@ class FlowCreator(
return Flow(flowStateMachineImpl, resultFuture) return Flow(flowStateMachineImpl, resultFuture)
} }
private fun Checkpoint.getFiberFromCheckpoint(runId: StateMachineRunId): FlowStateMachineImpl<*>? {
return when (this.flowState) {
is FlowState.Unstarted -> {
val logic = tryCheckpointDeserialize(this.flowState.frozenFlowLogic, runId) ?: return null
FlowStateMachineImpl(runId, logic, scheduler)
}
is FlowState.Started -> tryCheckpointDeserialize(this.flowState.frozenFiber, runId) ?: return null
// Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint.
else -> null
}
}
@Suppress("TooGenericExceptionCaught") @Suppress("TooGenericExceptionCaught")
private inline fun <reified T : Any> tryCheckpointDeserialize(bytes: SerializedBytes<T>, flowId: StateMachineRunId): T? { private fun Checkpoint.getFiberFromCheckpoint(runId: StateMachineRunId, firstRestore: Boolean): FlowStateMachineImpl<*>? {
return try { try {
bytes.checkpointDeserialize(context = checkpointSerializationContext) return when(flowState) {
is FlowState.Unstarted -> {
val logic = deserializeFlowState(flowState.frozenFlowLogic)
FlowStateMachineImpl(runId, logic, scheduler)
}
is FlowState.Started -> deserializeFlowState(flowState.frozenFiber)
// Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint.
else -> return null
}
} catch (e: Exception) { } catch (e: Exception) {
if (reloadCheckpointAfterSuspend && currentStateMachine() != null) { if (reloadCheckpointAfterSuspend && FlowStateMachineImpl.currentStateMachine() != null) {
logger.error( logger.error(
"Unable to deserialize checkpoint for flow $flowId. [reloadCheckpointAfterSuspend] is turned on, throwing exception", "Unable to deserialize checkpoint for flow $runId. [reloadCheckpointAfterSuspend] is turned on, throwing exception",
e e
) )
throw ReloadFlowFromCheckpointException(e) throw ReloadFlowFromCheckpointException(e)
} else { } else {
logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e) logSerializationError(firstRestore, runId, e)
null return null
} }
} }
} }
private inline fun <reified T : Any> deserializeFlowState(bytes: SerializedBytes<T>): T {
return bytes.checkpointDeserialize(context = checkpointSerializationContext)
}
private fun logSerializationError(firstRestore: Boolean, flowId: StateMachineRunId, exception: Exception) {
if (firstRestore) {
logger.warn("Flow with id $flowId could not be restored from its checkpoint. Normally this means that a CorDapp has been" +
" upgraded without draining the node. To run this flow restart the node after downgrading the CorDapp.", exception)
} else {
logger.error("Unable to deserialize fiber for flow $flowId. Something is very wrong and this flow will be ignored.", exception)
}
}
private fun verifyFlowLogicIsSuspendable(logic: FlowLogic<Any?>) { private fun verifyFlowLogicIsSuspendable(logic: FlowLogic<Any?>) {
// Quasar requires (in Java 8) that at least the call method be annotated suspendable. Unfortunately, it's // Quasar requires (in Java 8) that at least the call method be annotated suspendable. Unfortunately, it's
// easy to forget to add this when creating a new flow, so we check here to give the user a better error. // easy to forget to add this when creating a new flow, so we check here to give the user a better error.

View File

@ -19,7 +19,6 @@ import net.corda.core.internal.castIfPossible
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.mapError import net.corda.core.internal.concurrent.mapError
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.mapNotNull
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
@ -56,7 +55,6 @@ import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.component1 import kotlin.collections.component1
import kotlin.collections.component2 import kotlin.collections.component2
import kotlin.collections.set import kotlin.collections.set
import kotlin.streams.toList
/** /**
* The StateMachineManagerImpl will always invoke the flow fibers on the given [AffinityExecutor], regardless of which * The StateMachineManagerImpl will always invoke the flow fibers on the given [AffinityExecutor], regardless of which
@ -169,12 +167,11 @@ internal class SingleThreadedStateMachineManager(
flowTimeoutScheduler::resetCustomTimeout flowTimeoutScheduler::resetCustomTimeout
) )
val fibers = restoreFlowsFromCheckpoints() val (fibers, pausedFlows) = restoreFlowsFromCheckpoints()
metrics.register("Flows.InFlight", Gauge<Int> { innerState.flows.size }) metrics.register("Flows.InFlight", Gauge<Int> { innerState.flows.size })
setFlowDefaultUncaughtExceptionHandler() setFlowDefaultUncaughtExceptionHandler()
val pausedFlows = restoreNonResidentFlowsFromPausedCheckpoints()
innerState.withLock { innerState.withLock {
this.pausedFlows.putAll(pausedFlows) this.pausedFlows.putAll(pausedFlows)
for ((id, flow) in pausedFlows) { for ((id, flow) in pausedFlows) {
@ -367,30 +364,31 @@ internal class SingleThreadedStateMachineManager(
liveFibers.countUp() liveFibers.countUp()
} }
private fun restoreFlowsFromCheckpoints(): List<Flow<*>> { private fun restoreFlowsFromCheckpoints(): Pair<MutableMap<StateMachineRunId, Flow<*>>, MutableMap<StateMachineRunId, NonResidentFlow>> {
return checkpointStorage.getCheckpointsToRun().use { val flows = mutableMapOf<StateMachineRunId, Flow<*>>()
it.mapNotNull { (id, serializedCheckpoint) -> val pausedFlows = mutableMapOf<StateMachineRunId, NonResidentFlow>()
// If a flow is added before start() then don't attempt to restore it checkpointStorage.getCheckpointsToRun().forEach Checkpoints@{(id, serializedCheckpoint) ->
innerState.withLock { if (id in flows) return@mapNotNull null } // If a flow is added before start() then don't attempt to restore it
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@mapNotNull null innerState.withLock { if (id in flows) return@Checkpoints }
flowCreator.createFlowFromCheckpoint(id, checkpoint) val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@Checkpoints
}.toList() val flow = flowCreator.createFlowFromCheckpoint(id, checkpoint)
if (flow == null) {
// Set the flowState to paused so we don't waste memory storing it anymore.
pausedFlows[id] = NonResidentFlow(id, checkpoint.copy(flowState = FlowState.Paused), resumable = false)
} else {
flows[id] = flow
}
} }
checkpointStorage.getPausedCheckpoints().forEach Checkpoints@{ (id, serializedCheckpoint) ->
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@Checkpoints
pausedFlows[id] = NonResidentFlow(id, checkpoint)
}
return Pair(flows, pausedFlows)
} }
private fun restoreNonResidentFlowsFromPausedCheckpoints(): Map<StateMachineRunId, NonResidentFlow> { private fun resumeRestoredFlows(flows: Map<StateMachineRunId, Flow<*>>) {
return checkpointStorage.getPausedCheckpoints().use { for ((id, flow) in flows.entries) {
it.mapNotNull { (id, serializedCheckpoint) -> addAndStartFlow(id, flow)
// If a flow is added before start() then don't attempt to restore it
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@mapNotNull null
id to NonResidentFlow(id, checkpoint)
}.toList().toMap()
}
}
private fun resumeRestoredFlows(flows: List<Flow<*>>) {
for (flow in flows) {
addAndStartFlow(flow.fiber.id, flow)
} }
} }
@ -426,7 +424,8 @@ internal class SingleThreadedStateMachineManager(
flowId, flowId,
checkpoint, checkpoint,
currentState.reloadCheckpointAfterSuspendCount, currentState.reloadCheckpointAfterSuspendCount,
currentState.lock currentState.lock,
firstRestore = false
) ?: return ) ?: return
} else { } else {
// Just flow initiation message // Just flow initiation message

View File

@ -62,8 +62,8 @@ class TopLevelTransition(
is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition() is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition()
is Event.OvernightObservation -> overnightObservationTransition() is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition() is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
is Event.TerminateSessions -> terminateSessionsTransition(event)
is Event.Pause -> pausedFlowTransition() is Event.Pause -> pausedFlowTransition()
is Event.TerminateSessions -> terminateSessionsTransition(event)
} }
} }

View File

@ -890,6 +890,24 @@ class DBCheckpointStorageTests {
} }
} }
@Test(timeout = 300_000)
fun `update only compatible`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
}
database.transaction {
checkpointStorage.updateCompatible(id, !checkpoint.compatible)
}
database.transaction {
assertEquals(
checkpoint.copy(compatible = !checkpoint.compatible),
checkpointStorage.checkpoints().single().deserialize()
)
}
}
data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint) data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint)
private fun changeStatus(oldCheckpoint: Checkpoint, status: Checkpoint.FlowStatus): IdAndCheckpoint { private fun changeStatus(oldCheckpoint: Checkpoint, status: Checkpoint.FlowStatus): IdAndCheckpoint {