mirror of
https://github.com/corda/corda.git
synced 2025-02-07 11:30:22 +00:00
During testing I observed a race condition where the bridge losing master status was terminated by the dual bridge detection logic. This code defers teh m,aster claim a few seconds to prevent thsi race condition.
This commit is contained in:
parent
741a5741bd
commit
8af17d4c3e
@ -21,6 +21,9 @@ import net.corda.nodeapi.internal.zookeeper.ZkClient
|
||||
import net.corda.nodeapi.internal.zookeeper.ZkLeader
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.net.InetAddress
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
||||
val auditService: BridgeAuditService,
|
||||
@ -28,9 +31,12 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
||||
|
||||
private var haElector: ZkLeader? = null
|
||||
private var leaderListener: CordaLeaderListener? = null
|
||||
private val scheduler = Executors.newSingleThreadScheduledExecutor()
|
||||
private var becomeMasterFuture: ScheduledFuture<*>? = null
|
||||
|
||||
companion object {
|
||||
val log = contextLogger()
|
||||
const val DELAYED_LEADER_START = 5000L
|
||||
}
|
||||
|
||||
init {
|
||||
@ -38,6 +44,21 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
||||
require(conf.haConfig!!.haConnectionString.split(',').all { it.startsWith("zk://") }) { "Only Zookeeper HA mode 'zk://IPADDR:PORT supported" }
|
||||
}
|
||||
|
||||
private fun becomeMaster() {
|
||||
auditService.statusChangeEvent("Acquired leadership. Going active")
|
||||
stateHelper.active = true
|
||||
becomeMasterFuture = null
|
||||
}
|
||||
|
||||
private fun becomeSlave() {
|
||||
log.info("Cancelling leadership")
|
||||
becomeMasterFuture?.apply {
|
||||
cancel(false)
|
||||
}
|
||||
becomeMasterFuture = null
|
||||
stateHelper.active = false
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",")
|
||||
val leaderPriority = conf.haConfig!!.haPriority
|
||||
@ -49,13 +70,16 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
||||
haElector = leaderElector
|
||||
val listener = object : CordaLeaderListener {
|
||||
override fun notLeader() {
|
||||
auditService.statusChangeEvent("Loss of leadership signalled by Zookeeper")
|
||||
stateHelper.active = false
|
||||
auditService.statusChangeEvent("Leadership loss signalled from Zookeeper")
|
||||
becomeSlave()
|
||||
}
|
||||
|
||||
override fun isLeader() {
|
||||
auditService.statusChangeEvent("Acquired leadership from Zookeeper. Going active")
|
||||
stateHelper.active = true
|
||||
log.info("Zookeeper has signalled leadership acquired. Delay master claim for a short period to allow old master to close")
|
||||
becomeMasterFuture?.apply {
|
||||
cancel(false)
|
||||
}
|
||||
becomeMasterFuture = scheduler.schedule(::becomeMaster, DELAYED_LEADER_START, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
||||
}
|
||||
@ -68,7 +92,7 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
||||
|
||||
override fun stop() {
|
||||
auditService.statusChangeEvent("Stop requested")
|
||||
stateHelper.active = false
|
||||
becomeSlave()
|
||||
haElector?.apply {
|
||||
if (leaderListener != null) {
|
||||
removeLeadershipListener(leaderListener!!)
|
||||
|
Loading…
x
Reference in New Issue
Block a user