diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 65dfc1c49e..15a3cecb2a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -118,15 +118,15 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: private lateinit var userService: RPCUserService override fun makeMessagingService(): MessagingServiceInternal { + val legalIdentity = obtainLegalIdentity() + val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null userService = RPCUserServiceImpl(configuration.config) val serverAddr = with(configuration) { messagingServerAddress ?: { - messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService) + messageBroker = ArtemisMessagingServer(this, artemisAddress, myIdentityOrNullIfNetworkMapService, services.networkMapCache, userService) artemisAddress }() } - val legalIdentity = obtainLegalIdentity() - val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture) } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt index 38b324194c..2028583bba 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt @@ -59,6 +59,14 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { return addr.queueName } + + /** + * Convert the identity, host and port of this node into the appropriate [SingleMessageRecipient]. + * + * N.B. Marked as JvmStatic to allow use in the inherited classes. + */ + @JvmStatic + protected fun toMyAddress(myIdentity: CompositeKey?, myHostPort: HostAndPort): SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, myHostPort) else NetworkMapAddress(myHostPort) } protected interface ArtemisAddress { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 9948d215a9..be11255ca1 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -3,6 +3,7 @@ package net.corda.node.services.messaging import com.google.common.net.HostAndPort import net.corda.core.ThreadBox import net.corda.core.crypto.AddressFormatException +import net.corda.core.crypto.CompositeKey import net.corda.core.div import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.services.NetworkMapCache @@ -54,6 +55,7 @@ import javax.security.auth.spi.LoginModule @ThreadSafe class ArtemisMessagingServer(override val config: NodeConfiguration, val myHostPort: HostAndPort, + val myIdentity: CompositeKey?, val networkMapCache: NetworkMapCache, val userService: RPCUserService) : ArtemisMessagingComponent() { @@ -68,6 +70,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, private val mutex = ThreadBox(InnerState()) private lateinit var activeMQServer: ActiveMQServer private var networkChangeHandle: Subscription? = null + private val myQueueName = toQueueName(toMyAddress(myIdentity, myHostPort)) init { config.basedir.expectedOnDefaultFileSystem() @@ -148,7 +151,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, // a lazily initialised subsystem. registerPostQueueCreationCallback { queueName -> log.debug("Queue created: $queueName") - if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS) { + if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS && queueName != myQueueName) { try { val identity = parseKeyFromQueueName(queueName.toString()) val nodeInfo = networkMapCache.getNodeByCompositeKey(identity) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index ecb5d7e17e..5b0822ecef 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -94,7 +94,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, /** * Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache. */ - override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort) + override val myAddress: SingleMessageRecipient = toMyAddress(myIdentity, serverHostPort) private val state = ThreadBox(InnerState()) private val handlers = CopyOnWriteArrayList() diff --git a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt index db3e643c18..b93d45f44b 100644 --- a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt @@ -230,7 +230,7 @@ class ArtemisMessagingTests { } private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer { - return ArtemisMessagingServer(config, local, networkMapCache, userService).apply { + return ArtemisMessagingServer(config, local, identity.public.composite, networkMapCache, userService).apply { configureWithDevSSLCertificate() messagingServer = this }