ENT-5532 Terminate sessions after original io request

Sessions are now terminated after performing the original
`FlowIORequest` passed into `StartedFlowTransition`, instead of before.
This is done by scheduling an `Event.TerminateSessions` if there are
sessions to terminate when performing a suspending event.

Originally this was done by hijacking a transition that is trying to
perform a `StartedFlowTransition`, terminating the sessions and then
scheduling another `Event.DoRemainingWork` to perform the original
transition. This introduced a bug where, another event (from a external
message) could be placed onto the queue before the
`Event.DoRemainingWork` could be added. In most scenarios, that should
be ok. But, if a flow is retrying (while in an uninitiated state) and
this occurs the flow could fail due to being in an unexpected state.

Terminating the sessions after performing the original transition
removes this possibility. Meaning that a restarting flow will always
perform the transition they supposed to do (based on the called
suspending event).
This commit is contained in:
LankyDan 2020-07-31 12:37:44 +01:00
parent c91dba83d3
commit 39dbe22c9d
3 changed files with 54 additions and 42 deletions

View File

@ -139,7 +139,7 @@ sealed class Event {
data class AsyncOperationCompletion(val returnValue: Any?) : Event()
/**
* Signals the faiure of a [FlowAsyncOperation].
* Signals the failure of a [FlowAsyncOperation].
*
* Scheduling is triggered by the service that completes the future returned by the async operation.
*
@ -179,6 +179,13 @@ sealed class Event {
override fun toString() = "WakeUpSleepyFlow"
}
/**
* Terminate the specified [sessions], removing them from in-memory datastructures.
*
* @param sessions The sessions to terminate
*/
data class TerminateSessions(val sessions: Set<SessionId>) : Event()
/**
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
* even if it has not yet been processed and placed on the pending de-duplication handlers list.

View File

@ -41,47 +41,18 @@ class StartedFlowTransition(
continuation = FlowContinuation.Throw(errorsToThrow[0])
)
}
val sessionsToBeTerminated = findSessionsToBeTerminated(startingState)
// if there are sessions to be closed, we close them as part of this transition and normal processing will continue on the next transition.
return if (sessionsToBeTerminated.isNotEmpty()) {
terminateSessions(sessionsToBeTerminated)
} else {
when (flowIORequest) {
is FlowIORequest.Send -> sendTransition(flowIORequest)
is FlowIORequest.Receive -> receiveTransition(flowIORequest)
is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest)
is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
}
}
}
private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap {
return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId ->
val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated
if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) {
sessionId to sessionState
} else {
null
}
}.toMap()
}
private fun terminateSessions(sessionsToBeTerminated: SessionMap): TransitionResult {
return builder {
val sessionsToRemove = sessionsToBeTerminated.keys
val newCheckpoint = currentState.checkpoint.removeSessions(sessionsToRemove)
.removeSessionsToBeClosed(sessionsToRemove)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(Action.RemoveSessionBindings(sessionsToRemove))
actions.add(Action.ScheduleEvent(Event.DoRemainingWork))
FlowContinuation.ProcessEvents
}
return when (flowIORequest) {
is FlowIORequest.Send -> sendTransition(flowIORequest)
is FlowIORequest.Receive -> receiveTransition(flowIORequest)
is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest)
is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
}.let { scheduleTerminateSessionsIfRequired(it) }
}
private fun waitForSessionConfirmationsTransition(): TransitionResult {
@ -537,4 +508,25 @@ class StartedFlowTransition(
private fun executeForceCheckpoint(): TransitionResult {
return builder { resumeFlowLogic(Unit) }
}
private fun scheduleTerminateSessionsIfRequired(transition: TransitionResult): TransitionResult {
// If there are sessions to be closed, close them on a following transition
val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState)
return if (sessionsToBeTerminated.isNotEmpty()) {
transition.copy(actions = transition.actions + Action.ScheduleEvent(Event.TerminateSessions(sessionsToBeTerminated.keys)))
} else {
transition
}
}
private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap {
return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId ->
val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated
if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) {
sessionId to sessionState
} else {
null
}
}.toMap()
}
}

View File

@ -62,6 +62,7 @@ class TopLevelTransition(
is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition()
is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
is Event.TerminateSessions -> terminateSessionsTransition(event)
}
}
@ -366,4 +367,16 @@ class TopLevelTransition(
resumeFlowLogic(Unit)
}
}
private fun terminateSessionsTransition(event: Event.TerminateSessions): TransitionResult {
return builder {
val sessions = event.sessions
val newCheckpoint = currentState.checkpoint
.removeSessions(sessions)
.removeSessionsToBeClosed(sessions)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(Action.RemoveSessionBindings(sessions))
FlowContinuation.ProcessEvents
}
}
}