Ent 1394 ha artemis (#1428)

* ENT-1394: artemis locators for HA bases on configuration

* ENT-1394: bridge artemis connection service with HA impl

* ENT-1394: added own round robin logic for artemis client connections

* ENT-1394: added support for external clustered artemis to internal RPC client

* ENT-1394: added failover listener to internal rpc client, some cleanup

* ENT-1394: remove unused import

* ENT-1394: refactored after rebasing

* ENT-1394: refactored after rebasing, addressed PR comments

* ENT-1394: got rid of HA connection service in favor of a simple check inside current one

* ENT-1394: ha locator gets its own thread to handle failover that happens behind the scenes

* ENT-1394: move ha artemis flag in the outboundConfig

* ENT-1394: haArtemis flag has default value in constructor

* ENT-1394: address PR comment, handle status change during failover

* ENT-1394: reverted usage of ha locator

* ENT-1394: ensure that on failover the p2pclient sends a fresh snapshot to the bridge
This commit is contained in:
bpaunescu 2018-10-25 15:21:26 +01:00 committed by GitHub
parent 63f80d1fb5
commit c8b65c933f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 68 additions and 13 deletions

View File

@ -60,7 +60,6 @@ class BridgeArtemisConnectionServiceImpl(val conf: FirewallConfiguration,
val outboundConf = conf.outboundConfig!!
log.info("Connecting to message broker: ${outboundConf.artemisBrokerAddress}")
val brokerAddresses = listOf(outboundConf.artemisBrokerAddress) + outboundConf.alternateArtemisBrokerAddresses
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransports = brokerAddresses.map { ArtemisTcpTransport.p2pConnectorTcpTransport(it, sslConfiguration) }
locator = ActiveMQClient.createServerLocatorWithoutHA(*tcpTransports.toTypedArray()).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this

View File

@ -4,6 +4,8 @@ import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransportFromList
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
@ -22,7 +24,8 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
private val autoCommitSends: Boolean = true,
private val autoCommitAcks: Boolean = true,
private val confirmationWindowSize: Int = -1,
private val externalBrokerConnectionConfig: ExternalBrokerConnectionConfiguration? = null
private val externalBrokerConnectionConfig: ExternalBrokerConnectionConfiguration? = null,
private val backupServerAddressPool: List<NetworkHostAndPort> = emptyList()
) : ArtemisSessionProvider {
companion object {
private val log = loggerFor<ArtemisMessagingClient>()
@ -35,10 +38,15 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
override fun start(): Started = synchronized(this) {
check(started == null) { "start can't be called twice" }
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config)
val backupTransports = p2pConnectorTcpTransportFromList(backupServerAddressPool, config)
log.info("Connecting to message broker: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = ArtemisTcpTransport.p2pConnectorTcpTransport(serverAddress, config)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
if (backupTransports.isNotEmpty()) {
log.info("Back-up message broker addresses: $backupServerAddressPool")
}
// If back-up artemis addresses are configured, the locator will be created using HA mode.
val locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
connectionTTL = 60000
@ -47,6 +55,7 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
isUseGlobalPools = nodeSerializationEnv != null
confirmationWindowSize = this@ArtemisMessagingClient.confirmationWindowSize
externalBrokerConnectionConfig?.let {
connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName
reconnectAttempts = externalBrokerConnectionConfig.reconnectAttempts
retryInterval = externalBrokerConnectionConfig.retryInterval.toMillis()
retryIntervalMultiplier = externalBrokerConnectionConfig.retryIntervalMultiplier

View File

@ -130,6 +130,10 @@ class ArtemisTcpTransport {
return TransportConfiguration(connectorFactoryClassName, options)
}
fun p2pConnectorTcpTransportFromList(hostAndPortList: List<NetworkHostAndPort>, config: MutualSslConfiguration?, enableSSL: Boolean = true): List<TransportConfiguration> = hostAndPortList.map {
p2pConnectorTcpTransport(it, config, enableSSL)
}
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort).toMutableMap()
@ -160,6 +164,7 @@ class ArtemisTcpTransport {
return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions + config.toTransportOptions())
}
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration): TransportConfiguration {
return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions + config.toTransportOptions())
}

View File

@ -2,6 +2,7 @@
package net.corda.nodeapi.internal
import org.apache.activemq.artemis.api.core.client.loadbalance.ConnectionLoadBalancingPolicy
import java.nio.file.FileSystems
import java.nio.file.Path
@ -16,3 +17,18 @@ fun Path.requireOnDefaultFileSystem() {
fun requireMessageSize(messageSize: Int, limit: Int) {
require(messageSize <= limit) { "Message exceeds maxMessageSize network parameter, maxMessageSize: [$limit], message size: [$messageSize]" }
}
/**
* Implementation of an Artemis load balancing policy. It does round-robin always starting from the first position, whereas
* the current [RoundRobinConnectionLoadBalancingPolicy] in Artemis picks the starting position randomly. This can lead to
* attempting to connect to an inactive broker on the first attempt, which can cause start-up delays depending on what connection
* settings are used.
*/
class RoundRobinConnectionPolicy : ConnectionLoadBalancingPolicy {
private var pos = 0
override fun select(max: Int): Int {
pos = if (pos >= max) 0 else pos
return pos++
}
}

View File

@ -9,7 +9,6 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import org.assertj.core.api.Assertions.*
import org.hibernate.exception.DataException
import org.junit.Test
import java.net.URL
import java.nio.file.Path

View File

@ -251,7 +251,8 @@ open class Node(configuration: NodeConfiguration,
true,
true,
-1,
configuration.enterpriseConfiguration.externalBrokerConnectionConfiguration)
configuration.enterpriseConfiguration.externalBrokerConnectionConfiguration,
configuration.enterpriseConfiguration.externalBrokerBackupAddresses)
}
BridgeControlListener(configuration.p2pSslOptions, null, networkParameters.maxMessageSize, artemisClient)
} else {
@ -264,7 +265,11 @@ open class Node(configuration: NodeConfiguration,
rpcThreadPoolSize = configuration.enterpriseConfiguration.tuning.rpcThreadPoolSize
)
rpcServerAddresses?.let {
internalRpcMessagingClient = InternalRPCMessagingClient(configuration.p2pSslOptions, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.p2pSslOptions.keyStore.get()[X509Utilities.CORDA_CLIENT_TLS].subjectX500Principal), rpcServerConfiguration)
internalRpcMessagingClient = InternalRPCMessagingClient(configuration.p2pSslOptions,
it.admin,
MAX_RPC_MESSAGE_SIZE,
CordaX500Name.build(configuration.p2pSslOptions.keyStore.get()[X509Utilities.CORDA_CLIENT_TLS].subjectX500Principal),
rpcServerConfiguration)
printBasicNodeInfo("RPC connection address", it.primary.toString())
printBasicNodeInfo("RPC admin connection address", it.admin.toString())
}

View File

@ -1,5 +1,6 @@
package net.corda.node.services.config
import net.corda.core.utilities.NetworkHostAndPort
import java.io.File
import java.net.InetAddress
import java.nio.file.Path
@ -8,6 +9,7 @@ import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
data class EnterpriseConfiguration(
val mutualExclusionConfiguration: MutualExclusionConfiguration,
val externalBrokerConnectionConfiguration: ExternalBrokerConnectionConfiguration = ExternalBrokerConnectionConfiguration.DEFAULT,
val externalBrokerBackupAddresses: List<NetworkHostAndPort> = emptyList(),
val useMultiThreadedSMM: Boolean = true,
val tuning: PerformanceTuning = PerformanceTuning.default,
val externalBridge: Boolean? = null,

View File

@ -7,8 +7,11 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.config.EnterpriseConfiguration
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_RPC_USER
import net.corda.nodeapi.internal.ArtemisTcpTransport
import net.corda.nodeapi.internal.RoundRobinConnectionPolicy
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ServerLocator
@ -17,7 +20,12 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
/**
* Used by the Node to communicate with the RPC broker.
*/
class InternalRPCMessagingClient<OPS : RPCOps>(val sslConfig: MutualSslConfiguration, val serverAddress: NetworkHostAndPort, val maxMessageSize: Int, val nodeName: CordaX500Name, val rpcServerConfiguration: RPCServerConfiguration) : SingletonSerializeAsToken(), AutoCloseable {
class InternalRPCMessagingClient<OPS : RPCOps>(val sslConfig: MutualSslConfiguration,
val serverAddress: NetworkHostAndPort,
val maxMessageSize: Int,
val nodeName: CordaX500Name,
val rpcServerConfiguration: RPCServerConfiguration) : SingletonSerializeAsToken(), AutoCloseable {
private var locator: ServerLocator? = null
private var rpcServer: RPCServer<OPS>? = null
@ -28,6 +36,7 @@ class InternalRPCMessagingClient<OPS : RPCOps>(val sslConfig: MutualSslConfigura
fun init(rpcOpsRouting: RPCOpsRouting<OPS>, securityManager: RPCSecurityManager, cacheFactory: NamedCacheFactory) = synchronized(this) {
val tcpTransport = ArtemisTcpTransport.rpcInternalClientTcpTransport(serverAddress, sslConfig)
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.

View File

@ -38,6 +38,8 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransportFromList
import net.corda.nodeapi.internal.RoundRobinConnectionPolicy
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -117,6 +119,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
var bridgeNotifyConsumer: ClientConsumer? = null
var networkChangeSubscription: Subscription? = null
var sessionFactory: ClientSessionFactory? = null
val inboxes = mutableSetOf<String>()
fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message)
}
@ -150,6 +153,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
}
FailoverEventType.FAILOVER_COMPLETED -> {
log.info("Connection to broker re-established.")
state.locked {
enumerateBridges(bridgeSession!!, inboxes.toList())
}
}
FailoverEventType.FAILOVER_FAILED -> state.locked {
if (running) {
@ -179,10 +185,14 @@ class P2PMessagingClient(val config: NodeConfiguration,
this.maxMessageSize = maxMessageSize
state.locked {
started = true
log.info("Connecting to message broker: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions)
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
val backupTransports = p2pConnectorTcpTransportFromList(config.enterpriseConfiguration.externalBrokerBackupAddresses, config.p2pSslOptions)
log.info("Connecting to message broker: $serverAddress")
if (backupTransports.isNotEmpty()) {
log.info("Back-up message broker addresses: ${config.enterpriseConfiguration.externalBrokerBackupAddresses}")
}
// If back-up artemis addresses are configured, the locator will be created using HA mode.
locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted.
connectionTTL = 60000
@ -192,6 +202,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
confirmationWindowSize = config.enterpriseConfiguration.tuning.p2pConfirmationWindowSize
// Configuration for dealing with external broker failover
if (config.messagingServerExternal) {
connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName
reconnectAttempts = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts
retryInterval = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis()
retryIntervalMultiplier = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryIntervalMultiplier
@ -214,7 +225,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
producerSession!!.start()
bridgeSession!!.start()
val inboxes = mutableSetOf<String>()
// Create a queue, consumer and producer for handling P2P network messages.
// Create a general purpose producer.
producer = producerSession!!.createProducer()