mirror of
https://github.com/corda/corda.git
synced 2025-02-12 05:35:50 +00:00
Merge pull request #900 from corda/mnesbit-avoid-race-condition
ENT-2003: Avoid a race condition in bridge on master change
This commit is contained in:
commit
01fb1fc3a1
@ -21,6 +21,9 @@ import net.corda.nodeapi.internal.zookeeper.ZkClient
|
|||||||
import net.corda.nodeapi.internal.zookeeper.ZkLeader
|
import net.corda.nodeapi.internal.zookeeper.ZkLeader
|
||||||
import java.lang.management.ManagementFactory
|
import java.lang.management.ManagementFactory
|
||||||
import java.net.InetAddress
|
import java.net.InetAddress
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
import java.util.concurrent.ScheduledFuture
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
||||||
val auditService: BridgeAuditService,
|
val auditService: BridgeAuditService,
|
||||||
@ -28,9 +31,12 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
|||||||
|
|
||||||
private var haElector: ZkLeader? = null
|
private var haElector: ZkLeader? = null
|
||||||
private var leaderListener: CordaLeaderListener? = null
|
private var leaderListener: CordaLeaderListener? = null
|
||||||
|
private val scheduler = Executors.newSingleThreadScheduledExecutor()
|
||||||
|
private var becomeMasterFuture: ScheduledFuture<*>? = null
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
val log = contextLogger()
|
val log = contextLogger()
|
||||||
|
const val DELAYED_LEADER_START = 5000L
|
||||||
}
|
}
|
||||||
|
|
||||||
init {
|
init {
|
||||||
@ -38,6 +44,21 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
|||||||
require(conf.haConfig!!.haConnectionString.split(',').all { it.startsWith("zk://") }) { "Only Zookeeper HA mode 'zk://IPADDR:PORT supported" }
|
require(conf.haConfig!!.haConnectionString.split(',').all { it.startsWith("zk://") }) { "Only Zookeeper HA mode 'zk://IPADDR:PORT supported" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun becomeMaster() {
|
||||||
|
auditService.statusChangeEvent("Acquired leadership. Going active")
|
||||||
|
stateHelper.active = true
|
||||||
|
becomeMasterFuture = null
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun becomeSlave() {
|
||||||
|
log.info("Cancelling leadership")
|
||||||
|
becomeMasterFuture?.apply {
|
||||||
|
cancel(false)
|
||||||
|
}
|
||||||
|
becomeMasterFuture = null
|
||||||
|
stateHelper.active = false
|
||||||
|
}
|
||||||
|
|
||||||
override fun start() {
|
override fun start() {
|
||||||
val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",")
|
val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",")
|
||||||
val leaderPriority = conf.haConfig!!.haPriority
|
val leaderPriority = conf.haConfig!!.haPriority
|
||||||
@ -49,13 +70,16 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
|||||||
haElector = leaderElector
|
haElector = leaderElector
|
||||||
val listener = object : CordaLeaderListener {
|
val listener = object : CordaLeaderListener {
|
||||||
override fun notLeader() {
|
override fun notLeader() {
|
||||||
auditService.statusChangeEvent("Loss of leadership signalled by Zookeeper")
|
auditService.statusChangeEvent("Leadership loss signalled from Zookeeper")
|
||||||
stateHelper.active = false
|
becomeSlave()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun isLeader() {
|
override fun isLeader() {
|
||||||
auditService.statusChangeEvent("Acquired leadership from Zookeeper. Going active")
|
log.info("Zookeeper has signalled leadership acquired. Delay master claim for a short period to allow old master to close")
|
||||||
stateHelper.active = true
|
becomeMasterFuture?.apply {
|
||||||
|
cancel(false)
|
||||||
|
}
|
||||||
|
becomeMasterFuture = scheduler.schedule(::becomeMaster, DELAYED_LEADER_START, TimeUnit.MILLISECONDS)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -68,7 +92,7 @@ class ExternalMasterElectionService(val conf: BridgeConfiguration,
|
|||||||
|
|
||||||
override fun stop() {
|
override fun stop() {
|
||||||
auditService.statusChangeEvent("Stop requested")
|
auditService.statusChangeEvent("Stop requested")
|
||||||
stateHelper.active = false
|
becomeSlave()
|
||||||
haElector?.apply {
|
haElector?.apply {
|
||||||
if (leaderListener != null) {
|
if (leaderListener != null) {
|
||||||
removeLeadershipListener(leaderListener!!)
|
removeLeadershipListener(leaderListener!!)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user