Node memory leak fix (#152)

Perform Artemis message sending in a separate thread pool to avoid memory leaks
This commit is contained in:
Andrius Dagys 2017-01-13 18:29:56 +00:00 committed by GitHub
parent d757429043
commit ca1b08ad37

View File

@ -49,13 +49,13 @@ import javax.annotation.concurrent.ThreadSafe
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process)
* @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
* that this is a NetworkMapService node which will be bound globally to the name "networkmap"
* @param executor An executor to run received message tasks upon.
* @param nodeExecutor An executor to run received message tasks upon.
*/
@ThreadSafe
class NodeMessagingClient(override val config: NodeConfiguration,
val serverHostPort: HostAndPort,
val myIdentity: CompositeKey?,
val executor: AffinityExecutor,
val nodeExecutor: AffinityExecutor,
val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>) : ArtemisMessagingComponent(), MessagingServiceInternal {
companion object {
@ -86,6 +86,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
data class Handler(val topicSession: TopicSession,
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
/** An executor for sending messages */
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("${config.myLegalName} Messaging", 1)
/**
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
*/
@ -205,7 +208,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
check(started) { "start must be called first" }
check(!running) { "run can't be called twice" }
running = true
rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, executor)
rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, nodeExecutor)
p2pConsumer!!
}
@ -287,7 +290,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
//
// Note that handlers may re-enter this class. We aren't holding any locks and methods like
// start/run/stop have re-entrancy assertions at the top, so it is OK.
executor.fetchFrom {
nodeExecutor.fetchFrom {
databaseTransaction(database) {
if (msg.uniqueMessageId in processedMessages) {
log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" }
@ -330,7 +333,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
p2pConsumer = null
prevRunning
}
if (running && !executor.isOnThread) {
if (running && !nodeExecutor.isOnThread) {
// Wait for the main loop to notice the consumer has gone and finish up.
shutdownLatch.await()
}
@ -353,6 +356,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
override fun send(message: Message, target: MessageRecipients) {
// We have to perform sending on a different thread pool, since using the same pool for messaging and
// fibers leads to Netty buffer memory leaks, caused by both Netty and Quasar fiddling with thread-locals.
messagingExecutor.fetchFrom {
state.locked {
val mqAddress = getMQAddress(target)
val artemisMessage = session!!.createMessage(true).apply {
@ -363,12 +369,12 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
}
log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} " +
"uuid: ${message.uniqueMessageId}")
producer!!.send(mqAddress, artemisMessage)
}
}
}
private fun getMQAddress(target: MessageRecipients): String {
return if (target == myAddress) {
@ -426,6 +432,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService, nodeLegalName: String) =
object : RPCDispatcher(ops, userService, nodeLegalName) {
override fun send(data: SerializedBytes<*>, toAddress: String) {
messagingExecutor.fetchFrom {
state.locked {
val msg = session!!.createMessage(false).apply {
writeBodyBufferBytes(data.bytes)
@ -436,6 +443,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
}
}
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
return when (partyInfo) {