mirror of
https://github.com/corda/corda.git
synced 2024-12-24 15:16:45 +00:00
Merge pull request #7667 from corda/merge-release/os/4.9-release/os/4.10-2024-01-30-95
ENT-11387: Merging forward updates from release/os/4.9 to release/os/4.10 - 2024-01-30
This commit is contained in:
commit
4faa0caeb3
@ -201,9 +201,13 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
logInfoWithMDC("Stopping Artemis because stopping AMQP bridge")
|
||||
closeConsumer()
|
||||
consumer = null
|
||||
val closingSession = session
|
||||
eventLoop.execute {
|
||||
artemis(ArtemisState.STOPPING) {
|
||||
stopSession()
|
||||
stopSession(session)
|
||||
if(session != closingSession) {
|
||||
stopSession(closingSession)
|
||||
}
|
||||
session = null
|
||||
ArtemisState.STOPPED
|
||||
}
|
||||
@ -271,19 +275,28 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
logInfoWithMDC("Stopping Artemis because AMQP bridge disconnected")
|
||||
closeConsumer()
|
||||
consumer = null
|
||||
val closingSession = session
|
||||
eventLoop.execute {
|
||||
artemis(ArtemisState.STOPPING) {
|
||||
stopSession()
|
||||
session = null
|
||||
when (precedingState) {
|
||||
ArtemisState.AMQP_STOPPED ->
|
||||
ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff,
|
||||
TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() })
|
||||
ArtemisState.AMQP_RESTARTED -> {
|
||||
artemis(ArtemisState.AMQP_STARTING) { startOutbound() }
|
||||
ArtemisState.AMQP_STARTING
|
||||
synchronized(artemis!!) {
|
||||
if (session == closingSession) {
|
||||
artemis(ArtemisState.STOPPING) {
|
||||
stopSession(session)
|
||||
session = null
|
||||
when (precedingState) {
|
||||
ArtemisState.AMQP_STOPPED ->
|
||||
ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff,
|
||||
TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() })
|
||||
|
||||
ArtemisState.AMQP_RESTARTED -> {
|
||||
artemis(ArtemisState.AMQP_STARTING) { startOutbound() }
|
||||
ArtemisState.AMQP_STARTING
|
||||
}
|
||||
|
||||
else -> ArtemisState.STOPPED
|
||||
}
|
||||
}
|
||||
else -> ArtemisState.STOPPED
|
||||
} else {
|
||||
stopSession(closingSession)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -339,10 +352,10 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
}
|
||||
|
||||
private fun stopSession(): Boolean {
|
||||
private fun stopSession(localSession: ClientSession?): Boolean {
|
||||
var stopped = false
|
||||
try {
|
||||
session?.apply {
|
||||
localSession?.apply {
|
||||
if (!isClosed) {
|
||||
stop()
|
||||
}
|
||||
@ -356,7 +369,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
|
||||
private fun restartSession(): Boolean {
|
||||
if (!stopSession()) {
|
||||
if (!stopSession(session)) {
|
||||
// Session timed out stopping. The request/responses can be out of sequence on the session now, so abandon it.
|
||||
session = null
|
||||
// The consumer is also dead now too as attached to the dead session.
|
||||
|
Loading…
Reference in New Issue
Block a user