From 3ba2ef3092e81c82d6a379588d33ed1ed504954e Mon Sep 17 00:00:00 2001 From: Christian Sailer Date: Mon, 14 Jan 2019 15:32:08 +0000 Subject: [PATCH] Make the AMQP bridge individually ACK artemis messages (so that we don't have to wait for an ack flush or end up with ack'd messages queued up). (#4568) --- .../net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 56f48f6bb4..d60a50bf76 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -158,7 +158,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, logWarnWithMDC(msg) bridgeMetricsService?.packetDropEvent(artemisMessage, msg) // Ack the message to prevent same message being sent to us again. - artemisMessage.acknowledge() + artemisMessage.individualAcknowledge() return } val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) } @@ -181,7 +181,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, logDebugWithMDC { "Bridge ACK ${sendableMessage.onComplete.get()}" } lock.withLock { if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { - artemisMessage.acknowledge() + artemisMessage.individualAcknowledge() } else { logInfoWithMDC("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") // We need to commit any acknowledged messages before rolling back the failed