mirror of
https://github.com/corda/corda.git
synced 2025-06-23 09:25:36 +00:00
OS->Ent merge
This commit is contained in:
@ -35,7 +35,9 @@ import kotlin.concurrent.withLock
|
||||
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig? = null, maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||
class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig? = null, maxMessageSize: Int,
|
||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager {
|
||||
|
||||
private val lock = ReentrantLock()
|
||||
private val queueNamesToBridgesMap = mutableMapOf<String, MutableList<AMQPBridge>>()
|
||||
@ -72,10 +74,11 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
*/
|
||||
private class AMQPBridge(val queueName: String,
|
||||
val targets: List<NetworkHostAndPort>,
|
||||
private val legalNames: Set<CordaX500Name>,
|
||||
val legalNames: Set<CordaX500Name>,
|
||||
private val amqpConfig: AMQPConfiguration,
|
||||
sharedEventGroup: EventLoopGroup,
|
||||
private val artemis: ArtemisSessionProvider) {
|
||||
private val artemis: ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService?) {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
@ -111,7 +114,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
|
||||
fun start() {
|
||||
logInfoWithMDC("Create new AMQP bridge")
|
||||
connectedSubscription = amqpClient.onConnection.subscribe({ x -> onSocketConnected(x.connected) })
|
||||
connectedSubscription = amqpClient.onConnection.subscribe { x -> onSocketConnected(x.connected) }
|
||||
amqpClient.start()
|
||||
}
|
||||
|
||||
@ -143,6 +146,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
synchronized(artemis) {
|
||||
if (connected) {
|
||||
logInfoWithMDC("Bridge Connected")
|
||||
bridgeMetricsService?.bridgeConnected(targets, legalNames)
|
||||
val sessionFactory = artemis.started!!.sessionFactory
|
||||
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
this.session = session
|
||||
@ -152,6 +156,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
session.start()
|
||||
} else {
|
||||
logInfoWithMDC("Bridge Disconnected")
|
||||
bridgeMetricsService?.bridgeDisconnected(targets, legalNames)
|
||||
consumer?.apply {
|
||||
if (!isClosed) {
|
||||
close()
|
||||
@ -171,8 +176,10 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
if (artemisMessage.bodySize > amqpConfig.maxMessageSize) {
|
||||
logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [${amqpConfig.maxMessageSize}], message size: [${artemisMessage.bodySize}], " +
|
||||
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
val msg = "Message exceeds maxMessageSize network parameter, maxMessageSize: [${amqpConfig.maxMessageSize}], message size: [${artemisMessage.bodySize}], " +
|
||||
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}"
|
||||
logWarnWithMDC(msg)
|
||||
bridgeMetricsService?.packetDropEvent(artemisMessage, msg)
|
||||
// Ack the message to prevent same message being sent to us again.
|
||||
artemisMessage.acknowledge()
|
||||
return
|
||||
@ -208,6 +215,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
}
|
||||
}
|
||||
amqpClient.write(sendableMessage)
|
||||
bridgeMetricsService?.packetAcceptedEvent(sendableMessage)
|
||||
}
|
||||
}
|
||||
|
||||
@ -219,8 +227,9 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
return
|
||||
}
|
||||
}
|
||||
val newBridge = AMQPBridge(queueName, targets, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!)
|
||||
val newBridge = AMQPBridge(queueName, targets, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService)
|
||||
bridges += newBridge
|
||||
bridgeMetricsService?.bridgeCreated(targets, legalNames)
|
||||
newBridge
|
||||
}
|
||||
newBridge.start()
|
||||
@ -237,6 +246,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
queueNamesToBridgesMap.remove(queueName)
|
||||
}
|
||||
bridge.stop()
|
||||
bridgeMetricsService?.bridgeDestroyed(bridge.targets, bridge.legalNames)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,12 +26,13 @@ import java.util.*
|
||||
class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
socksProxyConfig: SocksProxyConfig? = null,
|
||||
maxMessageSize: Int,
|
||||
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
|
||||
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
bridgeMetricsService: BridgeMetricsService? = null) : AutoCloseable {
|
||||
private val bridgeId: String = UUID.randomUUID().toString()
|
||||
private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
|
||||
private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId"
|
||||
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize,
|
||||
artemisMessageClientFactory)
|
||||
artemisMessageClientFactory, bridgeMetricsService)
|
||||
private val validInboundQueues = mutableSetOf<String>()
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
private var controlConsumer: ClientConsumer? = null
|
||||
|
@ -0,0 +1,15 @@
|
||||
package net.corda.nodeapi.internal.bridging
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
|
||||
interface BridgeMetricsService {
|
||||
fun bridgeCreated(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
|
||||
fun bridgeConnected(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
|
||||
fun packetDropEvent(artemisMessage: ClientMessage, msg: String)
|
||||
fun packetAcceptedEvent(sendableMessage: SendableMessage)
|
||||
fun bridgeDisconnected(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
|
||||
fun bridgeDestroyed(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
|
||||
}
|
@ -15,6 +15,7 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.cordapp.CordappLoader
|
||||
import net.corda.node.internal.cordapp.CordappProviderImpl
|
||||
import net.corda.node.internal.cordapp.JarScanningCordappLoader
|
||||
@ -110,7 +111,7 @@ class AttachmentsClassLoaderStaticContractTests {
|
||||
val cordapps = cordappsForPackages(packages)
|
||||
return testDirectory().let { directory ->
|
||||
cordapps.packageInDirectory(directory)
|
||||
JarScanningCordappLoader.fromDirectories(listOf(directory))
|
||||
JarScanningCordappLoader.fromDirectories(listOf(directory), VersionInfo.UNKNOWN)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user