mirror of
https://github.com/corda/corda.git
synced 2025-04-15 15:07:03 +00:00
This reverts commit dd74fd2e2832a525a5aaa1976d7cc84c1ba7f361.
This commit is contained in:
parent
dd74fd2e28
commit
bafe387e93
@ -16,7 +16,7 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||
@ -34,7 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.slf4j.MDC
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Subscription
|
||||
import java.security.KeyStore
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
@ -88,28 +88,9 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
private val maxMessageSize: Int) {
|
||||
companion object {
|
||||
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
MDC.put("queueName", queueName)
|
||||
MDC.put("target", target.toString())
|
||||
MDC.put("bridgeName", bridgeName)
|
||||
MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("maxMessageSize", maxMessageSize.toString())
|
||||
block()
|
||||
MDC.clear()
|
||||
}
|
||||
|
||||
private fun logDebugWithMDC(msg: () -> String) {
|
||||
if (log.isDebugEnabled) {
|
||||
withMDC { log.debug(msg()) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
|
||||
|
||||
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||
private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}")
|
||||
|
||||
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail,
|
||||
sharedThreadPool = sharedEventGroup, socksProxyConfig = socksProxyConfig, maxMessageSize = maxMessageSize)
|
||||
@ -120,13 +101,13 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
private var connectedSubscription: Subscription? = null
|
||||
|
||||
fun start() {
|
||||
logInfoWithMDC("Create new AMQP bridge")
|
||||
log.info("Create new AMQP bridge")
|
||||
connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) })
|
||||
amqpClient.start()
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
logInfoWithMDC("Stopping AMQP bridge")
|
||||
log.info("Stopping AMQP bridge")
|
||||
lock.withLock {
|
||||
synchronized(artemis) {
|
||||
consumer?.apply {
|
||||
@ -152,7 +133,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
lock.withLock {
|
||||
synchronized(artemis) {
|
||||
if (connected) {
|
||||
logInfoWithMDC("Bridge Connected")
|
||||
log.info("Bridge Connected")
|
||||
val sessionFactory = artemis.started!!.sessionFactory
|
||||
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
this.session = session
|
||||
@ -161,7 +142,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler)
|
||||
session.start()
|
||||
} else {
|
||||
logInfoWithMDC("Bridge Disconnected")
|
||||
log.info("Bridge Disconnected")
|
||||
consumer?.apply {
|
||||
if (!isClosed) {
|
||||
close()
|
||||
@ -181,7 +162,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
if (artemisMessage.bodySize > maxMessageSize) {
|
||||
logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " +
|
||||
log.warn("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " +
|
||||
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
// Ack the message to prevent same message being sent to us again.
|
||||
artemisMessage.acknowledge()
|
||||
@ -198,18 +179,18 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
properties[key] = value
|
||||
}
|
||||
}
|
||||
logDebugWithMDC { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
val sendableMessage = amqpClient.createMessage(data, peerInbox,
|
||||
legalNames.first().toString(),
|
||||
properties)
|
||||
sendableMessage.onComplete.then {
|
||||
logDebugWithMDC { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
||||
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
||||
lock.withLock {
|
||||
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
|
||||
artemisMessage.acknowledge()
|
||||
} else {
|
||||
logInfoWithMDC("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
// We need to commit any acknowledged messages before rolling back the failed
|
||||
// (unacknowledged) message.
|
||||
session?.commit()
|
||||
|
@ -16,7 +16,7 @@ import io.netty.buffer.Unpooled
|
||||
import io.netty.channel.Channel
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||
@ -33,7 +33,7 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode
|
||||
import org.apache.qpid.proton.engine.*
|
||||
import org.apache.qpid.proton.message.Message
|
||||
import org.apache.qpid.proton.message.ProtonJMessage
|
||||
import org.slf4j.MDC
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.*
|
||||
@ -45,7 +45,7 @@ import java.util.*
|
||||
* but this threading lock is managed by the EventProcessor class that calls this.
|
||||
* It ultimately posts application packets to/from from the netty transport pipeline.
|
||||
*/
|
||||
internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
internal class ConnectionStateMachine(serverMode: Boolean,
|
||||
collector: Collector,
|
||||
private val localLegalName: String,
|
||||
private val remoteLegalName: String,
|
||||
@ -53,28 +53,10 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
password: String?) : BaseHandler() {
|
||||
companion object {
|
||||
private const val IDLE_TIMEOUT = 10000
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
MDC.put("serverMode", serverMode.toString())
|
||||
MDC.put("localLegalName", localLegalName)
|
||||
MDC.put("remoteLegalName", remoteLegalName)
|
||||
block()
|
||||
MDC.clear()
|
||||
}
|
||||
|
||||
private fun logDebugWithMDC(msg: () -> String) {
|
||||
if (log.isDebugEnabled) {
|
||||
withMDC { log.debug(msg()) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
|
||||
|
||||
private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) }
|
||||
|
||||
val connection: Connection
|
||||
private val log = LoggerFactory.getLogger(localLegalName)
|
||||
private val transport: Transport
|
||||
private val id = UUID.randomUUID().toString()
|
||||
private var session: Session? = null
|
||||
@ -121,12 +103,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
|
||||
override fun onConnectionInit(event: Event) {
|
||||
val connection = event.connection
|
||||
logDebugWithMDC { "Connection init $connection" }
|
||||
log.debug { "Connection init $connection" }
|
||||
}
|
||||
|
||||
override fun onConnectionLocalOpen(event: Event) {
|
||||
val connection = event.connection
|
||||
logInfoWithMDC("Connection local open $connection")
|
||||
log.info("Connection local open $connection")
|
||||
val session = connection.session()
|
||||
session.open()
|
||||
this.session = session
|
||||
@ -137,7 +119,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
|
||||
override fun onConnectionLocalClose(event: Event) {
|
||||
val connection = event.connection
|
||||
logInfoWithMDC("Connection local close $connection")
|
||||
log.info("Connection local close $connection")
|
||||
connection.close()
|
||||
connection.free()
|
||||
}
|
||||
@ -155,7 +137,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
|
||||
override fun onConnectionFinal(event: Event) {
|
||||
val connection = event.connection
|
||||
logDebugWithMDC { "Connection final $connection" }
|
||||
log.debug { "Connection final $connection" }
|
||||
if (connection == this.connection) {
|
||||
this.connection.context = null
|
||||
for (queue in messageQueues.values) {
|
||||
@ -195,21 +177,21 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
|
||||
override fun onTransportHeadClosed(event: Event) {
|
||||
val transport = event.transport
|
||||
logDebugWithMDC { "Transport Head Closed $transport" }
|
||||
log.debug { "Transport Head Closed $transport" }
|
||||
transport.close_tail()
|
||||
onTransportInternal(transport)
|
||||
}
|
||||
|
||||
override fun onTransportTailClosed(event: Event) {
|
||||
val transport = event.transport
|
||||
logDebugWithMDC { "Transport Tail Closed $transport" }
|
||||
log.debug { "Transport Tail Closed $transport" }
|
||||
transport.close_head()
|
||||
onTransportInternal(transport)
|
||||
}
|
||||
|
||||
override fun onTransportClosed(event: Event) {
|
||||
val transport = event.transport
|
||||
logDebugWithMDC { "Transport Closed $transport" }
|
||||
log.debug { "Transport Closed $transport" }
|
||||
if (transport == this.transport) {
|
||||
transport.unbind()
|
||||
transport.free()
|
||||
@ -219,19 +201,19 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
|
||||
override fun onTransportError(event: Event) {
|
||||
val transport = event.transport
|
||||
logInfoWithMDC("Transport Error $transport")
|
||||
log.info("Transport Error $transport")
|
||||
val condition = event.transport.condition
|
||||
if (condition != null) {
|
||||
logInfoWithMDC("Error: ${condition.description}")
|
||||
log.info("Error: ${condition.description}")
|
||||
} else {
|
||||
logInfoWithMDC("Error (no description returned).")
|
||||
log.info("Error (no description returned).")
|
||||
}
|
||||
onTransportInternal(transport)
|
||||
}
|
||||
|
||||
override fun onTransport(event: Event) {
|
||||
val transport = event.transport
|
||||
logDebugWithMDC { "Transport $transport" }
|
||||
log.debug { "Transport $transport" }
|
||||
onTransportInternal(transport)
|
||||
}
|
||||
|
||||
@ -248,12 +230,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
|
||||
override fun onSessionInit(event: Event) {
|
||||
val session = event.session
|
||||
logDebugWithMDC { "Session init $session" }
|
||||
log.debug { "Session init $session" }
|
||||
}
|
||||
|
||||
override fun onSessionLocalOpen(event: Event) {
|
||||
val session = event.session
|
||||
logDebugWithMDC { "Session local open $session" }
|
||||
log.debug { "Session local open $session" }
|
||||
}
|
||||
|
||||
private fun getSender(target: String): Sender {
|
||||
@ -279,14 +261,14 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
|
||||
override fun onSessionLocalClose(event: Event) {
|
||||
val session = event.session
|
||||
logDebugWithMDC { "Session local close $session" }
|
||||
log.debug { "Session local close $session" }
|
||||
session.close()
|
||||
session.free()
|
||||
}
|
||||
|
||||
override fun onSessionFinal(event: Event) {
|
||||
val session = event.session
|
||||
logDebugWithMDC { "Session final $session" }
|
||||
log.debug { "Session final $session" }
|
||||
if (session == this.session) {
|
||||
this.session = null
|
||||
}
|
||||
@ -295,12 +277,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
override fun onLinkLocalOpen(event: Event) {
|
||||
val link = event.link
|
||||
if (link is Sender) {
|
||||
logDebugWithMDC { "Sender Link local open ${link.name} ${link.source} ${link.target}" }
|
||||
log.debug { "Sender Link local open ${link.name} ${link.source} ${link.target}" }
|
||||
senders[link.target.address] = link
|
||||
transmitMessages(link)
|
||||
}
|
||||
if (link is Receiver) {
|
||||
logDebugWithMDC { "Receiver Link local open ${link.name} ${link.source} ${link.target}" }
|
||||
log.debug { "Receiver Link local open ${link.name} ${link.source} ${link.target}" }
|
||||
receivers[link.target.address] = link
|
||||
}
|
||||
}
|
||||
@ -309,7 +291,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
val link = event.link
|
||||
if (link is Receiver) {
|
||||
if (link.remoteTarget is Coordinator) {
|
||||
logDebugWithMDC { "Coordinator link received" }
|
||||
log.debug { "Coordinator link received" }
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -317,11 +299,11 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
override fun onLinkFinal(event: Event) {
|
||||
val link = event.link
|
||||
if (link is Sender) {
|
||||
logDebugWithMDC { "Sender Link final ${link.name} ${link.source} ${link.target}" }
|
||||
log.debug { "Sender Link final ${link.name} ${link.source} ${link.target}" }
|
||||
senders.remove(link.target.address)
|
||||
}
|
||||
if (link is Receiver) {
|
||||
logDebugWithMDC { "Receiver Link final ${link.name} ${link.source} ${link.target}" }
|
||||
log.debug { "Receiver Link final ${link.name} ${link.source} ${link.target}" }
|
||||
receivers.remove(link.target.address)
|
||||
}
|
||||
}
|
||||
@ -329,12 +311,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
override fun onLinkFlow(event: Event) {
|
||||
val link = event.link
|
||||
if (link is Sender) {
|
||||
logDebugWithMDC { "Sender Flow event: ${link.name} ${link.source} ${link.target}" }
|
||||
log.debug { "Sender Flow event: ${link.name} ${link.source} ${link.target}" }
|
||||
if (senders.containsKey(link.target.address)) {
|
||||
transmitMessages(link)
|
||||
}
|
||||
} else if (link is Receiver) {
|
||||
logDebugWithMDC { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" }
|
||||
log.debug { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" }
|
||||
}
|
||||
}
|
||||
|
||||
@ -345,7 +327,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
private fun transmitMessages(sender: Sender) {
|
||||
val messageQueue = messageQueues.getOrPut(sender.target.address, { LinkedList() })
|
||||
while (sender.credit > 0) {
|
||||
logDebugWithMDC { "Sender credit: ${sender.credit}" }
|
||||
log.debug { "Sender credit: ${sender.credit}" }
|
||||
val nextMessage = messageQueue.poll()
|
||||
if (nextMessage != null) {
|
||||
try {
|
||||
@ -356,7 +338,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
delivery.context = nextMessage
|
||||
sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes())
|
||||
nextMessage.status = MessageStatus.Sent
|
||||
logDebugWithMDC { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" }
|
||||
log.debug { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" }
|
||||
unackedQueue.offer(nextMessage)
|
||||
sender.advance()
|
||||
} finally {
|
||||
@ -370,7 +352,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
|
||||
override fun onDelivery(event: Event) {
|
||||
val delivery = event.delivery
|
||||
logDebugWithMDC { "Delivery $delivery" }
|
||||
log.debug { "Delivery $delivery" }
|
||||
val link = delivery.link
|
||||
if (link is Receiver) {
|
||||
if (delivery.isReadable && !delivery.isPartial) {
|
||||
@ -394,7 +376,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
appProperties,
|
||||
channel,
|
||||
delivery)
|
||||
logDebugWithMDC { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" }
|
||||
log.debug { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" }
|
||||
channel.writeAndFlush(receivedMessage)
|
||||
if (link.current() == delivery) {
|
||||
link.advance()
|
||||
@ -405,7 +387,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
}
|
||||
}
|
||||
} else if (link is Sender) {
|
||||
logDebugWithMDC { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" }
|
||||
log.debug { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" }
|
||||
val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance()
|
||||
val sourceMessage = delivery.context as? SendableMessageImpl
|
||||
unackedQueue.remove(sourceMessage)
|
||||
@ -423,7 +405,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
buffer.readBytes(bytes)
|
||||
return Unpooled.wrappedBuffer(bytes)
|
||||
} catch (ex: Exception) {
|
||||
logErrorWithMDC("Unable to encode message as AMQP packet", ex)
|
||||
log.error("Unable to encode message as AMQP packet", ex)
|
||||
throw ex
|
||||
}
|
||||
} finally {
|
||||
|
@ -13,7 +13,7 @@ package net.corda.nodeapi.internal.protonwrapper.engine
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.channel.Channel
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||
@ -26,7 +26,7 @@ import org.apache.qpid.proton.engine.*
|
||||
import org.apache.qpid.proton.engine.impl.CollectorImpl
|
||||
import org.apache.qpid.proton.reactor.FlowController
|
||||
import org.apache.qpid.proton.reactor.Handshaker
|
||||
import org.slf4j.MDC
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
@ -40,30 +40,16 @@ import kotlin.concurrent.withLock
|
||||
* Everything here is single threaded, because the proton-j library has to be run that way.
|
||||
*/
|
||||
internal class EventProcessor(channel: Channel,
|
||||
private val serverMode: Boolean,
|
||||
private val localLegalName: String,
|
||||
private val remoteLegalName: String,
|
||||
serverMode: Boolean,
|
||||
localLegalName: String,
|
||||
remoteLegalName: String,
|
||||
userName: String?,
|
||||
password: String?) : BaseHandler() {
|
||||
companion object {
|
||||
private const val FLOW_WINDOW_SIZE = 10
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
MDC.put("serverMode", serverMode.toString())
|
||||
MDC.put("localLegalName", localLegalName)
|
||||
MDC.put("remoteLegalName", remoteLegalName)
|
||||
block()
|
||||
MDC.clear()
|
||||
}
|
||||
|
||||
private fun logDebugWithMDC(msg: () -> String) {
|
||||
if (log.isDebugEnabled) {
|
||||
withMDC { log.debug(msg()) }
|
||||
}
|
||||
}
|
||||
|
||||
private val log = LoggerFactory.getLogger(localLegalName)
|
||||
private val lock = ReentrantLock()
|
||||
private var pendingExecute: Boolean = false
|
||||
private val executor: ScheduledExecutorService = channel.eventLoop()
|
||||
@ -118,16 +104,16 @@ internal class EventProcessor(channel: Channel,
|
||||
fun processEvents() {
|
||||
lock.withLock {
|
||||
pendingExecute = false
|
||||
logDebugWithMDC { "Process Events" }
|
||||
log.debug { "Process Events" }
|
||||
while (true) {
|
||||
val ev = popEvent() ?: break
|
||||
logDebugWithMDC { "Process event: $ev" }
|
||||
log.debug { "Process event: $ev" }
|
||||
for (handler in handlers) {
|
||||
handler.handle(ev)
|
||||
}
|
||||
}
|
||||
stateMachine.processTransport()
|
||||
logDebugWithMDC { "Process Events Done" }
|
||||
log.debug { "Process Events Done" }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,7 @@ import io.netty.handler.ssl.SslHandler
|
||||
import io.netty.handler.ssl.SslHandshakeCompletionEvent
|
||||
import io.netty.util.ReferenceCountUtil
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.crypto.x509
|
||||
import net.corda.nodeapi.internal.protonwrapper.engine.EventProcessor
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
@ -31,7 +31,7 @@ import org.apache.qpid.proton.engine.ProtonJTransport
|
||||
import org.apache.qpid.proton.engine.Transport
|
||||
import org.apache.qpid.proton.engine.impl.ProtocolTracer
|
||||
import org.apache.qpid.proton.framing.TransportFrame
|
||||
import org.slf4j.MDC
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.channels.ClosedChannelException
|
||||
import java.security.cert.X509Certificate
|
||||
@ -49,10 +49,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
private val onOpen: (Pair<SocketChannel, ConnectionChange>) -> Unit,
|
||||
private val onClose: (Pair<SocketChannel, ConnectionChange>) -> Unit,
|
||||
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler")
|
||||
private lateinit var remoteAddress: InetSocketAddress
|
||||
private var localCert: X509Certificate? = null
|
||||
private var remoteCert: X509Certificate? = null
|
||||
@ -60,34 +57,11 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
private var suppressClose: Boolean = false
|
||||
private var badCert: Boolean = false
|
||||
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
MDC.put("serverMode", serverMode.toString())
|
||||
MDC.put("remoteAddress", remoteAddress.toString())
|
||||
MDC.put("localCert", localCert?.subjectDN?.toString())
|
||||
MDC.put("remoteCert", remoteCert?.subjectDN?.toString())
|
||||
MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames?.joinToString(separator = ";") { it.toString() })
|
||||
block()
|
||||
MDC.clear()
|
||||
}
|
||||
|
||||
private fun logDebugWithMDC(msg: () -> String) {
|
||||
if (log.isDebugEnabled) {
|
||||
withMDC { log.debug(msg()) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
|
||||
|
||||
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||
|
||||
private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) }
|
||||
|
||||
|
||||
override fun channelActive(ctx: ChannelHandlerContext) {
|
||||
val ch = ctx.channel()
|
||||
remoteAddress = ch.remoteAddress() as InetSocketAddress
|
||||
val localAddress = ch.localAddress() as InetSocketAddress
|
||||
logInfoWithMDC("New client connection ${ch.id()} from $remoteAddress to $localAddress")
|
||||
log.info("New client connection ${ch.id()} from $remoteAddress to $localAddress")
|
||||
}
|
||||
|
||||
private fun createAMQPEngine(ctx: ChannelHandlerContext) {
|
||||
@ -98,11 +72,11 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
if (trace) {
|
||||
transport.protocolTracer = object : ProtocolTracer {
|
||||
override fun sentFrame(transportFrame: TransportFrame) {
|
||||
logInfoWithMDC("${transportFrame.body}")
|
||||
log.info("${transportFrame.body}")
|
||||
}
|
||||
|
||||
override fun receivedFrame(transportFrame: TransportFrame) {
|
||||
logInfoWithMDC("${transportFrame.body}")
|
||||
log.info("${transportFrame.body}")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -112,7 +86,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
|
||||
override fun channelInactive(ctx: ChannelHandlerContext) {
|
||||
val ch = ctx.channel()
|
||||
logInfoWithMDC("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
|
||||
log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
|
||||
if (!suppressClose) {
|
||||
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert)))
|
||||
}
|
||||
@ -133,29 +107,29 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
CordaX500Name.build(remoteCert!!.subjectX500Principal)
|
||||
} catch (ex: IllegalArgumentException) {
|
||||
badCert = true
|
||||
logErrorWithMDC("Certificate subject not a valid CordaX500Name", ex)
|
||||
log.error("Certificate subject not a valid CordaX500Name", ex)
|
||||
ctx.close()
|
||||
return
|
||||
}
|
||||
if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) {
|
||||
badCert = true
|
||||
logErrorWithMDC("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
|
||||
log.error("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
|
||||
ctx.close()
|
||||
return
|
||||
}
|
||||
logInfoWithMDC("Handshake completed with subject: $remoteX500Name")
|
||||
log.info("Handshake completed with subject: $remoteX500Name")
|
||||
createAMQPEngine(ctx)
|
||||
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
|
||||
} else {
|
||||
// This happens when the peer node is closed during SSL establishment.
|
||||
if (evt.cause() is ClosedChannelException) {
|
||||
logWarnWithMDC("SSL Handshake closed early.")
|
||||
log.warn("SSL Handshake closed early.")
|
||||
} else {
|
||||
badCert = true
|
||||
}
|
||||
logErrorWithMDC("Handshake failure ${evt.cause().message}")
|
||||
log.error("Handshake failure ${evt.cause().message}")
|
||||
if (log.isTraceEnabled) {
|
||||
withMDC { log.trace("Handshake failure", evt.cause()) }
|
||||
log.trace("Handshake failure", evt.cause())
|
||||
}
|
||||
ctx.close()
|
||||
}
|
||||
@ -164,9 +138,9 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
|
||||
@Suppress("OverridingDeprecatedMember")
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
logWarnWithMDC("Closing channel due to nonrecoverable exception ${cause.message}")
|
||||
log.warn("Closing channel due to nonrecoverable exception ${cause.message}")
|
||||
if (log.isTraceEnabled) {
|
||||
withMDC { log.trace("Pipeline uncaught exception", cause) }
|
||||
log.trace("Pipeline uncaught exception", cause)
|
||||
}
|
||||
if (cause is ProxyConnectException) {
|
||||
log.warn("Proxy connection failed ${cause.message}")
|
||||
@ -199,7 +173,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) {
|
||||
"Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}"
|
||||
}
|
||||
logDebugWithMDC { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
|
||||
log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
|
||||
eventProcessor!!.transportWriteMessage(msg)
|
||||
}
|
||||
// A received AMQP packet has been completed and this self-posted packet will be signalled out to the
|
||||
@ -217,7 +191,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
}
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
logErrorWithMDC("Error in AMQP write processing", ex)
|
||||
log.error("Error in AMQP write processing", ex)
|
||||
throw ex
|
||||
}
|
||||
} finally {
|
||||
|
Loading…
x
Reference in New Issue
Block a user