Detect duplicate bridge operation and terminate JVM to prevent re-ordering flow messages and causing odd behaviour.

This commit is contained in:
Matthew Nesbit 2018-05-16 16:00:41 +01:00
parent b07d5b3f26
commit f7fb54dc8a

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession
import rx.Observable
import rx.subjects.PublishSubject
import java.util.*
@ -39,11 +40,13 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId"
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize,
artemisMessageClientFactory)
private val validInboundQueues = mutableSetOf<String>()
private var artemis: ArtemisSessionProvider? = null
private var controlConsumer: ClientConsumer? = null
private var notifyConsumer: ClientConsumer? = null
constructor(config: NodeSSLConfiguration,
p2pAddress: NetworkHostAndPort,
@ -71,11 +74,21 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
artemis.start()
val artemisClient = artemis.started!!
val artemisSession = artemisClient.session
registerBridgeControlListener(artemisSession)
registerBridgeDuplicateChecker(artemisSession)
val startupMessage = BridgeControl.BridgeToNodeSnapshotRequest(bridgeId).serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val bridgeRequest = artemisSession.createMessage(false)
bridgeRequest.writeBodyBufferBytes(startupMessage)
artemisClient.producer.send(BRIDGE_NOTIFY, bridgeRequest)
}
private fun registerBridgeControlListener(artemisSession: ClientSession) {
try {
artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue)
} catch (ex: ActiveMQQueueExistsException) {
// Ignore if there is a queue still not cleaned up
}
val control = artemisSession.createConsumer(bridgeControlQueue)
controlConsumer = control
control.setMessageHandler { msg ->
@ -85,10 +98,28 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
log.error("Unable to process bridge control message", ex)
}
}
val startupMessage = BridgeControl.BridgeToNodeSnapshotRequest(bridgeId).serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val bridgeRequest = artemisSession.createMessage(false)
bridgeRequest.writeBodyBufferBytes(startupMessage)
artemisClient.producer.send(BRIDGE_NOTIFY, bridgeRequest)
}
private fun registerBridgeDuplicateChecker(artemisSession: ClientSession) {
try {
artemisSession.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue)
} catch (ex: ActiveMQQueueExistsException) {
// Ignore if there is a queue still not cleaned up
}
val notify = artemisSession.createConsumer(bridgeNotifyQueue)
notifyConsumer = notify
notify.setMessageHandler { msg ->
try {
val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
val notifyMessage = data.deserialize<BridgeControl.BridgeToNodeSnapshotRequest>(context = SerializationDefaults.P2P_CONTEXT)
if (notifyMessage.bridgeIdentity != bridgeId) {
log.error("Fatal Error! Two bridges have been configured simultaneously! Check the enterpriseConfiguration.externalBridge status")
System.exit(1)
}
} catch (ex: Exception) {
log.error("Unable to process bridge notification message", ex)
}
}
}
fun stop() {
@ -98,8 +129,11 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
validInboundQueues.clear()
controlConsumer?.close()
controlConsumer = null
notifyConsumer?.close()
notifyConsumer = null
artemis?.apply {
started?.session?.deleteQueue(bridgeControlQueue)
started?.session?.deleteQueue(bridgeNotifyQueue)
stop()
}
artemis = null