ENT-5196 handle errors during flow initialisation (#6378)

Changes to `TopLevelTransition` after merging from earlier releases.

When a flow is kept for observation and its checkpoint is saved as
HOSPITALIZED in the database, we must acknowledge the session init and
flow start events so that they are not replayed on node startup.

Otherwise the same flow will be ran twice when the node is restarted,
one from the checkpoint and one from artemis.
This commit is contained in:
LankyDan 2020-07-08 14:41:29 +01:00
parent fdae04fc28
commit 2204f44332

View File

@ -3,7 +3,9 @@ package net.corda.node.services.statemachine.transitions
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.InitiatingFlow
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.DeduplicationId
@ -11,12 +13,15 @@ import net.corda.node.services.statemachine.EndSessionMessage
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.ExistingSessionMessage
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.FlowRemovalReason
import net.corda.node.services.statemachine.FlowSessionImpl
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.InitialSessionMessage
import net.corda.node.services.statemachine.InitiatedSessionState
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionMessage
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.SubFlow
@ -321,15 +326,33 @@ class TopLevelTransition(
private fun overnightObservationTransition(): TransitionResult {
return builder {
val flowStartEvents = currentState.pendingDeduplicationHandlers.filter(::isFlowStartEvent)
val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
actions.add(Action.CreateTransaction)
actions.add(Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted))
actions.add(Action.CommitTransaction)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions += Action.CreateTransaction
actions += Action.PersistDeduplicationFacts(flowStartEvents)
actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
actions += Action.CommitTransaction
actions += Action.AcknowledgeMessages(flowStartEvents)
currentState = currentState.copy(
checkpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED),
pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers - flowStartEvents
)
FlowContinuation.ProcessEvents
}
}
private fun isFlowStartEvent(handler: DeduplicationHandler): Boolean {
return handler.externalCause.run { isSessionInit() || isFlowStart() }
}
private fun ExternalEvent.isSessionInit(): Boolean {
return this is ExternalEvent.ExternalMessageEvent && this.receivedMessage.data.deserialize<SessionMessage>() is InitialSessionMessage
}
private fun ExternalEvent.isFlowStart(): Boolean {
return this is ExternalEvent.ExternalStartFlowEvent<*>
}
private fun wakeUpFromSleepTransition(): TransitionResult {
return builder {
resumeFlowLogic(Unit)