Merged in parkri-remove-intermittent-error-at-start (pull request #542)

Remove race condition and intermittent logged error at node start up regarding "Queue created for a peer that we don't know from the network map".
This commit is contained in:
Rick Parker 2016-11-24 16:30:14 +00:00
commit 44e09f3366
5 changed files with 17 additions and 6 deletions

View File

@ -118,15 +118,15 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
private lateinit var userService: RPCUserService private lateinit var userService: RPCUserService
override fun makeMessagingService(): MessagingServiceInternal { override fun makeMessagingService(): MessagingServiceInternal {
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
userService = RPCUserServiceImpl(configuration.config) userService = RPCUserServiceImpl(configuration.config)
val serverAddr = with(configuration) { val serverAddr = with(configuration) {
messagingServerAddress ?: { messagingServerAddress ?: {
messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService) messageBroker = ArtemisMessagingServer(this, artemisAddress, myIdentityOrNullIfNetworkMapService, services.networkMapCache, userService)
artemisAddress artemisAddress
}() }()
} }
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture) return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture)
} }

View File

@ -59,6 +59,14 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
return addr.queueName return addr.queueName
} }
/**
* Convert the identity, host and port of this node into the appropriate [SingleMessageRecipient].
*
* N.B. Marked as JvmStatic to allow use in the inherited classes.
*/
@JvmStatic
protected fun toMyAddress(myIdentity: CompositeKey?, myHostPort: HostAndPort): SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, myHostPort) else NetworkMapAddress(myHostPort)
} }
protected interface ArtemisAddress { protected interface ArtemisAddress {

View File

@ -3,6 +3,7 @@ package net.corda.node.services.messaging
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import net.corda.core.ThreadBox import net.corda.core.ThreadBox
import net.corda.core.crypto.AddressFormatException import net.corda.core.crypto.AddressFormatException
import net.corda.core.crypto.CompositeKey
import net.corda.core.div import net.corda.core.div
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
@ -54,6 +55,7 @@ import javax.security.auth.spi.LoginModule
@ThreadSafe @ThreadSafe
class ArtemisMessagingServer(override val config: NodeConfiguration, class ArtemisMessagingServer(override val config: NodeConfiguration,
val myHostPort: HostAndPort, val myHostPort: HostAndPort,
val myIdentity: CompositeKey?,
val networkMapCache: NetworkMapCache, val networkMapCache: NetworkMapCache,
val userService: RPCUserService) : ArtemisMessagingComponent() { val userService: RPCUserService) : ArtemisMessagingComponent() {
@ -68,6 +70,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val mutex = ThreadBox(InnerState()) private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer private lateinit var activeMQServer: ActiveMQServer
private var networkChangeHandle: Subscription? = null private var networkChangeHandle: Subscription? = null
private val myQueueName = toQueueName(toMyAddress(myIdentity, myHostPort))
init { init {
config.basedir.expectedOnDefaultFileSystem() config.basedir.expectedOnDefaultFileSystem()
@ -148,7 +151,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
// a lazily initialised subsystem. // a lazily initialised subsystem.
registerPostQueueCreationCallback { queueName -> registerPostQueueCreationCallback { queueName ->
log.debug("Queue created: $queueName") log.debug("Queue created: $queueName")
if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS) { if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS && queueName != myQueueName) {
try { try {
val identity = parseKeyFromQueueName(queueName.toString()) val identity = parseKeyFromQueueName(queueName.toString())
val nodeInfo = networkMapCache.getNodeByCompositeKey(identity) val nodeInfo = networkMapCache.getNodeByCompositeKey(identity)

View File

@ -94,7 +94,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
/** /**
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache. * Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
*/ */
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort) override val myAddress: SingleMessageRecipient = toMyAddress(myIdentity, serverHostPort)
private val state = ThreadBox(InnerState()) private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>() private val handlers = CopyOnWriteArrayList<Handler>()

View File

@ -230,7 +230,7 @@ class ArtemisMessagingTests {
} }
private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer { private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, networkMapCache, userService).apply { return ArtemisMessagingServer(config, local, identity.public.composite, networkMapCache, userService).apply {
configureWithDevSSLCertificate() configureWithDevSSLCertificate()
messagingServer = this messagingServer = this
} }