From 82bcde573b15650e477bb7df822288da7e985c8e Mon Sep 17 00:00:00 2001 From: LankyDan Date: Fri, 31 Jul 2020 14:28:57 +0100 Subject: [PATCH] NOTICK Resume flow when wrong message received When an incorrect message is received, the flow should resume to allow it to throw the error back to user code and possibly cause the flow to fail. For now, if an `EndSessionMessage` is received instead of a `DataSessionMessage`, then an `UnexpectedFlowEndException` is thrown back to user code. Allowing it to correctly re-enter normal flow error handling. Without this change, the flow will hang due to it failing while creating a transition which exists outside of the general state machine error handling code path. --- .../transitions/StartedFlowTransition.kt | 50 +++++++++++++------ .../statemachine/RetryFlowMockTest.kt | 2 +- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 9b2469face..0911ed18a4 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -129,6 +129,7 @@ class StartedFlowTransition( } } + @Suppress("TooGenericExceptionCaught") private fun sendAndReceiveTransition(flowIORequest: FlowIORequest.SendAndReceive): TransitionResult { val sessionIdToMessage = LinkedHashMap>() val sessionIdToSession = LinkedHashMap() @@ -142,18 +143,23 @@ class StartedFlowTransition( if (isErrored()) { FlowContinuation.ProcessEvents } else { - val receivedMap = receiveFromSessionsTransition(sessionIdToSession) - if (receivedMap == null) { - // We don't yet have the messages, change the suspension to be on Receive - val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet()) - currentState = currentState.copy( + try { + val receivedMap = receiveFromSessionsTransition(sessionIdToSession) + if (receivedMap == null) { + // We don't yet have the messages, change the suspension to be on Receive + val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet()) + currentState = currentState.copy( checkpoint = currentState.checkpoint.copy( - flowState = FlowState.Started(newIoRequest, started.frozenFiber) + flowState = FlowState.Started(newIoRequest, started.frozenFiber) ) - ) - FlowContinuation.ProcessEvents - } else { - resumeFlowLogic(receivedMap) + ) + FlowContinuation.ProcessEvents + } else { + resumeFlowLogic(receivedMap) + } + } catch (t: Throwable) { + // E.g. A session end message received while expecting a data session message + resumeFlowLogic(t) } } } @@ -187,6 +193,7 @@ class StartedFlowTransition( } } + @Suppress("TooGenericExceptionCaught") private fun receiveTransition(flowIORequest: FlowIORequest.Receive): TransitionResult { return builder { val sessionIdToSession = LinkedHashMap() @@ -195,11 +202,16 @@ class StartedFlowTransition( } // send initialises to uninitialised sessions sendInitialSessionMessagesIfNeeded(sessionIdToSession.keys) - val receivedMap = receiveFromSessionsTransition(sessionIdToSession) - if (receivedMap == null) { - FlowContinuation.ProcessEvents - } else { - resumeFlowLogic(receivedMap) + try { + val receivedMap = receiveFromSessionsTransition(sessionIdToSession) + if (receivedMap == null) { + FlowContinuation.ProcessEvents + } else { + resumeFlowLogic(receivedMap) + } + } catch (t: Throwable) { + // E.g. A session end message received while expecting a data session message + resumeFlowLogic(t) } } } @@ -224,6 +236,8 @@ class StartedFlowTransition( val messages: Map>, val newSessionMap: SessionMap ) + + @Suppress("ComplexMethod", "NestedBlockDepth") private fun pollSessionMessages(sessions: SessionMap, sessionIds: Set): PollResult? { val newSessionMessages = LinkedHashMap(sessions) val resultMessages = LinkedHashMap>() @@ -238,7 +252,11 @@ class StartedFlowTransition( } else { newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toList()) // at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message. - resultMessages[sessionId] = (messages[0] as DataSessionMessage).payload + resultMessages[sessionId] = if (messages[0] is EndSessionMessage) { + throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?") + } else { + (messages[0] as DataSessionMessage).payload + } } } else -> { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index ee93d937d2..061345efe7 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -97,7 +97,7 @@ class RetryFlowMockTest { } @Test(timeout=300_000) - fun `Restart does not set senderUUID`() { + fun `Restart does not set senderUUID and early end session message does not hang receiving flow`() { val messagesSent = Collections.synchronizedList(mutableListOf()) val partyB = nodeB.info.legalIdentities.first() nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {