CORDA-2861 - give the message executor its own Artemis session and producer (#5036)

This commit is contained in:
bpaunescu 2019-04-17 19:05:06 +01:00 committed by Katelyn Baker
parent 4e10f09016
commit 95a3631628

View File

@ -108,6 +108,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
var eventsSubscription: Subscription? = null
var p2pConsumer: P2PMessagingConsumer? = null
var locator: ServerLocator? = null
var executorProducer: ClientProducer? = null
var executorSession: ClientSession? = null
var producer: ClientProducer? = null
var producerSession: ClientSession? = null
var bridgeSession: ClientSession? = null
@ -170,8 +172,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
// size of 1MB is acknowledged.
val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) }
executorSession = createNewSession()
producerSession = createNewSession()
bridgeSession = createNewSession()
executorSession!!.start()
producerSession!!.start()
bridgeSession!!.start()
@ -179,6 +183,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
// Create a queue, consumer and producer for handling P2P network messages.
// Create a general purpose producer.
producer = producerSession!!.createProducer()
executorProducer = executorSession!!.createProducer()
inboxes += RemoteInboxAddress(myIdentity).queueName
serviceIdentity?.let {
@ -190,8 +195,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents)
messagingExecutor = MessagingExecutor(
producerSession!!,
producer!!,
executorSession!!,
executorProducer!!,
versionInfo,
this@P2PMessagingClient,
ourSenderUUID = ourSenderUUID
@ -435,6 +440,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
producer = null
producerSession!!.commit()
close(executorProducer)
executorProducer = null
executorSession!!.commit()
close(bridgeNotifyConsumer)
knownQueues.clear()
eventsSubscription?.unsubscribe()