From 50a1819e47c73af3b67b231643f1387e5cc41075 Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Wed, 3 Oct 2018 18:56:15 +0100 Subject: [PATCH] ENT-2559: Gracefully handle Artemis connectivity loss during Bridge leader election (#1436) * ENT-2559: KDocs update and change visibility identifiers. * ENT-2559: Handle upstream dependencies going up and down. * ENT-2500: Address code review comments by @mnesbit --- .../ha/ExternalMasterElectionService.kt | 122 ++++++++++++------ .../BridgeSupervisorServiceImpl.kt | 21 +-- .../nodeapi/internal/zookeeper/ZkLeader.kt | 20 +-- 3 files changed, 101 insertions(+), 62 deletions(-) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt index c2ad674fbf..49131f6786 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt @@ -1,22 +1,31 @@ package net.corda.bridge.services.ha +import net.corda.bridge.services.api.BridgeArtemisConnectionService import net.corda.bridge.services.api.BridgeMasterService import net.corda.bridge.services.api.FirewallAuditService import net.corda.bridge.services.api.FirewallConfiguration import net.corda.bridge.services.api.ServiceStateSupport +import net.corda.bridge.services.util.ServiceStateCombiner import net.corda.bridge.services.util.ServiceStateHelper import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener import net.corda.nodeapi.internal.zookeeper.ZkClient import net.corda.nodeapi.internal.zookeeper.ZkLeader +import rx.Subscription import java.lang.management.ManagementFactory import java.net.InetAddress import java.util.concurrent.Executors import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock -class ExternalMasterElectionService(val conf: FirewallConfiguration, - val auditService: FirewallAuditService, +/** + * Election service which uses ZooKeeper for master election purposes. + */ +class ExternalMasterElectionService(private val conf: FirewallConfiguration, + private val auditService: FirewallAuditService, + artemisService: BridgeArtemisConnectionService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeMasterService, ServiceStateSupport by stateHelper { private var haElector: ZkLeader? = null @@ -24,9 +33,14 @@ class ExternalMasterElectionService(val conf: FirewallConfiguration, private val scheduler = Executors.newSingleThreadScheduledExecutor() private var becomeMasterFuture: ScheduledFuture<*>? = null - companion object { - val log = contextLogger() - const val DELAYED_LEADER_START = 5000L + private var statusSubscriber: Subscription? = null + private val statusFollower = ServiceStateCombiner(listOf(artemisService)) + + private val activeTransitionLock = ReentrantLock() + + private companion object { + private val log = contextLogger() + private const val DELAYED_LEADER_START = 5000L } init { @@ -50,48 +64,72 @@ class ExternalMasterElectionService(val conf: FirewallConfiguration, } override fun start() { - val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",") - val leaderPriority = conf.haConfig!!.haPriority - val hostName: String = InetAddress.getLocalHost().hostName - val info = ManagementFactory.getRuntimeMXBean() - val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this - val nodeId = "$hostName:$pid" - val leaderElector = ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority) - haElector = leaderElector - val listener = object : CordaLeaderListener { - override fun notLeader() { - auditService.statusChangeEvent("Leadership loss signalled from Zookeeper") - becomeSlave() + statusSubscriber = statusFollower.activeChange.subscribe({ ready -> + if (ready) { + log.info("Activating as result of upstream dependencies ready") + activate() + } else { + log.info("Deactivating due to upstream dependencies not ready") + deactivate() } - - override fun isLeader() { - log.info("Zookeeper has signalled leadership acquired. Delay master claim for a short period to allow old master to close") - becomeMasterFuture?.apply { - cancel(false) - } - becomeMasterFuture = scheduler.schedule(::becomeMaster, DELAYED_LEADER_START, TimeUnit.MILLISECONDS) - } - - } - leaderListener = listener - leaderElector.addLeadershipListener(listener) - leaderElector.start() - auditService.statusChangeEvent("Requesting leadership from Zookeeper") - leaderElector.requestLeadership() + }, { log.error("Error in state change", it) }) } override fun stop() { auditService.statusChangeEvent("Stop requested") - becomeSlave() - haElector?.apply { - if (leaderListener != null) { - removeLeadershipListener(leaderListener!!) - } - relinquishLeadership() - close() - } - haElector = null - leaderListener = null + deactivate() + statusSubscriber?.unsubscribe() + statusSubscriber = null } + /** + * Registers itself for leader election and installs [CordaLeaderListener] hook in case it becomes a leader. + */ + private fun activate() { + activeTransitionLock.withLock { + val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",") + val leaderPriority = conf.haConfig!!.haPriority + val hostName: String = InetAddress.getLocalHost().hostName + val info = ManagementFactory.getRuntimeMXBean() + val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this + val nodeId = "$hostName:$pid" + val leaderElector = ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority) + haElector = leaderElector + val listener = object : CordaLeaderListener { + override fun notLeader() { + auditService.statusChangeEvent("Leadership loss signalled from Zookeeper") + becomeSlave() + } + + override fun isLeader() { + log.info("Zookeeper has signalled leadership acquired. Delay master claim for a short period to allow old master to close") + becomeMasterFuture?.apply { + cancel(false) + } + becomeMasterFuture = scheduler.schedule(::becomeMaster, DELAYED_LEADER_START, TimeUnit.MILLISECONDS) + } + } + leaderListener = listener + leaderElector.addLeadershipListener(listener) + leaderElector.start() + auditService.statusChangeEvent("Requesting leadership from Zookeeper") + leaderElector.requestLeadership() + } + } + + /** + * Becoming slave, relinquishing leadership (if leader) and withdraws from future leader election process. + */ + private fun deactivate() { + activeTransitionLock.withLock { + becomeSlave() + haElector?.apply { + leaderListener?.apply { removeLeadershipListener(this) } + relinquishLeadership() + close() + } + haElector = null + leaderListener = null + } + } } \ No newline at end of file diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt index 2105e92dc5..745baf96f1 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt @@ -11,21 +11,22 @@ import net.corda.bridge.services.sender.DirectBridgeSenderService import net.corda.bridge.services.util.ServiceStateCombiner import net.corda.bridge.services.util.ServiceStateHelper import net.corda.core.utilities.contextLogger +import org.slf4j.Logger import org.slf4j.LoggerFactory import rx.Subscription -class BridgeSupervisorServiceImpl(val conf: FirewallConfiguration, +class BridgeSupervisorServiceImpl(conf: FirewallConfiguration, maxMessageSize: Int, - val auditService: FirewallAuditService, + auditService: FirewallAuditService, inProcessAMQPListenerService: BridgeAMQPListenerService?, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeSupervisorService, ServiceStateSupport by stateHelper { companion object { - val log = contextLogger() - val consoleLogger = LoggerFactory.getLogger("BasicInfo") + private val log = contextLogger() + private val consoleLogger : Logger = LoggerFactory.getLogger("BasicInfo") } private val haService: BridgeMasterService - private val artemisService: BridgeArtemisConnectionServiceImpl + private val artemisService: BridgeArtemisConnectionService private val senderService: BridgeSenderService private val receiverService: BridgeReceiverService private val filterService: IncomingMessageFilterService @@ -33,12 +34,12 @@ class BridgeSupervisorServiceImpl(val conf: FirewallConfiguration, private var statusSubscriber: Subscription? = null init { - if (conf.haConfig == null) { - haService = SingleInstanceMasterService(conf, auditService) - } else { - haService = ExternalMasterElectionService(conf, auditService) - } artemisService = BridgeArtemisConnectionServiceImpl(conf, maxMessageSize, auditService) + haService = if (conf.haConfig == null) { + SingleInstanceMasterService(conf, auditService) + } else { + ExternalMasterElectionService(conf, auditService, artemisService) + } senderService = DirectBridgeSenderService(conf, maxMessageSize, auditService, haService, artemisService) filterService = SimpleMessageFilterService(conf, auditService, artemisService, senderService) receiverService = if (conf.firewallMode == FirewallMode.SenderReceiver) { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt index e700358f37..63c534ea2b 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/zookeeper/ZkLeader.kt @@ -5,20 +5,20 @@ package net.corda.nodeapi.internal.zookeeper * Listener interface for leader election results, to avoid public reference to shadowed curator classes. */ interface CordaLeaderListener { - /** - * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true. - * - * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If - * this occurs, you can expect {@link #notLeader()} to also be called. - */ - fun notLeader() - /** * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false. * * Note that it is possible that by the time this method call happens, hasLeadership has become true. If * this occurs, you can expect {@link #isLeader()} to also be called. */ + fun notLeader() + + /** + * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true. + * + * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If + * this occurs, you can expect {@link #notLeader()} to also be called. + */ fun isLeader() } @@ -58,12 +58,12 @@ interface ZkLeader { fun removeLeadershipListener(listener: CordaLeaderListener) /** - * @return [true] if client is the current leader, [false] otherwise + * @return `true` if client is the current leader, `false` otherwise */ fun isLeader(): Boolean /** - * @return [true] if client is started, [false] otherwise + * @return `true` if client is started, `false` otherwise */ fun isStarted(): Boolean } \ No newline at end of file