From 70beffac48c943af69c51eb85593addbb230d81c Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 11 Apr 2018 10:33:17 +0100 Subject: [PATCH] Add support for different internal p2p artemis bind address/port (#2951) * Add support for different internal p2p artemis bind address/port and externally advertised p2pAddress and port. * Fix formatting --- .../net/corda/node/amqp/AMQPBridgeTest.kt | 2 +- .../net/corda/node/amqp/ProtonWrapperTests.kt | 2 +- .../kotlin/net/corda/node/internal/Node.kt | 21 ++++++++----------- .../node/services/config/NodeConfiguration.kt | 12 ++--------- .../messaging/ArtemisMessagingServer.kt | 9 ++++---- node/src/main/resources/reference.conf | 7 ------- .../config/NodeConfigurationImplTest.kt | 3 +-- .../messaging/ArtemisMessagingTest.kt | 2 +- 8 files changed, 19 insertions(+), 39 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 1b5dd8f5e0..93b18d7983 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -179,7 +179,7 @@ class AMQPBridgeTest { doReturn(emptyList()).whenever(it).certificateChainCheckPolicies } artemisConfig.configureWithDevSSLCertificate() - val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE) + val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), MAX_MESSAGE_SIZE) val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE) artemisServer.start() artemisClient.start() diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 8262f0b30b..bbac2dcb48 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -227,7 +227,7 @@ class ProtonWrapperTests { } artemisConfig.configureWithDevSSLCertificate() - val server = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE) + val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), MAX_MESSAGE_SIZE) val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE) server.start() client.start() diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 66d4308215..0c05f24bc7 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -23,8 +23,8 @@ import net.corda.node.VersionInfo import net.corda.node.internal.artemis.ArtemisBroker import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.cordapp.CordappLoader -import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser import net.corda.node.internal.security.RPCSecurityManagerImpl +import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser import net.corda.node.serialization.KryoServerSerializationScheme import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.SchemaService @@ -160,14 +160,18 @@ open class Node(configuration: NodeConfiguration, networkParameters: NetworkParameters): MessagingService { // Construct security manager reading users data either from the 'security' config section // if present or from rpcUsers list if the former is missing from config. - val securityManagerConfig = configuration.security?.authService ?: - SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers) + val securityManagerConfig = configuration.security?.authService ?: SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers) securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) { if (configuration.shouldInitCrashShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this } - val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker(networkParameters) + if (!configuration.messagingServerExternal) { + val brokerBindAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port) + messageBroker = ArtemisMessagingServer(configuration, brokerBindAddress, MAX_FILE_SIZE) + } + + val serverAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("localhost", configuration.p2pAddress.port) val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) { BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) } else { @@ -183,7 +187,7 @@ open class Node(configuration: NodeConfiguration, printBasicNodeInfo("RPC admin connection address", it.admin.toString()) } verifierMessagingClient = when (configuration.verifierType) { - VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) + VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) VerifierType.InMemory -> null } require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" } @@ -220,13 +224,6 @@ open class Node(configuration: NodeConfiguration, } } - private fun makeLocalMessageBroker(networkParameters: NetworkParameters): NetworkHostAndPort { - with(configuration) { - messageBroker = ArtemisMessagingServer(this, p2pAddress.port, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) - return NetworkHostAndPort("localhost", p2pAddress.port) - } - } - override fun myAddresses(): List = listOf(getAdvertisedAddress()) private fun getAdvertisedAddress(): NetworkHostAndPort { diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 858b243aeb..83c6688595 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -37,11 +37,11 @@ interface NodeConfiguration : NodeSSLConfiguration { val verifierType: VerifierType val messageRedeliveryDelaySeconds: Int val notary: NotaryConfig? - val activeMQServer: ActiveMqServerConfiguration val additionalNodeInfoPollingFrequencyMsec: Long val p2pAddress: NetworkHostAndPort val rpcOptions: NodeRpcOptions val messagingServerAddress: NetworkHostAndPort? + val messagingServerExternal: Boolean // TODO Move into DevModeOptions val useTestClock: Boolean get() = false val detectPublicIp: Boolean get() = true @@ -107,12 +107,6 @@ data class BFTSMaRtConfiguration( } } -data class BridgeConfiguration(val retryIntervalMs: Long, - val maxRetryIntervalMin: Long, - val retryIntervalMultiplier: Double) - -data class ActiveMqServerConfiguration(val bridge: BridgeConfiguration) - fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs() data class NodeConfigurationImpl( @@ -134,9 +128,8 @@ data class NodeConfigurationImpl( override val p2pAddress: NetworkHostAndPort, private val rpcAddress: NetworkHostAndPort? = null, private val rpcSettings: NodeRpcSettings, - // TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker. - // Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one override val messagingServerAddress: NetworkHostAndPort?, + override val messagingServerExternal: Boolean = (messagingServerAddress != null), override val notary: NotaryConfig?, override val certificateChainCheckPolicies: List, override val devMode: Boolean = false, @@ -144,7 +137,6 @@ data class NodeConfigurationImpl( override val devModeOptions: DevModeOptions? = null, override val useTestClock: Boolean = false, override val detectPublicIp: Boolean = true, - override val activeMQServer: ActiveMqServerConfiguration, // TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(), override val sshd: SSHDConfiguration? = null, diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 326f5713aa..08e040fbea 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -8,7 +8,6 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug -import net.corda.node.internal.Node import net.corda.node.internal.artemis.ArtemisBroker import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.artemis.CertificateChainCheckPolicy @@ -25,7 +24,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER -import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress import net.corda.nodeapi.internal.requireOnDefaultFileSystem import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl @@ -70,11 +68,12 @@ import javax.security.auth.spi.LoginModule */ @ThreadSafe class ArtemisMessagingServer(private val config: NodeConfiguration, - private val p2pPort: Int, + private val messagingServerAddress: NetworkHostAndPort, val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() { companion object { private val log = contextLogger() } + private class InnerState { var running = false } @@ -120,7 +119,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, } // Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges. activeMQServer.start() - log.info("P2P messaging server listening on port $p2pPort") + log.info("P2P messaging server listening on $messagingServerAddress") } private fun createArtemisConfig() = SecureArtemisConfiguration().apply { @@ -131,7 +130,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, val connectionDirection = ConnectionDirection.Inbound( acceptorFactoryClassName = NettyAcceptorFactory::class.java.name ) - val acceptors = mutableSetOf(createTcpTransport(connectionDirection, "0.0.0.0", p2pPort)) + val acceptors = mutableSetOf(createTcpTransport(connectionDirection, messagingServerAddress.host, messagingServerAddress.port)) acceptorConfigurations = acceptors // Enable built in message deduplication. Note we still have to do our own as the delayed commits // and our own definition of commit mean that the built in deduplication cannot remove all duplicates. diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index f3c83ef9f7..a949a45a39 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -16,13 +16,6 @@ devMode = true h2port = 0 useTestClock = false verifierType = InMemory -activeMQServer = { - bridge = { - retryIntervalMs = 5000 - retryIntervalMultiplier = 1.5 - maxRetryIntervalMin = 3 - } -} rpcSettings = { useSsl = false standAloneBroker = false diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index 394ef4a303..c0c3b16bf1 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -2,9 +2,9 @@ package net.corda.node.services.config import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort -import net.corda.tools.shell.SSHDConfiguration import net.corda.testing.core.ALICE_NAME import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.tools.shell.SSHDConfiguration import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.Test import java.nio.file.Paths @@ -77,7 +77,6 @@ class NodeConfigurationImplTest { certificateChainCheckPolicies = emptyList(), devMode = true, noLocalShell = false, - activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)), rpcSettings = rpcSettings ) } diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 266db1c0cd..5dadb85eac 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -183,7 +183,7 @@ class ArtemisMessagingTest { } private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer { - return ArtemisMessagingServer(config, local, maxMessageSize).apply { + return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize).apply { config.configureWithDevSSLCertificate() messagingServer = this }