diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 41f5a7d72b..94ddb681ac 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -108,7 +108,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa if (connected) { log.info("Bridge Connected") val sessionFactory = artemis.started!!.sessionFactory - val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, false, false, false, DEFAULT_ACK_BATCH_SIZE) + val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) this.session = session val consumer = session.createConsumer(queueName) this.consumer = consumer @@ -146,9 +146,11 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa lock.withLock { if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) { artemisMessage.acknowledge() - session?.commit() } else { log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") + // We need to commit any acknowledged messages before rolling back the failed + // (unacknowledged) message. + session?.commit() session?.rollback(false) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 39c0a8880f..7238069b51 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -68,23 +68,33 @@ class AMQPBridgeTest { val receive = amqpServer.onReceive.toBlocking().iterator amqpServer.start() + val receivedSequence = mutableListOf() + + fun formatMessage(expected: String, actual: Int, received: List): String { + return "Expected message with id $expected, got $actual, previous message receive sequence: " + "${received.joinToString(", ", "[", "]")}." + } + val received1 = receive.next() val messageID1 = received1.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID1".toByteArray(), received1.payload) assertEquals(0, messageID1) received1.complete(true) // Accept first message + receivedSequence.add(messageID1) val received2 = receive.next() val messageID2 = received2.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID2".toByteArray(), received2.payload) - assertEquals(1, messageID2) + assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence)) received2.complete(false) // Reject message + receivedSequence.add(messageID2) while (true) { val received3 = receive.next() val messageID3 = received3.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID3".toByteArray(), received3.payload) - assertNotEquals(0, messageID3) + assertNotEquals(0, messageID3, formatMessage("< 1", messageID3, receivedSequence)) + receivedSequence.add(messageID3) if (messageID3 != 1) { // keep rejecting any batched items following rejection received3.complete(false) } else { // beginnings of replay so accept again @@ -97,6 +107,7 @@ class AMQPBridgeTest { val received4 = receive.next() val messageID4 = received4.applicationProperties["CountProp"] as Int assertArrayEquals("Test$messageID4".toByteArray(), received4.payload) + receivedSequence.add(messageID4) if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip assertEquals(2, messageID4) // next message should be in order though break @@ -118,13 +129,16 @@ class AMQPBridgeTest { val received5 = receive.next() val messageID5 = received5.applicationProperties["CountProp"] as Int if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip - assertEquals(-1, messageID5) // next message should be in order though + assertEquals(-1, messageID5, formatMessage("-1", messageID5, receivedSequence)) // next message should be in order though assertArrayEquals("Test_end".toByteArray(), received5.payload) + receivedSequence.add(messageID5) break } + receivedSequence.add(messageID5) received5.complete(true) } + println("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}") bridgeManager.stop() amqpServer.stop() artemisClient.stop()