CORDA-1836: Checkpoints which cannot be deserialised no longer prevent the node from starting up (#5367)

Instead they are logged with the flow ID. The kill flow RPC has been updated to delete such checkpoints.
This commit is contained in:
Shams Asari 2019-09-13 17:27:20 +01:00 committed by GitHub
parent e2fcc1edd5
commit 39adfbb9ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -133,7 +133,12 @@ class SingleThreadedStateMachineManager(
override fun start(tokenizableServices: List<Any>) { override fun start(tokenizableServices: List<Any>) {
checkQuasarJavaAgentPresence() checkQuasarJavaAgentPresence()
val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext( val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
CheckpointSerializeAsTokenContextImpl(tokenizableServices, CheckpointSerializationDefaults.CHECKPOINT_SERIALIZER, CheckpointSerializationDefaults.CHECKPOINT_CONTEXT, serviceHub) CheckpointSerializeAsTokenContextImpl(
tokenizableServices,
CheckpointSerializationDefaults.CHECKPOINT_SERIALIZER,
CheckpointSerializationDefaults.CHECKPOINT_CONTEXT,
serviceHub
)
) )
this.checkpointSerializationContext = checkpointSerializationContext this.checkpointSerializationContext = checkpointSerializationContext
this.actionExecutor = makeActionExecutor(checkpointSerializationContext) this.actionExecutor = makeActionExecutor(checkpointSerializationContext)
@ -239,9 +244,10 @@ class SingleThreadedStateMachineManager(
unfinishedFibers.countDown() unfinishedFibers.countDown()
} }
} else { } else {
// TODO replace with a clustered delete after we'll support clustered nodes // It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists.
logger.debug("Unable to kill a flow unknown to physical node. Might be processed by another physical node.") database.transaction {
false checkpointStorage.removeCheckpoint(id)
}
} }
} }
} }
@ -320,11 +326,9 @@ class SingleThreadedStateMachineManager(
it.mapNotNull { (id, serializedCheckpoint) -> it.mapNotNull { (id, serializedCheckpoint) ->
// If a flow is added before start() then don't attempt to restore it // If a flow is added before start() then don't attempt to restore it
mutex.locked { if (id in flows) return@mapNotNull null } mutex.locked { if (id in flows) return@mapNotNull null }
val checkpoint = deserializeCheckpoint(serializedCheckpoint) ?: return@mapNotNull null
logger.debug { "Restored $checkpoint" }
createFlowFromCheckpoint( createFlowFromCheckpoint(
id = id, id = id,
checkpoint = checkpoint, serializedCheckpoint = serializedCheckpoint,
initialDeduplicationHandler = null, initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true, isAnyCheckpointPersisted = true,
isStartIdempotent = false isStartIdempotent = false
@ -348,24 +352,21 @@ class SingleThreadedStateMachineManager(
return return
} }
val flow = if (currentState.isAnyCheckpointPersisted) { val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) { if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.") logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return return
} }
val checkpoint = deserializeCheckpoint(serializedCheckpoint)
if (checkpoint == null) {
logger.error("Unable to deserialize database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
// Resurrect flow // Resurrect flow
createFlowFromCheckpoint( createFlowFromCheckpoint(
id = flowId, id = flowId,
checkpoint = checkpoint, serializedCheckpoint = serializedCheckpoint,
initialDeduplicationHandler = null, initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true, isAnyCheckpointPersisted = true,
isStartIdempotent = false isStartIdempotent = false
) ) ?: return
} else { } else {
// Just flow initiation message // Just flow initiation message
null null
@ -656,15 +657,6 @@ class SingleThreadedStateMachineManager(
} }
} }
private fun deserializeCheckpoint(serializedCheckpoint: SerializedBytes<Checkpoint>): Checkpoint? {
return try {
serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext!!)
} catch (exception: Exception) {
logger.error("Encountered unrestorable checkpoint!", exception)
null
}
}
private fun verifyFlowLogicIsSuspendable(logic: FlowLogic<Any?>) { private fun verifyFlowLogicIsSuspendable(logic: FlowLogic<Any?>) {
// Quasar requires (in Java 8) that at least the call method be annotated suspendable. Unfortunately, it's // Quasar requires (in Java 8) that at least the call method be annotated suspendable. Unfortunately, it's
// easy to forget to add this when creating a new flow, so we check here to give the user a better error. // easy to forget to add this when creating a new flow, so we check here to give the user a better error.
@ -693,18 +685,28 @@ class SingleThreadedStateMachineManager(
) )
} }
private inline fun <reified T : Any> tryCheckpointDeserialize(bytes: SerializedBytes<T>, flowId: StateMachineRunId): T? {
return try {
bytes.checkpointDeserialize(context = checkpointSerializationContext!!)
} catch (e: Exception) {
logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e)
null
}
}
private fun createFlowFromCheckpoint( private fun createFlowFromCheckpoint(
id: StateMachineRunId, id: StateMachineRunId,
checkpoint: Checkpoint, serializedCheckpoint: SerializedBytes<Checkpoint>,
isAnyCheckpointPersisted: Boolean, isAnyCheckpointPersisted: Boolean,
isStartIdempotent: Boolean, isStartIdempotent: Boolean,
initialDeduplicationHandler: DeduplicationHandler? initialDeduplicationHandler: DeduplicationHandler?
): Flow { ): Flow? {
val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, id) ?: return null
val flowState = checkpoint.flowState val flowState = checkpoint.flowState
val resultFuture = openFuture<Any?>() val resultFuture = openFuture<Any?>()
val fiber = when (flowState) { val fiber = when (flowState) {
is FlowState.Unstarted -> { is FlowState.Unstarted -> {
val logic = flowState.frozenFlowLogic.checkpointDeserialize(context = checkpointSerializationContext!!) val logic = tryCheckpointDeserialize(flowState.frozenFlowLogic, id) ?: return null
val state = StateMachineState( val state = StateMachineState(
checkpoint = checkpoint, checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
@ -723,7 +725,7 @@ class SingleThreadedStateMachineManager(
fiber fiber
} }
is FlowState.Started -> { is FlowState.Started -> {
val fiber = flowState.frozenFiber.checkpointDeserialize(context = checkpointSerializationContext!!) val fiber = tryCheckpointDeserialize(flowState.frozenFiber, id) ?: return null
val state = StateMachineState( val state = StateMachineState(
checkpoint = checkpoint, checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),