mirror of
https://github.com/corda/corda.git
synced 2024-12-20 05:28:21 +00:00
Make the AMQP bridge individually ACK artemis messages (so that we don't have to wait for an ack flush or end up with ack'd messages queued up). (#4568)
This commit is contained in:
parent
b505f770c1
commit
3ba2ef3092
@ -158,7 +158,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int,
|
|||||||
logWarnWithMDC(msg)
|
logWarnWithMDC(msg)
|
||||||
bridgeMetricsService?.packetDropEvent(artemisMessage, msg)
|
bridgeMetricsService?.packetDropEvent(artemisMessage, msg)
|
||||||
// Ack the message to prevent same message being sent to us again.
|
// Ack the message to prevent same message being sent to us again.
|
||||||
artemisMessage.acknowledge()
|
artemisMessage.individualAcknowledge()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||||
@ -181,7 +181,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int,
|
|||||||
logDebugWithMDC { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
logDebugWithMDC { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
||||||
lock.withLock {
|
lock.withLock {
|
||||||
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
|
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
|
||||||
artemisMessage.acknowledge()
|
artemisMessage.individualAcknowledge()
|
||||||
} else {
|
} else {
|
||||||
logInfoWithMDC("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
logInfoWithMDC("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||||
// We need to commit any acknowledged messages before rolling back the failed
|
// We need to commit any acknowledged messages before rolling back the failed
|
||||||
|
Loading…
Reference in New Issue
Block a user