From 4ff5aa34b6419f52392167cabe7079f4c2c4edfd Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 20 Jun 2018 11:55:59 +0100 Subject: [PATCH 1/5] Change to use MDC logic in bridge/AMQP protocol logging (#3398) --- .../internal/bridging/AMQPBridgeManager.kt | 41 +++++++--- .../engine/ConnectionStateMachine.kt | 82 +++++++++++-------- .../protonwrapper/engine/EventProcessor.kt | 32 ++++++-- .../protonwrapper/netty/AMQPChannelHandler.kt | 60 ++++++++++---- 4 files changed, 146 insertions(+), 69 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 7bdb466b85..e8956fff5a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -6,7 +6,7 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.internal.VisibleForTesting import net.corda.core.node.NodeInfo import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.debug +import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER @@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientSession -import org.slf4j.LoggerFactory +import org.slf4j.MDC import rx.Subscription import java.security.KeyStore import java.util.concurrent.locks.ReentrantLock @@ -74,9 +74,28 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize private val maxMessageSize: Int) { companion object { fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort" + private val log = contextLogger() } - private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}") + private fun withMDC(block: () -> Unit) { + MDC.put("queueName", queueName) + MDC.put("target", target.toString()) + MDC.put("bridgeName", bridgeName) + MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() }) + MDC.put("maxMessageSize", maxMessageSize.toString()) + block() + MDC.clear() + } + + private fun logDebugWithMDC(msg: () -> String) { + if (log.isDebugEnabled) { + withMDC { log.debug(msg()) } + } + } + + private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) } + + private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) } val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedThreadPool = sharedEventGroup, maxMessageSize = maxMessageSize) val bridgeName: String get() = getBridgeName(queueName, target) @@ -86,13 +105,13 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize private var connectedSubscription: Subscription? = null fun start() { - log.info("Create new AMQP bridge") + logInfoWithMDC("Create new AMQP bridge") connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) }) amqpClient.start() } fun stop() { - log.info("Stopping AMQP bridge") + logInfoWithMDC("Stopping AMQP bridge") lock.withLock { synchronized(artemis) { consumer?.close() @@ -110,7 +129,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize lock.withLock { synchronized(artemis) { if (connected) { - log.info("Bridge Connected") + logInfoWithMDC("Bridge Connected") val sessionFactory = artemis.started!!.sessionFactory val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) this.session = session @@ -119,7 +138,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize consumer.setMessageHandler(this@AMQPBridge::clientArtemisMessageHandler) session.start() } else { - log.info("Bridge Disconnected") + logInfoWithMDC("Bridge Disconnected") consumer?.close() consumer = null session?.stop() @@ -131,7 +150,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { if (artemisMessage.bodySize > maxMessageSize) { - log.warn("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " + + logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " + "dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") // Ack the message to prevent same message being sent to us again. artemisMessage.acknowledge() @@ -148,18 +167,18 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize properties[key] = value } } - log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } + logDebugWithMDC { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } val peerInbox = translateLocalQueueToInboxAddress(queueName) val sendableMessage = amqpClient.createMessage(data, peerInbox, legalNames.first().toString(), properties) sendableMessage.onComplete.then { - log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" } + logDebugWithMDC { "Bridge ACK ${sendableMessage.onComplete.get()}" } lock.withLock { if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { artemisMessage.acknowledge() } else { - log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") + logInfoWithMDC("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") // We need to commit any acknowledged messages before rolling back the failed // (unacknowledged) message. session?.commit() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index bb079ce7e4..b5fec7e22e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -6,7 +6,7 @@ import io.netty.buffer.Unpooled import io.netty.channel.Channel import io.netty.channel.ChannelHandlerContext import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.debug +import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl @@ -23,7 +23,7 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode import org.apache.qpid.proton.engine.* import org.apache.qpid.proton.message.Message import org.apache.qpid.proton.message.ProtonJMessage -import org.slf4j.LoggerFactory +import org.slf4j.MDC import java.net.InetSocketAddress import java.nio.ByteBuffer import java.util.* @@ -35,7 +35,7 @@ import java.util.* * but this threading lock is managed by the EventProcessor class that calls this. * It ultimately posts application packets to/from from the netty transport pipeline. */ -internal class ConnectionStateMachine(serverMode: Boolean, +internal class ConnectionStateMachine(private val serverMode: Boolean, collector: Collector, private val localLegalName: String, private val remoteLegalName: String, @@ -43,10 +43,28 @@ internal class ConnectionStateMachine(serverMode: Boolean, password: String?) : BaseHandler() { companion object { private const val IDLE_TIMEOUT = 10000 + private val log = contextLogger() } + private fun withMDC(block: () -> Unit) { + MDC.put("serverMode", serverMode.toString()) + MDC.put("localLegalName", localLegalName) + MDC.put("remoteLegalName", remoteLegalName) + block() + MDC.clear() + } + + private fun logDebugWithMDC(msg: () -> String) { + if (log.isDebugEnabled) { + withMDC { log.debug(msg()) } + } + } + + private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) } + + private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) } + val connection: Connection - private val log = LoggerFactory.getLogger(localLegalName) private val transport: Transport private val id = UUID.randomUUID().toString() private var session: Session? = null @@ -93,12 +111,12 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onConnectionInit(event: Event) { val connection = event.connection - log.debug { "Connection init $connection" } + logDebugWithMDC { "Connection init $connection" } } override fun onConnectionLocalOpen(event: Event) { val connection = event.connection - log.info("Connection local open $connection") + logInfoWithMDC("Connection local open $connection") val session = connection.session() session.open() this.session = session @@ -109,7 +127,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onConnectionLocalClose(event: Event) { val connection = event.connection - log.info("Connection local close $connection") + logInfoWithMDC("Connection local close $connection") connection.close() connection.free() } @@ -127,7 +145,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onConnectionFinal(event: Event) { val connection = event.connection - log.debug { "Connection final $connection" } + logDebugWithMDC { "Connection final $connection" } if (connection == this.connection) { this.connection.context = null for (queue in messageQueues.values) { @@ -167,21 +185,21 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onTransportHeadClosed(event: Event) { val transport = event.transport - log.debug { "Transport Head Closed $transport" } + logDebugWithMDC { "Transport Head Closed $transport" } transport.close_tail() onTransportInternal(transport) } override fun onTransportTailClosed(event: Event) { val transport = event.transport - log.debug { "Transport Tail Closed $transport" } + logDebugWithMDC { "Transport Tail Closed $transport" } transport.close_head() onTransportInternal(transport) } override fun onTransportClosed(event: Event) { val transport = event.transport - log.debug { "Transport Closed $transport" } + logDebugWithMDC { "Transport Closed $transport" } if (transport == this.transport) { transport.unbind() transport.free() @@ -191,19 +209,19 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onTransportError(event: Event) { val transport = event.transport - log.info("Transport Error $transport") + logInfoWithMDC("Transport Error $transport") val condition = event.transport.condition if (condition != null) { - log.info("Error: ${condition.description}") + logInfoWithMDC("Error: ${condition.description}") } else { - log.info("Error (no description returned).") + logInfoWithMDC("Error (no description returned).") } onTransportInternal(transport) } override fun onTransport(event: Event) { val transport = event.transport - log.debug { "Transport $transport" } + logDebugWithMDC { "Transport $transport" } onTransportInternal(transport) } @@ -220,12 +238,12 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onSessionInit(event: Event) { val session = event.session - log.debug { "Session init $session" } + logDebugWithMDC { "Session init $session" } } override fun onSessionLocalOpen(event: Event) { val session = event.session - log.debug { "Session local open $session" } + logDebugWithMDC { "Session local open $session" } } private fun getSender(target: String): Sender { @@ -251,14 +269,14 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onSessionLocalClose(event: Event) { val session = event.session - log.debug { "Session local close $session" } + logDebugWithMDC { "Session local close $session" } session.close() session.free() } override fun onSessionFinal(event: Event) { val session = event.session - log.debug { "Session final $session" } + logDebugWithMDC { "Session final $session" } if (session == this.session) { this.session = null } @@ -267,12 +285,12 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onLinkLocalOpen(event: Event) { val link = event.link if (link is Sender) { - log.debug { "Sender Link local open ${link.name} ${link.source} ${link.target}" } + logDebugWithMDC { "Sender Link local open ${link.name} ${link.source} ${link.target}" } senders[link.target.address] = link transmitMessages(link) } if (link is Receiver) { - log.debug { "Receiver Link local open ${link.name} ${link.source} ${link.target}" } + logDebugWithMDC { "Receiver Link local open ${link.name} ${link.source} ${link.target}" } receivers[link.target.address] = link } } @@ -281,7 +299,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, val link = event.link if (link is Receiver) { if (link.remoteTarget is Coordinator) { - log.debug { "Coordinator link received" } + logDebugWithMDC { "Coordinator link received" } } } } @@ -289,11 +307,11 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onLinkFinal(event: Event) { val link = event.link if (link is Sender) { - log.debug { "Sender Link final ${link.name} ${link.source} ${link.target}" } + logDebugWithMDC { "Sender Link final ${link.name} ${link.source} ${link.target}" } senders.remove(link.target.address) } if (link is Receiver) { - log.debug { "Receiver Link final ${link.name} ${link.source} ${link.target}" } + logDebugWithMDC { "Receiver Link final ${link.name} ${link.source} ${link.target}" } receivers.remove(link.target.address) } } @@ -301,12 +319,12 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onLinkFlow(event: Event) { val link = event.link if (link is Sender) { - log.debug { "Sender Flow event: ${link.name} ${link.source} ${link.target}" } + logDebugWithMDC { "Sender Flow event: ${link.name} ${link.source} ${link.target}" } if (senders.containsKey(link.target.address)) { transmitMessages(link) } } else if (link is Receiver) { - log.debug { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" } + logDebugWithMDC { "Receiver Flow event: ${link.name} ${link.source} ${link.target}" } } } @@ -317,7 +335,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, private fun transmitMessages(sender: Sender) { val messageQueue = messageQueues.getOrPut(sender.target.address, { LinkedList() }) while (sender.credit > 0) { - log.debug { "Sender credit: ${sender.credit}" } + logDebugWithMDC { "Sender credit: ${sender.credit}" } val nextMessage = messageQueue.poll() if (nextMessage != null) { try { @@ -328,7 +346,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, delivery.context = nextMessage sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes()) nextMessage.status = MessageStatus.Sent - log.debug { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" } + logDebugWithMDC { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" } unackedQueue.offer(nextMessage) sender.advance() } finally { @@ -342,7 +360,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, override fun onDelivery(event: Event) { val delivery = event.delivery - log.debug { "Delivery $delivery" } + logDebugWithMDC { "Delivery $delivery" } val link = delivery.link if (link is Receiver) { if (delivery.isReadable && !delivery.isPartial) { @@ -366,7 +384,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, appProperties, channel, delivery) - log.debug { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" } + logDebugWithMDC { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" } channel.writeAndFlush(receivedMessage) if (link.current() == delivery) { link.advance() @@ -377,7 +395,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, } } } else if (link is Sender) { - log.debug { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" } + logDebugWithMDC { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" } val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance() val sourceMessage = delivery.context as? SendableMessageImpl unackedQueue.remove(sourceMessage) @@ -395,7 +413,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, buffer.readBytes(bytes) return Unpooled.wrappedBuffer(bytes) } catch (ex: Exception) { - log.error("Unable to encode message as AMQP packet", ex) + logErrorWithMDC("Unable to encode message as AMQP packet", ex) throw ex } } finally { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt index c02c6f2541..4364564b5b 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt @@ -3,7 +3,7 @@ package net.corda.nodeapi.internal.protonwrapper.engine import io.netty.buffer.ByteBuf import io.netty.channel.Channel import io.netty.channel.ChannelHandlerContext -import net.corda.core.utilities.debug +import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl @@ -16,7 +16,7 @@ import org.apache.qpid.proton.engine.* import org.apache.qpid.proton.engine.impl.CollectorImpl import org.apache.qpid.proton.reactor.FlowController import org.apache.qpid.proton.reactor.Handshaker -import org.slf4j.LoggerFactory +import org.slf4j.MDC import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock @@ -30,16 +30,30 @@ import kotlin.concurrent.withLock * Everything here is single threaded, because the proton-j library has to be run that way. */ internal class EventProcessor(channel: Channel, - serverMode: Boolean, - localLegalName: String, - remoteLegalName: String, + private val serverMode: Boolean, + private val localLegalName: String, + private val remoteLegalName: String, userName: String?, password: String?) : BaseHandler() { companion object { private const val FLOW_WINDOW_SIZE = 10 + private val log = contextLogger() + } + + private fun withMDC(block: () -> Unit) { + MDC.put("serverMode", serverMode.toString()) + MDC.put("localLegalName", localLegalName) + MDC.put("remoteLegalName", remoteLegalName) + block() + MDC.clear() + } + + private fun logDebugWithMDC(msg: () -> String) { + if (log.isDebugEnabled) { + withMDC { log.debug(msg()) } + } } - private val log = LoggerFactory.getLogger(localLegalName) private val lock = ReentrantLock() private var pendingExecute: Boolean = false private val executor: ScheduledExecutorService = channel.eventLoop() @@ -94,16 +108,16 @@ internal class EventProcessor(channel: Channel, fun processEvents() { lock.withLock { pendingExecute = false - log.debug { "Process Events" } + logDebugWithMDC { "Process Events" } while (true) { val ev = popEvent() ?: break - log.debug { "Process event: $ev" } + logDebugWithMDC { "Process event: $ev" } for (handler in handlers) { handler.handle(ev) } } stateMachine.processTransport() - log.debug { "Process Events Done" } + logDebugWithMDC { "Process Events Done" } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index d7d1375c0c..360b86ec0e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -9,7 +9,7 @@ import io.netty.handler.ssl.SslHandler import io.netty.handler.ssl.SslHandshakeCompletionEvent import io.netty.util.ReferenceCountUtil import net.corda.core.identity.CordaX500Name -import net.corda.core.utilities.debug +import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.crypto.x509 import net.corda.nodeapi.internal.protonwrapper.engine.EventProcessor import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage @@ -19,7 +19,7 @@ import org.apache.qpid.proton.engine.ProtonJTransport import org.apache.qpid.proton.engine.Transport import org.apache.qpid.proton.engine.impl.ProtocolTracer import org.apache.qpid.proton.framing.TransportFrame -import org.slf4j.LoggerFactory +import org.slf4j.MDC import java.net.InetSocketAddress import java.nio.channels.ClosedChannelException import java.security.cert.X509Certificate @@ -37,18 +37,44 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, private val onOpen: (Pair) -> Unit, private val onClose: (Pair) -> Unit, private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() { - private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler") + companion object { + private val log = contextLogger() + } + private lateinit var remoteAddress: InetSocketAddress private var localCert: X509Certificate? = null private var remoteCert: X509Certificate? = null private var eventProcessor: EventProcessor? = null private var badCert: Boolean = false + private fun withMDC(block: () -> Unit) { + MDC.put("serverMode", serverMode.toString()) + MDC.put("remoteAddress", remoteAddress.toString()) + MDC.put("localCert", localCert?.subjectDN?.toString()) + MDC.put("remoteCert", remoteCert?.subjectDN?.toString()) + MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames?.joinToString(separator = ";") { it.toString() }) + block() + MDC.clear() + } + + private fun logDebugWithMDC(msg: () -> String) { + if (log.isDebugEnabled) { + withMDC { log.debug(msg()) } + } + } + + private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) } + + private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) } + + private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) } + + override fun channelActive(ctx: ChannelHandlerContext) { val ch = ctx.channel() remoteAddress = ch.remoteAddress() as InetSocketAddress val localAddress = ch.localAddress() as InetSocketAddress - log.info("New client connection ${ch.id()} from $remoteAddress to $localAddress") + logInfoWithMDC("New client connection ${ch.id()} from $remoteAddress to $localAddress") } private fun createAMQPEngine(ctx: ChannelHandlerContext) { @@ -59,11 +85,11 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, if (trace) { transport.protocolTracer = object : ProtocolTracer { override fun sentFrame(transportFrame: TransportFrame) { - log.info("${transportFrame.body}") + logInfoWithMDC("${transportFrame.body}") } override fun receivedFrame(transportFrame: TransportFrame) { - log.info("${transportFrame.body}") + logInfoWithMDC("${transportFrame.body}") } } } @@ -73,7 +99,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, override fun channelInactive(ctx: ChannelHandlerContext) { val ch = ctx.channel() - log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}") + logInfoWithMDC("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}") onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert))) eventProcessor?.close() ctx.fireChannelInactive() @@ -89,29 +115,29 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, CordaX500Name.build(remoteCert!!.subjectX500Principal) } catch (ex: IllegalArgumentException) { badCert = true - log.error("Certificate subject not a valid CordaX500Name", ex) + logErrorWithMDC("Certificate subject not a valid CordaX500Name", ex) ctx.close() return } if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) { badCert = true - log.error("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames") + logErrorWithMDC("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames") ctx.close() return } - log.info("Handshake completed with subject: $remoteX500Name") + logInfoWithMDC("Handshake completed with subject: $remoteX500Name") createAMQPEngine(ctx) onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false))) } else { // This happens when the peer node is closed during SSL establishment. if (evt.cause() is ClosedChannelException) { - log.warn("SSL Handshake closed early.") + logWarnWithMDC("SSL Handshake closed early.") } else { badCert = true } - log.error("Handshake failure ${evt.cause().message}") + logErrorWithMDC("Handshake failure ${evt.cause().message}") if (log.isTraceEnabled) { - log.trace("Handshake failure", evt.cause()) + withMDC { log.trace("Handshake failure", evt.cause()) } } ctx.close() } @@ -120,9 +146,9 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, @Suppress("OverridingDeprecatedMember") override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - log.warn("Closing channel due to nonrecoverable exception ${cause.message}") + logWarnWithMDC("Closing channel due to nonrecoverable exception ${cause.message}") if (log.isTraceEnabled) { - log.trace("Pipeline uncaught exception", cause) + withMDC { log.trace("Pipeline uncaught exception", cause) } } ctx.close() } @@ -151,7 +177,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) { "Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}" } - log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" } + logDebugWithMDC { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" } eventProcessor!!.transportWriteMessage(msg) } // A received AMQP packet has been completed and this self-posted packet will be signalled out to the @@ -169,7 +195,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, } } } catch (ex: Exception) { - log.error("Error in AMQP write processing", ex) + logErrorWithMDC("Error in AMQP write processing", ex) throw ex } } finally { From ed3944c54606e876abd6483ff5d66a764f9cf577 Mon Sep 17 00:00:00 2001 From: Thomas Schroeter Date: Wed, 20 Jun 2018 16:24:30 +0100 Subject: [PATCH 2/5] Create Artemis p2p.inbound addresses before starting the broker (#3407) --- .../kotlin/net/corda/node/internal/Node.kt | 2 +- .../messaging/ArtemisMessagingServer.kt | 68 +++++++++++++------ 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index b78a5d1e68..626afc2364 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -192,7 +192,7 @@ open class Node(configuration: NodeConfiguration, if (!configuration.messagingServerExternal) { val brokerBindAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port) - messageBroker = ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize) + messageBroker = ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize, info.legalIdentities.map { it.owningKey }) } val serverAddress = configuration.messagingServerAddress diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 7c071a8b7b..88c4ff0962 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -13,14 +13,18 @@ import net.corda.node.services.config.NodeConfiguration import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pAcceptorTcpTransport import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor +import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.requireOnDefaultFileSystem +import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.core.config.Configuration +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration import org.apache.activemq.artemis.core.security.Role @@ -29,6 +33,7 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import java.io.IOException import java.security.KeyStoreException +import java.security.PublicKey import javax.annotation.concurrent.ThreadSafe import javax.security.auth.login.AppConfigurationEntry import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED @@ -49,7 +54,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE @ThreadSafe class ArtemisMessagingServer(private val config: NodeConfiguration, private val messagingServerAddress: NetworkHostAndPort, - private val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() { + private val maxMessageSize: Int, + private val identities: List = emptyList()) : ArtemisBroker, SingletonSerializeAsToken() { companion object { private val log = contextLogger() } @@ -105,29 +111,47 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, log.info("P2P messaging server listening on $messagingServerAddress") } - private fun createArtemisConfig() = SecureArtemisConfiguration().apply { - val artemisDir = config.baseDirectory / "artemis" - bindingsDirectory = (artemisDir / "bindings").toString() - journalDirectory = (artemisDir / "journal").toString() - largeMessagesDirectory = (artemisDir / "large-messages").toString() - acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config)) - // Enable built in message deduplication. Note we still have to do our own as the delayed commits - // and our own definition of commit mean that the built in deduplication cannot remove all duplicates. - idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess - isPersistIDCache = true - isPopulateValidatedUser = true - journalBufferSize_NIO = maxMessageSize + JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store. - journalBufferSize_AIO = maxMessageSize + JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store. - journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB. - managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS) - - // JMX enablement - if (config.jmxMonitoringHttpPort != null) { - isJMXManagementEnabled = true - isJMXUseBrokerName = true + private fun createArtemisConfig(): Configuration { + val addressConfigs = identities.map { + val queueName = ArtemisMessagingComponent.RemoteInboxAddress(it).queueName + log.info("Configuring address $queueName") + val queueConfig = CoreQueueConfiguration().apply { + address = queueName + name = queueName + routingType = RoutingType.ANYCAST + isExclusive = true + } + CoreAddressConfiguration().apply { + name = queueName + queueConfigurations = listOf(queueConfig) + addRoutingType(RoutingType.ANYCAST) + } } + return SecureArtemisConfiguration().apply { + val artemisDir = config.baseDirectory / "artemis" + bindingsDirectory = (artemisDir / "bindings").toString() + journalDirectory = (artemisDir / "journal").toString() + largeMessagesDirectory = (artemisDir / "large-messages").toString() + acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config)) + // Enable built in message deduplication. Note we still have to do our own as the delayed commits + // and our own definition of commit mean that the built in deduplication cannot remove all duplicates. + idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess + isPersistIDCache = true + isPopulateValidatedUser = true + journalBufferSize_NIO = maxMessageSize + JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store. + journalBufferSize_AIO = maxMessageSize + JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store. + journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB. + managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS) + addressConfigurations = addressConfigs - }.configureAddressSecurity() + // JMX enablement + if (config.jmxMonitoringHttpPort != null) { + isJMXManagementEnabled = true + isJMXUseBrokerName = true + } + + }.configureAddressSecurity() + } /** * Authenticated clients connecting to us fall in one of the following groups: From 2f34b16b07e6fef84350b3642e2379602e65d7e1 Mon Sep 17 00:00:00 2001 From: gaugfather Date: Thu, 21 Jun 2018 03:03:32 -0500 Subject: [PATCH 3/5] Fix to allow equality of hostname (#3381) * Fix to allow equality of hostname * Remove unreliable require test per pull 3381 * Remove unreliable require test per pull 3381 --- .../internal/protonwrapper/netty/AMQPChannelHandler.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index 360b86ec0e..ee224b3dd2 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -171,9 +171,8 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, // Transfers application packet into the AMQP engine. is SendableMessageImpl -> { val inetAddress = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port) - require(inetAddress == remoteAddress) { - "Message for incorrect endpoint $inetAddress expected $remoteAddress" - } + logDebugWithMDC { "Message for endpoint $inetAddress , expected $remoteAddress "} + require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) { "Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}" } From 3af5412d40ef59270fe2551313080837f099dd61 Mon Sep 17 00:00:00 2001 From: szymonsztuka Date: Thu, 21 Jun 2018 10:32:36 +0100 Subject: [PATCH 4/5] ENT-1741 Build standalone shell JAR distribution (#3409) --- build.gradle | 1 + settings.gradle | 1 + tools/shell-cli/build.gradle | 103 ++++++++++++++++++ .../net/corda/tools/shell/StandaloneShell.kt | 0 .../tools/shell/StandaloneShellArgsParser.kt | 0 .../shell/StandaloneShellArgsParserTest.kt | 0 .../shell-cli/src/test/resources/config.conf | 29 +++++ tools/shell/build.gradle | 15 +-- 8 files changed, 138 insertions(+), 11 deletions(-) create mode 100644 tools/shell-cli/build.gradle rename tools/{shell => shell-cli}/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt (100%) rename tools/{shell => shell-cli}/src/main/kotlin/net/corda/tools/shell/StandaloneShellArgsParser.kt (100%) rename tools/{shell => shell-cli}/src/test/kotlin/net/corda/tools/shell/StandaloneShellArgsParserTest.kt (100%) create mode 100644 tools/shell-cli/src/test/resources/config.conf diff --git a/build.gradle b/build.gradle index 02f3e87385..198880a657 100644 --- a/build.gradle +++ b/build.gradle @@ -351,6 +351,7 @@ bintrayConfig { 'corda-node-driver', 'corda-confidential-identities', 'corda-shell', + 'corda-shell-cli', 'corda-serialization', 'corda-serialization-deterministic', 'corda-tools-blob-inspector', diff --git a/settings.gradle b/settings.gradle index 14684e8962..9f1701e383 100644 --- a/settings.gradle +++ b/settings.gradle @@ -43,6 +43,7 @@ include 'tools:graphs' include 'tools:bootstrapper' include 'tools:blobinspector' include 'tools:shell' +include 'tools:shell-cli' include 'tools:network-bootstrapper' include 'example-code' project(':example-code').projectDir = file("$settingsDir/docs/source/example-code") diff --git a/tools/shell-cli/build.gradle b/tools/shell-cli/build.gradle new file mode 100644 index 0000000000..c1848107ab --- /dev/null +++ b/tools/shell-cli/build.gradle @@ -0,0 +1,103 @@ +buildscript { + + repositories { + mavenLocal() + mavenCentral() + jcenter() + } + + dependencies { + classpath "org.jetbrains.kotlin:kotlin-noarg:$kotlin_version" + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" + classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.1' + } +} + +repositories { + mavenLocal() + mavenCentral() + jcenter() +} + +apply plugin: 'kotlin' +apply plugin: 'java' +apply plugin: 'application' +apply plugin: 'net.corda.plugins.publish-utils' +apply plugin: 'com.jfrog.artifactory' +apply plugin: 'com.github.johnrengelman.shadow' +apply plugin: 'maven-publish' + +description 'Corda Shell CLI' + +configurations { + integrationTestCompile.extendsFrom testCompile + integrationTestRuntime.extendsFrom testRuntime +} + +sourceSets { + integrationTest { + kotlin { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integration-test/kotlin') + } + resources { + srcDir file('src/integration-test/resources') + } + } + test { + resources { + srcDir file('src/test/resources') + } + } +} + +dependencies { + compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" + testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version" + + compile project(':tools:shell') + + // Unit testing helpers. + testCompile "junit:junit:$junit_version" + testCompile "org.assertj:assertj-core:${assertj_version}" + testCompile project(':test-utils') + testCompile project(':finance') + +} + +mainClassName = 'net.corda.tools.shell.StandaloneShellKt' + +jar { + baseName 'corda-shell-cli' +} + +processResources { + from file("$rootDir/config/dev/log4j2.xml") +} + +task integrationTest(type: Test) { + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath +} + +publishing { + publications { + shadow(MavenPublication) { publication -> + project.shadow.component(publication) + artifactId = "corda-tools-shell-cli" + } + } +} + +shadowJar { + //transform(de.sebastianboegl.gradle.plugins.shadow.transformers.Log4j2PluginsFileTransformer) + archiveName = "corda-shell-cli-${version}.jar" + + baseName = 'corda-shell-cli' + classifier = null + mainClassName = 'net.corda.tools.shell.StandaloneShellKt' + mergeServiceFiles() +} + +task buildShellCli(dependsOn: shadowJar) diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt b/tools/shell-cli/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt similarity index 100% rename from tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt rename to tools/shell-cli/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShellArgsParser.kt b/tools/shell-cli/src/main/kotlin/net/corda/tools/shell/StandaloneShellArgsParser.kt similarity index 100% rename from tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShellArgsParser.kt rename to tools/shell-cli/src/main/kotlin/net/corda/tools/shell/StandaloneShellArgsParser.kt diff --git a/tools/shell/src/test/kotlin/net/corda/tools/shell/StandaloneShellArgsParserTest.kt b/tools/shell-cli/src/test/kotlin/net/corda/tools/shell/StandaloneShellArgsParserTest.kt similarity index 100% rename from tools/shell/src/test/kotlin/net/corda/tools/shell/StandaloneShellArgsParserTest.kt rename to tools/shell-cli/src/test/kotlin/net/corda/tools/shell/StandaloneShellArgsParserTest.kt diff --git a/tools/shell-cli/src/test/resources/config.conf b/tools/shell-cli/src/test/resources/config.conf new file mode 100644 index 0000000000..e4446f1ec9 --- /dev/null +++ b/tools/shell-cli/src/test/resources/config.conf @@ -0,0 +1,29 @@ +node { + addresses { + rpc { + host : "alocalhost" + port : 1234 + } + } + user : demo + password : abcd1234 +} +extensions { + cordapps { + path : "/x/y/cordapps" + } + sshd { + enabled : "true" + port : 2223 + } + commands { + path : /x/y/commands + } +} +ssl { + truststore { + path : "/x/y/truststore.jks" + type : "JKS" + password : "pass2" + } + } \ No newline at end of file diff --git a/tools/shell/build.gradle b/tools/shell/build.gradle index a2d0303273..a17ff17380 100644 --- a/tools/shell/build.gradle +++ b/tools/shell/build.gradle @@ -1,6 +1,5 @@ apply plugin: 'kotlin' apply plugin: 'java' -apply plugin: 'application' apply plugin: 'net.corda.plugins.quasar-utils' apply plugin: 'net.corda.plugins.publish-utils' apply plugin: 'com.jfrog.artifactory' @@ -80,21 +79,15 @@ dependencies { integrationTestCompile project(':node-driver') } -mainClassName = 'net.corda.tools.shell.StandaloneShellKt' - -jar { - baseName 'corda-shell' -} - -processResources { - from file("$rootDir/config/dev/log4j2.xml") -} - task integrationTest(type: Test) { testClassesDirs = sourceSets.integrationTest.output.classesDirs classpath = sourceSets.integrationTest.runtimeClasspath } +jar { + baseName 'corda-shell' +} + publish { name jar.baseName } From 227ca3b65bcf737f9b52d135e2df47923e4b1ea8 Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Wed, 20 Jun 2018 16:00:21 +0100 Subject: [PATCH 5/5] CORDA-1610: Retain progress tracker during flow retry. Make sure the same progress tracker object is re-used in the restarted flow so subscribers can keep receiving progress updates. --- .../net/corda/node/services/TimedFlowTests.kt | 17 ++++++++++++++ .../SingleThreadedStateMachineManager.kt | 6 ++++- .../utilities/StateMachineManagerUtils.kt | 22 +++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt diff --git a/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt index bc6bc655e7..9a81913f41 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt @@ -19,6 +19,7 @@ import net.corda.core.node.NotaryInfo import net.corda.core.node.services.CordaService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.seconds import net.corda.node.internal.StartedNode import net.corda.node.services.config.NodeConfiguration @@ -43,6 +44,8 @@ import org.junit.Test import org.slf4j.MDC import java.security.PublicKey import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.assertEquals +import kotlin.test.assertNotEquals class TimedFlowTests { companion object { @@ -131,8 +134,15 @@ class TimedFlowTests { addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) } val flow = NotaryFlow.Client(issueTx) + val progressTracker = flow.progressTracker + assertNotEquals(ProgressTracker.DONE, progressTracker.currentStep) val notarySignatures = services.startFlow(flow).resultFuture.get() (issueTx + notarySignatures).verifyRequiredSignatures() + assertEquals( + ProgressTracker.DONE, + progressTracker.currentStep, + "Ensure the same progress tracker object is re-used after flow restart" + ) } } @@ -144,8 +154,15 @@ class TimedFlowTests { addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) } val flow = FinalityFlow(issueTx) + val progressTracker = flow.progressTracker + val stx = services.startFlow(flow).resultFuture.get() stx.verifyRequiredSignatures() + assertEquals( + ProgressTracker.DONE, + progressTracker.currentStep, + "Ensure the same progress tracker object is re-used after flow restart" + ) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 77b10dd575..51ad2460c4 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -33,6 +33,7 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.creat import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor +import net.corda.node.utilities.injectOldProgressTracker import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction import net.corda.serialization.internal.SerializeAsTokenContextImpl @@ -361,7 +362,10 @@ class SingleThreadedStateMachineManager( for (sessionId in getFlowSessionIds(currentState.checkpoint)) { sessionToFlow.remove(sessionId) } - if (flow != null) addAndStartFlow(flowId, flow) + if (flow != null) { + injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic) + addAndStartFlow(flowId, flow) + } // Deliver all the external events from the old flow instance. val unprocessedExternalEvents = mutableListOf() do { diff --git a/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt b/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt new file mode 100644 index 0000000000..f8b4294a8a --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt @@ -0,0 +1,22 @@ +package net.corda.node.utilities + +import net.corda.core.flows.FlowLogic +import net.corda.core.utilities.ProgressTracker +import net.corda.node.services.statemachine.StateMachineManagerInternal + +/** + * The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that + * any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress + * updates are correctly sent out after the flow is retried. + */ +fun StateMachineManagerInternal.injectOldProgressTracker(oldProgressTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) { + if (oldProgressTracker != null) { + try { + val field = newFlowLogic::class.java.getDeclaredField("progressTracker") + field.isAccessible = true + field.set(newFlowLogic, oldProgressTracker) + } catch (e: NoSuchFieldException) { + // The flow does not use a progress tracker. + } + } +} \ No newline at end of file