diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 290e687bc4..39feba07ee 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -81,7 +81,8 @@ path to the node's base directory. .. note:: In practice the ArtemisMQ messaging services bind to all local addresses on the specified port. However, note that the host is the included as the advertised entry in the NetworkMapService. As a result the value listed - here must be externally accessible when running nodes across a cluster of machines. + here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable, + the node will try to auto-discover its public one. :rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt index 0f16c2e056..b56f52239a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt @@ -30,6 +30,7 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() { const val INTERNAL_PREFIX = "internal." const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." const val SERVICES_PREFIX = "${INTERNAL_PREFIX}services." + const val IP_REQUEST_PREFIX = "ip." const val P2P_QUEUE = "p2p.inbound" const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications" const val NETWORK_MAP_QUEUE = "${INTERNAL_PREFIX}networkmap" diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index d20b64f167..c2847fc740 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -7,19 +7,24 @@ import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture import net.corda.core.flatMap import net.corda.core.messaging.RPCOps +import net.corda.core.minutes import net.corda.core.node.ServiceHub import net.corda.core.node.VersionInfo import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType import net.corda.core.node.services.UniquenessProvider +import net.corda.core.seconds import net.corda.core.success import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.trace import net.corda.node.printBasicNodeInfo import net.corda.node.serialization.NodeClock import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectRequestProperty +import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectResponseProperty import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.transactions.PersistentUniquenessProvider @@ -28,10 +33,19 @@ import net.corda.node.services.transactions.RaftUniquenessProvider import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.utilities.AddressUtils import net.corda.node.utilities.AffinityExecutor +import net.corda.nodeapi.ArtemisMessagingComponent.Companion.IP_REQUEST_PREFIX +import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress +import net.corda.nodeapi.ArtemisTcpTransport +import net.corda.nodeapi.ConnectionDirection +import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException +import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import org.apache.activemq.artemis.api.core.client.ClientMessage import org.bouncycastle.asn1.x500.X500Name import org.slf4j.Logger +import java.io.IOException import java.time.Clock +import java.util.* import javax.management.ObjectName import kotlin.concurrent.thread @@ -105,6 +119,7 @@ class Node(override val configuration: FullNodeConfiguration, override fun makeMessagingService(): MessagingService { userService = RPCUserServiceImpl(configuration.rpcUsers) val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker() + val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null return NodeMessagingClient( configuration, @@ -114,7 +129,8 @@ class Node(override val configuration: FullNodeConfiguration, serverThread, database, networkMapRegistrationFuture, - services.monitoringService) + services.monitoringService, + configuration.messagingServerAddress == null) } private fun makeLocalMessageBroker(): HostAndPort { @@ -128,25 +144,68 @@ class Node(override val configuration: FullNodeConfiguration, /** * Checks whether the specified [host] is a public IP address or hostname. If not, tries to discover the current - * machine's public IP address to be used instead. Note that it will only work if the machine is internet-facing. - * If none found, outputs a warning message. + * machine's public IP address to be used instead. It first looks through the network interfaces, and if no public IP + * is found, asks the network map service to provide it. */ private fun tryDetectIfNotPublicHost(host: String): String? { if (!AddressUtils.isPublic(host)) { val foundPublicIP = AddressUtils.tryDetectPublicIP() + if (foundPublicIP == null) { - val message = "The specified messaging host \"$host\" is private, " + - "this node will not be reachable by any other nodes outside the private network." - println("WARNING: $message") - log.warn(message) + networkMapAddress?.let { return discoverPublicHost(it.hostAndPort) } } else { - log.info("Detected public IP: $foundPublicIP. This will be used instead the provided \"$host\" as the advertised address.") + log.info("Detected public IP: ${foundPublicIP.hostAddress}. This will be used instead of the provided \"$host\" as the advertised address.") + return foundPublicIP.hostAddress } - return foundPublicIP?.hostAddress } return null } + /** + * Asks the network map service to provide this node's public IP address: + * - Connects to the network map service's message broker and creates a special IP request queue with a custom + * request id. Marks the established session with the same request id. + * - On the server side a special post-queue-creation callback is fired. It finds the session matching the request id + * encoded in the queue name. It then extracts the remote IP from the session details and posts a message containing + * it back to the queue. + * - Once the message is received the session is closed and the queue deleted. + */ + private fun discoverPublicHost(serverAddress: HostAndPort): String? { + log.trace { "Trying to detect public hostname through the Network Map Service at $serverAddress" } + val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, configuration) + val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { + initialConnectAttempts = 5 + retryInterval = 5.seconds.toMillis() + retryIntervalMultiplier = 1.5 + maxRetryInterval = 3.minutes.toMillis() + } + val clientFactory = try { + locator.createSessionFactory() + } catch (e: ActiveMQNotConnectedException) { + throw IOException("Unable to connect to the Network Map Service at $serverAddress for IP address discovery", e) + } + + val session = clientFactory.createSession(PEER_USER, PEER_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) + val requestId = UUID.randomUUID().toString() + session.addMetaData(ipDetectRequestProperty, requestId) + session.start() + + val queueName = "$IP_REQUEST_PREFIX$requestId" + session.createQueue(queueName, queueName, false) + + val consumer = session.createConsumer(queueName) + val artemisMessage: ClientMessage = consumer.receive(10.seconds.toMillis()) ?: + throw IOException("Did not receive a response from the Network Map Service at $serverAddress") + val publicHostAndPort = HostAndPort.fromString(artemisMessage.getStringProperty(ipDetectResponseProperty)) + log.info("Detected public address: $publicHostAndPort") + + consumer.close() + session.deleteQueue(queueName) + clientFactory.close() + + return publicHostAndPort.host.removePrefix("/") + } + override fun startMessagingService(rpcOps: RPCOps) { // Start up the embedded MQ server messageBroker?.apply { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index d8544b5e0e..9df1d8f2ed 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -37,6 +37,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor import org.apache.activemq.artemis.core.security.Role import org.apache.activemq.artemis.core.server.ActiveMQServer import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl +import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl +import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy import org.apache.activemq.artemis.core.settings.impl.AddressSettings import org.apache.activemq.artemis.spi.core.remoting.* @@ -91,6 +93,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, private val log = loggerFor() /** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */ @JvmStatic val MAX_FILE_SIZE = 10485760 + + val ipDetectRequestProperty = "ip-request-id" + val ipDetectResponseProperty = "ip-address" } private class InnerState { @@ -107,6 +112,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, */ val networkMapConnectionFuture: SettableFuture? get() = _networkMapConnectionFuture private var networkChangeHandle: Subscription? = null + private val nodeRunsNetworkMapService = config.networkMapService == null init { config.baseDirectory.expectedOnDefaultFileSystem() @@ -138,14 +144,15 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, // Artemis IO errors @Throws(IOException::class, KeyStoreException::class) private fun configureAndStartServer() { - val config = createArtemisConfig() + val artemisConfig = createArtemisConfig() val securityManager = createArtemisSecurityManager() - activeMQServer = ActiveMQServerImpl(config, securityManager).apply { + activeMQServer = ActiveMQServerImpl(artemisConfig, securityManager).apply { // Throw any exceptions which are detected during startup registerActivationFailureListener { exception -> throw exception } // Some types of queue might need special preparation on our side, like dialling back or preparing // a lazily initialised subsystem. registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) } + if (nodeRunsNetworkMapService) registerPostQueueCreationCallback { handleIpDetectionRequest(it.toString()) } registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } } activeMQServer.start() @@ -228,6 +235,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true)) // TODO remove the NODE_USER role once the webserver doesn't need it securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$NODE_USER.#"] = setOf(nodeInternalRole) + if (nodeRunsNetworkMapService) { + securityRoles["$IP_REQUEST_PREFIX*"] = setOf( + nodeInternalRole, + restrictedRole(PEER_ROLE, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true) + ) + } for ((username) in userService.users) { securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#"] = setOf( nodeInternalRole, @@ -425,6 +438,31 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, _networkMapConnectionFuture!!.set(Unit) } } + + private fun handleIpDetectionRequest(queueName: String) { + fun getRemoteAddress(requestId: String): String? { + val session = activeMQServer.sessions.first { + it.getMetaData(ipDetectRequestProperty) == requestId + } + return session.remotingConnection.remoteAddress + } + + fun sendResponse(remoteAddress: String?) { + val responseMessage = ServerMessageImpl(random63BitValue(), 0).apply { + putStringProperty(ipDetectResponseProperty, remoteAddress) + } + val routingContext = RoutingContextImpl(null) + val queue = activeMQServer.locateQueue(SimpleString(queueName)) + queue.route(responseMessage, routingContext) + activeMQServer.postOffice.processRoute(responseMessage, routingContext, true) + } + + if (!queueName.startsWith(IP_REQUEST_PREFIX)) return + val requestId = queueName.substringAfter(IP_REQUEST_PREFIX) + val remoteAddress = getRemoteAddress(requestId) + log.debug { "Detected remote address $remoteAddress for request $requestId" } + sendResponse(remoteAddress) + } } class VerifyingNettyConnectorFactory : NettyConnectorFactory() { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index d879e0b8ce..416c4cbaa0 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -24,7 +24,10 @@ import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService import net.corda.node.utilities.* -import net.corda.nodeapi.* +import net.corda.nodeapi.ArtemisMessagingComponent +import net.corda.nodeapi.ArtemisTcpTransport +import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.VerifierApi import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException @@ -58,10 +61,11 @@ import javax.annotation.concurrent.ThreadSafe * invoke methods on the provided implementation. There is more documentation on this in the docsite and the * CordaRPCClient class. * - * @param serverHostPort The address of the broker instance to connect to (might be running in the same process) + * @param serverHostPort The address of the broker instance to connect to (might be running in the same process). * @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate - * that this is a NetworkMapService node which will be bound globally to the name "networkmap" + * that this is a NetworkMapService node which will be bound globally to the name "networkmap". * @param nodeExecutor An executor to run received message tasks upon. + * @param isServerLocal Specify `true` if the provided [serverHostPort] is a locally running broker instance. */ @ThreadSafe class NodeMessagingClient(override val config: NodeConfiguration, @@ -71,7 +75,8 @@ class NodeMessagingClient(override val config: NodeConfiguration, val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, val database: Database, val networkMapRegistrationFuture: ListenableFuture, - val monitoringService: MonitoringService + val monitoringService: MonitoringService, + val isServerLocal: Boolean = true ) : ArtemisMessagingComponent(), MessagingService { companion object { private val log = loggerFor() @@ -153,9 +158,11 @@ class NodeMessagingClient(override val config: NodeConfiguration, check(!started) { "start can't be called twice" } started = true - log.info("Connecting to server: $serverHostPort") + val serverAddress = getBrokerAddress() + + log.info("Connecting to server: $serverAddress") // TODO Add broker CN to config for host verification in case the embedded broker isn't used - val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverHostPort, config) + val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) locator.minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE sessionFactory = locator.createSessionFactory() @@ -205,6 +212,13 @@ class NodeMessagingClient(override val config: NodeConfiguration, resumeMessageRedelivery() } + /** + * If the message broker is running locally and [serverHostPort] specifies a public IP, the messaging client will + * fail to connect on nodes under a NAT with no loopback support. As the local message broker is listening on + * all interfaces it is safer to always use `localhost` instead. + */ + private fun getBrokerAddress() = if (isServerLocal) HostAndPort.fromParts("localhost", serverHostPort.port) else serverHostPort + /** * We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close * the original and make another without a filter. We do this so that there is a network map in place for all other