Change to use MDC logic in bridge/AMQP protocol logging (#3398)

This commit is contained in:
Matthew Nesbit 2018-06-20 11:55:59 +01:00 committed by bpaunescu
parent 0bd09ff8a3
commit 4ff5aa34b6
4 changed files with 146 additions and 69 deletions

View File

@ -6,7 +6,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.VisibleForTesting
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import rx.Subscription
import java.security.KeyStore
import java.util.concurrent.locks.ReentrantLock
@ -74,9 +74,28 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
private val maxMessageSize: Int) {
companion object {
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
private val log = contextLogger()
}
private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}")
private fun withMDC(block: () -> Unit) {
MDC.put("queueName", queueName)
MDC.put("target", target.toString())
MDC.put("bridgeName", bridgeName)
MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() })
MDC.put("maxMessageSize", maxMessageSize.toString())
block()
MDC.clear()
}
private fun logDebugWithMDC(msg: () -> String) {
if (log.isDebugEnabled) {
withMDC { log.debug(msg()) }
}
}
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedThreadPool = sharedEventGroup, maxMessageSize = maxMessageSize)
val bridgeName: String get() = getBridgeName(queueName, target)
@ -86,13 +105,13 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
private var connectedSubscription: Subscription? = null
fun start() {
log.info("Create new AMQP bridge")
logInfoWithMDC("Create new AMQP bridge")
connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) })
amqpClient.start()
}
fun stop() {
log.info("Stopping AMQP bridge")
logInfoWithMDC("Stopping AMQP bridge")
lock.withLock {
synchronized(artemis) {
consumer?.close()
@ -110,7 +129,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
lock.withLock {
synchronized(artemis) {
if (connected) {
log.info("Bridge Connected")
logInfoWithMDC("Bridge Connected")
val sessionFactory = artemis.started!!.sessionFactory
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
this.session = session
@ -119,7 +138,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler)
session.start()
} else {
log.info("Bridge Disconnected")
logInfoWithMDC("Bridge Disconnected")
consumer?.close()
consumer = null
session?.stop()
@ -131,7 +150,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
if (artemisMessage.bodySize > maxMessageSize) {
log.warn("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " +
logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " +
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
// Ack the message to prevent same message being sent to us again.
artemisMessage.acknowledge()
@ -148,18 +167,18 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
properties[key] = value
}
}
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
logDebugWithMDC { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
val peerInbox = translateLocalQueueToInboxAddress(queueName)
val sendableMessage = amqpClient.createMessage(data, peerInbox,
legalNames.first().toString(),
properties)
sendableMessage.onComplete.then {
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
logDebugWithMDC { "Bridge ACK ${sendableMessage.onComplete.get()}" }
lock.withLock {
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
artemisMessage.acknowledge()
} else {
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
logInfoWithMDC("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
// We need to commit any acknowledged messages before rolling back the failed
// (unacknowledged) message.
session?.commit()

View File

@ -6,7 +6,7 @@ import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
@ -23,7 +23,7 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode
import org.apache.qpid.proton.engine.*
import org.apache.qpid.proton.message.Message
import org.apache.qpid.proton.message.ProtonJMessage
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.*
@ -35,7 +35,7 @@ import java.util.*
* but this threading lock is managed by the EventProcessor class that calls this.
* It ultimately posts application packets to/from from the netty transport pipeline.
*/
internal class ConnectionStateMachine(serverMode: Boolean,
internal class ConnectionStateMachine(private val serverMode: Boolean,
collector: Collector,
private val localLegalName: String,
private val remoteLegalName: String,
@ -43,10 +43,28 @@ internal class ConnectionStateMachine(serverMode: Boolean,
password: String?) : BaseHandler() {
companion object {
private const val IDLE_TIMEOUT = 10000
private val log = contextLogger()
}
private fun withMDC(block: () -> Unit) {
MDC.put("serverMode", serverMode.toString())
MDC.put("localLegalName", localLegalName)
MDC.put("remoteLegalName", remoteLegalName)
block()
MDC.clear()
}
private fun logDebugWithMDC(msg: () -> String) {
if (log.isDebugEnabled) {
withMDC { log.debug(msg()) }
}
}
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) }
val connection: Connection
private val log = LoggerFactory.getLogger(localLegalName)
private val transport: Transport
private val id = UUID.randomUUID().toString()
private var session: Session? = null
@ -93,12 +111,12 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onConnectionInit(event: Event) {
val connection = event.connection
log.debug { "Connection init $connection" }
logDebugWithMDC { "Connection init $connection" }
}
override fun onConnectionLocalOpen(event: Event) {
val connection = event.connection
log.info("Connection local open $connection")
logInfoWithMDC("Connection local open $connection")
val session = connection.session()
session.open()
this.session = session
@ -109,7 +127,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onConnectionLocalClose(event: Event) {
val connection = event.connection
log.info("Connection local close $connection")
logInfoWithMDC("Connection local close $connection")
connection.close()
connection.free()
}
@ -127,7 +145,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onConnectionFinal(event: Event) {
val connection = event.connection
log.debug { "Connection final $connection" }
logDebugWithMDC { "Connection final $connection" }
if (connection == this.connection) {
this.connection.context = null
for (queue in messageQueues.values) {
@ -167,21 +185,21 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onTransportHeadClosed(event: Event) {
val transport = event.transport
log.debug { "Transport Head Closed $transport" }
logDebugWithMDC { "Transport Head Closed $transport" }
transport.close_tail()
onTransportInternal(transport)
}
override fun onTransportTailClosed(event: Event) {
val transport = event.transport
log.debug { "Transport Tail Closed $transport" }
logDebugWithMDC { "Transport Tail Closed $transport" }
transport.close_head()
onTransportInternal(transport)
}
override fun onTransportClosed(event: Event) {
val transport = event.transport
log.debug { "Transport Closed $transport" }
logDebugWithMDC { "Transport Closed $transport" }
if (transport == this.transport) {
transport.unbind()
transport.free()
@ -191,19 +209,19 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onTransportError(event: Event) {
val transport = event.transport
log.info("Transport Error $transport")
logInfoWithMDC("Transport Error $transport")
val condition = event.transport.condition
if (condition != null) {
log.info("Error: ${condition.description}")
logInfoWithMDC("Error: ${condition.description}")
} else {
log.info("Error (no description returned).")
logInfoWithMDC("Error (no description returned).")
}
onTransportInternal(transport)
}
override fun onTransport(event: Event) {
val transport = event.transport
log.debug { "Transport $transport" }
logDebugWithMDC { "Transport $transport" }
onTransportInternal(transport)
}
@ -220,12 +238,12 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onSessionInit(event: Event) {
val session = event.session
log.debug { "Session init $session" }
logDebugWithMDC { "Session init $session" }
}
override fun onSessionLocalOpen(event: Event) {
val session = event.session
log.debug { "Session local open $session" }
logDebugWithMDC { "Session local open $session" }
}
private fun getSender(target: String): Sender {
@ -251,14 +269,14 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onSessionLocalClose(event: Event) {
val session = event.session
log.debug { "Session local close $session" }
logDebugWithMDC { "Session local close $session" }
session.close()
session.free()
}
override fun onSessionFinal(event: Event) {
val session = event.session
log.debug { "Session final $session" }
logDebugWithMDC { "Session final $session" }
if (session == this.session) {
this.session = null
}
@ -267,12 +285,12 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onLinkLocalOpen(event: Event) {
val link = event.link
if (link is Sender) {
log.debug { "Sender Link local open ${link.name} ${link.source} ${link.target}" }
logDebugWithMDC { "Sender Link local open ${link.name} ${link.source} ${link.target}" }
senders[link.target.address] = link
transmitMessages(link)
}
if (link is Receiver) {
log.debug { "Receiver Link local open ${link.name} ${link.source} ${link.target}" }
logDebugWithMDC { "Receiver Link local open ${link.name} ${link.source} ${link.target}" }
receivers[link.target.address] = link
}
}
@ -281,7 +299,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
val link = event.link
if (link is Receiver) {
if (link.remoteTarget is Coordinator) {
log.debug { "Coordinator link received" }
logDebugWithMDC { "Coordinator link received" }
}
}
}
@ -289,11 +307,11 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onLinkFinal(event: Event) {
val link = event.link
if (link is Sender) {
log.debug { "Sender Link final ${link.name} ${link.source} ${link.target}" }
logDebugWithMDC { "Sender Link final ${link.name} ${link.source} ${link.target}" }
senders.remove(link.target.address)
}
if (link is Receiver) {
log.debug { "Receiver Link final ${link.name} ${link.source} ${link.target}" }
logDebugWithMDC { "Receiver Link final ${link.name} ${link.source} ${link.target}" }
receivers.remove(link.target.address)
}
}
@ -301,12 +319,12 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onLinkFlow(event: Event) {
val link = event.link
if (link is Sender) {
log.debug { "Sender Flow event: ${link.name} ${link.source} ${link.target}" }
logDebugWithMDC { "Sender Flow event: ${link.name} ${link.source} ${link.target}" }
if (senders.containsKey(link.target.address)) {
transmitMessages(link)
}
} else if (link is Receiver) {
log.debug { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" }
logDebugWithMDC { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" }
}
}
@ -317,7 +335,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
private fun transmitMessages(sender: Sender) {
val messageQueue = messageQueues.getOrPut(sender.target.address, { LinkedList() })
while (sender.credit > 0) {
log.debug { "Sender credit: ${sender.credit}" }
logDebugWithMDC { "Sender credit: ${sender.credit}" }
val nextMessage = messageQueue.poll()
if (nextMessage != null) {
try {
@ -328,7 +346,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
delivery.context = nextMessage
sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes())
nextMessage.status = MessageStatus.Sent
log.debug { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" }
logDebugWithMDC { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" }
unackedQueue.offer(nextMessage)
sender.advance()
} finally {
@ -342,7 +360,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
override fun onDelivery(event: Event) {
val delivery = event.delivery
log.debug { "Delivery $delivery" }
logDebugWithMDC { "Delivery $delivery" }
val link = delivery.link
if (link is Receiver) {
if (delivery.isReadable && !delivery.isPartial) {
@ -366,7 +384,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
appProperties,
channel,
delivery)
log.debug { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" }
logDebugWithMDC { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" }
channel.writeAndFlush(receivedMessage)
if (link.current() == delivery) {
link.advance()
@ -377,7 +395,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
}
}
} else if (link is Sender) {
log.debug { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" }
logDebugWithMDC { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" }
val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance()
val sourceMessage = delivery.context as? SendableMessageImpl
unackedQueue.remove(sourceMessage)
@ -395,7 +413,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
buffer.readBytes(bytes)
return Unpooled.wrappedBuffer(bytes)
} catch (ex: Exception) {
log.error("Unable to encode message as AMQP packet", ex)
logErrorWithMDC("Unable to encode message as AMQP packet", ex)
throw ex
}
} finally {

View File

@ -3,7 +3,7 @@ package net.corda.nodeapi.internal.protonwrapper.engine
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import net.corda.core.utilities.debug
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
@ -16,7 +16,7 @@ import org.apache.qpid.proton.engine.*
import org.apache.qpid.proton.engine.impl.CollectorImpl
import org.apache.qpid.proton.reactor.FlowController
import org.apache.qpid.proton.reactor.Handshaker
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
@ -30,16 +30,30 @@ import kotlin.concurrent.withLock
* Everything here is single threaded, because the proton-j library has to be run that way.
*/
internal class EventProcessor(channel: Channel,
serverMode: Boolean,
localLegalName: String,
remoteLegalName: String,
private val serverMode: Boolean,
private val localLegalName: String,
private val remoteLegalName: String,
userName: String?,
password: String?) : BaseHandler() {
companion object {
private const val FLOW_WINDOW_SIZE = 10
private val log = contextLogger()
}
private fun withMDC(block: () -> Unit) {
MDC.put("serverMode", serverMode.toString())
MDC.put("localLegalName", localLegalName)
MDC.put("remoteLegalName", remoteLegalName)
block()
MDC.clear()
}
private fun logDebugWithMDC(msg: () -> String) {
if (log.isDebugEnabled) {
withMDC { log.debug(msg()) }
}
}
private val log = LoggerFactory.getLogger(localLegalName)
private val lock = ReentrantLock()
private var pendingExecute: Boolean = false
private val executor: ScheduledExecutorService = channel.eventLoop()
@ -94,16 +108,16 @@ internal class EventProcessor(channel: Channel,
fun processEvents() {
lock.withLock {
pendingExecute = false
log.debug { "Process Events" }
logDebugWithMDC { "Process Events" }
while (true) {
val ev = popEvent() ?: break
log.debug { "Process event: $ev" }
logDebugWithMDC { "Process event: $ev" }
for (handler in handlers) {
handler.handle(ev)
}
}
stateMachine.processTransport()
log.debug { "Process Events Done" }
logDebugWithMDC { "Process Events Done" }
}
}

View File

@ -9,7 +9,7 @@ import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeCompletionEvent
import io.netty.util.ReferenceCountUtil
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.debug
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.crypto.x509
import net.corda.nodeapi.internal.protonwrapper.engine.EventProcessor
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
@ -19,7 +19,7 @@ import org.apache.qpid.proton.engine.ProtonJTransport
import org.apache.qpid.proton.engine.Transport
import org.apache.qpid.proton.engine.impl.ProtocolTracer
import org.apache.qpid.proton.framing.TransportFrame
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import java.net.InetSocketAddress
import java.nio.channels.ClosedChannelException
import java.security.cert.X509Certificate
@ -37,18 +37,44 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
private val onOpen: (Pair<SocketChannel, ConnectionChange>) -> Unit,
private val onClose: (Pair<SocketChannel, ConnectionChange>) -> Unit,
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler")
companion object {
private val log = contextLogger()
}
private lateinit var remoteAddress: InetSocketAddress
private var localCert: X509Certificate? = null
private var remoteCert: X509Certificate? = null
private var eventProcessor: EventProcessor? = null
private var badCert: Boolean = false
private fun withMDC(block: () -> Unit) {
MDC.put("serverMode", serverMode.toString())
MDC.put("remoteAddress", remoteAddress.toString())
MDC.put("localCert", localCert?.subjectDN?.toString())
MDC.put("remoteCert", remoteCert?.subjectDN?.toString())
MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames?.joinToString(separator = ";") { it.toString() })
block()
MDC.clear()
}
private fun logDebugWithMDC(msg: () -> String) {
if (log.isDebugEnabled) {
withMDC { log.debug(msg()) }
}
}
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) }
override fun channelActive(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
remoteAddress = ch.remoteAddress() as InetSocketAddress
val localAddress = ch.localAddress() as InetSocketAddress
log.info("New client connection ${ch.id()} from $remoteAddress to $localAddress")
logInfoWithMDC("New client connection ${ch.id()} from $remoteAddress to $localAddress")
}
private fun createAMQPEngine(ctx: ChannelHandlerContext) {
@ -59,11 +85,11 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
if (trace) {
transport.protocolTracer = object : ProtocolTracer {
override fun sentFrame(transportFrame: TransportFrame) {
log.info("${transportFrame.body}")
logInfoWithMDC("${transportFrame.body}")
}
override fun receivedFrame(transportFrame: TransportFrame) {
log.info("${transportFrame.body}")
logInfoWithMDC("${transportFrame.body}")
}
}
}
@ -73,7 +99,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
override fun channelInactive(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
logInfoWithMDC("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert)))
eventProcessor?.close()
ctx.fireChannelInactive()
@ -89,29 +115,29 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
CordaX500Name.build(remoteCert!!.subjectX500Principal)
} catch (ex: IllegalArgumentException) {
badCert = true
log.error("Certificate subject not a valid CordaX500Name", ex)
logErrorWithMDC("Certificate subject not a valid CordaX500Name", ex)
ctx.close()
return
}
if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) {
badCert = true
log.error("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
logErrorWithMDC("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
ctx.close()
return
}
log.info("Handshake completed with subject: $remoteX500Name")
logInfoWithMDC("Handshake completed with subject: $remoteX500Name")
createAMQPEngine(ctx)
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
} else {
// This happens when the peer node is closed during SSL establishment.
if (evt.cause() is ClosedChannelException) {
log.warn("SSL Handshake closed early.")
logWarnWithMDC("SSL Handshake closed early.")
} else {
badCert = true
}
log.error("Handshake failure ${evt.cause().message}")
logErrorWithMDC("Handshake failure ${evt.cause().message}")
if (log.isTraceEnabled) {
log.trace("Handshake failure", evt.cause())
withMDC { log.trace("Handshake failure", evt.cause()) }
}
ctx.close()
}
@ -120,9 +146,9 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
@Suppress("OverridingDeprecatedMember")
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
log.warn("Closing channel due to nonrecoverable exception ${cause.message}")
logWarnWithMDC("Closing channel due to nonrecoverable exception ${cause.message}")
if (log.isTraceEnabled) {
log.trace("Pipeline uncaught exception", cause)
withMDC { log.trace("Pipeline uncaught exception", cause) }
}
ctx.close()
}
@ -151,7 +177,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) {
"Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}"
}
log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
logDebugWithMDC { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
eventProcessor!!.transportWriteMessage(msg)
}
// A received AMQP packet has been completed and this self-posted packet will be signalled out to the
@ -169,7 +195,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
}
}
} catch (ex: Exception) {
log.error("Error in AMQP write processing", ex)
logErrorWithMDC("Error in AMQP write processing", ex)
throw ex
}
} finally {