Flows shouldn't have to suspend if just doing a send ()

* CORDA-45 Flows shouldn't have to suspend if just doing a send
This commit is contained in:
Patrick Kuo 2017-01-30 16:21:43 +00:00 committed by GitHub
parent e383752995
commit e8015e689a

View File

@ -89,7 +89,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
_resultFuture?.setException(t) _resultFuture?.setException(t)
throw ExecutionException(t) throw ExecutionException(t)
} }
// Wait for sessions with unconfirmed session state.
openSessions.values.filter { it.state is FlowSessionState.Initiating }.forEach {
it.waitForConfirmation()
}
// This is to prevent actionOnEnd being called twice if it throws an exception // This is to prevent actionOnEnd being called twice if it throws an exception
actionOnEnd() actionOnEnd()
_resultFuture?.set(result) _resultFuture?.set(result)
@ -121,34 +124,48 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
otherParty: Party, otherParty: Party,
payload: Any, payload: Any,
sessionFlow: FlowLogic<*>): UntrustworthyData<T> { sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
val (session, new) = getSession(otherParty, sessionFlow, payload) val session = getConfirmedSession(otherParty, sessionFlow)
val receivedSessionData = if (new) { return if (session == null) {
// Only do a receive here as the session init has carried the payload // Only do a receive here as the session init has carried the payload
receiveInternal<SessionData>(session) receiveInternal<SessionData>(startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true))
} else { } else {
val sendSessionData = createSessionData(session, payload) sendAndReceiveInternal<SessionData>(session, createSessionData(session, payload))
sendAndReceiveInternal<SessionData>(session, sendSessionData) }.checkPayloadIs(receiveType)
}
return receivedSessionData.checkPayloadIs(receiveType)
} }
@Suspendable @Suspendable
override fun <T : Any> receive(receiveType: Class<T>, override fun <T : Any> receive(receiveType: Class<T>,
otherParty: Party, otherParty: Party,
sessionFlow: FlowLogic<*>): UntrustworthyData<T> { sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
val session = getSession(otherParty, sessionFlow, null).first val session = getConfirmedSession(otherParty, sessionFlow) ?: startNewSession(otherParty, sessionFlow, null, waitForConfirmation = true)
return receiveInternal<SessionData>(session).checkPayloadIs(receiveType) return receiveInternal<SessionData>(session).checkPayloadIs(receiveType)
} }
@Suspendable @Suspendable
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) { override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) {
val (session, new) = getSession(otherParty, sessionFlow, payload) val session = getConfirmedSession(otherParty, sessionFlow)
if (!new) { if (session == null) {
// Don't send the payload again if it was already piggy-backed on a session init // Don't send the payload again if it was already piggy-backed on a session init
startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = false)
} else {
sendInternal(session, createSessionData(session, payload)) sendInternal(session, createSessionData(session, payload))
} }
} }
/**
* This method will suspend the state machine and wait for incoming session init response from other party.
*/
@Suspendable
private fun FlowSession.waitForConfirmation() {
val (peerParty, sessionInitResponse) = receiveInternal<SessionInitResponse>(this)
if (sessionInitResponse is SessionConfirm) {
state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId)
} else {
sessionInitResponse as SessionReject
throw FlowException("Party ${state.sendToParty} rejected session request: ${sessionInitResponse.errorMessage}")
}
}
private fun createSessionData(session: FlowSession, payload: Any): SessionData { private fun createSessionData(session: FlowSession, payload: Any): SessionData {
val sessionState = session.state val sessionState = session.state
val peerSessionId = when (sessionState) { val peerSessionId = when (sessionState) {
@ -174,12 +191,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
@Suspendable @Suspendable
private fun getSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?): Pair<FlowSession, Boolean> { private fun getConfirmedSession(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession? {
val session = openSessions[Pair(sessionFlow, otherParty)] return openSessions[Pair(sessionFlow, otherParty)]?.apply {
return if (session != null) { if (state is FlowSessionState.Initiating) {
Pair(session, false) // Session still initiating, try to retrieve the init response.
} else { waitForConfirmation()
Pair(startNewSession(otherParty, sessionFlow, firstPayload), true) }
} }
} }
@ -190,21 +207,17 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
* multiple public keys, but we **don't support multiple nodes advertising the same legal identity**. * multiple public keys, but we **don't support multiple nodes advertising the same legal identity**.
*/ */
@Suspendable @Suspendable
private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?): FlowSession { private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?, waitForConfirmation: Boolean): FlowSession {
logger.trace { "Initiating a new session with $otherParty" } logger.trace { "Initiating a new session with $otherParty" }
val session = FlowSession(sessionFlow, random63BitValue(), FlowSessionState.Initiating(otherParty)) val session = FlowSession(sessionFlow, random63BitValue(), FlowSessionState.Initiating(otherParty))
openSessions[Pair(sessionFlow, otherParty)] = session openSessions[Pair(sessionFlow, otherParty)] = session
val counterpartyFlow = sessionFlow.getCounterpartyMarker(otherParty).name val counterpartyFlow = sessionFlow.getCounterpartyMarker(otherParty).name
val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload) val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload)
val (peerParty, sessionInitResponse) = sendAndReceiveInternal<SessionInitResponse>(session, sessionInit) sendInternal(session, sessionInit)
if (sessionInitResponse is SessionConfirm) { if (waitForConfirmation) {
require(session.state is FlowSessionState.Initiating) session.waitForConfirmation()
session.state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId)
return session
} else {
sessionInitResponse as SessionReject
throw FlowException("Party $otherParty rejected session request: ${sessionInitResponse.errorMessage}")
} }
return session
} }
@Suspendable @Suspendable