From ca1b08ad3778a47529d37220f7ebeda88f241079 Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Fri, 13 Jan 2017 18:29:56 +0000 Subject: [PATCH] Node memory leak fix (#152) Perform Artemis message sending in a separate thread pool to avoid memory leaks --- .../services/messaging/NodeMessagingClient.kt | 56 +++++++++++-------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 8f1fa97981..d7290474b7 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -49,13 +49,13 @@ import javax.annotation.concurrent.ThreadSafe * @param serverHostPort The address of the broker instance to connect to (might be running in the same process) * @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate * that this is a NetworkMapService node which will be bound globally to the name "networkmap" - * @param executor An executor to run received message tasks upon. + * @param nodeExecutor An executor to run received message tasks upon. */ @ThreadSafe class NodeMessagingClient(override val config: NodeConfiguration, val serverHostPort: HostAndPort, val myIdentity: CompositeKey?, - val executor: AffinityExecutor, + val nodeExecutor: AffinityExecutor, val database: Database, val networkMapRegistrationFuture: ListenableFuture) : ArtemisMessagingComponent(), MessagingServiceInternal { companion object { @@ -86,6 +86,9 @@ class NodeMessagingClient(override val config: NodeConfiguration, data class Handler(val topicSession: TopicSession, val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration + /** An executor for sending messages */ + private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("${config.myLegalName} Messaging", 1) + /** * Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache. */ @@ -205,7 +208,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, check(started) { "start must be called first" } check(!running) { "run can't be called twice" } running = true - rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, executor) + rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, nodeExecutor) p2pConsumer!! } @@ -287,7 +290,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, // // Note that handlers may re-enter this class. We aren't holding any locks and methods like // start/run/stop have re-entrancy assertions at the top, so it is OK. - executor.fetchFrom { + nodeExecutor.fetchFrom { databaseTransaction(database) { if (msg.uniqueMessageId in processedMessages) { log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" } @@ -330,7 +333,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, p2pConsumer = null prevRunning } - if (running && !executor.isOnThread) { + if (running && !nodeExecutor.isOnThread) { // Wait for the main loop to notice the consumer has gone and finish up. shutdownLatch.await() } @@ -353,20 +356,23 @@ class NodeMessagingClient(override val config: NodeConfiguration, } override fun send(message: Message, target: MessageRecipients) { - state.locked { - val mqAddress = getMQAddress(target) - val artemisMessage = session!!.createMessage(true).apply { - val sessionID = message.topicSession.sessionID - putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) - putLongProperty(SESSION_ID_PROPERTY, sessionID) - writeBodyBufferBytes(message.data) - // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString())) + // We have to perform sending on a different thread pool, since using the same pool for messaging and + // fibers leads to Netty buffer memory leaks, caused by both Netty and Quasar fiddling with thread-locals. + messagingExecutor.fetchFrom { + state.locked { + val mqAddress = getMQAddress(target) + val artemisMessage = session!!.createMessage(true).apply { + val sessionID = message.topicSession.sessionID + putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) + putLongProperty(SESSION_ID_PROPERTY, sessionID) + writeBodyBufferBytes(message.data) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString())) + } + log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} " + + "uuid: ${message.uniqueMessageId}") + producer!!.send(mqAddress, artemisMessage) } - - log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} " + - "uuid: ${message.uniqueMessageId}") - producer!!.send(mqAddress, artemisMessage) } } @@ -426,13 +432,15 @@ class NodeMessagingClient(override val config: NodeConfiguration, private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService, nodeLegalName: String) = object : RPCDispatcher(ops, userService, nodeLegalName) { override fun send(data: SerializedBytes<*>, toAddress: String) { - state.locked { - val msg = session!!.createMessage(false).apply { - writeBodyBufferBytes(data.bytes) - // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + messagingExecutor.fetchFrom { + state.locked { + val msg = session!!.createMessage(false).apply { + writeBodyBufferBytes(data.bytes) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } + producer!!.send(toAddress, msg) } - producer!!.send(toAddress, msg) } } }