Merging forward updates from release/os/4.11 to release/os/4.12 - 2024-01-30

This commit is contained in:
r3-build 2024-01-30 15:44:32 +00:00
commit 5bfc39c364

View File

@ -201,9 +201,13 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
logInfoWithMDC("Stopping Artemis because stopping AMQP bridge")
closeConsumer()
consumer = null
val closingSession = session
eventLoop.execute {
artemis(ArtemisState.STOPPING) {
stopSession()
stopSession(session)
if(session != closingSession) {
stopSession(closingSession)
}
session = null
ArtemisState.STOPPED
}
@ -271,19 +275,28 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
logInfoWithMDC("Stopping Artemis because AMQP bridge disconnected")
closeConsumer()
consumer = null
val closingSession = session
eventLoop.execute {
artemis(ArtemisState.STOPPING) {
stopSession()
session = null
when (precedingState) {
ArtemisState.AMQP_STOPPED ->
ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff,
TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() })
ArtemisState.AMQP_RESTARTED -> {
artemis(ArtemisState.AMQP_STARTING) { startOutbound() }
ArtemisState.AMQP_STARTING
synchronized(artemis!!) {
if (session == closingSession) {
artemis(ArtemisState.STOPPING) {
stopSession(session)
session = null
when (precedingState) {
ArtemisState.AMQP_STOPPED ->
ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff,
TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() })
ArtemisState.AMQP_RESTARTED -> {
artemis(ArtemisState.AMQP_STARTING) { startOutbound() }
ArtemisState.AMQP_STARTING
}
else -> ArtemisState.STOPPED
}
}
else -> ArtemisState.STOPPED
} else {
stopSession(closingSession)
}
}
}
@ -339,10 +352,10 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
}
}
private fun stopSession(): Boolean {
private fun stopSession(localSession: ClientSession?): Boolean {
var stopped = false
try {
session?.apply {
localSession?.apply {
if (!isClosed) {
stop()
}
@ -356,7 +369,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
}
private fun restartSession(): Boolean {
if (!stopSession()) {
if (!stopSession(session)) {
// Session timed out stopping. The request/responses can be out of sequence on the session now, so abandon it.
session = null
// The consumer is also dead now too as attached to the dead session.