Pass ports instead of hostAndPorts to the message broker. Pass an address for the NodeMessagingClient to advertise to the network map service.

This commit is contained in:
Andrius Dagys
2017-05-31 11:15:26 +01:00
parent 34eb5a3b70
commit f210370885
6 changed files with 59 additions and 43 deletions

View File

@ -5,15 +5,18 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.* import net.corda.core.flatMap
import net.corda.core.internal.ShutdownHook import net.corda.core.internal.ShutdownHook
import net.corda.core.internal.addShutdownHook import net.corda.core.internal.addShutdownHook
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.minutes
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.node.VersionInfo import net.corda.core.node.VersionInfo
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider import net.corda.core.node.services.UniquenessProvider
import net.corda.core.seconds
import net.corda.core.success
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.node.printBasicNodeInfo import net.corda.node.printBasicNodeInfo
@ -116,7 +119,17 @@ class Node(override val configuration: FullNodeConfiguration,
override fun makeMessagingService(): MessagingService { override fun makeMessagingService(): MessagingService {
userService = RPCUserServiceImpl(configuration.rpcUsers) userService = RPCUserServiceImpl(configuration.rpcUsers)
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val (serverAddress, advertisedAddress) = with(configuration) {
if (messagingServerAddress != null) {
// External broker
messagingServerAddress to messagingServerAddress
} else {
makeLocalMessageBroker() to getAdvertisedAddress()
}
}
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null
return NodeMessagingClient( return NodeMessagingClient(
@ -128,15 +141,21 @@ class Node(override val configuration: FullNodeConfiguration,
database, database,
networkMapRegistrationFuture, networkMapRegistrationFuture,
services.monitoringService, services.monitoringService,
configuration.messagingServerAddress == null) advertisedAddress)
} }
private fun makeLocalMessageBroker(): HostAndPort { private fun makeLocalMessageBroker(): HostAndPort {
with(configuration) { with(configuration) {
val useHost = tryDetectIfNotPublicHost(p2pAddress.host) messageBroker = ArtemisMessagingServer(this, p2pAddress.port, rpcAddress?.port, services.networkMapCache, userService)
val useAddress = useHost?.let { HostAndPort.fromParts(it, p2pAddress.port) } ?: p2pAddress return HostAndPort.fromParts("localhost", p2pAddress.port)
messageBroker = ArtemisMessagingServer(this, useAddress, rpcAddress, services.networkMapCache, userService) }
return useAddress }
private fun getAdvertisedAddress(): HostAndPort {
return with(configuration) {
val publicHost = tryDetectIfNotPublicHost(p2pAddress.host)
val useHost = publicHost ?: p2pAddress.host
HostAndPort.fromParts(useHost, p2pAddress.port)
} }
} }

View File

@ -85,8 +85,8 @@ import javax.security.cert.X509Certificate
*/ */
@ThreadSafe @ThreadSafe
class ArtemisMessagingServer(override val config: NodeConfiguration, class ArtemisMessagingServer(override val config: NodeConfiguration,
val p2pHostPort: HostAndPort, val p2pPort: Int,
val rpcHostPort: HostAndPort?, val rpcPort: Int?,
val networkMapCache: NetworkMapCache, val networkMapCache: NetworkMapCache,
val userService: RPCUserService) : ArtemisMessagingComponent() { val userService: RPCUserService) : ArtemisMessagingComponent() {
companion object { companion object {
@ -156,9 +156,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
} }
activeMQServer.start() activeMQServer.start()
printBasicNodeInfo("Listening on address", p2pHostPort.toString()) printBasicNodeInfo("Listening on port", p2pPort.toString())
if (rpcHostPort != null) { if (rpcPort != null) {
printBasicNodeInfo("RPC service listening on address", rpcHostPort.toString()) printBasicNodeInfo("RPC service listening on port", rpcPort.toString())
} }
} }
@ -170,9 +170,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
val connectionDirection = ConnectionDirection.Inbound( val connectionDirection = ConnectionDirection.Inbound(
acceptorFactoryClassName = NettyAcceptorFactory::class.java.name acceptorFactoryClassName = NettyAcceptorFactory::class.java.name
) )
val acceptors = mutableSetOf(createTcpTransport(connectionDirection, "0.0.0.0", p2pHostPort.port)) val acceptors = mutableSetOf(createTcpTransport(connectionDirection, "0.0.0.0", p2pPort))
if (rpcHostPort != null) { if (rpcPort != null) {
acceptors.add(createTcpTransport(connectionDirection, "0.0.0.0", rpcHostPort.port, enableSSL = false)) acceptors.add(createTcpTransport(connectionDirection, "0.0.0.0", rpcPort, enableSSL = false))
} }
acceptorConfigurations = acceptors acceptorConfigurations = acceptors
// Enable built in message deduplication. Note we still have to do our own as the delayed commits // Enable built in message deduplication. Note we still have to do our own as the delayed commits

View File

@ -61,22 +61,23 @@ import javax.annotation.concurrent.ThreadSafe
* invoke methods on the provided implementation. There is more documentation on this in the docsite and the * invoke methods on the provided implementation. There is more documentation on this in the docsite and the
* CordaRPCClient class. * CordaRPCClient class.
* *
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process). * @param serverAddress The address of the broker instance to connect to (might be running in the same process).
* @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate * @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
* that this is a NetworkMapService node which will be bound globally to the name "networkmap". * that this is a NetworkMapService node which will be bound globally to the name "networkmap".
* @param nodeExecutor An executor to run received message tasks upon. * @param nodeExecutor An executor to run received message tasks upon.
* @param isServerLocal Specify `true` if the provided [serverHostPort] is a locally running broker instance. * @param advertisedAddress The node address for inbound connections, advertised to the network map service and peers.
* If not provided, will default to [serverAddress].
*/ */
@ThreadSafe @ThreadSafe
class NodeMessagingClient(override val config: NodeConfiguration, class NodeMessagingClient(override val config: NodeConfiguration,
val versionInfo: VersionInfo, val versionInfo: VersionInfo,
val serverHostPort: HostAndPort, val serverAddress: HostAndPort,
val myIdentity: PublicKey?, val myIdentity: PublicKey?,
val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
val database: Database, val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>, val networkMapRegistrationFuture: ListenableFuture<Unit>,
val monitoringService: MonitoringService, val monitoringService: MonitoringService,
val isServerLocal: Boolean = true advertisedAddress: HostAndPort = serverAddress
) : ArtemisMessagingComponent(), MessagingService { ) : ArtemisMessagingComponent(), MessagingService {
companion object { companion object {
private val log = loggerFor<NodeMessagingClient>() private val log = loggerFor<NodeMessagingClient>()
@ -132,9 +133,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache. * Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
*/ */
override val myAddress: SingleMessageRecipient = if (myIdentity != null) { override val myAddress: SingleMessageRecipient = if (myIdentity != null) {
NodeAddress.asPeer(myIdentity, serverHostPort) NodeAddress.asPeer(myIdentity, advertisedAddress)
} else { } else {
NetworkMapAddress(serverHostPort) NetworkMapAddress(advertisedAddress)
} }
private val state = ThreadBox(InnerState()) private val state = ThreadBox(InnerState())
@ -158,8 +159,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
check(!started) { "start can't be called twice" } check(!started) { "start can't be called twice" }
started = true started = true
val serverAddress = getBrokerAddress()
log.info("Connecting to server: $serverAddress") log.info("Connecting to server: $serverAddress")
// TODO Add broker CN to config for host verification in case the embedded broker isn't used // TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config) val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
@ -216,13 +215,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
resumeMessageRedelivery() resumeMessageRedelivery()
} }
/**
* If the message broker is running locally and [serverHostPort] specifies a public IP, the messaging client will
* fail to connect on nodes under a NAT with no loopback support. As the local message broker is listening on
* all interfaces it is safer to always use `localhost` instead.
*/
private fun getBrokerAddress() = if (isServerLocal) HostAndPort.fromParts("localhost", serverHostPort.port) else serverHostPort
/** /**
* We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close * We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close
* the original and make another without a filter. We do this so that there is a network map in place for all other * the original and make another without a filter. We do this so that there is a network map in place for all other

View File

@ -24,10 +24,10 @@ import net.corda.node.utilities.transaction
import net.corda.testing.MOCK_VERSION_INFO import net.corda.testing.MOCK_VERSION_INFO
import net.corda.testing.TestNodeConfiguration import net.corda.testing.TestNodeConfiguration
import net.corda.testing.freeLocalHostAndPort import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.freePort
import net.corda.testing.node.makeTestDataSourceProperties import net.corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
@ -46,8 +46,8 @@ import kotlin.test.assertNull
class ArtemisMessagingTests { class ArtemisMessagingTests {
@Rule @JvmField val temporaryFolder = TemporaryFolder() @Rule @JvmField val temporaryFolder = TemporaryFolder()
val hostAndPort = freeLocalHostAndPort() val serverPort = freePort()
val rpcHostAndPort = freeLocalHostAndPort() val rpcPort = freePort()
val topic = "platform.self" val topic = "platform.self"
val identity = generateKeyPair() val identity = generateKeyPair()
@ -93,7 +93,7 @@ class ArtemisMessagingTests {
@Test @Test
fun `server starting with the port already bound should throw`() { fun `server starting with the port already bound should throw`() {
ServerSocket(hostAndPort.port).use { ServerSocket(serverPort).use {
val messagingServer = createMessagingServer() val messagingServer = createMessagingServer()
assertThatThrownBy { messagingServer.start() } assertThatThrownBy { messagingServer.start() }
} }
@ -103,7 +103,7 @@ class ArtemisMessagingTests {
fun `client should connect to remote server`() { fun `client should connect to remote server`() {
val remoteServerAddress = freeLocalHostAndPort() val remoteServerAddress = freeLocalHostAndPort()
createMessagingServer(remoteServerAddress).start() createMessagingServer(remoteServerAddress.port).start()
createMessagingClient(server = remoteServerAddress) createMessagingClient(server = remoteServerAddress)
startNodeMessagingClient() startNodeMessagingClient()
} }
@ -113,7 +113,7 @@ class ArtemisMessagingTests {
val serverAddress = freeLocalHostAndPort() val serverAddress = freeLocalHostAndPort()
val invalidServerAddress = freeLocalHostAndPort() val invalidServerAddress = freeLocalHostAndPort()
createMessagingServer(serverAddress).start() createMessagingServer(serverAddress.port).start()
messagingClient = createMessagingClient(server = invalidServerAddress) messagingClient = createMessagingClient(server = invalidServerAddress)
assertThatThrownBy { startNodeMessagingClient() } assertThatThrownBy { startNodeMessagingClient() }
@ -218,7 +218,7 @@ class ArtemisMessagingTests {
return messagingClient return messagingClient
} }
private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient { private fun createMessagingClient(server: HostAndPort = HostAndPort.fromParts("localhost", serverPort)): NodeMessagingClient {
return database.transaction { return database.transaction {
NodeMessagingClient( NodeMessagingClient(
config, config,
@ -235,7 +235,7 @@ class ArtemisMessagingTests {
} }
} }
private fun createMessagingServer(local: HostAndPort = hostAndPort, rpc: HostAndPort = rpcHostAndPort): ArtemisMessagingServer { private fun createMessagingServer(local: Int = serverPort, rpc: Int = rpcPort): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, rpc, networkMapCache, userService).apply { return ArtemisMessagingServer(config, local, rpc, networkMapCache, userService).apply {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
messagingServer = this messagingServer = this

View File

@ -88,16 +88,21 @@ val MOCK_VERSION_INFO = VersionInfo(1, "Mock release", "Mock revision", "Mock Ve
fun generateStateRef() = StateRef(SecureHash.randomSHA256(), 0) fun generateStateRef() = StateRef(SecureHash.randomSHA256(), 0)
private val freePortCounter = AtomicInteger(30000) private val freePortCounter = AtomicInteger(30000)
/**
* Returns a localhost address with a free port.
*
* Unsafe for getting multiple ports!
* Use [getFreeLocalPorts] for getting multiple ports.
*/
fun freeLocalHostAndPort(): HostAndPort = HostAndPort.fromParts("localhost", freePort())
/** /**
* Returns a free port. * Returns a free port.
* *
* Unsafe for getting multiple ports! * Unsafe for getting multiple ports!
* Use [getFreeLocalPorts] for getting multiple ports. * Use [getFreeLocalPorts] for getting multiple ports.
*/ */
fun freeLocalHostAndPort(): HostAndPort { fun freePort(): Int = freePortCounter.getAndAccumulate(0) { prev, _ -> 30000 + (prev - 30000 + 1) % 10000 }
val freePort = freePortCounter.getAndAccumulate(0) { prev, _ -> 30000 + (prev - 30000 + 1) % 10000 }
return HostAndPort.fromParts("localhost", freePort)
}
/** /**
* Creates a specified number of ports for use by the Node. * Creates a specified number of ports for use by the Node.

View File

@ -43,7 +43,7 @@ class SimpleNode(val config: NodeConfiguration, val address: HostAndPort = freeL
val identityService: IdentityService = InMemoryIdentityService(trustRoot = trustRoot) val identityService: IdentityService = InMemoryIdentityService(trustRoot = trustRoot)
val keyService: KeyManagementService = E2ETestKeyManagementService(identityService, setOf(identity)) val keyService: KeyManagementService = E2ETestKeyManagementService(identityService, setOf(identity))
val executor = ServiceAffinityExecutor(config.myLegalName.commonName, 1) val executor = ServiceAffinityExecutor(config.myLegalName.commonName, 1)
val broker = ArtemisMessagingServer(config, address, rpcAddress, InMemoryNetworkMapCache(), userService) val broker = ArtemisMessagingServer(config, address.port, rpcAddress.port, InMemoryNetworkMapCache(), userService)
val networkMapRegistrationFuture: SettableFuture<Unit> = SettableFuture.create<Unit>() val networkMapRegistrationFuture: SettableFuture<Unit> = SettableFuture.create<Unit>()
val net = database.transaction { val net = database.transaction {
NodeMessagingClient( NodeMessagingClient(