diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index 0e88c263e9..5d542e1964 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -229,12 +229,17 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, } override fun onTransportClosed(event: Event) { - val transport = event.transport - logDebugWithMDC { "Transport Closed ${transport.prettyPrint}" } - if (transport == this.transport) { + doTransportClose(event.transport) { "Transport Closed ${transport.prettyPrint}" } + } + + private fun doTransportClose(transport: Transport?, msg: () -> String) { + if (transport != null && transport == this.transport && transport.context != null) { + logDebugWithMDC(msg) transport.unbind() transport.free() transport.context = null + } else { + logDebugWithMDC { "Nothing to do in case of: ${msg()}" } } } @@ -264,6 +269,9 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, val channel = connection?.context as? Channel channel?.writeAndFlush(transport) } + } else { + logDebugWithMDC { "Transport is already closed: ${transport.prettyPrint}" } + doTransportClose(transport) { "Freeing-up resources associated with transport" } } } @@ -314,13 +322,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, // If TRANSPORT_CLOSED event was already processed, the 'transport' in all subsequent events is set to null. // There is, however, a chance of missing TRANSPORT_CLOSED event, e.g. when disconnect occurs before opening remote session. // In such cases we must explicitly cleanup the 'transport' in order to guarantee the delivery of CONNECTION_FINAL event. - val transport = event.transport - if (transport == this.transport) { - logDebugWithMDC { "Missed TRANSPORT_CLOSED: force cleanup ${transport.prettyPrint}" } - transport.unbind() - transport.free() - transport.context = null - } + doTransportClose(event.transport) { "Missed TRANSPORT_CLOSED in onSessionFinal: force cleanup ${transport.prettyPrint}" } } } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 874d1feca4..f33d9a8d1c 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -386,13 +386,16 @@ class ProtonWrapperTests { fun `Message sent from AMQP to non-existent Artemis inbox is rejected and client disconnects`() { val (server, artemisClient) = createArtemisServerAndClient() val amqpClient = createClient() - var connected = false + // AmqpClient is set to auto-reconnect, there might be multiple connect/disconnect rounds + val connectedStack = mutableListOf() amqpClient.onConnection.subscribe { change -> - connected = change.connected + connectedStack.add(change.connected) } val clientConnected = amqpClient.onConnection.toFuture() amqpClient.start() assertEquals(true, clientConnected.get().connected) + assertEquals(1, connectedStack.size) + assertTrue(connectedStack.contains(true)) assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) val sendAddress = P2P_PREFIX + "Test" val testData = "Test".toByteArray() @@ -401,7 +404,7 @@ class ProtonWrapperTests { val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty) amqpClient.write(message) assertEquals(MessageStatus.Rejected, message.onComplete.get()) - assertEquals(false, connected) + assertTrue(connectedStack.contains(false)) amqpClient.stop() artemisClient.stop() server.stop()