mirror of
https://github.com/corda/corda.git
synced 2024-12-19 04:57:58 +00:00
Fix flaky bridge test and an associated deadlock during rollback in the BridgeManager code. (#2739)
This commit is contained in:
parent
f1856f0146
commit
81f4bbcaf3
@ -126,37 +126,35 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
|
||||
}
|
||||
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
lock.withLock {
|
||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||
val properties = HashMap<Any?, Any?>()
|
||||
for (key in artemisMessage.propertyNames) {
|
||||
var value = artemisMessage.getObjectProperty(key)
|
||||
if (value is SimpleString) {
|
||||
value = value.toString()
|
||||
}
|
||||
properties[key.toString()] = value
|
||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||
val properties = HashMap<Any?, Any?>()
|
||||
for (key in artemisMessage.propertyNames) {
|
||||
var value = artemisMessage.getObjectProperty(key)
|
||||
if (value is SimpleString) {
|
||||
value = value.toString()
|
||||
}
|
||||
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
val sendableMessage = amqpClient.createMessage(data, peerInbox,
|
||||
legalNames.first().toString(),
|
||||
properties)
|
||||
sendableMessage.onComplete.then {
|
||||
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
||||
lock.withLock {
|
||||
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
|
||||
artemisMessage.acknowledge()
|
||||
} else {
|
||||
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
// We need to commit any acknowledged messages before rolling back the failed
|
||||
// (unacknowledged) message.
|
||||
session?.commit()
|
||||
session?.rollback(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
amqpClient.write(sendableMessage)
|
||||
properties[key.toString()] = value
|
||||
}
|
||||
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
val sendableMessage = amqpClient.createMessage(data, peerInbox,
|
||||
legalNames.first().toString(),
|
||||
properties)
|
||||
sendableMessage.onComplete.then {
|
||||
log.debug { "Bridge ACK ${sendableMessage.onComplete.get()}" }
|
||||
lock.withLock {
|
||||
if (sendableMessage.onComplete.get() == MessageStatus.Acknowledged) {
|
||||
artemisMessage.acknowledge()
|
||||
} else {
|
||||
log.info("Rollback rejected message uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
// We need to commit any acknowledged messages before rolling back the failed
|
||||
// (unacknowledged) message.
|
||||
session?.commit()
|
||||
session?.rollback(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
amqpClient.write(sendableMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@ import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.CertChainPolicyConfig
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
@ -25,13 +26,14 @@ import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotEquals
|
||||
|
||||
class AMQPBridgeTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val temporaryFolder = TemporaryFolder()
|
||||
|
||||
private val log = loggerFor<AMQPBridgeTest>()
|
||||
|
||||
private val ALICE = TestIdentity(ALICE_NAME)
|
||||
private val BOB = TestIdentity(BOB_NAME)
|
||||
|
||||
@ -64,14 +66,16 @@ class AMQPBridgeTest {
|
||||
|
||||
//Create target server
|
||||
val amqpServer = createAMQPServer()
|
||||
val dedupeSet = mutableSetOf<String>()
|
||||
|
||||
val receive = amqpServer.onReceive.toBlocking().iterator
|
||||
amqpServer.start()
|
||||
|
||||
val receivedSequence = mutableListOf<Int>()
|
||||
val atNodeSequence = mutableListOf<Int>()
|
||||
|
||||
fun formatMessage(expected: String, actual: Int, received: List<Int>): String {
|
||||
return "Expected message with id $expected, got $actual, previous message receive sequence: "
|
||||
return "Expected message with id $expected, got $actual, previous message receive sequence: " +
|
||||
"${received.joinToString(", ", "[", "]")}."
|
||||
}
|
||||
|
||||
@ -79,66 +83,84 @@ class AMQPBridgeTest {
|
||||
val messageID1 = received1.applicationProperties["CountProp"] as Int
|
||||
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload)
|
||||
assertEquals(0, messageID1)
|
||||
dedupeSet += received1.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||
received1.complete(true) // Accept first message
|
||||
receivedSequence.add(messageID1)
|
||||
receivedSequence += messageID1
|
||||
atNodeSequence += messageID1
|
||||
|
||||
val received2 = receive.next()
|
||||
val messageID2 = received2.applicationProperties["CountProp"] as Int
|
||||
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload)
|
||||
assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence))
|
||||
received2.complete(false) // Reject message
|
||||
receivedSequence.add(messageID2)
|
||||
received2.complete(false) // Reject message and don't add to dedupe
|
||||
receivedSequence += messageID2 // reflects actual sequence
|
||||
|
||||
// drop things until we get back to the replay
|
||||
while (true) {
|
||||
val received3 = receive.next()
|
||||
val messageID3 = received3.applicationProperties["CountProp"] as Int
|
||||
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload)
|
||||
assertNotEquals(0, messageID3, formatMessage("< 1", messageID3, receivedSequence))
|
||||
receivedSequence.add(messageID3)
|
||||
receivedSequence += messageID3
|
||||
if (messageID3 != 1) { // keep rejecting any batched items following rejection
|
||||
received3.complete(false)
|
||||
} else { // beginnings of replay so accept again
|
||||
received3.complete(true)
|
||||
val messageId = received3.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||
if (messageId !in dedupeSet) {
|
||||
dedupeSet += messageId
|
||||
atNodeSequence += messageID3
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// start receiving again, but discarding duplicates
|
||||
while (true) {
|
||||
val received4 = receive.next()
|
||||
val messageID4 = received4.applicationProperties["CountProp"] as Int
|
||||
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload)
|
||||
receivedSequence.add(messageID4)
|
||||
if (messageID4 != 1) { // we may get a duplicate of the rejected message, in which case skip
|
||||
assertEquals(2, messageID4) // next message should be in order though
|
||||
break
|
||||
receivedSequence += messageID4
|
||||
val messageId = received4.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||
if (messageId !in dedupeSet) {
|
||||
dedupeSet += messageId
|
||||
atNodeSequence += messageID4
|
||||
}
|
||||
received4.complete(true)
|
||||
if (messageID4 == 2) { // started to replay messages after rejection point
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Send a fresh item and check receive
|
||||
val artemisMessage = artemis.session.createMessage(true).apply {
|
||||
putIntProperty("CountProp", -1)
|
||||
writeBodyBufferBytes("Test_end".toByteArray())
|
||||
putIntProperty("CountProp", 3)
|
||||
writeBodyBufferBytes("Test3".toByteArray())
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
artemis.producer.send(sourceQueueName, artemisMessage)
|
||||
|
||||
|
||||
// start receiving again, discarding duplicates
|
||||
while (true) {
|
||||
val received5 = receive.next()
|
||||
val messageID5 = received5.applicationProperties["CountProp"] as Int
|
||||
if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip
|
||||
assertEquals(-1, messageID5, formatMessage("-1", messageID5, receivedSequence)) // next message should be in order though
|
||||
assertArrayEquals("Test_end".toByteArray(), received5.payload)
|
||||
receivedSequence.add(messageID5)
|
||||
assertArrayEquals("Test$messageID5".toByteArray(), received5.payload)
|
||||
receivedSequence += messageID5
|
||||
val messageId = received5.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||
if (messageId !in dedupeSet) {
|
||||
dedupeSet += messageId
|
||||
atNodeSequence += messageID5
|
||||
}
|
||||
received5.complete(true)
|
||||
if (messageID5 == 3) { // reached our fresh message
|
||||
break
|
||||
}
|
||||
receivedSequence.add(messageID5)
|
||||
received5.complete(true)
|
||||
}
|
||||
|
||||
println("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}")
|
||||
log.info("Message sequence: ${receivedSequence.joinToString(", ", "[", "]")}")
|
||||
log.info("Deduped sequence: ${atNodeSequence.joinToString(", ", "[", "]")}")
|
||||
assertEquals(listOf(0, 1, 2, 3), atNodeSequence)
|
||||
bridgeManager.stop()
|
||||
amqpServer.stop()
|
||||
artemisClient.stop()
|
||||
|
Loading…
Reference in New Issue
Block a user