diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index 5d542e1964..38062c20b9 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -81,7 +81,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, val connection: Connection private val transport: Transport private val id = UUID.randomUUID().toString() - private var session: Session? = null + private val sessionState = SessionState() /** * Key is message topic and value is the list of messages */ @@ -144,7 +144,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, logInfoWithMDC("Connection local open ${connection.prettyPrint}") val session = connection.session() session.open() - this.session = session + sessionState.init(session) for (target in messageQueues.keys) { getSender(target) } @@ -206,7 +206,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, } // shouldn't happen, but cleanup any stranded items transport.context = null - session = null + sessionState.close() receivers.clear() senders.clear() } else { @@ -287,7 +287,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, private fun getSender(target: String): Sender { if (!senders.containsKey(target)) { - val sender = session!!.sender(UUID.randomUUID().toString()) + val sender = sessionState.session!!.sender(UUID.randomUUID().toString()) sender.source = Source().apply { address = target dynamic = false @@ -315,9 +315,9 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onSessionFinal(event: Event) { val session = event.session - logDebugWithMDC { "Session final $session" } - if (session == this.session) { - this.session = null + logDebugWithMDC { "Session final for: $session" } + if (session == sessionState.session) { + sessionState.close() // If TRANSPORT_CLOSED event was already processed, the 'transport' in all subsequent events is set to null. // There is, however, a chance of missing TRANSPORT_CLOSED event, e.g. when disconnect occurs before opening remote session. @@ -451,8 +451,15 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, logDebugWithMDC { "Sender delivery confirmed tag ${delivery.tag.toHexString()}" } val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance() val sourceMessage = delivery.context as? SendableMessageImpl - unackedQueue.remove(sourceMessage) - sourceMessage?.doComplete(if (ok) MessageStatus.Acknowledged else MessageStatus.Rejected) + if (sourceMessage != null) { + unackedQueue.remove(sourceMessage) + val status = if (ok) MessageStatus.Acknowledged else MessageStatus.Rejected + logDebugWithMDC { "Setting status as: $status to message with wire uuid: " + + "${sourceMessage.applicationProperties[MESSAGE_ID_KEY]}" } + sourceMessage.doComplete(status) + } else { + logDebugWithMDC { "Source message not found on delivery context" } + } delivery.settle() } } @@ -500,11 +507,22 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, msg.buf = encoded val messageQueue = messageQueues.getOrPut(msg.topic, { LinkedList() }) messageQueue.offer(msg) - if (session != null) { - val sender = getSender(msg.topic) - transmitMessages(sender) - } else { - logInfoWithMDC("Session been closed already") + when (sessionState.value) { + SessionState.Value.ACTIVE -> { + val sender = getSender(msg.topic) + transmitMessages(sender) + } + SessionState.Value.UNINITIALIZED -> { + // This is pretty normal as on Connection Local may not have been received yet + // Once it will be received the messages will be sent from the `messageQueues` + logDebugWithMDC { "Session has not been open yet" } + } + SessionState.Value.CLOSED -> { + logInfoWithMDC("Session been closed already") + // If session been closed then it is too late to send a message, so we flag it as rejected. + logDebugWithMDC { "Setting Rejected status to message with wire uuid: ${msg.applicationProperties[MESSAGE_ID_KEY]}" } + msg.doComplete(MessageStatus.Rejected) + } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/SessionState.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/SessionState.kt new file mode 100644 index 0000000000..7c81e43d62 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/SessionState.kt @@ -0,0 +1,34 @@ +package net.corda.nodeapi.internal.protonwrapper.engine + +import org.apache.qpid.proton.engine.Session + +/** + * In addition to holding the `Session` also tracks the state of it. + */ +internal class SessionState { + + enum class Value { + UNINITIALIZED, + ACTIVE, + CLOSED + } + + private var _value: Value = Value.UNINITIALIZED + + private var _session: Session? = null + + val value: Value get() = _value + + val session: Session? get() = _session + + fun init(session: Session) { + require(value == Value.UNINITIALIZED) + _value = Value.ACTIVE + _session = session + } + + fun close() { + _value = Value.CLOSED + _session = null + } +} \ No newline at end of file