Merge pull request #6524 from corda/ENT-5532-retrying-flow-with-sessions-to-close

ENT-5532 Terminate sessions after original io request
NOTICK Resume flow when wrong message received
This commit is contained in:
Dan Newton 2020-07-31 17:08:43 +01:00 committed by GitHub
commit c288073e7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 89 additions and 59 deletions

View File

@ -139,7 +139,7 @@ sealed class Event {
data class AsyncOperationCompletion(val returnValue: Any?) : Event()
/**
* Signals the faiure of a [FlowAsyncOperation].
* Signals the failure of a [FlowAsyncOperation].
*
* Scheduling is triggered by the service that completes the future returned by the async operation.
*
@ -179,6 +179,13 @@ sealed class Event {
override fun toString() = "WakeUpSleepyFlow"
}
/**
* Terminate the specified [sessions], removing them from in-memory datastructures.
*
* @param sessions The sessions to terminate
*/
data class TerminateSessions(val sessions: Set<SessionId>) : Event()
/**
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
* even if it has not yet been processed and placed on the pending de-duplication handlers list.

View File

@ -41,47 +41,18 @@ class StartedFlowTransition(
continuation = FlowContinuation.Throw(errorsToThrow[0])
)
}
val sessionsToBeTerminated = findSessionsToBeTerminated(startingState)
// if there are sessions to be closed, we close them as part of this transition and normal processing will continue on the next transition.
return if (sessionsToBeTerminated.isNotEmpty()) {
terminateSessions(sessionsToBeTerminated)
} else {
when (flowIORequest) {
is FlowIORequest.Send -> sendTransition(flowIORequest)
is FlowIORequest.Receive -> receiveTransition(flowIORequest)
is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest)
is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
}
}
}
private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap {
return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId ->
val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated
if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) {
sessionId to sessionState
} else {
null
}
}.toMap()
}
private fun terminateSessions(sessionsToBeTerminated: SessionMap): TransitionResult {
return builder {
val sessionsToRemove = sessionsToBeTerminated.keys
val newCheckpoint = currentState.checkpoint.removeSessions(sessionsToRemove)
.removeSessionsToBeClosed(sessionsToRemove)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(Action.RemoveSessionBindings(sessionsToRemove))
actions.add(Action.ScheduleEvent(Event.DoRemainingWork))
FlowContinuation.ProcessEvents
}
return when (flowIORequest) {
is FlowIORequest.Send -> sendTransition(flowIORequest)
is FlowIORequest.Receive -> receiveTransition(flowIORequest)
is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest)
is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
}.let { scheduleTerminateSessionsIfRequired(it) }
}
private fun waitForSessionConfirmationsTransition(): TransitionResult {
@ -158,6 +129,7 @@ class StartedFlowTransition(
}
}
@Suppress("TooGenericExceptionCaught")
private fun sendAndReceiveTransition(flowIORequest: FlowIORequest.SendAndReceive): TransitionResult {
val sessionIdToMessage = LinkedHashMap<SessionId, SerializedBytes<Any>>()
val sessionIdToSession = LinkedHashMap<SessionId, FlowSessionImpl>()
@ -171,18 +143,23 @@ class StartedFlowTransition(
if (isErrored()) {
FlowContinuation.ProcessEvents
} else {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
// We don't yet have the messages, change the suspension to be on Receive
val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet())
currentState = currentState.copy(
try {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
// We don't yet have the messages, change the suspension to be on Receive
val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet())
currentState = currentState.copy(
checkpoint = currentState.checkpoint.copy(
flowState = FlowState.Started(newIoRequest, started.frozenFiber)
flowState = FlowState.Started(newIoRequest, started.frozenFiber)
)
)
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
)
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
}
} catch (t: Throwable) {
// E.g. A session end message received while expecting a data session message
resumeFlowLogic(t)
}
}
}
@ -216,6 +193,7 @@ class StartedFlowTransition(
}
}
@Suppress("TooGenericExceptionCaught")
private fun receiveTransition(flowIORequest: FlowIORequest.Receive): TransitionResult {
return builder {
val sessionIdToSession = LinkedHashMap<SessionId, FlowSessionImpl>()
@ -224,11 +202,16 @@ class StartedFlowTransition(
}
// send initialises to uninitialised sessions
sendInitialSessionMessagesIfNeeded(sessionIdToSession.keys)
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
try {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
}
} catch (t: Throwable) {
// E.g. A session end message received while expecting a data session message
resumeFlowLogic(t)
}
}
}
@ -253,6 +236,8 @@ class StartedFlowTransition(
val messages: Map<SessionId, SerializedBytes<Any>>,
val newSessionMap: SessionMap
)
@Suppress("ComplexMethod", "NestedBlockDepth")
private fun pollSessionMessages(sessions: SessionMap, sessionIds: Set<SessionId>): PollResult? {
val newSessionMessages = LinkedHashMap(sessions)
val resultMessages = LinkedHashMap<SessionId, SerializedBytes<Any>>()
@ -267,7 +252,11 @@ class StartedFlowTransition(
} else {
newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toList())
// at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message.
resultMessages[sessionId] = (messages[0] as DataSessionMessage).payload
resultMessages[sessionId] = if (messages[0] is EndSessionMessage) {
throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?")
} else {
(messages[0] as DataSessionMessage).payload
}
}
}
else -> {
@ -537,4 +526,25 @@ class StartedFlowTransition(
private fun executeForceCheckpoint(): TransitionResult {
return builder { resumeFlowLogic(Unit) }
}
private fun scheduleTerminateSessionsIfRequired(transition: TransitionResult): TransitionResult {
// If there are sessions to be closed, close them on a following transition
val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState)
return if (sessionsToBeTerminated.isNotEmpty()) {
transition.copy(actions = transition.actions + Action.ScheduleEvent(Event.TerminateSessions(sessionsToBeTerminated.keys)))
} else {
transition
}
}
private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap {
return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId ->
val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated
if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) {
sessionId to sessionState
} else {
null
}
}.toMap()
}
}

View File

@ -62,6 +62,7 @@ class TopLevelTransition(
is Event.ReloadFlowFromCheckpointAfterSuspend -> reloadFlowFromCheckpointAfterSuspendTransition()
is Event.OvernightObservation -> overnightObservationTransition()
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
is Event.TerminateSessions -> terminateSessionsTransition(event)
}
}
@ -366,4 +367,16 @@ class TopLevelTransition(
resumeFlowLogic(Unit)
}
}
private fun terminateSessionsTransition(event: Event.TerminateSessions): TransitionResult {
return builder {
val sessions = event.sessions
val newCheckpoint = currentState.checkpoint
.removeSessions(sessions)
.removeSessionsToBeClosed(sessions)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(Action.RemoveSessionBindings(sessions))
FlowContinuation.ProcessEvents
}
}
}

View File

@ -97,7 +97,7 @@ class RetryFlowMockTest {
}
@Test(timeout=300_000)
fun `Restart does not set senderUUID`() {
fun `Restart does not set senderUUID and early end session message does not hang receiving flow`() {
val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
val partyB = nodeB.info.legalIdentities.first()
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {