From bafe387e93dd345af9af2989004b9011683e2a36 Mon Sep 17 00:00:00 2001 From: Katelyn Baker Date: Thu, 21 Jun 2018 13:57:44 +0100 Subject: [PATCH] Revert "Change to use MDC logic in bridge/AMQP protocol logging (#3398) (#1034)" This reverts commit dd74fd2e2832a525a5aaa1976d7cc84c1ba7f361. --- .../internal/bridging/AMQPBridgeManager.kt | 41 +++------- .../engine/ConnectionStateMachine.kt | 82 ++++++++----------- .../protonwrapper/engine/EventProcessor.kt | 32 ++------ .../protonwrapper/netty/AMQPChannelHandler.kt | 60 ++++---------- 4 files changed, 69 insertions(+), 146 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index d942cb9ec8..6629914e31 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -16,7 +16,7 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.internal.VisibleForTesting import net.corda.core.node.NodeInfo import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER @@ -34,7 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientSession -import org.slf4j.MDC +import org.slf4j.LoggerFactory import rx.Subscription import java.security.KeyStore import java.util.concurrent.locks.ReentrantLock @@ -88,28 +88,9 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf private val maxMessageSize: Int) { companion object { fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort" - private val log = contextLogger() } - private fun withMDC(block: () -> Unit) { - MDC.put("queueName", queueName) - MDC.put("target", target.toString()) - MDC.put("bridgeName", bridgeName) - MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() }) - MDC.put("maxMessageSize", maxMessageSize.toString()) - block() - MDC.clear() - } - - private fun logDebugWithMDC(msg: () -> String) { - if (log.isDebugEnabled) { - withMDC { log.debug(msg()) } - } - } - - private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) } - - private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) } + private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}") val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedThreadPool = sharedEventGroup, socksProxyConfig = socksProxyConfig, maxMessageSize = maxMessageSize) @@ -120,13 +101,13 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf private var connectedSubscription: Subscription? = null fun start() { - logInfoWithMDC("Create new AMQP bridge") + log.info("Create new AMQP bridge") connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) }) amqpClient.start() } fun stop() { - logInfoWithMDC("Stopping AMQP bridge") + log.info("Stopping AMQP bridge") lock.withLock { synchronized(artemis) { consumer?.apply { @@ -152,7 +133,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf lock.withLock { synchronized(artemis) { if (connected) { - logInfoWithMDC("Bridge Connected") + log.info("Bridge Connected") val sessionFactory = artemis.started!!.sessionFactory val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) this.session = session @@ -161,7 +142,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler) session.start() } else { - logInfoWithMDC("Bridge Disconnected") + log.info("Bridge Disconnected") consumer?.apply { if (!isClosed) { close() @@ -181,7 +162,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { if (artemisMessage.bodySize > maxMessageSize) { - logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " + + log.warn("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " + "dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") // Ack the message to prevent same message being sent to us again. artemisMessage.acknowledge() @@ -198,18 +179,18 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf properties[key] = value } } - logDebugWithMDC { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } + log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } val peerInbox = translateLocalQueueToInboxAddress(queueName) val sendableMessage = amqpClient.createMessage(data, peerInbox, legalNames.first().toString(), properties) sendableMessage.onComplete.then { - logDebugWithMDC { "Bridge ACK ${sendableMessage.onComplete.get()}" } + log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" } lock.withLock { if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { artemisMessage.acknowledge() } else { - logInfoWithMDC("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") + log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") // We need to commit any acknowledged messages before rolling back the failed // (unacknowledged) message. session?.commit() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index 5f848c616c..77f64fc002 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -16,7 +16,7 @@ import io.netty.buffer.Unpooled import io.netty.channel.Channel import io.netty.channel.ChannelHandlerContext import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl @@ -33,7 +33,7 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode import org.apache.qpid.proton.engine.* import org.apache.qpid.proton.message.Message import org.apache.qpid.proton.message.ProtonJMessage -import org.slf4j.MDC +import org.slf4j.LoggerFactory import java.net.InetSocketAddress import java.nio.ByteBuffer import java.util.* @@ -45,7 +45,7 @@ import java.util.* * but this threading lock is managed by the EventProcessor class that calls this. * It ultimately posts application packets to/from from the netty transport pipeline. */ -internal class ConnectionStateMachine(private val serverMode: Boolean, +internal class ConnectionStateMachine(serverMode: Boolean, collector: Collector, private val localLegalName: String, private val remoteLegalName: String, @@ -53,28 +53,10 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, password: String?) : BaseHandler() { companion object { private const val IDLE_TIMEOUT = 10000 - private val log = contextLogger() } - private fun withMDC(block: () -> Unit) { - MDC.put("serverMode", serverMode.toString()) - MDC.put("localLegalName", localLegalName) - MDC.put("remoteLegalName", remoteLegalName) - block() - MDC.clear() - } - - private fun logDebugWithMDC(msg: () -> String) { - if (log.isDebugEnabled) { - withMDC { log.debug(msg()) } - } - } - - private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) } - - private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) } - val connection: Connection + private val log = LoggerFactory.getLogger(localLegalName) private val transport: Transport private val id = UUID.randomUUID().toString() private var session: Session? = null @@ -121,12 +103,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onConnectionInit(event: Event) { val connection = event.connection - logDebugWithMDC { "Connection init $connection" } + log.debug { "Connection init $connection" } } override fun onConnectionLocalOpen(event: Event) { val connection = event.connection - logInfoWithMDC("Connection local open $connection") + log.info("Connection local open $connection") val session = connection.session() session.open() this.session = session @@ -137,7 +119,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onConnectionLocalClose(event: Event) { val connection = event.connection - logInfoWithMDC("Connection local close $connection") + log.info("Connection local close $connection") connection.close() connection.free() } @@ -155,7 +137,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onConnectionFinal(event: Event) { val connection = event.connection - logDebugWithMDC { "Connection final $connection" } + log.debug { "Connection final $connection" } if (connection == this.connection) { this.connection.context = null for (queue in messageQueues.values) { @@ -195,21 +177,21 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onTransportHeadClosed(event: Event) { val transport = event.transport - logDebugWithMDC { "Transport Head Closed $transport" } + log.debug { "Transport Head Closed $transport" } transport.close_tail() onTransportInternal(transport) } override fun onTransportTailClosed(event: Event) { val transport = event.transport - logDebugWithMDC { "Transport Tail Closed $transport" } + log.debug { "Transport Tail Closed $transport" } transport.close_head() onTransportInternal(transport) } override fun onTransportClosed(event: Event) { val transport = event.transport - logDebugWithMDC { "Transport Closed $transport" } + log.debug { "Transport Closed $transport" } if (transport == this.transport) { transport.unbind() transport.free() @@ -219,19 +201,19 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onTransportError(event: Event) { val transport = event.transport - logInfoWithMDC("Transport Error $transport") + log.info("Transport Error $transport") val condition = event.transport.condition if (condition != null) { - logInfoWithMDC("Error: ${condition.description}") + log.info("Error: ${condition.description}") } else { - logInfoWithMDC("Error (no description returned).") + log.info("Error (no description returned).") } onTransportInternal(transport) } override fun onTransport(event: Event) { val transport = event.transport - logDebugWithMDC { "Transport $transport" } + log.debug { "Transport $transport" } onTransportInternal(transport) } @@ -248,12 +230,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onSessionInit(event: Event) { val session = event.session - logDebugWithMDC { "Session init $session" } + log.debug { "Session init $session" } } override fun onSessionLocalOpen(event: Event) { val session = event.session - logDebugWithMDC { "Session local open $session" } + log.debug { "Session local open $session" } } private fun getSender(target: String): Sender { @@ -279,14 +261,14 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onSessionLocalClose(event: Event) { val session = event.session - logDebugWithMDC { "Session local close $session" } + log.debug { "Session local close $session" } session.close() session.free() } override fun onSessionFinal(event: Event) { val session = event.session - logDebugWithMDC { "Session final $session" } + log.debug { "Session final $session" } if (session == this.session) { this.session = null } @@ -295,12 +277,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onLinkLocalOpen(event: Event) { val link = event.link if (link is Sender) { - logDebugWithMDC { "Sender Link local open ${link.name} ${link.source} ${link.target}" } + log.debug { "Sender Link local open ${link.name} ${link.source} ${link.target}" } senders[link.target.address] = link transmitMessages(link) } if (link is Receiver) { - logDebugWithMDC { "Receiver Link local open ${link.name} ${link.source} ${link.target}" } + log.debug { "Receiver Link local open ${link.name} ${link.source} ${link.target}" } receivers[link.target.address] = link } } @@ -309,7 +291,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, val link = event.link if (link is Receiver) { if (link.remoteTarget is Coordinator) { - logDebugWithMDC { "Coordinator link received" } + log.debug { "Coordinator link received" } } } } @@ -317,11 +299,11 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onLinkFinal(event: Event) { val link = event.link if (link is Sender) { - logDebugWithMDC { "Sender Link final ${link.name} ${link.source} ${link.target}" } + log.debug { "Sender Link final ${link.name} ${link.source} ${link.target}" } senders.remove(link.target.address) } if (link is Receiver) { - logDebugWithMDC { "Receiver Link final ${link.name} ${link.source} ${link.target}" } + log.debug { "Receiver Link final ${link.name} ${link.source} ${link.target}" } receivers.remove(link.target.address) } } @@ -329,12 +311,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onLinkFlow(event: Event) { val link = event.link if (link is Sender) { - logDebugWithMDC { "Sender Flow event: ${link.name} ${link.source} ${link.target}" } + log.debug { "Sender Flow event: ${link.name} ${link.source} ${link.target}" } if (senders.containsKey(link.target.address)) { transmitMessages(link) } } else if (link is Receiver) { - logDebugWithMDC { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" } + log.debug { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" } } } @@ -345,7 +327,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, private fun transmitMessages(sender: Sender) { val messageQueue = messageQueues.getOrPut(sender.target.address, { LinkedList() }) while (sender.credit > 0) { - logDebugWithMDC { "Sender credit: ${sender.credit}" } + log.debug { "Sender credit: ${sender.credit}" } val nextMessage = messageQueue.poll() if (nextMessage != null) { try { @@ -356,7 +338,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, delivery.context = nextMessage sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes()) nextMessage.status = MessageStatus.Sent - logDebugWithMDC { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" } + log.debug { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" } unackedQueue.offer(nextMessage) sender.advance() } finally { @@ -370,7 +352,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onDelivery(event: Event) { val delivery = event.delivery - logDebugWithMDC { "Delivery $delivery" } + log.debug { "Delivery $delivery" } val link = delivery.link if (link is Receiver) { if (delivery.isReadable && !delivery.isPartial) { @@ -394,7 +376,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, appProperties, channel, delivery) - logDebugWithMDC { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" } + log.debug { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" } channel.writeAndFlush(receivedMessage) if (link.current() == delivery) { link.advance() @@ -405,7 +387,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, } } } else if (link is Sender) { - logDebugWithMDC { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" } + log.debug { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" } val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance() val sourceMessage = delivery.context as? SendableMessageImpl unackedQueue.remove(sourceMessage) @@ -423,7 +405,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, buffer.readBytes(bytes) return Unpooled.wrappedBuffer(bytes) } catch (ex: Exception) { - logErrorWithMDC("Unable to encode message as AMQP packet", ex) + log.error("Unable to encode message as AMQP packet", ex) throw ex } } finally { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt index fc41b54eec..4ad1858a43 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt @@ -13,7 +13,7 @@ package net.corda.nodeapi.internal.protonwrapper.engine import io.netty.buffer.ByteBuf import io.netty.channel.Channel import io.netty.channel.ChannelHandlerContext -import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl @@ -26,7 +26,7 @@ import org.apache.qpid.proton.engine.* import org.apache.qpid.proton.engine.impl.CollectorImpl import org.apache.qpid.proton.reactor.FlowController import org.apache.qpid.proton.reactor.Handshaker -import org.slf4j.MDC +import org.slf4j.LoggerFactory import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock @@ -40,30 +40,16 @@ import kotlin.concurrent.withLock * Everything here is single threaded, because the proton-j library has to be run that way. */ internal class EventProcessor(channel: Channel, - private val serverMode: Boolean, - private val localLegalName: String, - private val remoteLegalName: String, + serverMode: Boolean, + localLegalName: String, + remoteLegalName: String, userName: String?, password: String?) : BaseHandler() { companion object { private const val FLOW_WINDOW_SIZE = 10 - private val log = contextLogger() - } - - private fun withMDC(block: () -> Unit) { - MDC.put("serverMode", serverMode.toString()) - MDC.put("localLegalName", localLegalName) - MDC.put("remoteLegalName", remoteLegalName) - block() - MDC.clear() - } - - private fun logDebugWithMDC(msg: () -> String) { - if (log.isDebugEnabled) { - withMDC { log.debug(msg()) } - } } + private val log = LoggerFactory.getLogger(localLegalName) private val lock = ReentrantLock() private var pendingExecute: Boolean = false private val executor: ScheduledExecutorService = channel.eventLoop() @@ -118,16 +104,16 @@ internal class EventProcessor(channel: Channel, fun processEvents() { lock.withLock { pendingExecute = false - logDebugWithMDC { "Process Events" } + log.debug { "Process Events" } while (true) { val ev = popEvent() ?: break - logDebugWithMDC { "Process event: $ev" } + log.debug { "Process event: $ev" } for (handler in handlers) { handler.handle(ev) } } stateMachine.processTransport() - logDebugWithMDC { "Process Events Done" } + log.debug { "Process Events Done" } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index 2b43dcdd51..fd56a9d37d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -21,7 +21,7 @@ import io.netty.handler.ssl.SslHandler import io.netty.handler.ssl.SslHandshakeCompletionEvent import io.netty.util.ReferenceCountUtil import net.corda.core.identity.CordaX500Name -import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.nodeapi.internal.crypto.x509 import net.corda.nodeapi.internal.protonwrapper.engine.EventProcessor import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage @@ -31,7 +31,7 @@ import org.apache.qpid.proton.engine.ProtonJTransport import org.apache.qpid.proton.engine.Transport import org.apache.qpid.proton.engine.impl.ProtocolTracer import org.apache.qpid.proton.framing.TransportFrame -import org.slf4j.MDC +import org.slf4j.LoggerFactory import java.net.InetSocketAddress import java.nio.channels.ClosedChannelException import java.security.cert.X509Certificate @@ -49,10 +49,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, private val onOpen: (Pair) -> Unit, private val onClose: (Pair) -> Unit, private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() { - companion object { - private val log = contextLogger() - } - + private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler") private lateinit var remoteAddress: InetSocketAddress private var localCert: X509Certificate? = null private var remoteCert: X509Certificate? = null @@ -60,34 +57,11 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, private var suppressClose: Boolean = false private var badCert: Boolean = false - private fun withMDC(block: () -> Unit) { - MDC.put("serverMode", serverMode.toString()) - MDC.put("remoteAddress", remoteAddress.toString()) - MDC.put("localCert", localCert?.subjectDN?.toString()) - MDC.put("remoteCert", remoteCert?.subjectDN?.toString()) - MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames?.joinToString(separator = ";") { it.toString() }) - block() - MDC.clear() - } - - private fun logDebugWithMDC(msg: () -> String) { - if (log.isDebugEnabled) { - withMDC { log.debug(msg()) } - } - } - - private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) } - - private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) } - - private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) } - - override fun channelActive(ctx: ChannelHandlerContext) { val ch = ctx.channel() remoteAddress = ch.remoteAddress() as InetSocketAddress val localAddress = ch.localAddress() as InetSocketAddress - logInfoWithMDC("New client connection ${ch.id()} from $remoteAddress to $localAddress") + log.info("New client connection ${ch.id()} from $remoteAddress to $localAddress") } private fun createAMQPEngine(ctx: ChannelHandlerContext) { @@ -98,11 +72,11 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, if (trace) { transport.protocolTracer = object : ProtocolTracer { override fun sentFrame(transportFrame: TransportFrame) { - logInfoWithMDC("${transportFrame.body}") + log.info("${transportFrame.body}") } override fun receivedFrame(transportFrame: TransportFrame) { - logInfoWithMDC("${transportFrame.body}") + log.info("${transportFrame.body}") } } } @@ -112,7 +86,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, override fun channelInactive(ctx: ChannelHandlerContext) { val ch = ctx.channel() - logInfoWithMDC("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}") + log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}") if (!suppressClose) { onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert))) } @@ -133,29 +107,29 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, CordaX500Name.build(remoteCert!!.subjectX500Principal) } catch (ex: IllegalArgumentException) { badCert = true - logErrorWithMDC("Certificate subject not a valid CordaX500Name", ex) + log.error("Certificate subject not a valid CordaX500Name", ex) ctx.close() return } if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) { badCert = true - logErrorWithMDC("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames") + log.error("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames") ctx.close() return } - logInfoWithMDC("Handshake completed with subject: $remoteX500Name") + log.info("Handshake completed with subject: $remoteX500Name") createAMQPEngine(ctx) onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false))) } else { // This happens when the peer node is closed during SSL establishment. if (evt.cause() is ClosedChannelException) { - logWarnWithMDC("SSL Handshake closed early.") + log.warn("SSL Handshake closed early.") } else { badCert = true } - logErrorWithMDC("Handshake failure ${evt.cause().message}") + log.error("Handshake failure ${evt.cause().message}") if (log.isTraceEnabled) { - withMDC { log.trace("Handshake failure", evt.cause()) } + log.trace("Handshake failure", evt.cause()) } ctx.close() } @@ -164,9 +138,9 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, @Suppress("OverridingDeprecatedMember") override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - logWarnWithMDC("Closing channel due to nonrecoverable exception ${cause.message}") + log.warn("Closing channel due to nonrecoverable exception ${cause.message}") if (log.isTraceEnabled) { - withMDC { log.trace("Pipeline uncaught exception", cause) } + log.trace("Pipeline uncaught exception", cause) } if (cause is ProxyConnectException) { log.warn("Proxy connection failed ${cause.message}") @@ -199,7 +173,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) { "Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}" } - logDebugWithMDC { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" } + log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" } eventProcessor!!.transportWriteMessage(msg) } // A received AMQP packet has been completed and this self-posted packet will be signalled out to the @@ -217,7 +191,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, } } } catch (ex: Exception) { - logErrorWithMDC("Error in AMQP write processing", ex) + log.error("Error in AMQP write processing", ex) throw ex } } finally {