mirror of
https://github.com/corda/corda.git
synced 2025-03-15 00:36:49 +00:00
renamed external artemis config properties for consistency (#1555)
This commit is contained in:
parent
32ff24ddc7
commit
7fbe8e7de7
@ -255,7 +255,7 @@ absolute path to the node's base directory.
|
||||
:port: Port the graphite instance is listening at.
|
||||
:prefix: Optional prefix string to identify metrics from this node, will default to a string made up
|
||||
from Organisation Name and ip address.
|
||||
:sampleIntervallSeconds: optional wait time between pushing metrics. This will default to 60 seconds.
|
||||
:sampleIntervalSeconds: optional wait time between pushing metrics. This will default to 60 seconds.
|
||||
|
||||
:extraNetworkMapKeys: An optional list of private network map UUIDs. Your node will fetch the public network and private network maps based on
|
||||
these keys. Private network UUID should be provided by network operator and lets you see nodes not visible on public network.
|
||||
|
@ -6,7 +6,7 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
|
||||
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransportFromList
|
||||
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
|
||||
import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
@ -23,7 +23,7 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
private val autoCommitSends: Boolean = true,
|
||||
private val autoCommitAcks: Boolean = true,
|
||||
private val confirmationWindowSize: Int = -1,
|
||||
private val externalBrokerConnectionConfig: ExternalBrokerConnectionConfiguration? = null,
|
||||
private val messagingServerConnectionConfig: MessagingServerConnectionConfiguration? = null,
|
||||
private val backupServerAddressPool: List<NetworkHostAndPort> = emptyList()
|
||||
) : ArtemisSessionProvider {
|
||||
companion object {
|
||||
@ -55,14 +55,14 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
|
||||
minLargeMessageSize = maxMessageSize
|
||||
isUseGlobalPools = nodeSerializationEnv != null
|
||||
confirmationWindowSize = this@ArtemisMessagingClient.confirmationWindowSize
|
||||
externalBrokerConnectionConfig?.let {
|
||||
messagingServerConnectionConfig?.let {
|
||||
connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName
|
||||
reconnectAttempts = externalBrokerConnectionConfig.reconnectAttempts
|
||||
retryInterval = externalBrokerConnectionConfig.retryInterval.toMillis()
|
||||
retryIntervalMultiplier = externalBrokerConnectionConfig.retryIntervalMultiplier
|
||||
maxRetryInterval = externalBrokerConnectionConfig.maxRetryInterval.toMillis()
|
||||
isFailoverOnInitialConnection = externalBrokerConnectionConfig.failoverOnInitialAttempt
|
||||
initialConnectAttempts = externalBrokerConnectionConfig.initialConnectAttempts
|
||||
reconnectAttempts = messagingServerConnectionConfig.reconnectAttempts
|
||||
retryInterval = messagingServerConnectionConfig.retryInterval.toMillis()
|
||||
retryIntervalMultiplier = messagingServerConnectionConfig.retryIntervalMultiplier
|
||||
maxRetryInterval = messagingServerConnectionConfig.maxRetryInterval.toMillis()
|
||||
isFailoverOnInitialConnection = messagingServerConnectionConfig.failoverOnInitialAttempt
|
||||
initialConnectAttempts = messagingServerConnectionConfig.initialConnectAttempts
|
||||
}
|
||||
addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize))
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ import java.time.Duration
|
||||
* @param retryIntervalMultiplier Value used in the reconnection back-off process.
|
||||
* @param maxRetryInterval Determines the maximum duration between reconnection attempts. Useful when using infinite retries.
|
||||
*/
|
||||
enum class ExternalBrokerConnectionConfiguration(
|
||||
enum class MessagingServerConnectionConfiguration(
|
||||
val failoverOnInitialAttempt: Boolean,
|
||||
val initialConnectAttempts: Int,
|
||||
val reconnectAttempts: Int,
|
@ -9,7 +9,7 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
|
||||
import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.MAX_MESSAGE_SIZE
|
||||
@ -114,7 +114,7 @@ class ExternalBrokertests : IntegrationTest() {
|
||||
"devMode" to false, "messagingServerExternal" to true,
|
||||
"messagingServerAddress" to NetworkHostAndPort("localhost", p2pPort).toString(),
|
||||
"enterpriseConfiguration" to mapOf(
|
||||
"externalBrokerConnectionConfiguration" to "FAIL_FAST",
|
||||
"messagingServerConnectionConfiguration" to "FAIL_FAST",
|
||||
"messagingServerSslConfiguration" to mapOf(
|
||||
"sslKeystore" to "${nodeBaseDir}/certificates/sslkeystore.jks",
|
||||
"keyStorePassword" to "cordacadevpass",
|
||||
@ -137,7 +137,7 @@ class ExternalBrokertests : IntegrationTest() {
|
||||
}
|
||||
|
||||
broker.stop()
|
||||
val defaultConfig = ExternalBrokerConnectionConfiguration.FAIL_FAST
|
||||
val defaultConfig = MessagingServerConnectionConfiguration.FAIL_FAST
|
||||
var reconnectTimeout = 0.0
|
||||
(1..defaultConfig.reconnectAttempts).forEach {
|
||||
reconnectTimeout += defaultConfig.retryInterval.toMillis() * defaultConfig.retryIntervalMultiplier.pow(it - 1)
|
||||
|
@ -262,8 +262,8 @@ open class Node(configuration: NodeConfiguration,
|
||||
true,
|
||||
true,
|
||||
-1,
|
||||
configuration.enterpriseConfiguration.externalBrokerConnectionConfiguration,
|
||||
configuration.enterpriseConfiguration.externalBrokerBackupAddresses)
|
||||
configuration.enterpriseConfiguration.messagingServerConnectionConfiguration,
|
||||
configuration.enterpriseConfiguration.messagingServerBackupAddresses)
|
||||
}
|
||||
BridgeControlListener(configuration.p2pSslOptions, null, networkParameters.maxMessageSize, configuration.enableSNI, artemisClient)
|
||||
} else {
|
||||
|
@ -4,14 +4,14 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import java.io.File
|
||||
import java.net.InetAddress
|
||||
import java.nio.file.Path
|
||||
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
|
||||
import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration
|
||||
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
|
||||
data class EnterpriseConfiguration(
|
||||
val mutualExclusionConfiguration: MutualExclusionConfiguration,
|
||||
val externalBrokerConnectionConfiguration: ExternalBrokerConnectionConfiguration = Defaults.externalBrokerConnectionConfiguration,
|
||||
val externalBrokerBackupAddresses: List<NetworkHostAndPort> = Defaults.externalBrokerBackupAddresses,
|
||||
val messagingServerConnectionConfiguration: MessagingServerConnectionConfiguration = Defaults.messagingServerConnectionConfiguration,
|
||||
val messagingServerBackupAddresses: List<NetworkHostAndPort> = Defaults.messagingServerBackupAddresses,
|
||||
val messagingServerSslConfiguration: MessagingServerSslConfiguration? = null,
|
||||
val useMultiThreadedSMM: Boolean = Defaults.useMultiThreadedSMM,
|
||||
val tuning: PerformanceTuning = Defaults.tuning,
|
||||
@ -20,8 +20,8 @@ data class EnterpriseConfiguration(
|
||||
val traceTargetDirectory: Path = Defaults.traceTargetDirectory
|
||||
) {
|
||||
internal object Defaults {
|
||||
val externalBrokerConnectionConfiguration: ExternalBrokerConnectionConfiguration = ExternalBrokerConnectionConfiguration.DEFAULT
|
||||
val externalBrokerBackupAddresses: List<NetworkHostAndPort> = emptyList()
|
||||
val messagingServerConnectionConfiguration: MessagingServerConnectionConfiguration = MessagingServerConnectionConfiguration.DEFAULT
|
||||
val messagingServerBackupAddresses: List<NetworkHostAndPort> = emptyList()
|
||||
val useMultiThreadedSMM: Boolean = true
|
||||
val tuning: PerformanceTuning = PerformanceTuning.default
|
||||
val enableCacheTracing: Boolean = false
|
||||
|
@ -44,7 +44,7 @@ import net.corda.node.services.config.schema.parsers.toURL
|
||||
import net.corda.node.services.config.schema.parsers.toUUID
|
||||
import net.corda.node.services.config.schema.parsers.validValue
|
||||
import net.corda.nodeapi.BrokerRpcSslOptions
|
||||
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
|
||||
import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.TransactionIsolationLevel
|
||||
@ -272,8 +272,8 @@ internal object GraphiteOptionsSpec : Configuration.Specification<GraphiteOption
|
||||
|
||||
internal object EnterpriseConfigurationSpec : Configuration.Specification<EnterpriseConfiguration>("EnterpriseConfiguration") {
|
||||
private val mutualExclusionConfiguration by nested(MutualExclusionConfigurationSpec)
|
||||
private val externalBrokerConnectionConfiguration by enum(ExternalBrokerConnectionConfiguration::class).optional().withDefaultValue(EnterpriseConfiguration.Defaults.externalBrokerConnectionConfiguration)
|
||||
private val externalBrokerBackupAddresses by string().mapValid(::toNetworkHostAndPort).list().optional().withDefaultValue(EnterpriseConfiguration.Defaults.externalBrokerBackupAddresses)
|
||||
private val messagingServerConnectionConfiguration by enum(MessagingServerConnectionConfiguration::class).optional().withDefaultValue(EnterpriseConfiguration.Defaults.messagingServerConnectionConfiguration)
|
||||
private val messagingServerBackupAddresses by string().mapValid(::toNetworkHostAndPort).list().optional().withDefaultValue(EnterpriseConfiguration.Defaults.messagingServerBackupAddresses)
|
||||
private val useMultiThreadedSMM by boolean().optional().withDefaultValue(EnterpriseConfiguration.Defaults.useMultiThreadedSMM)
|
||||
private val tuning by nested(PerformanceTuningSpec).optional().withDefaultValue(EnterpriseConfiguration.Defaults.tuning)
|
||||
private val externalBridge by boolean().optional()
|
||||
@ -284,8 +284,8 @@ internal object EnterpriseConfigurationSpec : Configuration.Specification<Enterp
|
||||
override fun parseValid(configuration: Config): Valid<EnterpriseConfiguration> {
|
||||
return valid(EnterpriseConfiguration(
|
||||
configuration[mutualExclusionConfiguration],
|
||||
configuration[externalBrokerConnectionConfiguration],
|
||||
configuration[externalBrokerBackupAddresses],
|
||||
configuration[messagingServerConnectionConfiguration],
|
||||
configuration[messagingServerBackupAddresses],
|
||||
configuration[messagingServerSslConfiguration],
|
||||
configuration[useMultiThreadedSMM],
|
||||
configuration[tuning],
|
||||
|
@ -7,11 +7,8 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.config.EnterpriseConfiguration
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_RPC_USER
|
||||
import net.corda.nodeapi.internal.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.internal.RoundRobinConnectionPolicy
|
||||
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||
|
@ -150,7 +150,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
private fun failoverCallback(event: FailoverEventType) {
|
||||
when (event) {
|
||||
FailoverEventType.FAILURE_DETECTED -> {
|
||||
log.warn("Connection to the broker was lost. Starting ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} reconnect attempts.")
|
||||
log.warn("Connection to the broker was lost. Trying to reconnect.")
|
||||
}
|
||||
FailoverEventType.FAILOVER_COMPLETED -> {
|
||||
log.info("Connection to broker re-established.")
|
||||
@ -160,8 +160,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
}
|
||||
FailoverEventType.FAILOVER_FAILED -> state.locked {
|
||||
if (running) {
|
||||
log.error("Could not reconnect to the broker after ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} attempts. Node is shutting down.")
|
||||
Thread.sleep(config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis())
|
||||
log.error("Could not reconnect to the broker after ${config.enterpriseConfiguration.messagingServerConnectionConfiguration.reconnectAttempts} attempts. Node is shutting down.")
|
||||
Thread.sleep(config.enterpriseConfiguration.messagingServerConnectionConfiguration.retryInterval.toMillis())
|
||||
Runtime.getRuntime().halt(1)
|
||||
}
|
||||
}
|
||||
@ -192,10 +192,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
config.p2pSslOptions
|
||||
}
|
||||
val tcpTransport = p2pConnectorTcpTransport(serverAddress, sslOptions)
|
||||
val backupTransports = p2pConnectorTcpTransportFromList(config.enterpriseConfiguration.externalBrokerBackupAddresses, sslOptions)
|
||||
val backupTransports = p2pConnectorTcpTransportFromList(config.enterpriseConfiguration.messagingServerBackupAddresses, sslOptions)
|
||||
log.info("Connecting to message broker: $serverAddress")
|
||||
if (backupTransports.isNotEmpty()) {
|
||||
log.info("Back-up message broker addresses: ${config.enterpriseConfiguration.externalBrokerBackupAddresses}")
|
||||
log.info("Back-up message broker addresses: ${config.enterpriseConfiguration.messagingServerBackupAddresses}")
|
||||
}
|
||||
// If back-up artemis addresses are configured, the locator will be created using HA mode.
|
||||
locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply {
|
||||
@ -211,12 +211,12 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
// Configuration for dealing with external broker failover
|
||||
if (config.messagingServerExternal) {
|
||||
connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName
|
||||
reconnectAttempts = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts
|
||||
retryInterval = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis()
|
||||
retryIntervalMultiplier = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryIntervalMultiplier
|
||||
maxRetryInterval = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.maxRetryInterval.toMillis()
|
||||
isFailoverOnInitialConnection = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.failoverOnInitialAttempt
|
||||
initialConnectAttempts = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.initialConnectAttempts
|
||||
reconnectAttempts = config.enterpriseConfiguration.messagingServerConnectionConfiguration.reconnectAttempts
|
||||
retryInterval = config.enterpriseConfiguration.messagingServerConnectionConfiguration.retryInterval.toMillis()
|
||||
retryIntervalMultiplier = config.enterpriseConfiguration.messagingServerConnectionConfiguration.retryIntervalMultiplier
|
||||
maxRetryInterval = config.enterpriseConfiguration.messagingServerConnectionConfiguration.maxRetryInterval.toMillis()
|
||||
isFailoverOnInitialConnection = config.enterpriseConfiguration.messagingServerConnectionConfiguration.failoverOnInitialAttempt
|
||||
initialConnectAttempts = config.enterpriseConfiguration.messagingServerConnectionConfiguration.initialConnectAttempts
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user