diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index e08c11927a..6cb39ecfd2 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -66,6 +66,8 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) } + private fun logWarnWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.warn(msg, ex) } + private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) } val connection: Connection @@ -308,6 +310,16 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, } } + override fun onLinkRemoteClose(e: Event) { + val link = e.link + if(link.remoteCondition != null) { + logWarnWithMDC("Connection closed due to error on remote side: `${link.remoteCondition.description}`") + transport.condition = link.condition + transport.close_tail() + transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) + } + } + override fun onLinkFinal(event: Event) { val link = event.link if (link is Sender) { diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index d295b7ff06..93016870ce 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -362,6 +362,31 @@ class ProtonWrapperTests { } } + @Test + fun `Message sent from AMQP to non-existent Artemis inbox is rejected and client disconnects`() { + val (server, artemisClient) = createArtemisServerAndClient() + val amqpClient = createClient() + var connected = false + amqpClient.onConnection.subscribe { change -> + connected = change.connected + } + val clientConnected = amqpClient.onConnection.toFuture() + amqpClient.start() + assertEquals(true, clientConnected.get().connected) + assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) + val sendAddress = P2P_PREFIX + "Test" + val testData = "Test".toByteArray() + val testProperty = mutableMapOf() + testProperty["TestProp"] = "1" + val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty) + amqpClient.write(message) + assertEquals(MessageStatus.Rejected, message.onComplete.get()) + assertEquals(false, connected) + amqpClient.stop() + artemisClient.stop() + server.stop() + } + private fun createArtemisServerAndClient(maxMessageSize: Int = MAX_MESSAGE_SIZE): Pair { val artemisConfig = rigorousMock().also { doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory