ENT-5089: Forcibly free associated with transport resources if it is already closed (#6130)

This commit is contained in:
Viktor Kolomeyko 2020-04-03 17:30:29 +01:00 committed by GitHub
parent 04963e7f67
commit 1d7c13276c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 13 deletions

View File

@ -229,12 +229,17 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
} }
override fun onTransportClosed(event: Event) { override fun onTransportClosed(event: Event) {
val transport = event.transport doTransportClose(event.transport) { "Transport Closed ${transport.prettyPrint}" }
logDebugWithMDC { "Transport Closed ${transport.prettyPrint}" } }
if (transport == this.transport) {
private fun doTransportClose(transport: Transport?, msg: () -> String) {
if (transport != null && transport == this.transport && transport.context != null) {
logDebugWithMDC(msg)
transport.unbind() transport.unbind()
transport.free() transport.free()
transport.context = null transport.context = null
} else {
logDebugWithMDC { "Nothing to do in case of: ${msg()}" }
} }
} }
@ -264,6 +269,9 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
val channel = connection?.context as? Channel val channel = connection?.context as? Channel
channel?.writeAndFlush(transport) channel?.writeAndFlush(transport)
} }
} else {
logDebugWithMDC { "Transport is already closed: ${transport.prettyPrint}" }
doTransportClose(transport) { "Freeing-up resources associated with transport" }
} }
} }
@ -314,13 +322,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
// If TRANSPORT_CLOSED event was already processed, the 'transport' in all subsequent events is set to null. // If TRANSPORT_CLOSED event was already processed, the 'transport' in all subsequent events is set to null.
// There is, however, a chance of missing TRANSPORT_CLOSED event, e.g. when disconnect occurs before opening remote session. // There is, however, a chance of missing TRANSPORT_CLOSED event, e.g. when disconnect occurs before opening remote session.
// In such cases we must explicitly cleanup the 'transport' in order to guarantee the delivery of CONNECTION_FINAL event. // In such cases we must explicitly cleanup the 'transport' in order to guarantee the delivery of CONNECTION_FINAL event.
val transport = event.transport doTransportClose(event.transport) { "Missed TRANSPORT_CLOSED in onSessionFinal: force cleanup ${transport.prettyPrint}" }
if (transport == this.transport) {
logDebugWithMDC { "Missed TRANSPORT_CLOSED: force cleanup ${transport.prettyPrint}" }
transport.unbind()
transport.free()
transport.context = null
}
} }
} }

View File

@ -386,13 +386,16 @@ class ProtonWrapperTests {
fun `Message sent from AMQP to non-existent Artemis inbox is rejected and client disconnects`() { fun `Message sent from AMQP to non-existent Artemis inbox is rejected and client disconnects`() {
val (server, artemisClient) = createArtemisServerAndClient() val (server, artemisClient) = createArtemisServerAndClient()
val amqpClient = createClient() val amqpClient = createClient()
var connected = false // AmqpClient is set to auto-reconnect, there might be multiple connect/disconnect rounds
val connectedStack = mutableListOf<Boolean>()
amqpClient.onConnection.subscribe { change -> amqpClient.onConnection.subscribe { change ->
connected = change.connected connectedStack.add(change.connected)
} }
val clientConnected = amqpClient.onConnection.toFuture() val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start() amqpClient.start()
assertEquals(true, clientConnected.get().connected) assertEquals(true, clientConnected.get().connected)
assertEquals(1, connectedStack.size)
assertTrue(connectedStack.contains(true))
assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal))
val sendAddress = P2P_PREFIX + "Test" val sendAddress = P2P_PREFIX + "Test"
val testData = "Test".toByteArray() val testData = "Test".toByteArray()
@ -401,7 +404,7 @@ class ProtonWrapperTests {
val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty) val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty)
amqpClient.write(message) amqpClient.write(message)
assertEquals(MessageStatus.Rejected, message.onComplete.get()) assertEquals(MessageStatus.Rejected, message.onComplete.get())
assertEquals(false, connected) assertTrue(connectedStack.contains(false))
amqpClient.stop() amqpClient.stop()
artemisClient.stop() artemisClient.stop()
server.stop() server.stop()