ENT-4990: Port AMQP state machine logging and stability fixes from Enterprise to OS

This commit is contained in:
Denis Rekalov 2020-02-26 10:38:08 +00:00
parent fe625d0f37
commit 054563e40c
3 changed files with 85 additions and 33 deletions

View File

@ -0,0 +1,5 @@
package net.corda.nodeapi.internal
object ArtemisConstants {
const val MESSAGE_ID_KEY = "_AMQ_DUPL_ID"
}

View File

@ -7,6 +7,8 @@ import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toHexString
import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
@ -26,6 +28,8 @@ import org.slf4j.MDC
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.*
import kotlin.math.max
import kotlin.math.min
/**
* This ConnectionStateMachine class handles the events generated by the proton-j library to track
@ -51,6 +55,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
MDC.put("serverMode", serverMode.toString())
MDC.put("localLegalName", localLegalName)
MDC.put("remoteLegalName", remoteLegalName)
MDC.put("conn", connection.prettyPrint)
block()
} finally {
MDC.setContextMap(oldMDC)
@ -73,12 +78,22 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
private val transport: Transport
private val id = UUID.randomUUID().toString()
private var session: Session? = null
/**
* Key is message topic and value is the list of messages
*/
private val messageQueues = mutableMapOf<String, LinkedList<SendableMessageImpl>>()
private val unackedQueue = LinkedList<SendableMessageImpl>()
private val receivers = mutableMapOf<String, Receiver>()
private val senders = mutableMapOf<String, Sender>()
private var tagId: Int = 0
private val Connection?.prettyPrint: String
get() = this?.context?.toString() ?: "<n/a>"
private val Transport?.prettyPrint: String
// Inside Transport's context - there is Connection, inside Connection's context there is NIO channel that has useful information
get() = (this?.context as? Endpoint)?.context?.toString() ?: "<n/a>"
init {
connection = Engine.connection()
connection.container = "CORDA:$id"
@ -116,12 +131,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onConnectionInit(event: Event) {
val connection = event.connection
logDebugWithMDC { "Connection init $connection" }
logDebugWithMDC { "Connection init ${connection.prettyPrint}" }
}
override fun onConnectionLocalOpen(event: Event) {
val connection = event.connection
logInfoWithMDC("Connection local open $connection")
logInfoWithMDC("Connection local open ${connection.prettyPrint}")
val session = connection.session()
session.open()
this.session = session
@ -132,13 +147,15 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onConnectionLocalClose(event: Event) {
val connection = event.connection
logInfoWithMDC("Connection local close $connection")
logInfoWithMDC("Connection local close ${connection.prettyPrint}")
connection.close()
connection.free()
}
override fun onConnectionUnbound(event: Event) {
if (event.connection == this.connection) {
val connection = event.connection
logInfoWithMDC("Connection unbound ${connection.prettyPrint}")
if (connection == this.connection) {
val channel = connection.context as? Channel
if (channel != null) {
if (channel.isActive) {
@ -150,12 +167,13 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onConnectionFinal(event: Event) {
val connection = event.connection
logDebugWithMDC { "Connection final $connection" }
logDebugWithMDC { "Connection final ${connection.prettyPrint}" }
if (connection == this.connection) {
this.connection.context = null
for (queue in messageQueues.values) {
// clear any dead messages
while (true) {
logDebugWithMDC { "Queue size: ${queue.size}" }
val msg = queue.poll()
if (msg != null) {
msg.doComplete(MessageStatus.Rejected)
@ -167,6 +185,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
}
messageQueues.clear()
while (true) {
logDebugWithMDC { "Unacked queue size: ${unackedQueue.size}" }
val msg = unackedQueue.poll()
if (msg != null) {
msg.doComplete(MessageStatus.Rejected)
@ -185,26 +204,28 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
session = null
receivers.clear()
senders.clear()
} else {
logDebugWithMDC { "Connection from the event: ${connection.prettyPrint} is not the connection owned: ${this.connection.prettyPrint}" }
}
}
override fun onTransportHeadClosed(event: Event) {
val transport = event.transport
logDebugWithMDC { "Transport Head Closed $transport" }
logDebugWithMDC { "Transport Head Closed ${transport.prettyPrint}" }
transport.close_tail()
onTransportInternal(transport)
}
override fun onTransportTailClosed(event: Event) {
val transport = event.transport
logDebugWithMDC { "Transport Tail Closed $transport" }
logDebugWithMDC { "Transport Tail Closed ${transport.prettyPrint}" }
transport.close_head()
onTransportInternal(transport)
}
override fun onTransportClosed(event: Event) {
val transport = event.transport
logDebugWithMDC { "Transport Closed $transport" }
logDebugWithMDC { "Transport Closed ${transport.prettyPrint}" }
if (transport == this.transport) {
transport.unbind()
transport.free()
@ -214,7 +235,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onTransportError(event: Event) {
val transport = event.transport
logInfoWithMDC("Transport Error $transport")
logInfoWithMDC("Transport Error ${transport.prettyPrint}")
val condition = event.transport.condition
if (condition != null) {
logInfoWithMDC("Error: ${condition.description}")
@ -226,7 +247,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onTransport(event: Event) {
val transport = event.transport
logDebugWithMDC { "Transport $transport" }
logDebugWithMDC { "Transport ${transport.prettyPrint}" }
onTransportInternal(transport)
}
@ -361,7 +382,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
delivery.context = nextMessage
sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes())
nextMessage.status = MessageStatus.Sent
logDebugWithMDC { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" }
logDebugWithMDC { "Put tag ${delivery.tag.toHexString()} on wire uuid: ${nextMessage.applicationProperties[MESSAGE_ID_KEY]}" }
unackedQueue.offer(nextMessage)
sender.advance()
} finally {
@ -398,7 +419,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
appProperties,
channel,
delivery)
logDebugWithMDC { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" }
logDebugWithMDC { "Full message received uuid: ${appProperties[MESSAGE_ID_KEY]}" }
channel.writeAndFlush(receivedMessage)
if (link.current() == delivery) {
link.advance()
@ -409,7 +430,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
}
}
} else if (link is Sender) {
logDebugWithMDC { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" }
logDebugWithMDC { "Sender delivery confirmed tag ${delivery.tag.toHexString()}" }
val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance()
val sourceMessage = delivery.context as? SendableMessageImpl
unackedQueue.remove(sourceMessage)
@ -462,6 +483,8 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
if (session != null) {
val sender = getSender(msg.topic)
transmitMessages(sender)
} else {
logInfoWithMDC("Session been closed already")
}
}
@ -470,7 +493,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
try {
do {
val buffer = transport.inputBuffer
val limit = Math.min(buffer.remaining(), source.remaining())
val limit = min(buffer.remaining(), source.remaining())
val duplicate = source.duplicate()
duplicate.limit(source.position() + limit)
buffer.put(duplicate)
@ -483,7 +506,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
condition.description = ex.message
transport.condition = condition
transport.close_tail()
transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
transport.pop(max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
}
}
@ -508,7 +531,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
condition.description = ex.message
transport.condition = condition
transport.close_head()
transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
transport.pop(max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
}
}
}

View File

@ -3,6 +3,8 @@ package net.corda.nodeapi.internal.protonwrapper.engine
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import net.corda.core.internal.declaredField
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
@ -13,7 +15,6 @@ import org.apache.qpid.proton.amqp.messaging.Rejected
import org.apache.qpid.proton.amqp.transport.DeliveryState
import org.apache.qpid.proton.amqp.transport.ErrorCondition
import org.apache.qpid.proton.engine.*
import org.apache.qpid.proton.engine.impl.CollectorImpl
import org.apache.qpid.proton.reactor.FlowController
import org.apache.qpid.proton.reactor.Handshaker
import org.slf4j.MDC
@ -21,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.math.max
/**
* The EventProcessor class converts calls on the netty scheduler/pipeline
@ -29,12 +31,12 @@ import kotlin.concurrent.withLock
* and simple sliding window flow control, so that these events don't have to live inside ConnectionStateMachine.
* Everything here is single threaded, because the proton-j library has to be run that way.
*/
internal class EventProcessor(channel: Channel,
internal class EventProcessor(private val channel: Channel,
private val serverMode: Boolean,
private val localLegalName: String,
private val remoteLegalName: String,
userName: String?,
password: String?) : BaseHandler() {
password: String?) {
companion object {
private const val FLOW_WINDOW_SIZE = 10
private val log = contextLogger()
@ -45,7 +47,9 @@ internal class EventProcessor(channel: Channel,
try {
MDC.put("serverMode", serverMode.toString())
MDC.put("localLegalName", localLegalName)
MDC.put("localAddress", channel.localAddress()?.toString())
MDC.put("remoteLegalName", remoteLegalName)
MDC.put("remoteAddress", channel.remoteAddress()?.toString())
block()
} finally {
MDC.setContextMap(oldMDC)
@ -59,10 +63,13 @@ internal class EventProcessor(channel: Channel,
}
private val lock = ReentrantLock()
@Volatile
private var pendingExecute: Boolean = false
@Volatile
private var processorClosed: Boolean = false
private val executor: ScheduledExecutorService = channel.eventLoop()
private val collector = Proton.collector() as CollectorImpl
private val handlers = mutableListOf<Handler>()
private val collector = Proton.collector()
private val handlers: List<Handler>
private val stateMachine: ConnectionStateMachine = ConnectionStateMachine(serverMode,
collector,
localLegalName,
@ -73,15 +80,11 @@ internal class EventProcessor(channel: Channel,
val connection: Connection = stateMachine.connection
init {
addHandler(Handshaker())
addHandler(FlowController(FLOW_WINDOW_SIZE))
addHandler(stateMachine)
handlers = listOf(Handshaker(), FlowController(FLOW_WINDOW_SIZE), stateMachine)
connection.context = channel
tick(stateMachine.connection)
}
fun addHandler(handler: Handler) = handlers.add(handler)
private fun popEvent(): Event? {
var ev = collector.peek()
if (ev != null) {
@ -93,23 +96,28 @@ internal class EventProcessor(channel: Channel,
private fun tick(connection: Connection) {
lock.withLock {
logDebugWithMDC { "Tick" }
try {
if ((connection.localState != EndpointState.CLOSED) && !connection.transport.isClosed) {
val now = System.currentTimeMillis()
val tickDelay = Math.max(0L, connection.transport.tick(now) - now)
val tickDelay = max(0L, connection.transport.tick(now) - now)
executor.schedule({
tick(connection)
processEvents()
}, tickDelay, TimeUnit.MILLISECONDS)
logDebugWithMDC {"Tick done. Next tick scheduled in $tickDelay ms"}
} else {
logDebugWithMDC { "Connection closed - no more ticking" }
}
} catch (ex: Exception) {
withMDC { log.info("Tick failed", ex) }
connection.transport.close()
connection.condition = ErrorCondition()
}
}
}
fun processEvents() {
private fun processEvents() {
lock.withLock {
pendingExecute = false
logDebugWithMDC { "Process Events" }
@ -135,11 +143,27 @@ internal class EventProcessor(channel: Channel,
}
fun close() {
if (connection.localState != EndpointState.CLOSED) {
connection.close()
processEvents()
connection.free()
processEvents()
lock.withLock {
if (!processorClosed) {
processorClosed = true
connection.logLocalState("Before close")
connection.close()
processEvents()
logDebugWithMDC { "Freeing-up connection" }
connection.free()
processEvents()
connection.logLocalState("After close")
} else {
logDebugWithMDC { "Processor is already closed" }
}
}
}
private fun Connection.logLocalState(prefix: String) {
if (log.isDebugEnabled) {
val freedTry = Try.on { declaredField<Boolean>("freed").value }
val refcountTry = Try.on { declaredField<Int>("refcount").value }
logDebugWithMDC { "$prefix, local state: $localState, freed: $freedTry, refcount: $refcountTry" }
}
}