ENT-2559: Gracefully handle Artemis connectivity loss during Bridge leader election (#1436)

* ENT-2559: KDocs update and change visibility identifiers.

* ENT-2559: Handle upstream dependencies going up and down.

* ENT-2500: Address code review comments by @mnesbit
This commit is contained in:
Viktor Kolomeyko 2018-10-03 18:56:15 +01:00 committed by GitHub
parent a037c59e3a
commit 50a1819e47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 62 deletions

View File

@ -1,22 +1,31 @@
package net.corda.bridge.services.ha
import net.corda.bridge.services.api.BridgeArtemisConnectionService
import net.corda.bridge.services.api.BridgeMasterService
import net.corda.bridge.services.api.FirewallAuditService
import net.corda.bridge.services.api.FirewallConfiguration
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener
import net.corda.nodeapi.internal.zookeeper.ZkClient
import net.corda.nodeapi.internal.zookeeper.ZkLeader
import rx.Subscription
import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
class ExternalMasterElectionService(val conf: FirewallConfiguration,
val auditService: FirewallAuditService,
/**
* Election service which uses ZooKeeper for master election purposes.
*/
class ExternalMasterElectionService(private val conf: FirewallConfiguration,
private val auditService: FirewallAuditService,
artemisService: BridgeArtemisConnectionService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeMasterService, ServiceStateSupport by stateHelper {
private var haElector: ZkLeader? = null
@ -24,9 +33,14 @@ class ExternalMasterElectionService(val conf: FirewallConfiguration,
private val scheduler = Executors.newSingleThreadScheduledExecutor()
private var becomeMasterFuture: ScheduledFuture<*>? = null
companion object {
val log = contextLogger()
const val DELAYED_LEADER_START = 5000L
private var statusSubscriber: Subscription? = null
private val statusFollower = ServiceStateCombiner(listOf(artemisService))
private val activeTransitionLock = ReentrantLock()
private companion object {
private val log = contextLogger()
private const val DELAYED_LEADER_START = 5000L
}
init {
@ -50,48 +64,72 @@ class ExternalMasterElectionService(val conf: FirewallConfiguration,
}
override fun start() {
val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",")
val leaderPriority = conf.haConfig!!.haPriority
val hostName: String = InetAddress.getLocalHost().hostName
val info = ManagementFactory.getRuntimeMXBean()
val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this
val nodeId = "$hostName:$pid"
val leaderElector = ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority)
haElector = leaderElector
val listener = object : CordaLeaderListener {
override fun notLeader() {
auditService.statusChangeEvent("Leadership loss signalled from Zookeeper")
becomeSlave()
statusSubscriber = statusFollower.activeChange.subscribe({ ready ->
if (ready) {
log.info("Activating as result of upstream dependencies ready")
activate()
} else {
log.info("Deactivating due to upstream dependencies not ready")
deactivate()
}
override fun isLeader() {
log.info("Zookeeper has signalled leadership acquired. Delay master claim for a short period to allow old master to close")
becomeMasterFuture?.apply {
cancel(false)
}
becomeMasterFuture = scheduler.schedule(::becomeMaster, DELAYED_LEADER_START, TimeUnit.MILLISECONDS)
}
}
leaderListener = listener
leaderElector.addLeadershipListener(listener)
leaderElector.start()
auditService.statusChangeEvent("Requesting leadership from Zookeeper")
leaderElector.requestLeadership()
}, { log.error("Error in state change", it) })
}
override fun stop() {
auditService.statusChangeEvent("Stop requested")
becomeSlave()
haElector?.apply {
if (leaderListener != null) {
removeLeadershipListener(leaderListener!!)
}
relinquishLeadership()
close()
}
haElector = null
leaderListener = null
deactivate()
statusSubscriber?.unsubscribe()
statusSubscriber = null
}
/**
* Registers itself for leader election and installs [CordaLeaderListener] hook in case it becomes a leader.
*/
private fun activate() {
activeTransitionLock.withLock {
val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",")
val leaderPriority = conf.haConfig!!.haPriority
val hostName: String = InetAddress.getLocalHost().hostName
val info = ManagementFactory.getRuntimeMXBean()
val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this
val nodeId = "$hostName:$pid"
val leaderElector = ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority)
haElector = leaderElector
val listener = object : CordaLeaderListener {
override fun notLeader() {
auditService.statusChangeEvent("Leadership loss signalled from Zookeeper")
becomeSlave()
}
override fun isLeader() {
log.info("Zookeeper has signalled leadership acquired. Delay master claim for a short period to allow old master to close")
becomeMasterFuture?.apply {
cancel(false)
}
becomeMasterFuture = scheduler.schedule(::becomeMaster, DELAYED_LEADER_START, TimeUnit.MILLISECONDS)
}
}
leaderListener = listener
leaderElector.addLeadershipListener(listener)
leaderElector.start()
auditService.statusChangeEvent("Requesting leadership from Zookeeper")
leaderElector.requestLeadership()
}
}
/**
* Becoming slave, relinquishing leadership (if leader) and withdraws from future leader election process.
*/
private fun deactivate() {
activeTransitionLock.withLock {
becomeSlave()
haElector?.apply {
leaderListener?.apply { removeLeadershipListener(this) }
relinquishLeadership()
close()
}
haElector = null
leaderListener = null
}
}
}

View File

@ -11,21 +11,22 @@ import net.corda.bridge.services.sender.DirectBridgeSenderService
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Subscription
class BridgeSupervisorServiceImpl(val conf: FirewallConfiguration,
class BridgeSupervisorServiceImpl(conf: FirewallConfiguration,
maxMessageSize: Int,
val auditService: FirewallAuditService,
auditService: FirewallAuditService,
inProcessAMQPListenerService: BridgeAMQPListenerService?,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeSupervisorService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
val consoleLogger = LoggerFactory.getLogger("BasicInfo")
private val log = contextLogger()
private val consoleLogger : Logger = LoggerFactory.getLogger("BasicInfo")
}
private val haService: BridgeMasterService
private val artemisService: BridgeArtemisConnectionServiceImpl
private val artemisService: BridgeArtemisConnectionService
private val senderService: BridgeSenderService
private val receiverService: BridgeReceiverService
private val filterService: IncomingMessageFilterService
@ -33,12 +34,12 @@ class BridgeSupervisorServiceImpl(val conf: FirewallConfiguration,
private var statusSubscriber: Subscription? = null
init {
if (conf.haConfig == null) {
haService = SingleInstanceMasterService(conf, auditService)
} else {
haService = ExternalMasterElectionService(conf, auditService)
}
artemisService = BridgeArtemisConnectionServiceImpl(conf, maxMessageSize, auditService)
haService = if (conf.haConfig == null) {
SingleInstanceMasterService(conf, auditService)
} else {
ExternalMasterElectionService(conf, auditService, artemisService)
}
senderService = DirectBridgeSenderService(conf, maxMessageSize, auditService, haService, artemisService)
filterService = SimpleMessageFilterService(conf, auditService, artemisService, senderService)
receiverService = if (conf.firewallMode == FirewallMode.SenderReceiver) {

View File

@ -5,20 +5,20 @@ package net.corda.nodeapi.internal.zookeeper
* Listener interface for leader election results, to avoid public reference to shadowed curator classes.
*/
interface CordaLeaderListener {
/**
* This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
*
* Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If
* this occurs, you can expect {@link #notLeader()} to also be called.
*/
fun notLeader()
/**
* This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
*
* Note that it is possible that by the time this method call happens, hasLeadership has become true. If
* this occurs, you can expect {@link #isLeader()} to also be called.
*/
fun notLeader()
/**
* This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
*
* Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If
* this occurs, you can expect {@link #notLeader()} to also be called.
*/
fun isLeader()
}
@ -58,12 +58,12 @@ interface ZkLeader {
fun removeLeadershipListener(listener: CordaLeaderListener)
/**
* @return [true] if client is the current leader, [false] otherwise
* @return `true` if client is the current leader, `false` otherwise
*/
fun isLeader(): Boolean
/**
* @return [true] if client is started, [false] otherwise
* @return `true` if client is started, `false` otherwise
*/
fun isStarted(): Boolean
}