From 845ef8d3d18ba0468975dd004e1942d3002b30b8 Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Fri, 14 Aug 2020 11:13:42 +0100 Subject: [PATCH] CORDA-3989 Terminate sessions instantly (#6634) Terminate sessions that need to be removed instantly in whatever transition is currently executing, rather than scheduling another event and doing so at a later time. To do this, update the transition being created in `TopLevelTransition` to remove the sessions and append the `RemoveSessionBindings` action to it. This achieves the same outcome as the original code but does so with 1 less transition. Doing this also removes the race condition that can occur where another external event is added to the flow's event queue before the terminate event could be added. --- .../corda/node/services/statemachine/Event.kt | 7 ------- .../statemachine/StateMachineState.kt | 11 ++++++----- .../transitions/StartedFlowTransition.kt | 8 ++++++-- .../transitions/TopLevelTransition.kt | 19 ++++++------------- 4 files changed, 18 insertions(+), 27 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt index 5f1f86c671..10659b948e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt @@ -186,13 +186,6 @@ sealed class Event { override fun toString() = "Pause" } - /** - * Terminate the specified [sessions], removing them from in-memory datastructures. - * - * @param sessions The sessions to terminate - */ - data class TerminateSessions(val sessions: Set) : Event() - /** * Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow, * even if it has not yet been processed and placed on the pending de-duplication handlers list. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index b734810f0b..c53cf81dc0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -158,16 +158,17 @@ data class Checkpoint( return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed + sessionIds)) } - fun removeSessionsToBeClosed(sessionIds: Set): Checkpoint { - return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed - sessionIds)) - } - /** * Returns a copy of the Checkpoint with the specified session removed from the session map. * @param sessionIds the sessions to remove. */ fun removeSessions(sessionIds: Set): Checkpoint { - return copy(checkpointState = checkpointState.copy(sessions = checkpointState.sessions - sessionIds)) + return copy( + checkpointState = checkpointState.copy( + sessions = checkpointState.sessions - sessionIds, + sessionsToBeClosed = checkpointState.sessionsToBeClosed - sessionIds + ) + ) } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 0911ed18a4..5ac73c799e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -528,10 +528,14 @@ class StartedFlowTransition( } private fun scheduleTerminateSessionsIfRequired(transition: TransitionResult): TransitionResult { - // If there are sessions to be closed, close them on a following transition + // If there are sessions to be closed, close them on the currently executing transition val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState) return if (sessionsToBeTerminated.isNotEmpty()) { - transition.copy(actions = transition.actions + Action.ScheduleEvent(Event.TerminateSessions(sessionsToBeTerminated.keys))) + val checkpointWithSessionsRemoved = transition.newState.checkpoint.removeSessions(sessionsToBeTerminated.keys) + transition.copy( + newState = transition.newState.copy(checkpoint = checkpointWithSessionsRemoved), + actions = transition.actions + Action.RemoveSessionBindings(sessionsToBeTerminated.keys) + ) } else { transition } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 4cfd564ce9..a7e408503f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -5,6 +5,7 @@ import net.corda.core.flows.InitiatingFlow import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.deserialize import net.corda.core.utilities.Try +import net.corda.core.utilities.contextLogger import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.statemachine.Action import net.corda.node.services.statemachine.Checkpoint @@ -37,6 +38,10 @@ class TopLevelTransition( val event: Event ) : Transition { + private companion object { + val log = contextLogger() + } + @Suppress("ComplexMethod", "TooGenericExceptionCaught") override fun transition(): TransitionResult { return try { @@ -63,11 +68,11 @@ class TopLevelTransition( is Event.OvernightObservation -> overnightObservationTransition() is Event.WakeUpFromSleep -> wakeUpFromSleepTransition() is Event.Pause -> pausedFlowTransition() - is Event.TerminateSessions -> terminateSessionsTransition(event) } } catch (t: Throwable) { // All errors coming from the transition should be sent back to the flow // Letting the flow re-enter standard error handling + log.error("Error occurred while creating transition for event: $event", t) builder { resumeFlowLogic(t) } } } @@ -401,16 +406,4 @@ class TopLevelTransition( FlowContinuation.Abort } } - - private fun terminateSessionsTransition(event: Event.TerminateSessions): TransitionResult { - return builder { - val sessions = event.sessions - val newCheckpoint = currentState.checkpoint - .removeSessions(sessions) - .removeSessionsToBeClosed(sessions) - currentState = currentState.copy(checkpoint = newCheckpoint) - actions.add(Action.RemoveSessionBindings(sessions)) - FlowContinuation.ProcessEvents - } - } }