Add support for different internal p2p artemis bind address/port (#2951)

* Add support for different internal p2p artemis bind address/port and externally advertised p2pAddress and port.

* Fix formatting
This commit is contained in:
Matthew Nesbit 2018-04-11 10:33:17 +01:00 committed by GitHub
parent ecce64ba03
commit 70beffac48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 19 additions and 39 deletions

View File

@ -179,7 +179,7 @@ class AMQPBridgeTest {
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
} }
artemisConfig.configureWithDevSSLCertificate() artemisConfig.configureWithDevSSLCertificate()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE) val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE) val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
artemisServer.start() artemisServer.start()
artemisClient.start() artemisClient.start()

View File

@ -227,7 +227,7 @@ class ProtonWrapperTests {
} }
artemisConfig.configureWithDevSSLCertificate() artemisConfig.configureWithDevSSLCertificate()
val server = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE) val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), MAX_MESSAGE_SIZE)
val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE) val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE)
server.start() server.start()
client.start() client.start()

View File

@ -23,8 +23,8 @@ import net.corda.node.VersionInfo
import net.corda.node.internal.artemis.ArtemisBroker import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser
import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser
import net.corda.node.serialization.KryoServerSerializationScheme import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchemaService import net.corda.node.services.api.SchemaService
@ -160,14 +160,18 @@ open class Node(configuration: NodeConfiguration,
networkParameters: NetworkParameters): MessagingService { networkParameters: NetworkParameters): MessagingService {
// Construct security manager reading users data either from the 'security' config section // Construct security manager reading users data either from the 'security' config section
// if present or from rpcUsers list if the former is missing from config. // if present or from rpcUsers list if the former is missing from config.
val securityManagerConfig = configuration.security?.authService ?: val securityManagerConfig = configuration.security?.authService ?: SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers)
SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers)
securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) { securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) {
if (configuration.shouldInitCrashShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this if (configuration.shouldInitCrashShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this
} }
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker(networkParameters) if (!configuration.messagingServerExternal) {
val brokerBindAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port)
messageBroker = ArtemisMessagingServer(configuration, brokerBindAddress, MAX_FILE_SIZE)
}
val serverAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("localhost", configuration.p2pAddress.port)
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) { val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) {
BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress)
} else { } else {
@ -183,7 +187,7 @@ open class Node(configuration: NodeConfiguration,
printBasicNodeInfo("RPC admin connection address", it.admin.toString()) printBasicNodeInfo("RPC admin connection address", it.admin.toString())
} }
verifierMessagingClient = when (configuration.verifierType) { verifierMessagingClient = when (configuration.verifierType) {
VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE)
VerifierType.InMemory -> null VerifierType.InMemory -> null
} }
require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" } require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
@ -220,13 +224,6 @@ open class Node(configuration: NodeConfiguration,
} }
} }
private fun makeLocalMessageBroker(networkParameters: NetworkParameters): NetworkHostAndPort {
with(configuration) {
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE)
return NetworkHostAndPort("localhost", p2pAddress.port)
}
}
override fun myAddresses(): List<NetworkHostAndPort> = listOf(getAdvertisedAddress()) override fun myAddresses(): List<NetworkHostAndPort> = listOf(getAdvertisedAddress())
private fun getAdvertisedAddress(): NetworkHostAndPort { private fun getAdvertisedAddress(): NetworkHostAndPort {

View File

@ -37,11 +37,11 @@ interface NodeConfiguration : NodeSSLConfiguration {
val verifierType: VerifierType val verifierType: VerifierType
val messageRedeliveryDelaySeconds: Int val messageRedeliveryDelaySeconds: Int
val notary: NotaryConfig? val notary: NotaryConfig?
val activeMQServer: ActiveMqServerConfiguration
val additionalNodeInfoPollingFrequencyMsec: Long val additionalNodeInfoPollingFrequencyMsec: Long
val p2pAddress: NetworkHostAndPort val p2pAddress: NetworkHostAndPort
val rpcOptions: NodeRpcOptions val rpcOptions: NodeRpcOptions
val messagingServerAddress: NetworkHostAndPort? val messagingServerAddress: NetworkHostAndPort?
val messagingServerExternal: Boolean
// TODO Move into DevModeOptions // TODO Move into DevModeOptions
val useTestClock: Boolean get() = false val useTestClock: Boolean get() = false
val detectPublicIp: Boolean get() = true val detectPublicIp: Boolean get() = true
@ -107,12 +107,6 @@ data class BFTSMaRtConfiguration(
} }
} }
data class BridgeConfiguration(val retryIntervalMs: Long,
val maxRetryIntervalMin: Long,
val retryIntervalMultiplier: Double)
data class ActiveMqServerConfiguration(val bridge: BridgeConfiguration)
fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs<NodeConfigurationImpl>() fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs<NodeConfigurationImpl>()
data class NodeConfigurationImpl( data class NodeConfigurationImpl(
@ -134,9 +128,8 @@ data class NodeConfigurationImpl(
override val p2pAddress: NetworkHostAndPort, override val p2pAddress: NetworkHostAndPort,
private val rpcAddress: NetworkHostAndPort? = null, private val rpcAddress: NetworkHostAndPort? = null,
private val rpcSettings: NodeRpcSettings, private val rpcSettings: NodeRpcSettings,
// TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker.
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
override val messagingServerAddress: NetworkHostAndPort?, override val messagingServerAddress: NetworkHostAndPort?,
override val messagingServerExternal: Boolean = (messagingServerAddress != null),
override val notary: NotaryConfig?, override val notary: NotaryConfig?,
override val certificateChainCheckPolicies: List<CertChainPolicyConfig>, override val certificateChainCheckPolicies: List<CertChainPolicyConfig>,
override val devMode: Boolean = false, override val devMode: Boolean = false,
@ -144,7 +137,6 @@ data class NodeConfigurationImpl(
override val devModeOptions: DevModeOptions? = null, override val devModeOptions: DevModeOptions? = null,
override val useTestClock: Boolean = false, override val useTestClock: Boolean = false,
override val detectPublicIp: Boolean = true, override val detectPublicIp: Boolean = true,
override val activeMQServer: ActiveMqServerConfiguration,
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration // TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(), override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
override val sshd: SSHDConfiguration? = null, override val sshd: SSHDConfiguration? = null,

View File

@ -8,7 +8,6 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.node.internal.Node
import net.corda.node.internal.artemis.ArtemisBroker import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.artemis.CertificateChainCheckPolicy import net.corda.node.internal.artemis.CertificateChainCheckPolicy
@ -25,7 +24,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
import net.corda.nodeapi.internal.requireOnDefaultFileSystem import net.corda.nodeapi.internal.requireOnDefaultFileSystem
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
@ -70,11 +68,12 @@ import javax.security.auth.spi.LoginModule
*/ */
@ThreadSafe @ThreadSafe
class ArtemisMessagingServer(private val config: NodeConfiguration, class ArtemisMessagingServer(private val config: NodeConfiguration,
private val p2pPort: Int, private val messagingServerAddress: NetworkHostAndPort,
val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() { val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
} }
private class InnerState { private class InnerState {
var running = false var running = false
} }
@ -120,7 +119,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
} }
// Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges. // Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges.
activeMQServer.start() activeMQServer.start()
log.info("P2P messaging server listening on port $p2pPort") log.info("P2P messaging server listening on $messagingServerAddress")
} }
private fun createArtemisConfig() = SecureArtemisConfiguration().apply { private fun createArtemisConfig() = SecureArtemisConfiguration().apply {
@ -131,7 +130,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
val connectionDirection = ConnectionDirection.Inbound( val connectionDirection = ConnectionDirection.Inbound(
acceptorFactoryClassName = NettyAcceptorFactory::class.java.name acceptorFactoryClassName = NettyAcceptorFactory::class.java.name
) )
val acceptors = mutableSetOf(createTcpTransport(connectionDirection, "0.0.0.0", p2pPort)) val acceptors = mutableSetOf(createTcpTransport(connectionDirection, messagingServerAddress.host, messagingServerAddress.port))
acceptorConfigurations = acceptors acceptorConfigurations = acceptors
// Enable built in message deduplication. Note we still have to do our own as the delayed commits // Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates. // and our own definition of commit mean that the built in deduplication cannot remove all duplicates.

View File

@ -16,13 +16,6 @@ devMode = true
h2port = 0 h2port = 0
useTestClock = false useTestClock = false
verifierType = InMemory verifierType = InMemory
activeMQServer = {
bridge = {
retryIntervalMs = 5000
retryIntervalMultiplier = 1.5
maxRetryIntervalMin = 3
}
}
rpcSettings = { rpcSettings = {
useSsl = false useSsl = false
standAloneBroker = false standAloneBroker = false

View File

@ -2,9 +2,9 @@ package net.corda.node.services.config
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.tools.shell.SSHDConfiguration
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.tools.shell.SSHDConfiguration
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test import org.junit.Test
import java.nio.file.Paths import java.nio.file.Paths
@ -77,7 +77,6 @@ class NodeConfigurationImplTest {
certificateChainCheckPolicies = emptyList(), certificateChainCheckPolicies = emptyList(),
devMode = true, devMode = true,
noLocalShell = false, noLocalShell = false,
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)),
rpcSettings = rpcSettings rpcSettings = rpcSettings
) )
} }

View File

@ -183,7 +183,7 @@ class ArtemisMessagingTest {
} }
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer { private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, maxMessageSize).apply { return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize).apply {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
messagingServer = this messagingServer = this
} }