diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt index bb043cae0f..adbf62ddaf 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt @@ -21,6 +21,9 @@ import net.corda.nodeapi.internal.zookeeper.ZkClient import net.corda.nodeapi.internal.zookeeper.ZkLeader import java.lang.management.ManagementFactory import java.net.InetAddress +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit class ExternalMasterElectionService(val conf: BridgeConfiguration, val auditService: BridgeAuditService, @@ -28,9 +31,12 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration, private var haElector: ZkLeader? = null private var leaderListener: CordaLeaderListener? = null + private val scheduler = Executors.newSingleThreadScheduledExecutor() + private var becomeMasterFuture: ScheduledFuture<*>? = null companion object { val log = contextLogger() + const val DELAYED_LEADER_START = 5000L } init { @@ -38,6 +44,21 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration, require(conf.haConfig!!.haConnectionString.split(',').all { it.startsWith("zk://") }) { "Only Zookeeper HA mode 'zk://IPADDR:PORT supported" } } + private fun becomeMaster() { + auditService.statusChangeEvent("Acquired leadership. Going active") + stateHelper.active = true + becomeMasterFuture = null + } + + private fun becomeSlave() { + log.info("Cancelling leadership") + becomeMasterFuture?.apply { + cancel(false) + } + becomeMasterFuture = null + stateHelper.active = false + } + override fun start() { val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",") val leaderPriority = conf.haConfig!!.haPriority @@ -49,13 +70,16 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration, haElector = leaderElector val listener = object : CordaLeaderListener { override fun notLeader() { - auditService.statusChangeEvent("Loss of leadership signalled by Zookeeper") - stateHelper.active = false + auditService.statusChangeEvent("Leadership loss signalled from Zookeeper") + becomeSlave() } override fun isLeader() { - auditService.statusChangeEvent("Acquired leadership from Zookeeper. Going active") - stateHelper.active = true + log.info("Zookeeper has signalled leadership acquired. Delay master claim for a short period to allow old master to close") + becomeMasterFuture?.apply { + cancel(false) + } + becomeMasterFuture = scheduler.schedule(::becomeMaster, DELAYED_LEADER_START, TimeUnit.MILLISECONDS) } } @@ -68,7 +92,7 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration, override fun stop() { auditService.statusChangeEvent("Stop requested") - stateHelper.active = false + becomeSlave() haElector?.apply { if (leaderListener != null) { removeLeadershipListener(leaderListener!!)