mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
ENT-11387: Fix to prevent interleaved stop/start causing bridge to be started with null session. (#7665)
* ENT-11387: Fix to prevent interleaved stop/start causing bridge to be started with null session. * ENT-11387: Fixed bug in assigning null to session.
This commit is contained in:
parent
4793cd5f31
commit
fa173baaee
@ -201,9 +201,13 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
logInfoWithMDC("Stopping Artemis because stopping AMQP bridge")
|
||||
closeConsumer()
|
||||
consumer = null
|
||||
val closingSession = session
|
||||
eventLoop.execute {
|
||||
artemis(ArtemisState.STOPPING) {
|
||||
stopSession()
|
||||
stopSession(session)
|
||||
if(session != closingSession) {
|
||||
stopSession(closingSession)
|
||||
}
|
||||
session = null
|
||||
ArtemisState.STOPPED
|
||||
}
|
||||
@ -271,19 +275,28 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
logInfoWithMDC("Stopping Artemis because AMQP bridge disconnected")
|
||||
closeConsumer()
|
||||
consumer = null
|
||||
val closingSession = session
|
||||
eventLoop.execute {
|
||||
artemis(ArtemisState.STOPPING) {
|
||||
stopSession()
|
||||
session = null
|
||||
when (precedingState) {
|
||||
ArtemisState.AMQP_STOPPED ->
|
||||
ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff,
|
||||
TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() })
|
||||
ArtemisState.AMQP_RESTARTED -> {
|
||||
artemis(ArtemisState.AMQP_STARTING) { startOutbound() }
|
||||
ArtemisState.AMQP_STARTING
|
||||
synchronized(artemis!!) {
|
||||
if (session == closingSession) {
|
||||
artemis(ArtemisState.STOPPING) {
|
||||
stopSession(session)
|
||||
session = null
|
||||
when (precedingState) {
|
||||
ArtemisState.AMQP_STOPPED ->
|
||||
ArtemisState.STOPPED_AMQP_START_SCHEDULED(scheduledArtemis(artemisHeartbeatPlusBackoff,
|
||||
TimeUnit.MILLISECONDS, ArtemisState.AMQP_STARTING) { startOutbound() })
|
||||
|
||||
ArtemisState.AMQP_RESTARTED -> {
|
||||
artemis(ArtemisState.AMQP_STARTING) { startOutbound() }
|
||||
ArtemisState.AMQP_STARTING
|
||||
}
|
||||
|
||||
else -> ArtemisState.STOPPED
|
||||
}
|
||||
}
|
||||
else -> ArtemisState.STOPPED
|
||||
} else {
|
||||
stopSession(closingSession)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -339,10 +352,10 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
}
|
||||
|
||||
private fun stopSession(): Boolean {
|
||||
private fun stopSession(localSession: ClientSession?): Boolean {
|
||||
var stopped = false
|
||||
try {
|
||||
session?.apply {
|
||||
localSession?.apply {
|
||||
if (!isClosed) {
|
||||
stop()
|
||||
}
|
||||
@ -356,7 +369,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
|
||||
}
|
||||
|
||||
private fun restartSession(): Boolean {
|
||||
if (!stopSession()) {
|
||||
if (!stopSession(session)) {
|
||||
// Session timed out stopping. The request/responses can be out of sequence on the session now, so abandon it.
|
||||
session = null
|
||||
// The consumer is also dead now too as attached to the dead session.
|
||||
|
Loading…
Reference in New Issue
Block a user