ENT-5277: Reflect node-api changes in OS (#6238)

This commit is contained in:
Viktor Kolomeyko
2020-05-12 13:22:30 +01:00
committed by GitHub
parent d4cb0e2d87
commit c5950d6a3a
2 changed files with 66 additions and 14 deletions

View File

@ -81,7 +81,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
val connection: Connection val connection: Connection
private val transport: Transport private val transport: Transport
private val id = UUID.randomUUID().toString() private val id = UUID.randomUUID().toString()
private var session: Session? = null private val sessionState = SessionState()
/** /**
* Key is message topic and value is the list of messages * Key is message topic and value is the list of messages
*/ */
@ -144,7 +144,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
logInfoWithMDC("Connection local open ${connection.prettyPrint}") logInfoWithMDC("Connection local open ${connection.prettyPrint}")
val session = connection.session() val session = connection.session()
session.open() session.open()
this.session = session sessionState.init(session)
for (target in messageQueues.keys) { for (target in messageQueues.keys) {
getSender(target) getSender(target)
} }
@ -206,7 +206,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
} }
// shouldn't happen, but cleanup any stranded items // shouldn't happen, but cleanup any stranded items
transport.context = null transport.context = null
session = null sessionState.close()
receivers.clear() receivers.clear()
senders.clear() senders.clear()
} else { } else {
@ -287,7 +287,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
private fun getSender(target: String): Sender { private fun getSender(target: String): Sender {
if (!senders.containsKey(target)) { if (!senders.containsKey(target)) {
val sender = session!!.sender(UUID.randomUUID().toString()) val sender = sessionState.session!!.sender(UUID.randomUUID().toString())
sender.source = Source().apply { sender.source = Source().apply {
address = target address = target
dynamic = false dynamic = false
@ -315,9 +315,9 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onSessionFinal(event: Event) { override fun onSessionFinal(event: Event) {
val session = event.session val session = event.session
logDebugWithMDC { "Session final $session" } logDebugWithMDC { "Session final for: $session" }
if (session == this.session) { if (session == sessionState.session) {
this.session = null sessionState.close()
// If TRANSPORT_CLOSED event was already processed, the 'transport' in all subsequent events is set to null. // If TRANSPORT_CLOSED event was already processed, the 'transport' in all subsequent events is set to null.
// There is, however, a chance of missing TRANSPORT_CLOSED event, e.g. when disconnect occurs before opening remote session. // There is, however, a chance of missing TRANSPORT_CLOSED event, e.g. when disconnect occurs before opening remote session.
@ -451,8 +451,15 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
logDebugWithMDC { "Sender delivery confirmed tag ${delivery.tag.toHexString()}" } logDebugWithMDC { "Sender delivery confirmed tag ${delivery.tag.toHexString()}" }
val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance() val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance()
val sourceMessage = delivery.context as? SendableMessageImpl val sourceMessage = delivery.context as? SendableMessageImpl
unackedQueue.remove(sourceMessage) if (sourceMessage != null) {
sourceMessage?.doComplete(if (ok) MessageStatus.Acknowledged else MessageStatus.Rejected) unackedQueue.remove(sourceMessage)
val status = if (ok) MessageStatus.Acknowledged else MessageStatus.Rejected
logDebugWithMDC { "Setting status as: $status to message with wire uuid: " +
"${sourceMessage.applicationProperties[MESSAGE_ID_KEY]}" }
sourceMessage.doComplete(status)
} else {
logDebugWithMDC { "Source message not found on delivery context" }
}
delivery.settle() delivery.settle()
} }
} }
@ -500,11 +507,22 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
msg.buf = encoded msg.buf = encoded
val messageQueue = messageQueues.getOrPut(msg.topic, { LinkedList() }) val messageQueue = messageQueues.getOrPut(msg.topic, { LinkedList() })
messageQueue.offer(msg) messageQueue.offer(msg)
if (session != null) { when (sessionState.value) {
val sender = getSender(msg.topic) SessionState.Value.ACTIVE -> {
transmitMessages(sender) val sender = getSender(msg.topic)
} else { transmitMessages(sender)
logInfoWithMDC("Session been closed already") }
SessionState.Value.UNINITIALIZED -> {
// This is pretty normal as on Connection Local may not have been received yet
// Once it will be received the messages will be sent from the `messageQueues`
logDebugWithMDC { "Session has not been open yet" }
}
SessionState.Value.CLOSED -> {
logInfoWithMDC("Session been closed already")
// If session been closed then it is too late to send a message, so we flag it as rejected.
logDebugWithMDC { "Setting Rejected status to message with wire uuid: ${msg.applicationProperties[MESSAGE_ID_KEY]}" }
msg.doComplete(MessageStatus.Rejected)
}
} }
} }

View File

@ -0,0 +1,34 @@
package net.corda.nodeapi.internal.protonwrapper.engine
import org.apache.qpid.proton.engine.Session
/**
* In addition to holding the `Session` also tracks the state of it.
*/
internal class SessionState {
enum class Value {
UNINITIALIZED,
ACTIVE,
CLOSED
}
private var _value: Value = Value.UNINITIALIZED
private var _session: Session? = null
val value: Value get() = _value
val session: Session? get() = _session
fun init(session: Session) {
require(value == Value.UNINITIALIZED)
_value = Value.ACTIVE
_session = session
}
fun close() {
_value = Value.CLOSED
_session = null
}
}