From c2140f727c8b11c27068ace7d14228b6b44e1856 Mon Sep 17 00:00:00 2001 From: Lorcan Wogan <69468264+LWogan@users.noreply.github.com> Date: Tue, 18 Jan 2022 10:13:29 +0000 Subject: [PATCH] ENT-6025 remote artemis channel does not exist resulting in infinite retry loop (#7020) * ENT-6025 remote artemis channel does not exist resulting in infinite retry loop * ENT-6025 rename test * ENT-6025 fix detekt and add description * ENT-6025 add check on count of connected stack --- .../engine/ConnectionStateMachine.kt | 24 +++++++++++++++++-- .../net/corda/node/amqp/ProtonWrapperTests.kt | 7 +++--- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index 38062c20b9..1f8836458d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -47,6 +47,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, companion object { private const val CORDA_AMQP_FRAME_SIZE_PROP_NAME = "net.corda.nodeapi.connectionstatemachine.AmqpMaxFrameSize" private const val CORDA_AMQP_IDLE_TIMEOUT_PROP_NAME = "net.corda.nodeapi.connectionstatemachine.AmqpIdleTimeout" + private const val CREATE_ADDRESS_PERMISSION_ERROR = "AMQ119032" private val MAX_FRAME_SIZE = Integer.getInteger(CORDA_AMQP_FRAME_SIZE_PROP_NAME, 128 * 1024) private val IDLE_TIMEOUT = Integer.getInteger(CORDA_AMQP_IDLE_TIMEOUT_PROP_NAME, 10 * 1000) @@ -350,14 +351,33 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, override fun onLinkRemoteClose(e: Event) { val link = e.link - if(link.remoteCondition != null) { - logWarnWithMDC("Connection closed due to error on remote side: `${link.remoteCondition.description}`") + if (link.remoteCondition != null) { + val remoteConditionDescription = link.remoteCondition.description + logWarnWithMDC("Connection closed due to error on remote side: `$remoteConditionDescription`") + if (remoteConditionDescription.contains(CREATE_ADDRESS_PERMISSION_ERROR)) { + handleRemoteCreatePermissionError(e) + } + transport.condition = link.condition transport.close_tail() transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) } } + /** + * If an the artemis channel does not exist on the counterparty, then a create permission error is returned in the [event]. + * Do not retry messages to this channel as it will result in an infinite loop of retries. + * Log the error, mark the messages as acknowledged and clear them from the message queue. + */ + private fun handleRemoteCreatePermissionError(event: Event) { + val remoteP2PAddress = event.sender.source.address + logWarnWithMDC("Address does not exist on peer: $remoteP2PAddress. Marking messages sent to this address as Acknowledged.") + messageQueues[remoteP2PAddress]?.apply { + forEach { it.doComplete(MessageStatus.Acknowledged) } + clear() + } + } + override fun onLinkFinal(event: Event) { val link = event.link if (link is Sender) { diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 1fd59b9704..21c4567e0b 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -392,7 +392,7 @@ class ProtonWrapperTests { } @Test(timeout=300_000) - fun `Message sent from AMQP to non-existent Artemis inbox is rejected and client disconnects`() { + fun `Message sent from AMQP to non-existent Artemis inbox is marked as acknowledged to avoid infinite retries`() { val (server, artemisClient) = createArtemisServerAndClient() val amqpClient = createClient() // AmqpClient is set to auto-reconnect, there might be multiple connect/disconnect rounds @@ -412,8 +412,9 @@ class ProtonWrapperTests { testProperty["TestProp"] = "1" val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty) amqpClient.write(message) - assertEquals(MessageStatus.Rejected, message.onComplete.get()) - assertTrue(connectedStack.contains(false)) + assertEquals(MessageStatus.Acknowledged, message.onComplete.get()) + assertTrue(connectedStack.contains(true)) + assertEquals(1, connectedStack.size) amqpClient.stop() artemisClient.stop() server.stop()