Node IP auto-detection (#666)

* If the p2pAddress provided by the configuration is not public, the node tries to discover the public IP:
- First by checking the network interfaces
- If not found, sends a request to the network map service

* Enable initial connection retry

* Improve error handling

* Update docs
This commit is contained in:
Andrius Dagys
2017-05-12 14:09:02 +01:00
committed by GitHub
parent abe568a7c8
commit 885f951dc3
5 changed files with 131 additions and 18 deletions

View File

@ -81,7 +81,8 @@ path to the node's base directory.
.. note:: In practice the ArtemisMQ messaging services bind to all local addresses on the specified port. However, .. note:: In practice the ArtemisMQ messaging services bind to all local addresses on the specified port. However,
note that the host is the included as the advertised entry in the NetworkMapService. As a result the value listed note that the host is the included as the advertised entry in the NetworkMapService. As a result the value listed
here must be externally accessible when running nodes across a cluster of machines. here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable,
the node will try to auto-discover its public one.
:rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. :rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC.

View File

@ -30,6 +30,7 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
const val INTERNAL_PREFIX = "internal." const val INTERNAL_PREFIX = "internal."
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers."
const val SERVICES_PREFIX = "${INTERNAL_PREFIX}services." const val SERVICES_PREFIX = "${INTERNAL_PREFIX}services."
const val IP_REQUEST_PREFIX = "ip."
const val P2P_QUEUE = "p2p.inbound" const val P2P_QUEUE = "p2p.inbound"
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications" const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
const val NETWORK_MAP_QUEUE = "${INTERNAL_PREFIX}networkmap" const val NETWORK_MAP_QUEUE = "${INTERNAL_PREFIX}networkmap"

View File

@ -7,19 +7,24 @@ import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.flatMap import net.corda.core.flatMap
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.minutes
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.node.VersionInfo import net.corda.core.node.VersionInfo
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider import net.corda.core.node.services.UniquenessProvider
import net.corda.core.seconds
import net.corda.core.success import net.corda.core.success
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.printBasicNodeInfo import net.corda.node.printBasicNodeInfo
import net.corda.node.serialization.NodeClock import net.corda.node.serialization.NodeClock
import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectRequestProperty
import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectResponseProperty
import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
@ -28,10 +33,19 @@ import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.AddressUtils import net.corda.node.utilities.AddressUtils
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.IP_REQUEST_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import org.slf4j.Logger import org.slf4j.Logger
import java.io.IOException
import java.time.Clock import java.time.Clock
import java.util.*
import javax.management.ObjectName import javax.management.ObjectName
import kotlin.concurrent.thread import kotlin.concurrent.thread
@ -105,6 +119,7 @@ class Node(override val configuration: FullNodeConfiguration,
override fun makeMessagingService(): MessagingService { override fun makeMessagingService(): MessagingService {
userService = RPCUserServiceImpl(configuration.rpcUsers) userService = RPCUserServiceImpl(configuration.rpcUsers)
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker() val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null
return NodeMessagingClient( return NodeMessagingClient(
configuration, configuration,
@ -114,7 +129,8 @@ class Node(override val configuration: FullNodeConfiguration,
serverThread, serverThread,
database, database,
networkMapRegistrationFuture, networkMapRegistrationFuture,
services.monitoringService) services.monitoringService,
configuration.messagingServerAddress == null)
} }
private fun makeLocalMessageBroker(): HostAndPort { private fun makeLocalMessageBroker(): HostAndPort {
@ -128,25 +144,68 @@ class Node(override val configuration: FullNodeConfiguration,
/** /**
* Checks whether the specified [host] is a public IP address or hostname. If not, tries to discover the current * Checks whether the specified [host] is a public IP address or hostname. If not, tries to discover the current
* machine's public IP address to be used instead. Note that it will only work if the machine is internet-facing. * machine's public IP address to be used instead. It first looks through the network interfaces, and if no public IP
* If none found, outputs a warning message. * is found, asks the network map service to provide it.
*/ */
private fun tryDetectIfNotPublicHost(host: String): String? { private fun tryDetectIfNotPublicHost(host: String): String? {
if (!AddressUtils.isPublic(host)) { if (!AddressUtils.isPublic(host)) {
val foundPublicIP = AddressUtils.tryDetectPublicIP() val foundPublicIP = AddressUtils.tryDetectPublicIP()
if (foundPublicIP == null) { if (foundPublicIP == null) {
val message = "The specified messaging host \"$host\" is private, " + networkMapAddress?.let { return discoverPublicHost(it.hostAndPort) }
"this node will not be reachable by any other nodes outside the private network."
println("WARNING: $message")
log.warn(message)
} else { } else {
log.info("Detected public IP: $foundPublicIP. This will be used instead the provided \"$host\" as the advertised address.") log.info("Detected public IP: ${foundPublicIP.hostAddress}. This will be used instead of the provided \"$host\" as the advertised address.")
return foundPublicIP.hostAddress
} }
return foundPublicIP?.hostAddress
} }
return null return null
} }
/**
* Asks the network map service to provide this node's public IP address:
* - Connects to the network map service's message broker and creates a special IP request queue with a custom
* request id. Marks the established session with the same request id.
* - On the server side a special post-queue-creation callback is fired. It finds the session matching the request id
* encoded in the queue name. It then extracts the remote IP from the session details and posts a message containing
* it back to the queue.
* - Once the message is received the session is closed and the queue deleted.
*/
private fun discoverPublicHost(serverAddress: HostAndPort): String? {
log.trace { "Trying to detect public hostname through the Network Map Service at $serverAddress" }
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, configuration)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
initialConnectAttempts = 5
retryInterval = 5.seconds.toMillis()
retryIntervalMultiplier = 1.5
maxRetryInterval = 3.minutes.toMillis()
}
val clientFactory = try {
locator.createSessionFactory()
} catch (e: ActiveMQNotConnectedException) {
throw IOException("Unable to connect to the Network Map Service at $serverAddress for IP address discovery", e)
}
val session = clientFactory.createSession(PEER_USER, PEER_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
val requestId = UUID.randomUUID().toString()
session.addMetaData(ipDetectRequestProperty, requestId)
session.start()
val queueName = "$IP_REQUEST_PREFIX$requestId"
session.createQueue(queueName, queueName, false)
val consumer = session.createConsumer(queueName)
val artemisMessage: ClientMessage = consumer.receive(10.seconds.toMillis()) ?:
throw IOException("Did not receive a response from the Network Map Service at $serverAddress")
val publicHostAndPort = HostAndPort.fromString(artemisMessage.getStringProperty(ipDetectResponseProperty))
log.info("Detected public address: $publicHostAndPort")
consumer.close()
session.deleteQueue(queueName)
clientFactory.close()
return publicHostAndPort.host.removePrefix("/")
}
override fun startMessagingService(rpcOps: RPCOps) { override fun startMessagingService(rpcOps: RPCOps) {
// Start up the embedded MQ server // Start up the embedded MQ server
messageBroker?.apply { messageBroker?.apply {

View File

@ -37,6 +37,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
import org.apache.activemq.artemis.core.security.Role import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.spi.core.remoting.* import org.apache.activemq.artemis.spi.core.remoting.*
@ -91,6 +93,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val log = loggerFor<ArtemisMessagingServer>() private val log = loggerFor<ArtemisMessagingServer>()
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */ /** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
@JvmStatic val MAX_FILE_SIZE = 10485760 @JvmStatic val MAX_FILE_SIZE = 10485760
val ipDetectRequestProperty = "ip-request-id"
val ipDetectResponseProperty = "ip-address"
} }
private class InnerState { private class InnerState {
@ -107,6 +112,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
*/ */
val networkMapConnectionFuture: SettableFuture<Unit>? get() = _networkMapConnectionFuture val networkMapConnectionFuture: SettableFuture<Unit>? get() = _networkMapConnectionFuture
private var networkChangeHandle: Subscription? = null private var networkChangeHandle: Subscription? = null
private val nodeRunsNetworkMapService = config.networkMapService == null
init { init {
config.baseDirectory.expectedOnDefaultFileSystem() config.baseDirectory.expectedOnDefaultFileSystem()
@ -138,14 +144,15 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
// Artemis IO errors // Artemis IO errors
@Throws(IOException::class, KeyStoreException::class) @Throws(IOException::class, KeyStoreException::class)
private fun configureAndStartServer() { private fun configureAndStartServer() {
val config = createArtemisConfig() val artemisConfig = createArtemisConfig()
val securityManager = createArtemisSecurityManager() val securityManager = createArtemisSecurityManager()
activeMQServer = ActiveMQServerImpl(config, securityManager).apply { activeMQServer = ActiveMQServerImpl(artemisConfig, securityManager).apply {
// Throw any exceptions which are detected during startup // Throw any exceptions which are detected during startup
registerActivationFailureListener { exception -> throw exception } registerActivationFailureListener { exception -> throw exception }
// Some types of queue might need special preparation on our side, like dialling back or preparing // Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem. // a lazily initialised subsystem.
registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) } registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) }
if (nodeRunsNetworkMapService) registerPostQueueCreationCallback { handleIpDetectionRequest(it.toString()) }
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
} }
activeMQServer.start() activeMQServer.start()
@ -228,6 +235,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true)) securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
// TODO remove the NODE_USER role once the webserver doesn't need it // TODO remove the NODE_USER role once the webserver doesn't need it
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$NODE_USER.#"] = setOf(nodeInternalRole) securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$NODE_USER.#"] = setOf(nodeInternalRole)
if (nodeRunsNetworkMapService) {
securityRoles["$IP_REQUEST_PREFIX*"] = setOf(
nodeInternalRole,
restrictedRole(PEER_ROLE, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)
)
}
for ((username) in userService.users) { for ((username) in userService.users) {
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#"] = setOf( securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#"] = setOf(
nodeInternalRole, nodeInternalRole,
@ -425,6 +438,31 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
_networkMapConnectionFuture!!.set(Unit) _networkMapConnectionFuture!!.set(Unit)
} }
} }
private fun handleIpDetectionRequest(queueName: String) {
fun getRemoteAddress(requestId: String): String? {
val session = activeMQServer.sessions.first {
it.getMetaData(ipDetectRequestProperty) == requestId
}
return session.remotingConnection.remoteAddress
}
fun sendResponse(remoteAddress: String?) {
val responseMessage = ServerMessageImpl(random63BitValue(), 0).apply {
putStringProperty(ipDetectResponseProperty, remoteAddress)
}
val routingContext = RoutingContextImpl(null)
val queue = activeMQServer.locateQueue(SimpleString(queueName))
queue.route(responseMessage, routingContext)
activeMQServer.postOffice.processRoute(responseMessage, routingContext, true)
}
if (!queueName.startsWith(IP_REQUEST_PREFIX)) return
val requestId = queueName.substringAfter(IP_REQUEST_PREFIX)
val remoteAddress = getRemoteAddress(requestId)
log.debug { "Detected remote address $remoteAddress for request $requestId" }
sendResponse(remoteAddress)
}
} }
class VerifyingNettyConnectorFactory : NettyConnectorFactory() { class VerifyingNettyConnectorFactory : NettyConnectorFactory() {

View File

@ -24,7 +24,10 @@ import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
import net.corda.node.utilities.* import net.corda.node.utilities.*
import net.corda.nodeapi.* import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
@ -58,10 +61,11 @@ import javax.annotation.concurrent.ThreadSafe
* invoke methods on the provided implementation. There is more documentation on this in the docsite and the * invoke methods on the provided implementation. There is more documentation on this in the docsite and the
* CordaRPCClient class. * CordaRPCClient class.
* *
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process) * @param serverHostPort The address of the broker instance to connect to (might be running in the same process).
* @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate * @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
* that this is a NetworkMapService node which will be bound globally to the name "networkmap" * that this is a NetworkMapService node which will be bound globally to the name "networkmap".
* @param nodeExecutor An executor to run received message tasks upon. * @param nodeExecutor An executor to run received message tasks upon.
* @param isServerLocal Specify `true` if the provided [serverHostPort] is a locally running broker instance.
*/ */
@ThreadSafe @ThreadSafe
class NodeMessagingClient(override val config: NodeConfiguration, class NodeMessagingClient(override val config: NodeConfiguration,
@ -71,7 +75,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
val database: Database, val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>, val networkMapRegistrationFuture: ListenableFuture<Unit>,
val monitoringService: MonitoringService val monitoringService: MonitoringService,
val isServerLocal: Boolean = true
) : ArtemisMessagingComponent(), MessagingService { ) : ArtemisMessagingComponent(), MessagingService {
companion object { companion object {
private val log = loggerFor<NodeMessagingClient>() private val log = loggerFor<NodeMessagingClient>()
@ -153,9 +158,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
check(!started) { "start can't be called twice" } check(!started) { "start can't be called twice" }
started = true started = true
log.info("Connecting to server: $serverHostPort") val serverAddress = getBrokerAddress()
log.info("Connecting to server: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used // TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverHostPort, config) val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
locator.minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE locator.minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
sessionFactory = locator.createSessionFactory() sessionFactory = locator.createSessionFactory()
@ -205,6 +212,13 @@ class NodeMessagingClient(override val config: NodeConfiguration,
resumeMessageRedelivery() resumeMessageRedelivery()
} }
/**
* If the message broker is running locally and [serverHostPort] specifies a public IP, the messaging client will
* fail to connect on nodes under a NAT with no loopback support. As the local message broker is listening on
* all interfaces it is safer to always use `localhost` instead.
*/
private fun getBrokerAddress() = if (isServerLocal) HostAndPort.fromParts("localhost", serverHostPort.port) else serverHostPort
/** /**
* We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close * We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close
* the original and make another without a filter. We do this so that there is a network map in place for all other * the original and make another without a filter. We do this so that there is a network map in place for all other