Enhance the pluggability of the bridging and messaging code, so that more complex HA and out of process bridges can be written. (#2558)

This commit is contained in:
Matthew Nesbit 2018-02-16 16:13:05 +00:00 committed by GitHub
parent cb7a0229a8
commit fee89c044f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 57 additions and 25 deletions

View File

@ -13,17 +13,25 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
class ArtemisMessagingClient(private val config: SSLConfiguration, private val serverAddress: NetworkHostAndPort, private val maxMessageSize: Int) { interface ArtemisSessionProvider {
fun start(): ArtemisMessagingClient.Started
fun stop()
val started: ArtemisMessagingClient.Started?
}
class ArtemisMessagingClient(private val config: SSLConfiguration,
private val serverAddress: NetworkHostAndPort,
private val maxMessageSize: Int) : ArtemisSessionProvider {
companion object { companion object {
private val log = loggerFor<ArtemisMessagingClient>() private val log = loggerFor<ArtemisMessagingClient>()
} }
class Started(val sessionFactory: ClientSessionFactory, val session: ClientSession, val producer: ClientProducer) class Started(val sessionFactory: ClientSessionFactory, val session: ClientSession, val producer: ClientProducer)
var started: Started? = null override var started: Started? = null
private set private set
fun start(): Started = synchronized(this) { override fun start(): Started = synchronized(this) {
check(started == null) { "start can't be called twice" } check(started == null) { "start can't be called twice" }
log.info("Connecting to message broker: $serverAddress") log.info("Connecting to message broker: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used // TODO Add broker CN to config for host verification in case the embedded broker isn't used
@ -48,7 +56,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration, private val s
return Started(sessionFactory, session, producer).also { started = it } return Started(sessionFactory, session, producer).also { started = it }
} }
fun stop() = synchronized(this) { override fun stop() = synchronized(this) {
started?.run { started?.run {
producer.close() producer.close()
// Ensure any trailing messages are committed to the journal // Ensure any trailing messages are committed to the journal

View File

@ -12,6 +12,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
@ -35,7 +36,7 @@ import kotlin.concurrent.withLock
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager. * The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
*/ */
@VisibleForTesting @VisibleForTesting
class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager { class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
private val lock = ReentrantLock() private val lock = ReentrantLock()
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>() private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
@ -43,7 +44,9 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
private val keyStore = config.loadSslKeyStore().internal private val keyStore = config.loadSslKeyStore().internal
private val keyStorePrivateKeyPassword: String = config.keyStorePassword private val keyStorePrivateKeyPassword: String = config.keyStorePassword
private val trustStore = config.loadTrustStore().internal private val trustStore = config.loadTrustStore().internal
private var artemis: ArtemisMessagingClient? = null private var artemis: ArtemisSessionProvider? = null
constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int) : this(config, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object { companion object {
private const val NUM_BRIDGE_THREADS = 0 // Default sized pool private const val NUM_BRIDGE_THREADS = 0 // Default sized pool
@ -64,7 +67,7 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
keyStorePrivateKeyPassword: String, keyStorePrivateKeyPassword: String,
trustStore: KeyStore, trustStore: KeyStore,
sharedEventGroup: EventLoopGroup, sharedEventGroup: EventLoopGroup,
private val artemis: ArtemisMessagingClient) { private val artemis: ArtemisSessionProvider) {
companion object { companion object {
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort" fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
} }
@ -190,7 +193,7 @@ class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: Networ
override fun start() { override fun start() {
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS) sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS)
val artemis = ArtemisMessagingClient(config, p2pAddress, maxMessageSize) val artemis = artemisMessageClientFactory()
this.artemis = artemis this.artemis = artemis
artemis.start() artemis.start()
} }

View File

@ -10,6 +10,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
@ -18,14 +19,17 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
import java.util.* import java.util.*
class BridgeControlListener(val config: NodeSSLConfiguration, class BridgeControlListener(val config: NodeSSLConfiguration,
val p2pAddress: NetworkHostAndPort, val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
val maxMessageSize: Int) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString() private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, p2pAddress, maxMessageSize) private val bridgeManager: BridgeManager = AMQPBridgeManager(config, artemisMessageClientFactory)
private val validInboundQueues = mutableSetOf<String>() private val validInboundQueues = mutableSetOf<String>()
private var artemis: ArtemisMessagingClient? = null private var artemis: ArtemisSessionProvider? = null
private var controlConsumer: ClientConsumer? = null private var controlConsumer: ClientConsumer? = null
constructor(config: NodeSSLConfiguration,
p2pAddress: NetworkHostAndPort,
maxMessageSize: Int) : this(config, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
} }
@ -33,7 +37,7 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
fun start() { fun start() {
stop() stop()
bridgeManager.start() bridgeManager.start()
val artemis = ArtemisMessagingClient(config, p2pAddress, maxMessageSize) val artemis = artemisMessageClientFactory()
this.artemis = artemis this.artemis = artemis
artemis.start() artemis.start()
val artemisClient = artemis.started!! val artemisClient = artemis.started!!
@ -56,6 +60,7 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
} }
fun stop() { fun stop() {
validInboundQueues.clear()
controlConsumer?.close() controlConsumer?.close()
controlConsumer = null controlConsumer = null
artemis?.stop() artemis?.stop()
@ -65,6 +70,10 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
override fun close() = stop() override fun close() = stop()
fun validateReceiveTopic(topic: String): Boolean {
return topic in validInboundQueues
}
private fun validateInboxQueueName(queueName: String): Boolean { private fun validateInboxQueueName(queueName: String): Boolean {
return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists
} }
@ -90,7 +99,6 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
for (outQueue in controlMessage.sendQueues) { for (outQueue in controlMessage.sendQueues) {
bridgeManager.deployBridge(outQueue.queueName, outQueue.targets.first(), outQueue.legalNames.toSet()) bridgeManager.deployBridge(outQueue.queueName, outQueue.targets.first(), outQueue.legalNames.toSet())
} }
// TODO For now we just record the inboxes, but we don't use the information, but eventually out of process bridges will use this for validating inbound messages.
validInboundQueues.addAll(controlMessage.inboxQueues) validInboundQueues.addAll(controlMessage.inboxQueues)
} }
is BridgeControl.BridgeToNodeSnapshotRequest -> { is BridgeControl.BridgeToNodeSnapshotRequest -> {

View File

@ -38,8 +38,8 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() { private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler") private val log = LoggerFactory.getLogger(allowedRemoteLegalNames?.firstOrNull()?.toString() ?: "AMQPChannelHandler")
private lateinit var remoteAddress: InetSocketAddress private lateinit var remoteAddress: InetSocketAddress
private lateinit var localCert: X509Certificate private var localCert: X509Certificate? = null
private lateinit var remoteCert: X509Certificate private var remoteCert: X509Certificate? = null
private var eventProcessor: EventProcessor? = null private var eventProcessor: EventProcessor? = null
override fun channelActive(ctx: ChannelHandlerContext) { override fun channelActive(ctx: ChannelHandlerContext) {
@ -51,7 +51,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
private fun createAMQPEngine(ctx: ChannelHandlerContext) { private fun createAMQPEngine(ctx: ChannelHandlerContext) {
val ch = ctx.channel() val ch = ctx.channel()
eventProcessor = EventProcessor(ch, serverMode, localCert.subjectX500Principal.toString(), remoteCert.subjectX500Principal.toString(), userName, password) eventProcessor = EventProcessor(ch, serverMode, localCert!!.subjectX500Principal.toString(), remoteCert!!.subjectX500Principal.toString(), userName, password)
val connection = eventProcessor!!.connection val connection = eventProcessor!!.connection
val transport = connection.transport as ProtonJTransport val transport = connection.transport as ProtonJTransport
if (trace) { if (trace) {
@ -72,7 +72,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
override fun channelInactive(ctx: ChannelHandlerContext) { override fun channelInactive(ctx: ChannelHandlerContext) {
val ch = ctx.channel() val ch = ctx.channel()
log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}") log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, null, false))) onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false)))
eventProcessor?.close() eventProcessor?.close()
ctx.fireChannelInactive() ctx.fireChannelInactive()
} }
@ -84,7 +84,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
localCert = sslHandler.engine().session.localCertificates[0].x509 localCert = sslHandler.engine().session.localCertificates[0].x509
remoteCert = sslHandler.engine().session.peerCertificates[0].x509 remoteCert = sslHandler.engine().session.peerCertificates[0].x509
try { try {
val remoteX500Name = CordaX500Name.build(remoteCert.subjectX500Principal) val remoteX500Name = CordaX500Name.build(remoteCert!!.subjectX500Principal)
require(allowedRemoteLegalNames == null || remoteX500Name in allowedRemoteLegalNames) require(allowedRemoteLegalNames == null || remoteX500Name in allowedRemoteLegalNames)
log.info("handshake completed subject: $remoteX500Name") log.info("handshake completed subject: $remoteX500Name")
} catch (ex: IllegalArgumentException) { } catch (ex: IllegalArgumentException) {
@ -124,7 +124,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
require(inetAddress == remoteAddress) { require(inetAddress == remoteAddress) {
"Message for incorrect endpoint" "Message for incorrect endpoint"
} }
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert.subjectX500Principal)) { require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) {
"Message for incorrect legal identity" "Message for incorrect legal identity"
} }
log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" } log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }

View File

@ -38,7 +38,7 @@ class AMQPServer(val hostName: String,
private val userName: String?, private val userName: String?,
private val password: String?, private val password: String?,
private val keyStore: KeyStore, private val keyStore: KeyStore,
private val keyStorePrivateKeyPassword: String, private val keyStorePrivateKeyPassword: CharArray,
private val trustStore: KeyStore, private val trustStore: KeyStore,
private val trace: Boolean = false) : AutoCloseable { private val trace: Boolean = false) : AutoCloseable {
@ -59,15 +59,21 @@ class AMQPServer(val hostName: String,
private var serverChannel: Channel? = null private var serverChannel: Channel? = null
private val clientChannels = ConcurrentHashMap<InetSocketAddress, SocketChannel>() private val clientChannels = ConcurrentHashMap<InetSocketAddress, SocketChannel>()
init { constructor(hostName: String,
} port: Int,
userName: String?,
password: String?,
keyStore: KeyStore,
keyStorePrivateKeyPassword: String,
trustStore: KeyStore,
trace: Boolean = false) : this(hostName, port, userName, password, keyStore, keyStorePrivateKeyPassword.toCharArray(), trustStore, trace)
private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() { private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() {
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
init { init {
keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword.toCharArray()) keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword)
trustManagerFactory.init(parent.trustStore) trustManagerFactory.init(parent.trustStore)
} }
@ -169,6 +175,13 @@ class AMQPServer(val hostName: String,
} }
} }
fun dropConnection(connectionRemoteHost: InetSocketAddress) {
val channel = clientChannels[connectionRemoteHost]
if (channel != null) {
channel.close()
}
}
fun complete(delivery: Delivery, target: InetSocketAddress) { fun complete(delivery: Delivery, target: InetSocketAddress) {
val channel = clientChannels[target] val channel = clientChannels[target]
channel?.apply { channel?.apply {

View File

@ -210,7 +210,7 @@ open class Node(configuration: NodeConfiguration,
} }
override fun myAddresses(): List<NetworkHostAndPort> { override fun myAddresses(): List<NetworkHostAndPort> {
return listOf(configuration.messagingServerAddress ?: getAdvertisedAddress()) return listOf(getAdvertisedAddress())
} }
private fun getAdvertisedAddress(): NetworkHostAndPort { private fun getAdvertisedAddress(): NetworkHostAndPort {