NOTICK Resume flow when wrong message received

When an incorrect message is received, the flow should resume to allow
it to throw the error back to user code and possibly cause the flow to
fail.

For now, if an `EndSessionMessage` is received instead of a
`DataSessionMessage`, then an `UnexpectedFlowEndException` is thrown
back to user code. Allowing it to correctly re-enter normal flow error
handling.

Without this change, the flow will hang due to it failing while creating
a transition which exists outside of the general state machine error
handling code path.
This commit is contained in:
LankyDan 2020-07-31 14:28:57 +01:00
parent 39dbe22c9d
commit 82bcde573b
2 changed files with 35 additions and 17 deletions

View File

@ -129,6 +129,7 @@ class StartedFlowTransition(
}
}
@Suppress("TooGenericExceptionCaught")
private fun sendAndReceiveTransition(flowIORequest: FlowIORequest.SendAndReceive): TransitionResult {
val sessionIdToMessage = LinkedHashMap<SessionId, SerializedBytes<Any>>()
val sessionIdToSession = LinkedHashMap<SessionId, FlowSessionImpl>()
@ -142,18 +143,23 @@ class StartedFlowTransition(
if (isErrored()) {
FlowContinuation.ProcessEvents
} else {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
// We don't yet have the messages, change the suspension to be on Receive
val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet())
currentState = currentState.copy(
try {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
// We don't yet have the messages, change the suspension to be on Receive
val newIoRequest = FlowIORequest.Receive(flowIORequest.sessionToMessage.keys.toNonEmptySet())
currentState = currentState.copy(
checkpoint = currentState.checkpoint.copy(
flowState = FlowState.Started(newIoRequest, started.frozenFiber)
flowState = FlowState.Started(newIoRequest, started.frozenFiber)
)
)
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
)
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
}
} catch (t: Throwable) {
// E.g. A session end message received while expecting a data session message
resumeFlowLogic(t)
}
}
}
@ -187,6 +193,7 @@ class StartedFlowTransition(
}
}
@Suppress("TooGenericExceptionCaught")
private fun receiveTransition(flowIORequest: FlowIORequest.Receive): TransitionResult {
return builder {
val sessionIdToSession = LinkedHashMap<SessionId, FlowSessionImpl>()
@ -195,11 +202,16 @@ class StartedFlowTransition(
}
// send initialises to uninitialised sessions
sendInitialSessionMessagesIfNeeded(sessionIdToSession.keys)
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
try {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
FlowContinuation.ProcessEvents
} else {
resumeFlowLogic(receivedMap)
}
} catch (t: Throwable) {
// E.g. A session end message received while expecting a data session message
resumeFlowLogic(t)
}
}
}
@ -224,6 +236,8 @@ class StartedFlowTransition(
val messages: Map<SessionId, SerializedBytes<Any>>,
val newSessionMap: SessionMap
)
@Suppress("ComplexMethod", "NestedBlockDepth")
private fun pollSessionMessages(sessions: SessionMap, sessionIds: Set<SessionId>): PollResult? {
val newSessionMessages = LinkedHashMap(sessions)
val resultMessages = LinkedHashMap<SessionId, SerializedBytes<Any>>()
@ -238,7 +252,11 @@ class StartedFlowTransition(
} else {
newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toList())
// at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message.
resultMessages[sessionId] = (messages[0] as DataSessionMessage).payload
resultMessages[sessionId] = if (messages[0] is EndSessionMessage) {
throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?")
} else {
(messages[0] as DataSessionMessage).payload
}
}
}
else -> {

View File

@ -97,7 +97,7 @@ class RetryFlowMockTest {
}
@Test(timeout=300_000)
fun `Restart does not set senderUUID`() {
fun `Restart does not set senderUUID and early end session message does not hang receiving flow`() {
val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
val partyB = nodeB.info.legalIdentities.first()
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {