ENT-2500: Introduce optional BridgeMetricsService in nodeApi module (#4001)

* ENT-2500: Introduce optional BridgeAuditService in `nodeApi` module

* ENT-2500: Rename audit to metrics service and add more traps to capture stats.
This commit is contained in:
Viktor Kolomeyko 2018-10-01 14:27:01 +01:00 committed by GitHub
parent 6980835e8c
commit 5a79f439db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 9 deletions

View File

@ -34,7 +34,9 @@ import kotlin.concurrent.withLock
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager. * The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
*/ */
@VisibleForTesting @VisibleForTesting
class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager { class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager {
private val lock = ReentrantLock() private val lock = ReentrantLock()
private val queueNamesToBridgesMap = mutableMapOf<String, MutableList<AMQPBridge>>() private val queueNamesToBridgesMap = mutableMapOf<String, MutableList<AMQPBridge>>()
@ -65,10 +67,11 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
*/ */
private class AMQPBridge(val queueName: String, private class AMQPBridge(val queueName: String,
val targets: List<NetworkHostAndPort>, val targets: List<NetworkHostAndPort>,
private val legalNames: Set<CordaX500Name>, val legalNames: Set<CordaX500Name>,
private val amqpConfig: AMQPConfiguration, private val amqpConfig: AMQPConfiguration,
sharedEventGroup: EventLoopGroup, sharedEventGroup: EventLoopGroup,
private val artemis: ArtemisSessionProvider) { private val artemis: ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService?) {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
} }
@ -104,7 +107,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
fun start() { fun start() {
logInfoWithMDC("Create new AMQP bridge") logInfoWithMDC("Create new AMQP bridge")
connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) }) connectedSubscription = amqpClient.onConnection.subscribe { x -> onSocketConnected(x.connected) }
amqpClient.start() amqpClient.start()
} }
@ -128,6 +131,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
synchronized(artemis) { synchronized(artemis) {
if (connected) { if (connected) {
logInfoWithMDC("Bridge Connected") logInfoWithMDC("Bridge Connected")
bridgeMetricsService?.bridgeConnected(targets, legalNames)
val sessionFactory = artemis.started!!.sessionFactory val sessionFactory = artemis.started!!.sessionFactory
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
this.session = session this.session = session
@ -137,6 +141,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
session.start() session.start()
} else { } else {
logInfoWithMDC("Bridge Disconnected") logInfoWithMDC("Bridge Disconnected")
bridgeMetricsService?.bridgeDisconnected(targets, legalNames)
consumer?.close() consumer?.close()
consumer = null consumer = null
session?.stop() session?.stop()
@ -148,8 +153,10 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
if (artemisMessage.bodySize > amqpConfig.maxMessageSize) { if (artemisMessage.bodySize > amqpConfig.maxMessageSize) {
logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [${amqpConfig.maxMessageSize}], message size: [${artemisMessage.bodySize}], " + val msg = "Message exceeds maxMessageSize network parameter, maxMessageSize: [${amqpConfig.maxMessageSize}], message size: [${artemisMessage.bodySize}], " +
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") "dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}"
logWarnWithMDC(msg)
bridgeMetricsService?.packetDropEvent(artemisMessage, msg)
// Ack the message to prevent same message being sent to us again. // Ack the message to prevent same message being sent to us again.
artemisMessage.acknowledge() artemisMessage.acknowledge()
return return
@ -185,6 +192,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
} }
} }
amqpClient.write(sendableMessage) amqpClient.write(sendableMessage)
bridgeMetricsService?.packetAcceptedEvent(sendableMessage)
} }
} }
@ -196,8 +204,9 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
return return
} }
} }
val newBridge = AMQPBridge(queueName, targets, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!) val newBridge = AMQPBridge(queueName, targets, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService)
bridges += newBridge bridges += newBridge
bridgeMetricsService?.bridgeCreated(targets, legalNames)
newBridge newBridge
} }
newBridge.start() newBridge.start()
@ -214,6 +223,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
queueNamesToBridgesMap.remove(queueName) queueNamesToBridgesMap.remove(queueName)
} }
bridge.stop() bridge.stop()
bridgeMetricsService?.bridgeDestroyed(bridge.targets, bridge.legalNames)
} }
} }
} }

View File

@ -20,9 +20,11 @@ import java.util.*
class BridgeControlListener(val config: MutualSslConfiguration, class BridgeControlListener(val config: MutualSslConfiguration,
maxMessageSize: Int, maxMessageSize: Int,
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable { private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
bridgeMetricsService: BridgeMetricsService? = null) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString() private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, maxMessageSize, artemisMessageClientFactory) private val bridgeManager: BridgeManager = AMQPBridgeManager(config, maxMessageSize,
artemisMessageClientFactory, bridgeMetricsService)
private val validInboundQueues = mutableSetOf<String>() private val validInboundQueues = mutableSetOf<String>()
private var artemis: ArtemisSessionProvider? = null private var artemis: ArtemisSessionProvider? = null
private var controlConsumer: ClientConsumer? = null private var controlConsumer: ClientConsumer? = null

View File

@ -0,0 +1,15 @@
package net.corda.nodeapi.internal.bridging
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import org.apache.activemq.artemis.api.core.client.ClientMessage
interface BridgeMetricsService {
fun bridgeCreated(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
fun bridgeConnected(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
fun packetDropEvent(artemisMessage: ClientMessage, msg: String)
fun packetAcceptedEvent(sendableMessage: SendableMessage)
fun bridgeDisconnected(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
fun bridgeDestroyed(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
}