mirror of
https://github.com/corda/corda.git
synced 2025-02-22 10:10:59 +00:00
ENT-2116: handle amqp client remote error, added test (#3636)
* ENT-2116: handle amqp client remote error, added test * ENT: 2116 rename test, added logging * ENT-2116: rename test to indicate its purpose
This commit is contained in:
parent
08b5cb6d5f
commit
53b398a460
@ -66,6 +66,8 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
|||||||
|
|
||||||
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
|
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
|
||||||
|
|
||||||
|
private fun logWarnWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.warn(msg, ex) }
|
||||||
|
|
||||||
private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) }
|
private fun logErrorWithMDC(msg: String, ex: Throwable? = null) = withMDC { log.error(msg, ex) }
|
||||||
|
|
||||||
val connection: Connection
|
val connection: Connection
|
||||||
@ -308,6 +310,16 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun onLinkRemoteClose(e: Event) {
|
||||||
|
val link = e.link
|
||||||
|
if(link.remoteCondition != null) {
|
||||||
|
logWarnWithMDC("Connection closed due to error on remote side: `${link.remoteCondition.description}`")
|
||||||
|
transport.condition = link.condition
|
||||||
|
transport.close_tail()
|
||||||
|
transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun onLinkFinal(event: Event) {
|
override fun onLinkFinal(event: Event) {
|
||||||
val link = event.link
|
val link = event.link
|
||||||
if (link is Sender) {
|
if (link is Sender) {
|
||||||
|
@ -362,6 +362,31 @@ class ProtonWrapperTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `Message sent from AMQP to non-existent Artemis inbox is rejected and client disconnects`() {
|
||||||
|
val (server, artemisClient) = createArtemisServerAndClient()
|
||||||
|
val amqpClient = createClient()
|
||||||
|
var connected = false
|
||||||
|
amqpClient.onConnection.subscribe { change ->
|
||||||
|
connected = change.connected
|
||||||
|
}
|
||||||
|
val clientConnected = amqpClient.onConnection.toFuture()
|
||||||
|
amqpClient.start()
|
||||||
|
assertEquals(true, clientConnected.get().connected)
|
||||||
|
assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal))
|
||||||
|
val sendAddress = P2P_PREFIX + "Test"
|
||||||
|
val testData = "Test".toByteArray()
|
||||||
|
val testProperty = mutableMapOf<String, Any?>()
|
||||||
|
testProperty["TestProp"] = "1"
|
||||||
|
val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty)
|
||||||
|
amqpClient.write(message)
|
||||||
|
assertEquals(MessageStatus.Rejected, message.onComplete.get())
|
||||||
|
assertEquals(false, connected)
|
||||||
|
amqpClient.stop()
|
||||||
|
artemisClient.stop()
|
||||||
|
server.stop()
|
||||||
|
}
|
||||||
|
|
||||||
private fun createArtemisServerAndClient(maxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
private fun createArtemisServerAndClient(maxMessageSize: Int = MAX_MESSAGE_SIZE): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
||||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||||
doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory
|
doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory
|
||||||
|
Loading…
x
Reference in New Issue
Block a user