From e8015e689a6bbdf09d8fff8cf7b214fe3777be3f Mon Sep 17 00:00:00 2001 From: Patrick Kuo Date: Mon, 30 Jan 2017 16:21:43 +0000 Subject: [PATCH] Flows shouldn't have to suspend if just doing a send (#187) * CORDA-45 Flows shouldn't have to suspend if just doing a send --- .../statemachine/FlowStateMachineImpl.kt | 65 +++++++++++-------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 4675001e6a..2ed5e921e3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -89,7 +89,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, _resultFuture?.setException(t) throw ExecutionException(t) } - + // Wait for sessions with unconfirmed session state. + openSessions.values.filter { it.state is FlowSessionState.Initiating }.forEach { + it.waitForConfirmation() + } // This is to prevent actionOnEnd being called twice if it throws an exception actionOnEnd() _resultFuture?.set(result) @@ -121,34 +124,48 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>): UntrustworthyData { - val (session, new) = getSession(otherParty, sessionFlow, payload) - val receivedSessionData = if (new) { + val session = getConfirmedSession(otherParty, sessionFlow) + return if (session == null) { // Only do a receive here as the session init has carried the payload - receiveInternal(session) + receiveInternal(startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true)) } else { - val sendSessionData = createSessionData(session, payload) - sendAndReceiveInternal(session, sendSessionData) - } - return receivedSessionData.checkPayloadIs(receiveType) + sendAndReceiveInternal(session, createSessionData(session, payload)) + }.checkPayloadIs(receiveType) } @Suspendable override fun receive(receiveType: Class, otherParty: Party, sessionFlow: FlowLogic<*>): UntrustworthyData { - val session = getSession(otherParty, sessionFlow, null).first + val session = getConfirmedSession(otherParty, sessionFlow) ?: startNewSession(otherParty, sessionFlow, null, waitForConfirmation = true) return receiveInternal(session).checkPayloadIs(receiveType) } @Suspendable override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) { - val (session, new) = getSession(otherParty, sessionFlow, payload) - if (!new) { + val session = getConfirmedSession(otherParty, sessionFlow) + if (session == null) { // Don't send the payload again if it was already piggy-backed on a session init + startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = false) + } else { sendInternal(session, createSessionData(session, payload)) } } + /** + * This method will suspend the state machine and wait for incoming session init response from other party. + */ + @Suspendable + private fun FlowSession.waitForConfirmation() { + val (peerParty, sessionInitResponse) = receiveInternal(this) + if (sessionInitResponse is SessionConfirm) { + state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId) + } else { + sessionInitResponse as SessionReject + throw FlowException("Party ${state.sendToParty} rejected session request: ${sessionInitResponse.errorMessage}") + } + } + private fun createSessionData(session: FlowSession, payload: Any): SessionData { val sessionState = session.state val peerSessionId = when (sessionState) { @@ -174,12 +191,12 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable - private fun getSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?): Pair { - val session = openSessions[Pair(sessionFlow, otherParty)] - return if (session != null) { - Pair(session, false) - } else { - Pair(startNewSession(otherParty, sessionFlow, firstPayload), true) + private fun getConfirmedSession(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession? { + return openSessions[Pair(sessionFlow, otherParty)]?.apply { + if (state is FlowSessionState.Initiating) { + // Session still initiating, try to retrieve the init response. + waitForConfirmation() + } } } @@ -190,21 +207,17 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, * multiple public keys, but we **don't support multiple nodes advertising the same legal identity**. */ @Suspendable - private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?): FlowSession { + private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?, waitForConfirmation: Boolean): FlowSession { logger.trace { "Initiating a new session with $otherParty" } val session = FlowSession(sessionFlow, random63BitValue(), FlowSessionState.Initiating(otherParty)) openSessions[Pair(sessionFlow, otherParty)] = session val counterpartyFlow = sessionFlow.getCounterpartyMarker(otherParty).name val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload) - val (peerParty, sessionInitResponse) = sendAndReceiveInternal(session, sessionInit) - if (sessionInitResponse is SessionConfirm) { - require(session.state is FlowSessionState.Initiating) - session.state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId) - return session - } else { - sessionInitResponse as SessionReject - throw FlowException("Party $otherParty rejected session request: ${sessionInitResponse.errorMessage}") + sendInternal(session, sessionInit) + if (waitForConfirmation) { + session.waitForConfirmation() } + return session } @Suspendable