diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt index 7c0ac104fe..c59b0049a1 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt @@ -60,7 +60,6 @@ class BridgeArtemisConnectionServiceImpl(val conf: FirewallConfiguration, val outboundConf = conf.outboundConfig!! log.info("Connecting to message broker: ${outboundConf.artemisBrokerAddress}") val brokerAddresses = listOf(outboundConf.artemisBrokerAddress) + outboundConf.alternateArtemisBrokerAddresses - // TODO Add broker CN to config for host verification in case the embedded broker isn't used val tcpTransports = brokerAddresses.map { ArtemisTcpTransport.p2pConnectorTcpTransport(it, sslConfiguration) } locator = ActiveMQClient.createServerLocatorWithoutHA(*tcpTransports.toTypedArray()).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt index 4a8912939c..0545eb14af 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt @@ -4,6 +4,8 @@ import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER +import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport +import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransportFromList import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration import org.apache.activemq.artemis.api.core.client.ActiveMQClient @@ -22,7 +24,8 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, private val autoCommitSends: Boolean = true, private val autoCommitAcks: Boolean = true, private val confirmationWindowSize: Int = -1, - private val externalBrokerConnectionConfig: ExternalBrokerConnectionConfiguration? = null + private val externalBrokerConnectionConfig: ExternalBrokerConnectionConfiguration? = null, + private val backupServerAddressPool: List = emptyList() ) : ArtemisSessionProvider { companion object { private val log = loggerFor() @@ -35,10 +38,15 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, override fun start(): Started = synchronized(this) { check(started == null) { "start can't be called twice" } + val tcpTransport = p2pConnectorTcpTransport(serverAddress, config) + val backupTransports = p2pConnectorTcpTransportFromList(backupServerAddressPool, config) + log.info("Connecting to message broker: $serverAddress") - // TODO Add broker CN to config for host verification in case the embedded broker isn't used - val tcpTransport = ArtemisTcpTransport.p2pConnectorTcpTransport(serverAddress, config) - val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { + if (backupTransports.isNotEmpty()) { + log.info("Back-up message broker addresses: $backupServerAddressPool") + } + // If back-up artemis addresses are configured, the locator will be created using HA mode. + val locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // would be the default and the two lines below can be deleted. connectionTTL = 60000 @@ -47,6 +55,7 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, isUseGlobalPools = nodeSerializationEnv != null confirmationWindowSize = this@ArtemisMessagingClient.confirmationWindowSize externalBrokerConnectionConfig?.let { + connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName reconnectAttempts = externalBrokerConnectionConfig.reconnectAttempts retryInterval = externalBrokerConnectionConfig.retryInterval.toMillis() retryIntervalMultiplier = externalBrokerConnectionConfig.retryIntervalMultiplier diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt index 0b484e4de4..a1c33fcdd1 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt @@ -130,6 +130,10 @@ class ArtemisTcpTransport { return TransportConfiguration(connectorFactoryClassName, options) } + fun p2pConnectorTcpTransportFromList(hostAndPortList: List, config: MutualSslConfiguration?, enableSSL: Boolean = true): List = hostAndPortList.map { + p2pConnectorTcpTransport(it, config, enableSSL) + } + fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { val options = defaultArtemisOptions(hostAndPort).toMutableMap() @@ -160,6 +164,7 @@ class ArtemisTcpTransport { return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions + config.toTransportOptions()) } + fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration): TransportConfiguration { return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions + config.toTransportOptions()) } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisUtils.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisUtils.kt index 97abc0d024..2a39ace1cf 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisUtils.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisUtils.kt @@ -2,6 +2,7 @@ package net.corda.nodeapi.internal +import org.apache.activemq.artemis.api.core.client.loadbalance.ConnectionLoadBalancingPolicy import java.nio.file.FileSystems import java.nio.file.Path @@ -16,3 +17,18 @@ fun Path.requireOnDefaultFileSystem() { fun requireMessageSize(messageSize: Int, limit: Int) { require(messageSize <= limit) { "Message exceeds maxMessageSize network parameter, maxMessageSize: [$limit], message size: [$messageSize]" } } + +/** + * Implementation of an Artemis load balancing policy. It does round-robin always starting from the first position, whereas + * the current [RoundRobinConnectionLoadBalancingPolicy] in Artemis picks the starting position randomly. This can lead to + * attempting to connect to an inactive broker on the first attempt, which can cause start-up delays depending on what connection + * settings are used. + */ +class RoundRobinConnectionPolicy : ConnectionLoadBalancingPolicy { + private var pos = 0 + + override fun select(max: Int): Int { + pos = if (pos >= max) 0 else pos + return pos++ + } +} diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/config/ConfigParsingTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/config/ConfigParsingTest.kt index cb92e9431b..4830759478 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/config/ConfigParsingTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/config/ConfigParsingTest.kt @@ -9,7 +9,6 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort import org.assertj.core.api.Assertions.* -import org.hibernate.exception.DataException import org.junit.Test import java.net.URL import java.nio.file.Path diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index f2b7553d8d..8941bbae30 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -251,7 +251,8 @@ open class Node(configuration: NodeConfiguration, true, true, -1, - configuration.enterpriseConfiguration.externalBrokerConnectionConfiguration) + configuration.enterpriseConfiguration.externalBrokerConnectionConfiguration, + configuration.enterpriseConfiguration.externalBrokerBackupAddresses) } BridgeControlListener(configuration.p2pSslOptions, null, networkParameters.maxMessageSize, artemisClient) } else { @@ -264,7 +265,11 @@ open class Node(configuration: NodeConfiguration, rpcThreadPoolSize = configuration.enterpriseConfiguration.tuning.rpcThreadPoolSize ) rpcServerAddresses?.let { - internalRpcMessagingClient = InternalRPCMessagingClient(configuration.p2pSslOptions, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.p2pSslOptions.keyStore.get()[X509Utilities.CORDA_CLIENT_TLS].subjectX500Principal), rpcServerConfiguration) + internalRpcMessagingClient = InternalRPCMessagingClient(configuration.p2pSslOptions, + it.admin, + MAX_RPC_MESSAGE_SIZE, + CordaX500Name.build(configuration.p2pSslOptions.keyStore.get()[X509Utilities.CORDA_CLIENT_TLS].subjectX500Principal), + rpcServerConfiguration) printBasicNodeInfo("RPC connection address", it.primary.toString()) printBasicNodeInfo("RPC admin connection address", it.admin.toString()) } diff --git a/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt index 5eec05d1bb..b81d20e45f 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt @@ -1,5 +1,6 @@ package net.corda.node.services.config +import net.corda.core.utilities.NetworkHostAndPort import java.io.File import java.net.InetAddress import java.nio.file.Path @@ -8,6 +9,7 @@ import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration data class EnterpriseConfiguration( val mutualExclusionConfiguration: MutualExclusionConfiguration, val externalBrokerConnectionConfiguration: ExternalBrokerConnectionConfiguration = ExternalBrokerConnectionConfiguration.DEFAULT, + val externalBrokerBackupAddresses: List = emptyList(), val useMultiThreadedSMM: Boolean = true, val tuning: PerformanceTuning = PerformanceTuning.default, val externalBridge: Boolean? = null, diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt index 544744d557..98c1f239ad 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt @@ -7,8 +7,11 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.security.RPCSecurityManager +import net.corda.node.services.config.EnterpriseConfiguration import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_RPC_USER import net.corda.nodeapi.internal.ArtemisTcpTransport +import net.corda.nodeapi.internal.RoundRobinConnectionPolicy +import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ServerLocator @@ -17,7 +20,12 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl /** * Used by the Node to communicate with the RPC broker. */ -class InternalRPCMessagingClient(val sslConfig: MutualSslConfiguration, val serverAddress: NetworkHostAndPort, val maxMessageSize: Int, val nodeName: CordaX500Name, val rpcServerConfiguration: RPCServerConfiguration) : SingletonSerializeAsToken(), AutoCloseable { + +class InternalRPCMessagingClient(val sslConfig: MutualSslConfiguration, + val serverAddress: NetworkHostAndPort, + val maxMessageSize: Int, + val nodeName: CordaX500Name, + val rpcServerConfiguration: RPCServerConfiguration) : SingletonSerializeAsToken(), AutoCloseable { private var locator: ServerLocator? = null private var rpcServer: RPCServer? = null @@ -28,6 +36,7 @@ class InternalRPCMessagingClient(val sslConfig: MutualSslConfigura fun init(rpcOpsRouting: RPCOpsRouting, securityManager: RPCSecurityManager, cacheFactory: NamedCacheFactory) = synchronized(this) { val tcpTransport = ArtemisTcpTransport.rpcInternalClientTcpTransport(serverAddress, sslConfig) + locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // would be the default and the two lines below can be deleted. diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 8ec46dbaf9..720f3149ec 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -38,6 +38,8 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport +import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransportFromList +import net.corda.nodeapi.internal.RoundRobinConnectionPolicy import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -117,6 +119,7 @@ class P2PMessagingClient(val config: NodeConfiguration, var bridgeNotifyConsumer: ClientConsumer? = null var networkChangeSubscription: Subscription? = null var sessionFactory: ClientSessionFactory? = null + val inboxes = mutableSetOf() fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message) } @@ -150,6 +153,9 @@ class P2PMessagingClient(val config: NodeConfiguration, } FailoverEventType.FAILOVER_COMPLETED -> { log.info("Connection to broker re-established.") + state.locked { + enumerateBridges(bridgeSession!!, inboxes.toList()) + } } FailoverEventType.FAILOVER_FAILED -> state.locked { if (running) { @@ -179,10 +185,14 @@ class P2PMessagingClient(val config: NodeConfiguration, this.maxMessageSize = maxMessageSize state.locked { started = true - log.info("Connecting to message broker: $serverAddress") - // TODO Add broker CN to config for host verification in case the embedded broker isn't used val tcpTransport = p2pConnectorTcpTransport(serverAddress, config.p2pSslOptions) - locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { + val backupTransports = p2pConnectorTcpTransportFromList(config.enterpriseConfiguration.externalBrokerBackupAddresses, config.p2pSslOptions) + log.info("Connecting to message broker: $serverAddress") + if (backupTransports.isNotEmpty()) { + log.info("Back-up message broker addresses: ${config.enterpriseConfiguration.externalBrokerBackupAddresses}") + } + // If back-up artemis addresses are configured, the locator will be created using HA mode. + locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // would be the default and the two lines below can be deleted. connectionTTL = 60000 @@ -192,6 +202,7 @@ class P2PMessagingClient(val config: NodeConfiguration, confirmationWindowSize = config.enterpriseConfiguration.tuning.p2pConfirmationWindowSize // Configuration for dealing with external broker failover if (config.messagingServerExternal) { + connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName reconnectAttempts = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts retryInterval = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis() retryIntervalMultiplier = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryIntervalMultiplier @@ -214,7 +225,7 @@ class P2PMessagingClient(val config: NodeConfiguration, producerSession!!.start() bridgeSession!!.start() - val inboxes = mutableSetOf() + // Create a queue, consumer and producer for handling P2P network messages. // Create a general purpose producer. producer = producerSession!!.createProducer()