From 759cb6da0497858e8602750e515b36a5c5763e90 Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Wed, 24 May 2017 14:50:12 +0100 Subject: [PATCH] Pass ports instead of hostAndPorts to the message broker. Pass an address for the NodeMessagingClient to advertise to the network map service. --- .../kotlin/net/corda/node/internal/Node.kt | 32 +++++++++++++++---- .../messaging/ArtemisMessagingServer.kt | 16 +++++----- .../services/messaging/NodeMessagingClient.kt | 22 ++++--------- .../messaging/ArtemisMessagingTests.kt | 16 +++++----- .../kotlin/net/corda/testing/CoreTestUtils.kt | 13 +++++--- .../net/corda/testing/node/SimpleNode.kt | 2 +- 6 files changed, 59 insertions(+), 42 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 5795aa8627..dd4a480190 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -116,7 +116,17 @@ class Node(override val configuration: FullNodeConfiguration, override fun makeMessagingService(): MessagingService { userService = RPCUserServiceImpl(configuration.rpcUsers) - val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker() + + val (serverAddress, advertisedAddress) = with(configuration) { + if (messagingServerAddress != null) { + // External broker + messagingServerAddress to messagingServerAddress + } else { + makeLocalMessageBroker() to getAdvertisedAddress() + } + } + + printBasicNodeInfo("Incoming connection address:", advertisedAddress.toString()) val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null return NodeMessagingClient( @@ -128,15 +138,25 @@ class Node(override val configuration: FullNodeConfiguration, database, networkMapRegistrationFuture, services.monitoringService, - configuration.messagingServerAddress == null) + advertisedAddress) } private fun makeLocalMessageBroker(): HostAndPort { with(configuration) { - val useHost = tryDetectIfNotPublicHost(p2pAddress.host) - val useAddress = useHost?.let { HostAndPort.fromParts(it, p2pAddress.port) } ?: p2pAddress - messageBroker = ArtemisMessagingServer(this, useAddress, rpcAddress, services.networkMapCache, userService) - return useAddress + messageBroker = ArtemisMessagingServer(this, p2pAddress.port, rpcAddress?.port, services.networkMapCache, userService) + return HostAndPort.fromParts("localhost", p2pAddress.port) + } + } + + private fun getAdvertisedAddress(): HostAndPort { + return with(configuration) { + if (relay != null) { + HostAndPort.fromParts(relay.relayHost, relay.remoteInboundPort) + } else { + val publicHost = tryDetectIfNotPublicHost(p2pAddress.host) + val useHost = publicHost ?: p2pAddress.host + HostAndPort.fromParts(useHost, p2pAddress.port) + } } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 23c6e1a33a..eb5df6b7e7 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -85,8 +85,8 @@ import javax.security.cert.X509Certificate */ @ThreadSafe class ArtemisMessagingServer(override val config: NodeConfiguration, - val p2pHostPort: HostAndPort, - val rpcHostPort: HostAndPort?, + val p2pPort: Int, + val rpcPort: Int?, val networkMapCache: NetworkMapCache, val userService: RPCUserService) : ArtemisMessagingComponent() { companion object { @@ -156,9 +156,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } } activeMQServer.start() - printBasicNodeInfo("Listening on address", p2pHostPort.toString()) - if (rpcHostPort != null) { - printBasicNodeInfo("RPC service listening on address", rpcHostPort.toString()) + printBasicNodeInfo("Listening on port", p2pPort.toString()) + if (rpcPort != null) { + printBasicNodeInfo("RPC service listening on port", rpcPort.toString()) } } @@ -170,9 +170,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, val connectionDirection = ConnectionDirection.Inbound( acceptorFactoryClassName = NettyAcceptorFactory::class.java.name ) - val acceptors = mutableSetOf(createTcpTransport(connectionDirection, "0.0.0.0", p2pHostPort.port)) - if (rpcHostPort != null) { - acceptors.add(createTcpTransport(connectionDirection, "0.0.0.0", rpcHostPort.port, enableSSL = false)) + val acceptors = mutableSetOf(createTcpTransport(connectionDirection, "0.0.0.0", p2pPort)) + if (rpcPort != null) { + acceptors.add(createTcpTransport(connectionDirection, "0.0.0.0", rpcPort, enableSSL = false)) } acceptorConfigurations = acceptors // Enable built in message deduplication. Note we still have to do our own as the delayed commits diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index bebae60032..19e1802dc4 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -61,22 +61,23 @@ import javax.annotation.concurrent.ThreadSafe * invoke methods on the provided implementation. There is more documentation on this in the docsite and the * CordaRPCClient class. * - * @param serverHostPort The address of the broker instance to connect to (might be running in the same process). + * @param serverAddress The address of the broker instance to connect to (might be running in the same process). * @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate * that this is a NetworkMapService node which will be bound globally to the name "networkmap". * @param nodeExecutor An executor to run received message tasks upon. - * @param isServerLocal Specify `true` if the provided [serverHostPort] is a locally running broker instance. + * @param advertisedAddress The node address for inbound connections, advertised to the network map service and peers. + * If not provided, will default to [serverAddress]. */ @ThreadSafe class NodeMessagingClient(override val config: NodeConfiguration, val versionInfo: VersionInfo, - val serverHostPort: HostAndPort, + val serverAddress: HostAndPort, val myIdentity: PublicKey?, val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, val database: Database, val networkMapRegistrationFuture: ListenableFuture, val monitoringService: MonitoringService, - val isServerLocal: Boolean = true + advertisedAddress: HostAndPort = serverAddress ) : ArtemisMessagingComponent(), MessagingService { companion object { private val log = loggerFor() @@ -132,9 +133,9 @@ class NodeMessagingClient(override val config: NodeConfiguration, * Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache. */ override val myAddress: SingleMessageRecipient = if (myIdentity != null) { - NodeAddress.asPeer(myIdentity, serverHostPort) + NodeAddress.asPeer(myIdentity, advertisedAddress) } else { - NetworkMapAddress(serverHostPort) + NetworkMapAddress(advertisedAddress) } private val state = ThreadBox(InnerState()) @@ -158,8 +159,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, check(!started) { "start can't be called twice" } started = true - val serverAddress = getBrokerAddress() - log.info("Connecting to server: $serverAddress") // TODO Add broker CN to config for host verification in case the embedded broker isn't used val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config) @@ -216,13 +215,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, resumeMessageRedelivery() } - /** - * If the message broker is running locally and [serverHostPort] specifies a public IP, the messaging client will - * fail to connect on nodes under a NAT with no loopback support. As the local message broker is listening on - * all interfaces it is safer to always use `localhost` instead. - */ - private fun getBrokerAddress() = if (isServerLocal) HostAndPort.fromParts("localhost", serverHostPort.port) else serverHostPort - /** * We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close * the original and make another without a filter. We do this so that there is a network map in place for all other diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index 41184f80f8..41c4386930 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -24,10 +24,10 @@ import net.corda.node.utilities.transaction import net.corda.testing.MOCK_VERSION_INFO import net.corda.testing.TestNodeConfiguration import net.corda.testing.freeLocalHostAndPort +import net.corda.testing.freePort import net.corda.testing.node.makeTestDataSourceProperties import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy -import org.bouncycastle.asn1.x500.X500Name import org.jetbrains.exposed.sql.Database import org.junit.After import org.junit.Before @@ -46,8 +46,8 @@ import kotlin.test.assertNull class ArtemisMessagingTests { @Rule @JvmField val temporaryFolder = TemporaryFolder() - val hostAndPort = freeLocalHostAndPort() - val rpcHostAndPort = freeLocalHostAndPort() + val serverPort = freePort() + val rpcPort = freePort() val topic = "platform.self" val identity = generateKeyPair() @@ -93,7 +93,7 @@ class ArtemisMessagingTests { @Test fun `server starting with the port already bound should throw`() { - ServerSocket(hostAndPort.port).use { + ServerSocket(serverPort).use { val messagingServer = createMessagingServer() assertThatThrownBy { messagingServer.start() } } @@ -103,7 +103,7 @@ class ArtemisMessagingTests { fun `client should connect to remote server`() { val remoteServerAddress = freeLocalHostAndPort() - createMessagingServer(remoteServerAddress).start() + createMessagingServer(remoteServerAddress.port).start() createMessagingClient(server = remoteServerAddress) startNodeMessagingClient() } @@ -113,7 +113,7 @@ class ArtemisMessagingTests { val serverAddress = freeLocalHostAndPort() val invalidServerAddress = freeLocalHostAndPort() - createMessagingServer(serverAddress).start() + createMessagingServer(serverAddress.port).start() messagingClient = createMessagingClient(server = invalidServerAddress) assertThatThrownBy { startNodeMessagingClient() } @@ -218,7 +218,7 @@ class ArtemisMessagingTests { return messagingClient } - private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient { + private fun createMessagingClient(server: HostAndPort = HostAndPort.fromParts("localhost", serverPort)): NodeMessagingClient { return database.transaction { NodeMessagingClient( config, @@ -235,7 +235,7 @@ class ArtemisMessagingTests { } } - private fun createMessagingServer(local: HostAndPort = hostAndPort, rpc: HostAndPort = rpcHostAndPort): ArtemisMessagingServer { + private fun createMessagingServer(local: Int = serverPort, rpc: Int = rpcPort): ArtemisMessagingServer { return ArtemisMessagingServer(config, local, rpc, networkMapCache, userService).apply { config.configureWithDevSSLCertificate() messagingServer = this diff --git a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt index a9911f39ed..895cc0024d 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt @@ -92,16 +92,21 @@ val MOCK_VERSION_INFO = VersionInfo(1, "Mock release", "Mock revision", "Mock Ve fun generateStateRef() = StateRef(SecureHash.randomSHA256(), 0) private val freePortCounter = AtomicInteger(30000) +/** + * Returns a localhost address with a free port. + * + * Unsafe for getting multiple ports! + * Use [getFreeLocalPorts] for getting multiple ports. + */ +fun freeLocalHostAndPort(): HostAndPort = HostAndPort.fromParts("localhost", freePort()) + /** * Returns a free port. * * Unsafe for getting multiple ports! * Use [getFreeLocalPorts] for getting multiple ports. */ -fun freeLocalHostAndPort(): HostAndPort { - val freePort = freePortCounter.getAndAccumulate(0) { prev, _ -> 30000 + (prev - 30000 + 1) % 10000 } - return HostAndPort.fromParts("localhost", freePort) -} +fun freePort(): Int = freePortCounter.getAndAccumulate(0) { prev, _ -> 30000 + (prev - 30000 + 1) % 10000 } /** * Creates a specified number of ports for use by the Node. diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt index 29c4d8042d..485536deee 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt @@ -37,7 +37,7 @@ class SimpleNode(val config: NodeConfiguration, val address: HostAndPort = freeL val identity: KeyPair = generateKeyPair() val keyService: KeyManagementService = E2ETestKeyManagementService(setOf(identity)) val executor = ServiceAffinityExecutor(config.myLegalName.commonName, 1) - val broker = ArtemisMessagingServer(config, address, rpcAddress, InMemoryNetworkMapCache(), userService) + val broker = ArtemisMessagingServer(config, address.port, rpcAddress.port, InMemoryNetworkMapCache(), userService) val networkMapRegistrationFuture: SettableFuture = SettableFuture.create() val net = database.transaction { NodeMessagingClient(