Merging forward updates from release/os/4.10 to release/os/4.11 - 2024-01-30

This commit is contained in:
r3-build 2024-01-30 11:51:24 +00:00
commit b6c2d17dae

View File

@ -201,9 +201,13 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
logInfoWithMDC("Stopping Artemis because stopping AMQP bridge") logInfoWithMDC("Stopping Artemis because stopping AMQP bridge")
closeConsumer() closeConsumer()
consumer = null consumer = null
val closingSession = session
eventLoop.execute { eventLoop.execute {
artemis(ArtemisState.STOPPING) { artemis(ArtemisState.STOPPING) {
stopSession() stopSession(session)
if(session != closingSession) {
stopSession(closingSession)
}
session = null session = null
ArtemisState.STOPPED ArtemisState.STOPPED
} }
@ -271,19 +275,28 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
logInfoWithMDC("Stopping Artemis because AMQP bridge disconnected") logInfoWithMDC("Stopping Artemis because AMQP bridge disconnected")
closeConsumer() closeConsumer()
consumer = null consumer = null
val closingSession = session
eventLoop.execute { eventLoop.execute {
artemis(ArtemisState.STOPPING) { synchronized(artemis!!) {
stopSession() if (session == closingSession) {
session = null artemis(ArtemisState.STOPPING) {
when (precedingState) { stopSession(session)
ArtemisState.AMQP_STOPPED -> session = null
ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff, when (precedingState) {
TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() }) ArtemisState.AMQP_STOPPED ->
ArtemisState.AMQP_RESTARTED -> { ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff,
artemis(ArtemisState.AMQP_STARTING) { startOutbound() } TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() })
ArtemisState.AMQP_STARTING
ArtemisState.AMQP_RESTARTED -> {
artemis(ArtemisState.AMQP_STARTING) { startOutbound() }
ArtemisState.AMQP_STARTING
}
else -> ArtemisState.STOPPED
}
} }
else -> ArtemisState.STOPPED } else {
stopSession(closingSession)
} }
} }
} }
@ -339,10 +352,10 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
} }
} }
private fun stopSession(): Boolean { private fun stopSession(localSession: ClientSession?): Boolean {
var stopped = false var stopped = false
try { try {
session?.apply { localSession?.apply {
if (!isClosed) { if (!isClosed) {
stop() stop()
} }
@ -356,7 +369,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
} }
private fun restartSession(): Boolean { private fun restartSession(): Boolean {
if (!stopSession()) { if (!stopSession(session)) {
// Session timed out stopping. The request/responses can be out of sequence on the session now, so abandon it. // Session timed out stopping. The request/responses can be out of sequence on the session now, so abandon it.
session = null session = null
// The consumer is also dead now too as attached to the dead session. // The consumer is also dead now too as attached to the dead session.