diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 7bd2181641..898e4c4c52 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -255,7 +255,7 @@ absolute path to the node's base directory. :port: Port the graphite instance is listening at. :prefix: Optional prefix string to identify metrics from this node, will default to a string made up from Organisation Name and ip address. - :sampleIntervallSeconds: optional wait time between pushing metrics. This will default to 60 seconds. + :sampleIntervalSeconds: optional wait time between pushing metrics. This will default to 60 seconds. :extraNetworkMapKeys: An optional list of private network map UUIDs. Your node will fetch the public network and private network maps based on these keys. Private network UUID should be provided by network operator and lets you see nodes not visible on public network. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt index b51051a08e..c17b6a3efe 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt @@ -6,7 +6,7 @@ import net.corda.core.utilities.loggerFor import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransportFromList -import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration +import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration import org.apache.activemq.artemis.api.core.client.* import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE @@ -23,7 +23,7 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, private val autoCommitSends: Boolean = true, private val autoCommitAcks: Boolean = true, private val confirmationWindowSize: Int = -1, - private val externalBrokerConnectionConfig: ExternalBrokerConnectionConfiguration? = null, + private val messagingServerConnectionConfig: MessagingServerConnectionConfiguration? = null, private val backupServerAddressPool: List = emptyList() ) : ArtemisSessionProvider { companion object { @@ -55,14 +55,14 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, minLargeMessageSize = maxMessageSize isUseGlobalPools = nodeSerializationEnv != null confirmationWindowSize = this@ArtemisMessagingClient.confirmationWindowSize - externalBrokerConnectionConfig?.let { + messagingServerConnectionConfig?.let { connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName - reconnectAttempts = externalBrokerConnectionConfig.reconnectAttempts - retryInterval = externalBrokerConnectionConfig.retryInterval.toMillis() - retryIntervalMultiplier = externalBrokerConnectionConfig.retryIntervalMultiplier - maxRetryInterval = externalBrokerConnectionConfig.maxRetryInterval.toMillis() - isFailoverOnInitialConnection = externalBrokerConnectionConfig.failoverOnInitialAttempt - initialConnectAttempts = externalBrokerConnectionConfig.initialConnectAttempts + reconnectAttempts = messagingServerConnectionConfig.reconnectAttempts + retryInterval = messagingServerConnectionConfig.retryInterval.toMillis() + retryIntervalMultiplier = messagingServerConnectionConfig.retryIntervalMultiplier + maxRetryInterval = messagingServerConnectionConfig.maxRetryInterval.toMillis() + isFailoverOnInitialConnection = messagingServerConnectionConfig.failoverOnInitialAttempt + initialConnectAttempts = messagingServerConnectionConfig.initialConnectAttempts } addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize)) } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ExternalBrokerConnectionConfiguration.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/MessagingServerConnectionConfiguration.kt similarity index 98% rename from node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ExternalBrokerConnectionConfiguration.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/config/MessagingServerConnectionConfiguration.kt index 2ec2f7f3ca..2cbe6211f5 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ExternalBrokerConnectionConfiguration.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/MessagingServerConnectionConfiguration.kt @@ -24,7 +24,7 @@ import java.time.Duration * @param retryIntervalMultiplier Value used in the reconnection back-off process. * @param maxRetryInterval Determines the maximum duration between reconnection attempts. Useful when using infinite retries. */ -enum class ExternalBrokerConnectionConfiguration( +enum class MessagingServerConnectionConfiguration( val failoverOnInitialAttempt: Boolean, val initialConnectAttempts: Int, val reconnectAttempts: Int, diff --git a/node/src/integration-test/kotlin/net/corda/node/ExternalBrokerTests.kt b/node/src/integration-test/kotlin/net/corda/node/ExternalBrokerTests.kt index 538df50361..dcc416e571 100644 --- a/node/src/integration-test/kotlin/net/corda/node/ExternalBrokerTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/ExternalBrokerTests.kt @@ -9,7 +9,7 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow import net.corda.node.services.config.* import net.corda.node.services.messaging.ArtemisMessagingServer -import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration +import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.DUMMY_NOTARY_NAME import net.corda.testing.core.MAX_MESSAGE_SIZE @@ -114,7 +114,7 @@ class ExternalBrokertests : IntegrationTest() { "devMode" to false, "messagingServerExternal" to true, "messagingServerAddress" to NetworkHostAndPort("localhost", p2pPort).toString(), "enterpriseConfiguration" to mapOf( - "externalBrokerConnectionConfiguration" to "FAIL_FAST", + "messagingServerConnectionConfiguration" to "FAIL_FAST", "messagingServerSslConfiguration" to mapOf( "sslKeystore" to "${nodeBaseDir}/certificates/sslkeystore.jks", "keyStorePassword" to "cordacadevpass", @@ -137,7 +137,7 @@ class ExternalBrokertests : IntegrationTest() { } broker.stop() - val defaultConfig = ExternalBrokerConnectionConfiguration.FAIL_FAST + val defaultConfig = MessagingServerConnectionConfiguration.FAIL_FAST var reconnectTimeout = 0.0 (1..defaultConfig.reconnectAttempts).forEach { reconnectTimeout += defaultConfig.retryInterval.toMillis() * defaultConfig.retryIntervalMultiplier.pow(it - 1) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 7280b41ee9..6120029596 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -262,8 +262,8 @@ open class Node(configuration: NodeConfiguration, true, true, -1, - configuration.enterpriseConfiguration.externalBrokerConnectionConfiguration, - configuration.enterpriseConfiguration.externalBrokerBackupAddresses) + configuration.enterpriseConfiguration.messagingServerConnectionConfiguration, + configuration.enterpriseConfiguration.messagingServerBackupAddresses) } BridgeControlListener(configuration.p2pSslOptions, null, networkParameters.maxMessageSize, configuration.enableSNI, artemisClient) } else { diff --git a/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt index 970938d527..8f9cdc3728 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt @@ -4,14 +4,14 @@ import net.corda.core.utilities.NetworkHostAndPort import java.io.File import java.net.InetAddress import java.nio.file.Path -import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration +import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier import net.corda.nodeapi.internal.config.MutualSslConfiguration data class EnterpriseConfiguration( val mutualExclusionConfiguration: MutualExclusionConfiguration, - val externalBrokerConnectionConfiguration: ExternalBrokerConnectionConfiguration = Defaults.externalBrokerConnectionConfiguration, - val externalBrokerBackupAddresses: List = Defaults.externalBrokerBackupAddresses, + val messagingServerConnectionConfiguration: MessagingServerConnectionConfiguration = Defaults.messagingServerConnectionConfiguration, + val messagingServerBackupAddresses: List = Defaults.messagingServerBackupAddresses, val messagingServerSslConfiguration: MessagingServerSslConfiguration? = null, val useMultiThreadedSMM: Boolean = Defaults.useMultiThreadedSMM, val tuning: PerformanceTuning = Defaults.tuning, @@ -20,8 +20,8 @@ data class EnterpriseConfiguration( val traceTargetDirectory: Path = Defaults.traceTargetDirectory ) { internal object Defaults { - val externalBrokerConnectionConfiguration: ExternalBrokerConnectionConfiguration = ExternalBrokerConnectionConfiguration.DEFAULT - val externalBrokerBackupAddresses: List = emptyList() + val messagingServerConnectionConfiguration: MessagingServerConnectionConfiguration = MessagingServerConnectionConfiguration.DEFAULT + val messagingServerBackupAddresses: List = emptyList() val useMultiThreadedSMM: Boolean = true val tuning: PerformanceTuning = PerformanceTuning.default val enableCacheTracing: Boolean = false diff --git a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt index 721aef7c9b..f5396c70d3 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt @@ -44,7 +44,7 @@ import net.corda.node.services.config.schema.parsers.toURL import net.corda.node.services.config.schema.parsers.toUUID import net.corda.node.services.config.schema.parsers.validValue import net.corda.nodeapi.BrokerRpcSslOptions -import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration +import net.corda.nodeapi.internal.config.MessagingServerConnectionConfiguration import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.TransactionIsolationLevel @@ -272,8 +272,8 @@ internal object GraphiteOptionsSpec : Configuration.Specification("EnterpriseConfiguration") { private val mutualExclusionConfiguration by nested(MutualExclusionConfigurationSpec) - private val externalBrokerConnectionConfiguration by enum(ExternalBrokerConnectionConfiguration::class).optional().withDefaultValue(EnterpriseConfiguration.Defaults.externalBrokerConnectionConfiguration) - private val externalBrokerBackupAddresses by string().mapValid(::toNetworkHostAndPort).list().optional().withDefaultValue(EnterpriseConfiguration.Defaults.externalBrokerBackupAddresses) + private val messagingServerConnectionConfiguration by enum(MessagingServerConnectionConfiguration::class).optional().withDefaultValue(EnterpriseConfiguration.Defaults.messagingServerConnectionConfiguration) + private val messagingServerBackupAddresses by string().mapValid(::toNetworkHostAndPort).list().optional().withDefaultValue(EnterpriseConfiguration.Defaults.messagingServerBackupAddresses) private val useMultiThreadedSMM by boolean().optional().withDefaultValue(EnterpriseConfiguration.Defaults.useMultiThreadedSMM) private val tuning by nested(PerformanceTuningSpec).optional().withDefaultValue(EnterpriseConfiguration.Defaults.tuning) private val externalBridge by boolean().optional() @@ -284,8 +284,8 @@ internal object EnterpriseConfigurationSpec : Configuration.Specification { return valid(EnterpriseConfiguration( configuration[mutualExclusionConfiguration], - configuration[externalBrokerConnectionConfiguration], - configuration[externalBrokerBackupAddresses], + configuration[messagingServerConnectionConfiguration], + configuration[messagingServerBackupAddresses], configuration[messagingServerSslConfiguration], configuration[useMultiThreadedSMM], configuration[tuning], diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt index 98c1f239ad..e6106c6681 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/InternalRPCMessagingClient.kt @@ -7,11 +7,8 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.security.RPCSecurityManager -import net.corda.node.services.config.EnterpriseConfiguration import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_RPC_USER import net.corda.nodeapi.internal.ArtemisTcpTransport -import net.corda.nodeapi.internal.RoundRobinConnectionPolicy -import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ServerLocator diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 0a52e4d77b..5545daa88b 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -150,7 +150,7 @@ class P2PMessagingClient(val config: NodeConfiguration, private fun failoverCallback(event: FailoverEventType) { when (event) { FailoverEventType.FAILURE_DETECTED -> { - log.warn("Connection to the broker was lost. Starting ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} reconnect attempts.") + log.warn("Connection to the broker was lost. Trying to reconnect.") } FailoverEventType.FAILOVER_COMPLETED -> { log.info("Connection to broker re-established.") @@ -160,8 +160,8 @@ class P2PMessagingClient(val config: NodeConfiguration, } FailoverEventType.FAILOVER_FAILED -> state.locked { if (running) { - log.error("Could not reconnect to the broker after ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} attempts. Node is shutting down.") - Thread.sleep(config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis()) + log.error("Could not reconnect to the broker after ${config.enterpriseConfiguration.messagingServerConnectionConfiguration.reconnectAttempts} attempts. Node is shutting down.") + Thread.sleep(config.enterpriseConfiguration.messagingServerConnectionConfiguration.retryInterval.toMillis()) Runtime.getRuntime().halt(1) } } @@ -192,10 +192,10 @@ class P2PMessagingClient(val config: NodeConfiguration, config.p2pSslOptions } val tcpTransport = p2pConnectorTcpTransport(serverAddress, sslOptions) - val backupTransports = p2pConnectorTcpTransportFromList(config.enterpriseConfiguration.externalBrokerBackupAddresses, sslOptions) + val backupTransports = p2pConnectorTcpTransportFromList(config.enterpriseConfiguration.messagingServerBackupAddresses, sslOptions) log.info("Connecting to message broker: $serverAddress") if (backupTransports.isNotEmpty()) { - log.info("Back-up message broker addresses: ${config.enterpriseConfiguration.externalBrokerBackupAddresses}") + log.info("Back-up message broker addresses: ${config.enterpriseConfiguration.messagingServerBackupAddresses}") } // If back-up artemis addresses are configured, the locator will be created using HA mode. locator = ActiveMQClient.createServerLocator(backupTransports.isNotEmpty(), *(listOf(tcpTransport) + backupTransports).toTypedArray()).apply { @@ -211,12 +211,12 @@ class P2PMessagingClient(val config: NodeConfiguration, // Configuration for dealing with external broker failover if (config.messagingServerExternal) { connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName - reconnectAttempts = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts - retryInterval = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis() - retryIntervalMultiplier = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryIntervalMultiplier - maxRetryInterval = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.maxRetryInterval.toMillis() - isFailoverOnInitialConnection = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.failoverOnInitialAttempt - initialConnectAttempts = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.initialConnectAttempts + reconnectAttempts = config.enterpriseConfiguration.messagingServerConnectionConfiguration.reconnectAttempts + retryInterval = config.enterpriseConfiguration.messagingServerConnectionConfiguration.retryInterval.toMillis() + retryIntervalMultiplier = config.enterpriseConfiguration.messagingServerConnectionConfiguration.retryIntervalMultiplier + maxRetryInterval = config.enterpriseConfiguration.messagingServerConnectionConfiguration.maxRetryInterval.toMillis() + isFailoverOnInitialConnection = config.enterpriseConfiguration.messagingServerConnectionConfiguration.failoverOnInitialAttempt + initialConnectAttempts = config.enterpriseConfiguration.messagingServerConnectionConfiguration.initialConnectAttempts } }