From f7fb54dc8a7a119fa360d32f45d62a94fea042a9 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 16 May 2018 16:00:41 +0100 Subject: [PATCH] Detect duplicate bridge operation and terminate JVM to prevent re-ordering flow messages and causing odd behaviour. --- .../bridging/BridgeControlListener.kt | 42 +++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index 6d49eee754..cddfcfaeae 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientSession import rx.Observable import rx.subjects.PublishSubject import java.util.* @@ -39,11 +40,13 @@ class BridgeControlListener(val config: NodeSSLConfiguration, val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable { private val bridgeId: String = UUID.randomUUID().toString() private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId" + private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId" private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, artemisMessageClientFactory) private val validInboundQueues = mutableSetOf() private var artemis: ArtemisSessionProvider? = null private var controlConsumer: ClientConsumer? = null + private var notifyConsumer: ClientConsumer? = null constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, @@ -71,11 +74,21 @@ class BridgeControlListener(val config: NodeSSLConfiguration, artemis.start() val artemisClient = artemis.started!! val artemisSession = artemisClient.session + registerBridgeControlListener(artemisSession) + registerBridgeDuplicateChecker(artemisSession) + val startupMessage = BridgeControl.BridgeToNodeSnapshotRequest(bridgeId).serialize(context = SerializationDefaults.P2P_CONTEXT).bytes + val bridgeRequest = artemisSession.createMessage(false) + bridgeRequest.writeBodyBufferBytes(startupMessage) + artemisClient.producer.send(BRIDGE_NOTIFY, bridgeRequest) + } + + private fun registerBridgeControlListener(artemisSession: ClientSession) { try { artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue) } catch (ex: ActiveMQQueueExistsException) { // Ignore if there is a queue still not cleaned up } + val control = artemisSession.createConsumer(bridgeControlQueue) controlConsumer = control control.setMessageHandler { msg -> @@ -85,10 +98,28 @@ class BridgeControlListener(val config: NodeSSLConfiguration, log.error("Unable to process bridge control message", ex) } } - val startupMessage = BridgeControl.BridgeToNodeSnapshotRequest(bridgeId).serialize(context = SerializationDefaults.P2P_CONTEXT).bytes - val bridgeRequest = artemisSession.createMessage(false) - bridgeRequest.writeBodyBufferBytes(startupMessage) - artemisClient.producer.send(BRIDGE_NOTIFY, bridgeRequest) + } + + private fun registerBridgeDuplicateChecker(artemisSession: ClientSession) { + try { + artemisSession.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue) + } catch (ex: ActiveMQQueueExistsException) { + // Ignore if there is a queue still not cleaned up + } + val notify = artemisSession.createConsumer(bridgeNotifyQueue) + notifyConsumer = notify + notify.setMessageHandler { msg -> + try { + val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) } + val notifyMessage = data.deserialize(context = SerializationDefaults.P2P_CONTEXT) + if (notifyMessage.bridgeIdentity != bridgeId) { + log.error("Fatal Error! Two bridges have been configured simultaneously! Check the enterpriseConfiguration.externalBridge status") + System.exit(1) + } + } catch (ex: Exception) { + log.error("Unable to process bridge notification message", ex) + } + } } fun stop() { @@ -98,8 +129,11 @@ class BridgeControlListener(val config: NodeSSLConfiguration, validInboundQueues.clear() controlConsumer?.close() controlConsumer = null + notifyConsumer?.close() + notifyConsumer = null artemis?.apply { started?.session?.deleteQueue(bridgeControlQueue) + started?.session?.deleteQueue(bridgeNotifyQueue) stop() } artemis = null