Acknowledge all artemis messages

This commit is contained in:
Matthew Nesbit 2018-11-12 14:01:45 +00:00
parent fab0c5cd2c
commit 2600376773
4 changed files with 12 additions and 2 deletions

View File

@ -56,7 +56,10 @@ class FlowWorker(flowWorkerId: String, private val flowWorkerServiceHub: FlowWor
val consumer = session.createConsumer(queueName)
val producer = session.createProducer()
consumer.setMessageHandler { message -> handleFlowWorkerMessage(message, session, producer) }
consumer.setMessageHandler { message ->
handleFlowWorkerMessage(message, session, producer)
message.acknowledge()
}
thread {
(flowWorkerServiceHub.networkService as P2PMessagingClient).run()

View File

@ -84,7 +84,10 @@ class CordaRpcWorkerOps(
}
val consumer = session.createConsumer(rpcWorkerQueueName)
consumer.setMessageHandler { message -> handleFlowWorkerMessage(message) }
consumer.setMessageHandler { message ->
handleFlowWorkerMessage(message)
message.acknowledge()
}
networkMapFeed().updates.subscribe { mapChange: NetworkMapCache.MapChange? ->
val networkMapUpdateMessage = NetworkMapUpdate(services.myInfo.legalIdentities.first().name, mapChange!!)

View File

@ -91,6 +91,7 @@ class BridgeControlListener(val config: MutualSslConfiguration,
} catch (ex: Exception) {
log.error("Unable to process bridge control message", ex)
}
msg.acknowledge()
}
}
@ -113,6 +114,7 @@ class BridgeControlListener(val config: MutualSslConfiguration,
} catch (ex: Exception) {
log.error("Unable to process bridge notification message", ex)
}
msg.acknowledge()
}
}

View File

@ -292,6 +292,7 @@ class RPCServer<OPS : RPCOps>(
val clientAddress = artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME)
log.warn("Detected RPC client disconnect on address $clientAddress, scheduling for reaping")
invalidateClient(SimpleString(clientAddress))
artemisMessage.acknowledge()
}
private fun bindingAdditionArtemisMessageHandler(artemisMessage: ClientMessage) {
@ -303,6 +304,7 @@ class RPCServer<OPS : RPCOps>(
val buffer = stopBuffering(clientAddress)
buffer?.let { drainBuffer(it) }
artemisMessage.acknowledge()
}
/**