mirror of
https://github.com/corda/corda.git
synced 2024-12-21 22:07:55 +00:00
CORDA-2861: give the message executor its own artemis session and producer (#5031)
This commit is contained in:
parent
5821ad5f5c
commit
33e9f125db
@ -108,6 +108,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
var eventsSubscription: Subscription? = null
|
var eventsSubscription: Subscription? = null
|
||||||
var p2pConsumer: P2PMessagingConsumer? = null
|
var p2pConsumer: P2PMessagingConsumer? = null
|
||||||
var locator: ServerLocator? = null
|
var locator: ServerLocator? = null
|
||||||
|
var executorProducer: ClientProducer? = null
|
||||||
|
var executorSession: ClientSession? = null
|
||||||
var producer: ClientProducer? = null
|
var producer: ClientProducer? = null
|
||||||
var producerSession: ClientSession? = null
|
var producerSession: ClientSession? = null
|
||||||
var bridgeSession: ClientSession? = null
|
var bridgeSession: ClientSession? = null
|
||||||
@ -171,8 +173,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
// size of 1MB is acknowledged.
|
// size of 1MB is acknowledged.
|
||||||
val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) }
|
val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) }
|
||||||
|
|
||||||
|
executorSession = createNewSession()
|
||||||
producerSession = createNewSession()
|
producerSession = createNewSession()
|
||||||
bridgeSession = createNewSession()
|
bridgeSession = createNewSession()
|
||||||
|
executorSession!!.start()
|
||||||
producerSession!!.start()
|
producerSession!!.start()
|
||||||
bridgeSession!!.start()
|
bridgeSession!!.start()
|
||||||
|
|
||||||
@ -180,6 +184,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
// Create a queue, consumer and producer for handling P2P network messages.
|
// Create a queue, consumer and producer for handling P2P network messages.
|
||||||
// Create a general purpose producer.
|
// Create a general purpose producer.
|
||||||
producer = producerSession!!.createProducer()
|
producer = producerSession!!.createProducer()
|
||||||
|
executorProducer = executorSession!!.createProducer()
|
||||||
|
|
||||||
inboxes += RemoteInboxAddress(myIdentity).queueName
|
inboxes += RemoteInboxAddress(myIdentity).queueName
|
||||||
serviceIdentity?.let {
|
serviceIdentity?.let {
|
||||||
@ -191,8 +196,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents)
|
p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents)
|
||||||
|
|
||||||
messagingExecutor = MessagingExecutor(
|
messagingExecutor = MessagingExecutor(
|
||||||
producerSession!!,
|
executorSession!!,
|
||||||
producer!!,
|
executorProducer!!,
|
||||||
versionInfo,
|
versionInfo,
|
||||||
this@P2PMessagingClient,
|
this@P2PMessagingClient,
|
||||||
ourSenderUUID = ourSenderUUID
|
ourSenderUUID = ourSenderUUID
|
||||||
@ -441,6 +446,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
producer = null
|
producer = null
|
||||||
producerSession!!.commit()
|
producerSession!!.commit()
|
||||||
|
|
||||||
|
close(executorProducer)
|
||||||
|
executorProducer = null
|
||||||
|
executorSession!!.commit()
|
||||||
|
|
||||||
close(bridgeNotifyConsumer)
|
close(bridgeNotifyConsumer)
|
||||||
knownQueues.clear()
|
knownQueues.clear()
|
||||||
eventsSubscription?.unsubscribe()
|
eventsSubscription?.unsubscribe()
|
||||||
|
Loading…
Reference in New Issue
Block a user