mirror of
https://github.com/corda/corda.git
synced 2025-04-07 19:34:41 +00:00
ENT-6025 remote artemis channel does not exist resulting in infinite retry loop (#7020)
* ENT-6025 remote artemis channel does not exist resulting in infinite retry loop * ENT-6025 rename test * ENT-6025 fix detekt and add description * ENT-6025 add check on count of connected stack
This commit is contained in:
parent
b8a52bfd21
commit
c2140f727c
@ -47,6 +47,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
companion object {
|
||||
private const val CORDA_AMQP_FRAME_SIZE_PROP_NAME = "net.corda.nodeapi.connectionstatemachine.AmqpMaxFrameSize"
|
||||
private const val CORDA_AMQP_IDLE_TIMEOUT_PROP_NAME = "net.corda.nodeapi.connectionstatemachine.AmqpIdleTimeout"
|
||||
private const val CREATE_ADDRESS_PERMISSION_ERROR = "AMQ119032"
|
||||
|
||||
private val MAX_FRAME_SIZE = Integer.getInteger(CORDA_AMQP_FRAME_SIZE_PROP_NAME, 128 * 1024)
|
||||
private val IDLE_TIMEOUT = Integer.getInteger(CORDA_AMQP_IDLE_TIMEOUT_PROP_NAME, 10 * 1000)
|
||||
@ -350,14 +351,33 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
||||
|
||||
override fun onLinkRemoteClose(e: Event) {
|
||||
val link = e.link
|
||||
if(link.remoteCondition != null) {
|
||||
logWarnWithMDC("Connection closed due to error on remote side: `${link.remoteCondition.description}`")
|
||||
if (link.remoteCondition != null) {
|
||||
val remoteConditionDescription = link.remoteCondition.description
|
||||
logWarnWithMDC("Connection closed due to error on remote side: `$remoteConditionDescription`")
|
||||
if (remoteConditionDescription.contains(CREATE_ADDRESS_PERMISSION_ERROR)) {
|
||||
handleRemoteCreatePermissionError(e)
|
||||
}
|
||||
|
||||
transport.condition = link.condition
|
||||
transport.close_tail()
|
||||
transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If an the artemis channel does not exist on the counterparty, then a create permission error is returned in the [event].
|
||||
* Do not retry messages to this channel as it will result in an infinite loop of retries.
|
||||
* Log the error, mark the messages as acknowledged and clear them from the message queue.
|
||||
*/
|
||||
private fun handleRemoteCreatePermissionError(event: Event) {
|
||||
val remoteP2PAddress = event.sender.source.address
|
||||
logWarnWithMDC("Address does not exist on peer: $remoteP2PAddress. Marking messages sent to this address as Acknowledged.")
|
||||
messageQueues[remoteP2PAddress]?.apply {
|
||||
forEach { it.doComplete(MessageStatus.Acknowledged) }
|
||||
clear()
|
||||
}
|
||||
}
|
||||
|
||||
override fun onLinkFinal(event: Event) {
|
||||
val link = event.link
|
||||
if (link is Sender) {
|
||||
|
@ -392,7 +392,7 @@ class ProtonWrapperTests {
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `Message sent from AMQP to non-existent Artemis inbox is rejected and client disconnects`() {
|
||||
fun `Message sent from AMQP to non-existent Artemis inbox is marked as acknowledged to avoid infinite retries`() {
|
||||
val (server, artemisClient) = createArtemisServerAndClient()
|
||||
val amqpClient = createClient()
|
||||
// AmqpClient is set to auto-reconnect, there might be multiple connect/disconnect rounds
|
||||
@ -412,8 +412,9 @@ class ProtonWrapperTests {
|
||||
testProperty["TestProp"] = "1"
|
||||
val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty)
|
||||
amqpClient.write(message)
|
||||
assertEquals(MessageStatus.Rejected, message.onComplete.get())
|
||||
assertTrue(connectedStack.contains(false))
|
||||
assertEquals(MessageStatus.Acknowledged, message.onComplete.get())
|
||||
assertTrue(connectedStack.contains(true))
|
||||
assertEquals(1, connectedStack.size)
|
||||
amqpClient.stop()
|
||||
artemisClient.stop()
|
||||
server.stop()
|
||||
|
Loading…
x
Reference in New Issue
Block a user