diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisConstants.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisConstants.kt new file mode 100644 index 0000000000..f2c887d464 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisConstants.kt @@ -0,0 +1,5 @@ +package net.corda.nodeapi.internal + +object ArtemisConstants { + const val MESSAGE_ID_KEY = "_AMQ_DUPL_ID" +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index b79f86cbd1..ce6987e826 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -7,6 +7,8 @@ import io.netty.channel.Channel import io.netty.channel.ChannelHandlerContext import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.toHexString +import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl @@ -26,6 +28,8 @@ import org.slf4j.MDC import java.net.InetSocketAddress import java.nio.ByteBuffer import java.util.* +import kotlin.math.max +import kotlin.math.min /** * This ConnectionStateMachine class handles the events generated by the proton-j library to track @@ -51,6 +55,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, MDC.put("serverMode", serverMode.toString()) MDC.put("localLegalName", localLegalName) MDC.put("remoteLegalName", remoteLegalName) + MDC.put("conn", connection.prettyPrint) block() } finally { MDC.setContextMap(oldMDC) @@ -73,12 +78,22 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, private val transport: Transport private val id = UUID.randomUUID().toString() private var session: Session? = null + /** + * Key is message topic and value is the list of messages + */ private val messageQueues = mutableMapOf>() private val unackedQueue = LinkedList() private val receivers = mutableMapOf() private val senders = mutableMapOf() private var tagId: Int = 0 + private val Connection?.prettyPrint: String + get() = this?.context?.toString() ?: "" + + private val Transport?.prettyPrint: String + // Inside Transport's context - there is Connection, inside Connection's context there is NIO channel that has useful information + get() = (this?.context as? Endpoint)?.context?.toString() ?: "" + init { connection = Engine.connection() connection.container = "CORDA:$id" @@ -116,12 +131,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onConnectionInit(event: Event) { val connection = event.connection - logDebugWithMDC { "Connection init $connection" } + logDebugWithMDC { "Connection init ${connection.prettyPrint}" } } override fun onConnectionLocalOpen(event: Event) { val connection = event.connection - logInfoWithMDC("Connection local open $connection") + logInfoWithMDC("Connection local open ${connection.prettyPrint}") val session = connection.session() session.open() this.session = session @@ -132,13 +147,15 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onConnectionLocalClose(event: Event) { val connection = event.connection - logInfoWithMDC("Connection local close $connection") + logInfoWithMDC("Connection local close ${connection.prettyPrint}") connection.close() connection.free() } override fun onConnectionUnbound(event: Event) { - if (event.connection == this.connection) { + val connection = event.connection + logInfoWithMDC("Connection unbound ${connection.prettyPrint}") + if (connection == this.connection) { val channel = connection.context as? Channel if (channel != null) { if (channel.isActive) { @@ -150,12 +167,13 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onConnectionFinal(event: Event) { val connection = event.connection - logDebugWithMDC { "Connection final $connection" } + logDebugWithMDC { "Connection final ${connection.prettyPrint}" } if (connection == this.connection) { this.connection.context = null for (queue in messageQueues.values) { // clear any dead messages while (true) { + logDebugWithMDC { "Queue size: ${queue.size}" } val msg = queue.poll() if (msg != null) { msg.doComplete(MessageStatus.Rejected) @@ -167,6 +185,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, } messageQueues.clear() while (true) { + logDebugWithMDC { "Unacked queue size: ${unackedQueue.size}" } val msg = unackedQueue.poll() if (msg != null) { msg.doComplete(MessageStatus.Rejected) @@ -185,26 +204,28 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, session = null receivers.clear() senders.clear() + } else { + logDebugWithMDC { "Connection from the event: ${connection.prettyPrint} is not the connection owned: ${this.connection.prettyPrint}" } } } override fun onTransportHeadClosed(event: Event) { val transport = event.transport - logDebugWithMDC { "Transport Head Closed $transport" } + logDebugWithMDC { "Transport Head Closed ${transport.prettyPrint}" } transport.close_tail() onTransportInternal(transport) } override fun onTransportTailClosed(event: Event) { val transport = event.transport - logDebugWithMDC { "Transport Tail Closed $transport" } + logDebugWithMDC { "Transport Tail Closed ${transport.prettyPrint}" } transport.close_head() onTransportInternal(transport) } override fun onTransportClosed(event: Event) { val transport = event.transport - logDebugWithMDC { "Transport Closed $transport" } + logDebugWithMDC { "Transport Closed ${transport.prettyPrint}" } if (transport == this.transport) { transport.unbind() transport.free() @@ -214,7 +235,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onTransportError(event: Event) { val transport = event.transport - logInfoWithMDC("Transport Error $transport") + logInfoWithMDC("Transport Error ${transport.prettyPrint}") val condition = event.transport.condition if (condition != null) { logInfoWithMDC("Error: ${condition.description}") @@ -226,7 +247,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onTransport(event: Event) { val transport = event.transport - logDebugWithMDC { "Transport $transport" } + logDebugWithMDC { "Transport ${transport.prettyPrint}" } onTransportInternal(transport) } @@ -361,7 +382,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, delivery.context = nextMessage sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes()) nextMessage.status = MessageStatus.Sent - logDebugWithMDC { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" } + logDebugWithMDC { "Put tag ${delivery.tag.toHexString()} on wire uuid: ${nextMessage.applicationProperties[MESSAGE_ID_KEY]}" } unackedQueue.offer(nextMessage) sender.advance() } finally { @@ -398,7 +419,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, appProperties, channel, delivery) - logDebugWithMDC { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" } + logDebugWithMDC { "Full message received uuid: ${appProperties[MESSAGE_ID_KEY]}" } channel.writeAndFlush(receivedMessage) if (link.current() == delivery) { link.advance() @@ -409,7 +430,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, } } } else if (link is Sender) { - logDebugWithMDC { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" } + logDebugWithMDC { "Sender delivery confirmed tag ${delivery.tag.toHexString()}" } val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance() val sourceMessage = delivery.context as? SendableMessageImpl unackedQueue.remove(sourceMessage) @@ -462,6 +483,8 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, if (session != null) { val sender = getSender(msg.topic) transmitMessages(sender) + } else { + logInfoWithMDC("Session been closed already") } } @@ -470,7 +493,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, try { do { val buffer = transport.inputBuffer - val limit = Math.min(buffer.remaining(), source.remaining()) + val limit = min(buffer.remaining(), source.remaining()) val duplicate = source.duplicate() duplicate.limit(source.position() + limit) buffer.put(duplicate) @@ -483,7 +506,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, condition.description = ex.message transport.condition = condition transport.close_tail() - transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) + transport.pop(max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) } } @@ -508,7 +531,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, condition.description = ex.message transport.condition = condition transport.close_head() - transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) + transport.pop(max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) } } } \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt index 8611373d2b..b91642a840 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt @@ -3,6 +3,8 @@ package net.corda.nodeapi.internal.protonwrapper.engine import io.netty.buffer.ByteBuf import io.netty.channel.Channel import io.netty.channel.ChannelHandlerContext +import net.corda.core.internal.declaredField +import net.corda.core.utilities.Try import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl @@ -13,7 +15,6 @@ import org.apache.qpid.proton.amqp.messaging.Rejected import org.apache.qpid.proton.amqp.transport.DeliveryState import org.apache.qpid.proton.amqp.transport.ErrorCondition import org.apache.qpid.proton.engine.* -import org.apache.qpid.proton.engine.impl.CollectorImpl import org.apache.qpid.proton.reactor.FlowController import org.apache.qpid.proton.reactor.Handshaker import org.slf4j.MDC @@ -21,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock +import kotlin.math.max /** * The EventProcessor class converts calls on the netty scheduler/pipeline @@ -29,12 +31,12 @@ import kotlin.concurrent.withLock * and simple sliding window flow control, so that these events don't have to live inside ConnectionStateMachine. * Everything here is single threaded, because the proton-j library has to be run that way. */ -internal class EventProcessor(channel: Channel, +internal class EventProcessor(private val channel: Channel, private val serverMode: Boolean, private val localLegalName: String, private val remoteLegalName: String, userName: String?, - password: String?) : BaseHandler() { + password: String?) { companion object { private const val FLOW_WINDOW_SIZE = 10 private val log = contextLogger() @@ -45,7 +47,9 @@ internal class EventProcessor(channel: Channel, try { MDC.put("serverMode", serverMode.toString()) MDC.put("localLegalName", localLegalName) + MDC.put("localAddress", channel.localAddress()?.toString()) MDC.put("remoteLegalName", remoteLegalName) + MDC.put("remoteAddress", channel.remoteAddress()?.toString()) block() } finally { MDC.setContextMap(oldMDC) @@ -59,10 +63,13 @@ internal class EventProcessor(channel: Channel, } private val lock = ReentrantLock() + @Volatile private var pendingExecute: Boolean = false + @Volatile + private var processorClosed: Boolean = false private val executor: ScheduledExecutorService = channel.eventLoop() - private val collector = Proton.collector() as CollectorImpl - private val handlers = mutableListOf() + private val collector = Proton.collector() + private val handlers: List private val stateMachine: ConnectionStateMachine = ConnectionStateMachine(serverMode, collector, localLegalName, @@ -73,15 +80,11 @@ internal class EventProcessor(channel: Channel, val connection: Connection = stateMachine.connection init { - addHandler(Handshaker()) - addHandler(FlowController(FLOW_WINDOW_SIZE)) - addHandler(stateMachine) + handlers = listOf(Handshaker(), FlowController(FLOW_WINDOW_SIZE), stateMachine) connection.context = channel tick(stateMachine.connection) } - fun addHandler(handler: Handler) = handlers.add(handler) - private fun popEvent(): Event? { var ev = collector.peek() if (ev != null) { @@ -93,23 +96,28 @@ internal class EventProcessor(channel: Channel, private fun tick(connection: Connection) { lock.withLock { + logDebugWithMDC { "Tick" } try { if ((connection.localState != EndpointState.CLOSED) && !connection.transport.isClosed) { val now = System.currentTimeMillis() - val tickDelay = Math.max(0L, connection.transport.tick(now) - now) + val tickDelay = max(0L, connection.transport.tick(now) - now) executor.schedule({ tick(connection) processEvents() }, tickDelay, TimeUnit.MILLISECONDS) + logDebugWithMDC {"Tick done. Next tick scheduled in $tickDelay ms"} + } else { + logDebugWithMDC { "Connection closed - no more ticking" } } } catch (ex: Exception) { + withMDC { log.info("Tick failed", ex) } connection.transport.close() connection.condition = ErrorCondition() } } } - fun processEvents() { + private fun processEvents() { lock.withLock { pendingExecute = false logDebugWithMDC { "Process Events" } @@ -135,11 +143,27 @@ internal class EventProcessor(channel: Channel, } fun close() { - if (connection.localState != EndpointState.CLOSED) { - connection.close() - processEvents() - connection.free() - processEvents() + lock.withLock { + if (!processorClosed) { + processorClosed = true + connection.logLocalState("Before close") + connection.close() + processEvents() + logDebugWithMDC { "Freeing-up connection" } + connection.free() + processEvents() + connection.logLocalState("After close") + } else { + logDebugWithMDC { "Processor is already closed" } + } + } + } + + private fun Connection.logLocalState(prefix: String) { + if (log.isDebugEnabled) { + val freedTry = Try.on { declaredField("freed").value } + val refcountTry = Try.on { declaredField("refcount").value } + logDebugWithMDC { "$prefix, local state: $localState, freed: $freedTry, refcount: $refcountTry" } } }