CORDA-1122 Switch AMQP bridge to use auto-commit sends and acks (#2661)

* CORDA-1122 Switch AMQP bridge to use auto-commit sends and acks

* CORDA-1122 Codereview: Add comment for extra commit to rollback and add better message sequence logging to tricky test
This commit is contained in:
Christian Sailer 2018-02-27 15:25:58 +00:00 committed by GitHub
parent 4695cb0810
commit 0ff37c0437
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 5 deletions

View File

@ -108,7 +108,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
if (connected) { if (connected) {
log.info("Bridge Connected") log.info("Bridge Connected")
val sessionFactory = artemis.started!!.sessionFactory val sessionFactory = artemis.started!!.sessionFactory
val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, false, false, false, DEFAULT_ACK_BATCH_SIZE) val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
this.session = session this.session = session
val consumer = session.createConsumer(queueName) val consumer = session.createConsumer(queueName)
this.consumer = consumer this.consumer = consumer
@ -146,9 +146,11 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
lock.withLock { lock.withLock {
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
artemisMessage.acknowledge() artemisMessage.acknowledge()
session?.commit()
} else { } else {
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
// We need to commit any acknowledged messages before rolling back the failed
// (unacknowledged) message.
session?.commit()
session?.rollback(false) session?.rollback(false)
} }
} }

View File

@ -68,23 +68,33 @@ class AMQPBridgeTest {
val receive = amqpServer.onReceive.toBlocking().iterator val receive = amqpServer.onReceive.toBlocking().iterator
amqpServer.start() amqpServer.start()
val receivedSequence = mutableListOf<Int>()
fun formatMessage(expected: String, actual: Int, received: List<Int>): String {
return "Expected message with id $expected, got $actual, previous message receive sequence: "
"${received.joinToString(", ", "[", "]")}."
}
val received1 = receive.next() val received1 = receive.next()
val messageID1 = received1.applicationProperties["CountProp"] as Int val messageID1 = received1.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload) assertArrayEquals("Test$messageID1".toByteArray(), received1.payload)
assertEquals(0, messageID1) assertEquals(0, messageID1)
received1.complete(true) // Accept first message received1.complete(true) // Accept first message
receivedSequence.add(messageID1)
val received2 = receive.next() val received2 = receive.next()
val messageID2 = received2.applicationProperties["CountProp"] as Int val messageID2 = received2.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload) assertArrayEquals("Test$messageID2".toByteArray(), received2.payload)
assertEquals(1, messageID2) assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence))
received2.complete(false) // Reject message received2.complete(false) // Reject message
receivedSequence.add(messageID2)
while (true) { while (true) {
val received3 = receive.next() val received3 = receive.next()
val messageID3 = received3.applicationProperties["CountProp"] as Int val messageID3 = received3.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload) assertArrayEquals("Test$messageID3".toByteArray(), received3.payload)
assertNotEquals(0, messageID3) assertNotEquals(0, messageID3, formatMessage("< 1", messageID3, receivedSequence))
receivedSequence.add(messageID3)
if (messageID3 != 1) { // keep rejecting any batched items following rejection if (messageID3 != 1) { // keep rejecting any batched items following rejection
received3.complete(false) received3.complete(false)
} else { // beginnings of replay so accept again } else { // beginnings of replay so accept again
@ -97,6 +107,7 @@ class AMQPBridgeTest {
val received4 = receive.next() val received4 = receive.next()
val messageID4 = received4.applicationProperties["CountProp"] as Int val messageID4 = received4.applicationProperties["CountProp"] as Int
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload) assertArrayEquals("Test$messageID4".toByteArray(), received4.payload)
receivedSequence.add(messageID4)
if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip
assertEquals(2, messageID4) // next message should be in order though assertEquals(2, messageID4) // next message should be in order though
break break
@ -118,13 +129,16 @@ class AMQPBridgeTest {
val received5 = receive.next() val received5 = receive.next()
val messageID5 = received5.applicationProperties["CountProp"] as Int val messageID5 = received5.applicationProperties["CountProp"] as Int
if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip
assertEquals(-1, messageID5) // next message should be in order though assertEquals(-1, messageID5, formatMessage("-1", messageID5, receivedSequence)) // next message should be in order though
assertArrayEquals("Test_end".toByteArray(), received5.payload) assertArrayEquals("Test_end".toByteArray(), received5.payload)
receivedSequence.add(messageID5)
break break
} }
receivedSequence.add(messageID5)
received5.complete(true) received5.complete(true)
} }
println("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}")
bridgeManager.stop() bridgeManager.stop()
amqpServer.stop() amqpServer.stop()
artemisClient.stop() artemisClient.stop()