Remove race condition and intermittent logged error at node start up regarding "Queue created for a peer that we don't know from the network map".

This commit is contained in:
rick.parker 2016-11-24 13:14:07 +00:00
parent b4288cf9dc
commit b20baff54c
5 changed files with 17 additions and 6 deletions

View File

@ -118,15 +118,15 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
private lateinit var userService: RPCUserService
override fun makeMessagingService(): MessagingServiceInternal {
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
userService = RPCUserServiceImpl(configuration.config)
val serverAddr = with(configuration) {
messagingServerAddress ?: {
messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService)
messageBroker = ArtemisMessagingServer(this, artemisAddress, myIdentityOrNullIfNetworkMapService, services.networkMapCache, userService)
artemisAddress
}()
}
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture)
}

View File

@ -59,6 +59,14 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
return addr.queueName
}
/**
* Convert the identity, host and port of this node into the appropriate [SingleMessageRecipient].
*
* N.B. Marked as JvmStatic to allow use in the inherited classes.
*/
@JvmStatic
protected fun toMyAddress(myIdentity: CompositeKey?, myHostPort: HostAndPort): SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, myHostPort) else NetworkMapAddress(myHostPort)
}
protected interface ArtemisAddress {

View File

@ -3,6 +3,7 @@ package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import net.corda.core.ThreadBox
import net.corda.core.crypto.AddressFormatException
import net.corda.core.crypto.CompositeKey
import net.corda.core.div
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.NetworkMapCache
@ -54,6 +55,7 @@ import javax.security.auth.spi.LoginModule
@ThreadSafe
class ArtemisMessagingServer(override val config: NodeConfiguration,
val myHostPort: HostAndPort,
val myIdentity: CompositeKey?,
val networkMapCache: NetworkMapCache,
val userService: RPCUserService) : ArtemisMessagingComponent() {
@ -68,6 +70,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer
private var networkChangeHandle: Subscription? = null
private val myQueueName = toQueueName(toMyAddress(myIdentity, myHostPort))
init {
config.basedir.expectedOnDefaultFileSystem()
@ -148,7 +151,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
// a lazily initialised subsystem.
registerPostQueueCreationCallback { queueName ->
log.debug("Queue created: $queueName")
if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS) {
if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS && queueName != myQueueName) {
try {
val identity = parseKeyFromQueueName(queueName.toString())
val nodeInfo = networkMapCache.getNodeByCompositeKey(identity)

View File

@ -94,7 +94,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
/**
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
*/
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort)
override val myAddress: SingleMessageRecipient = toMyAddress(myIdentity, serverHostPort)
private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>()

View File

@ -230,7 +230,7 @@ class ArtemisMessagingTests {
}
private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, networkMapCache, userService).apply {
return ArtemisMessagingServer(config, local, identity.public.composite, networkMapCache, userService).apply {
configureWithDevSSLCertificate()
messagingServer = this
}