diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index bec83e4d75..ed72cfaaaf 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -38,6 +38,7 @@ import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.requireMessageSize +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER @@ -168,7 +169,8 @@ class P2PMessagingClient(val config: NodeConfiguration, inboxes += RemoteInboxAddress(it).queueName } - inboxes.forEach { createQueueIfAbsent(it, producerSession!!) } + inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true) } + p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents) messagingExecutor = MessagingExecutor( @@ -469,14 +471,14 @@ class P2PMessagingClient(val config: NodeConfiguration, val internalTargetQueue = (address as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address") state.locked { - createQueueIfAbsent(internalTargetQueue, producerSession!!) + createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = address !is ServiceAddress) } internalTargetQueue } } /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ - private fun createQueueIfAbsent(queueName: String, session: ClientSession) { + private fun createQueueIfAbsent(queueName: String, session: ClientSession, exclusive: Boolean) { fun sendBridgeCreateMessage() { val keyHash = queueName.substring(PEERS_PREFIX.length) val peers = networkMap.getNodesByOwningKeyIndex(keyHash) @@ -495,7 +497,9 @@ class P2PMessagingClient(val config: NodeConfiguration, val queueQuery = session.queueQuery(SimpleString(queueName)) if (!queueQuery.isExists) { log.info("Create fresh queue $queueName bound on same address") - session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) + session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), exclusive, null) sendBridgeCreateMessage() } }