From 2204f443324babb68155627a62156e1bc5a51af7 Mon Sep 17 00:00:00 2001 From: LankyDan Date: Wed, 8 Jul 2020 14:41:29 +0100 Subject: [PATCH] ENT-5196 handle errors during flow initialisation (#6378) Changes to `TopLevelTransition` after merging from earlier releases. When a flow is kept for observation and its checkpoint is saved as HOSPITALIZED in the database, we must acknowledge the session init and flow start events so that they are not replayed on node startup. Otherwise the same flow will be ran twice when the node is restarted, one from the checkpoint and one from artemis. --- .../transitions/TopLevelTransition.kt | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 887041b82c..1b7d79dfec 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -3,7 +3,9 @@ package net.corda.node.services.statemachine.transitions import net.corda.core.crypto.SecureHash import net.corda.core.flows.InitiatingFlow import net.corda.core.internal.FlowIORequest +import net.corda.core.serialization.deserialize import net.corda.core.utilities.Try +import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.statemachine.Action import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.DeduplicationId @@ -11,12 +13,15 @@ import net.corda.node.services.statemachine.EndSessionMessage import net.corda.node.services.statemachine.ErrorState import net.corda.node.services.statemachine.Event import net.corda.node.services.statemachine.ExistingSessionMessage +import net.corda.node.services.statemachine.ExternalEvent import net.corda.node.services.statemachine.FlowRemovalReason import net.corda.node.services.statemachine.FlowSessionImpl import net.corda.node.services.statemachine.FlowState +import net.corda.node.services.statemachine.InitialSessionMessage import net.corda.node.services.statemachine.InitiatedSessionState import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.services.statemachine.SessionId +import net.corda.node.services.statemachine.SessionMessage import net.corda.node.services.statemachine.SessionState import net.corda.node.services.statemachine.StateMachineState import net.corda.node.services.statemachine.SubFlow @@ -321,15 +326,33 @@ class TopLevelTransition( private fun overnightObservationTransition(): TransitionResult { return builder { + val flowStartEvents = currentState.pendingDeduplicationHandlers.filter(::isFlowStartEvent) val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED) - actions.add(Action.CreateTransaction) - actions.add(Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted)) - actions.add(Action.CommitTransaction) - currentState = currentState.copy(checkpoint = newCheckpoint) + actions += Action.CreateTransaction + actions += Action.PersistDeduplicationFacts(flowStartEvents) + actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted) + actions += Action.CommitTransaction + actions += Action.AcknowledgeMessages(flowStartEvents) + currentState = currentState.copy( + checkpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED), + pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers - flowStartEvents + ) FlowContinuation.ProcessEvents } } + private fun isFlowStartEvent(handler: DeduplicationHandler): Boolean { + return handler.externalCause.run { isSessionInit() || isFlowStart() } + } + + private fun ExternalEvent.isSessionInit(): Boolean { + return this is ExternalEvent.ExternalMessageEvent && this.receivedMessage.data.deserialize() is InitialSessionMessage + } + + private fun ExternalEvent.isFlowStart(): Boolean { + return this is ExternalEvent.ExternalStartFlowEvent<*> + } + private fun wakeUpFromSleepTransition(): TransitionResult { return builder { resumeFlowLogic(Unit)