diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 93ab5616de..2a4456ca36 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -201,9 +201,13 @@ open class AMQPBridgeManager(keyStore: CertificateStore, logInfoWithMDC("Stopping Artemis because stopping AMQP bridge") closeConsumer() consumer = null + val closingSession = session eventLoop.execute { artemis(ArtemisState.STOPPING) { - stopSession() + stopSession(session) + if(session != closingSession) { + stopSession(closingSession) + } session = null ArtemisState.STOPPED } @@ -271,19 +275,28 @@ open class AMQPBridgeManager(keyStore: CertificateStore, logInfoWithMDC("Stopping Artemis because AMQP bridge disconnected") closeConsumer() consumer = null + val closingSession = session eventLoop.execute { - artemis(ArtemisState.STOPPING) { - stopSession() - session = null - when (precedingState) { - ArtemisState.AMQP_STOPPED -> - ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff, - TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() }) - ArtemisState.AMQP_RESTARTED -> { - artemis(ArtemisState.AMQP_STARTING) { startOutbound() } - ArtemisState.AMQP_STARTING + synchronized(artemis!!) { + if (session == closingSession) { + artemis(ArtemisState.STOPPING) { + stopSession(session) + session = null + when (precedingState) { + ArtemisState.AMQP_STOPPED -> + ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff, + TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() }) + + ArtemisState.AMQP_RESTARTED -> { + artemis(ArtemisState.AMQP_STARTING) { startOutbound() } + ArtemisState.AMQP_STARTING + } + + else -> ArtemisState.STOPPED + } } - else -> ArtemisState.STOPPED + } else { + stopSession(closingSession) } } } @@ -339,10 +352,10 @@ open class AMQPBridgeManager(keyStore: CertificateStore, } } - private fun stopSession(): Boolean { + private fun stopSession(localSession: ClientSession?): Boolean { var stopped = false try { - session?.apply { + localSession?.apply { if (!isClosed) { stop() } @@ -356,7 +369,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore, } private fun restartSession(): Boolean { - if (!stopSession()) { + if (!stopSession(session)) { // Session timed out stopping. The request/responses can be out of sequence on the session now, so abandon it. session = null // The consumer is also dead now too as attached to the dead session.