CORDA-3989 Terminate sessions instantly (#6634)

Terminate sessions that need to be removed instantly in whatever transition is currently executing, rather than scheduling another event and doing so at a later time.

To do this, update the transition being created in `TopLevelTransition` to remove the sessions and append the `RemoveSessionBindings` action to it.

This achieves the same outcome as the original code but does so with 1 less transition. Doing this also removes the race condition that can occur where another external event is added to the flow's event queue before the terminate event could be added.
This commit is contained in:
Dan Newton 2020-08-14 11:13:42 +01:00 committed by GitHub
parent 1cbfb74022
commit 845ef8d3d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 18 additions and 27 deletions

View File

@ -186,13 +186,6 @@ sealed class Event {
override fun toString() = "Pause"
}
/**
* Terminate the specified [sessions], removing them from in-memory datastructures.
*
* @param sessions The sessions to terminate
*/
data class TerminateSessions(val sessions: Set<SessionId>) : Event()
/**
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
* even if it has not yet been processed and placed on the pending de-duplication handlers list.

View File

@ -158,16 +158,17 @@ data class Checkpoint(
return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed + sessionIds))
}
fun removeSessionsToBeClosed(sessionIds: Set<SessionId>): Checkpoint {
return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed - sessionIds))
}
/**
* Returns a copy of the Checkpoint with the specified session removed from the session map.
* @param sessionIds the sessions to remove.
*/
fun removeSessions(sessionIds: Set<SessionId>): Checkpoint {
return copy(checkpointState = checkpointState.copy(sessions = checkpointState.sessions - sessionIds))
return copy(
checkpointState = checkpointState.copy(
sessions = checkpointState.sessions - sessionIds,
sessionsToBeClosed = checkpointState.sessionsToBeClosed - sessionIds
)
)
}
/**

View File

@ -528,10 +528,14 @@ class StartedFlowTransition(
}
private fun scheduleTerminateSessionsIfRequired(transition: TransitionResult): TransitionResult {
// If there are sessions to be closed, close them on a following transition
// If there are sessions to be closed, close them on the currently executing transition
val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState)
return if (sessionsToBeTerminated.isNotEmpty()) {
transition.copy(actions = transition.actions + Action.ScheduleEvent(Event.TerminateSessions(sessionsToBeTerminated.keys)))
val checkpointWithSessionsRemoved = transition.newState.checkpoint.removeSessions(sessionsToBeTerminated.keys)
transition.copy(
newState = transition.newState.copy(checkpoint = checkpointWithSessionsRemoved),
actions = transition.actions + Action.RemoveSessionBindings(sessionsToBeTerminated.keys)
)
} else {
transition
}

View File

@ -5,6 +5,7 @@ import net.corda.core.flows.InitiatingFlow
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.Checkpoint
@ -37,6 +38,10 @@ class TopLevelTransition(
val event: Event
) : Transition {
private companion object {
val log = contextLogger()
}
@Suppress("ComplexMethod", "TooGenericExceptionCaught")
override fun transition(): TransitionResult {
return try {
@ -63,11 +68,11 @@ class TopLevelTransition(
is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
is Event.Pause -> pausedFlowTransition()
is Event.TerminateSessions -> terminateSessionsTransition(event)
}
} catch (t: Throwable) {
// All errors coming from the transition should be sent back to the flow
// Letting the flow re-enter standard error handling
log.error("Error occurred while creating transition for event: $event", t)
builder { resumeFlowLogic(t) }
}
}
@ -401,16 +406,4 @@ class TopLevelTransition(
FlowContinuation.Abort
}
}
private fun terminateSessionsTransition(event: Event.TerminateSessions): TransitionResult {
return builder {
val sessions = event.sessions
val newCheckpoint = currentState.checkpoint
.removeSessions(sessions)
.removeSessionsToBeClosed(sessions)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(Action.RemoveSessionBindings(sessions))
FlowContinuation.ProcessEvents
}
}
}