Merge pull request #1554 from corda/mnesbit-bully-leader

ENT-2705: A Bully Algorithm Leader Elector for the Bridge
This commit is contained in:
Matthew Nesbit 2018-11-12 09:41:50 +00:00 committed by GitHub
commit d54e63e356
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1054 additions and 36 deletions

View File

@ -22,6 +22,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.bully.BullyLeaderClient
import net.corda.nodeapi.internal.zookeeper.ZkClient
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
@ -299,6 +300,52 @@ class BridgeIntegrationTest {
}
}
@Test
fun `Run HA using Bully algorithm`() {
val configResource = "/net/corda/bridge/habullysingleprocess/firewall.conf"
createNetworkParams(tempFolder.root.toPath())
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
assertEquals(BridgeHAConfigImpl("bully://localhost", 10), config.haConfig)
config.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val (artemisServer, artemisClient) = createArtemis()
try {
installBridgeControlResponder(artemisClient)
val bridge = FirewallInstance(config, FirewallVersionInfo(1, "1.1", "Dummy", "Test"))
val stateFollower = bridge.activeChange.toBlocking().iterator
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
bridge.start()
assertEquals(false, bridge.active) // Starting the bridge insufficient to go active
assertEquals(true, stateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, serverListening("localhost", 10005))
val higherPriorityClient = BullyLeaderClient(artemisClient, "/bridge/ha", "Test", 5)
higherPriorityClient.start()
higherPriorityClient.requestLeadership() // should win leadership and kick out our bridge
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
var socketState = true
for (i in 0 until 5) { // The event signalling bridge down is pretty immediate, but the cascade of events leading to socket close can take a while
socketState = serverListening("localhost", 10005)
if (!socketState) break
Thread.sleep(100)
}
assertEquals(false, socketState)
higherPriorityClient.relinquishLeadership() // let our bridge back as leader
higherPriorityClient.close()
assertEquals(true, stateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, serverListening("localhost", 10005))
bridge.stop() // Finally check shutdown
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
assertEquals(false, serverListening("localhost", 10005))
} finally {
artemisClient.stop()
artemisServer.stop()
}
}
@Test
fun `Test artemis failover logic`() {
val configResource = "/net/corda/bridge/artemisfailover/firewall.conf"

View File

@ -0,0 +1,20 @@
package net.corda.bridge.services.artemis
import net.corda.bridge.services.api.BridgeArtemisConnectionService
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisSessionProvider
class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider {
override fun start(): ArtemisMessagingClient.Started {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
return artemisConnectionService.started!!
}
override fun stop() {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
}
override val started: ArtemisMessagingClient.Started?
get() = artemisConnectionService.started
}

View File

@ -1,13 +1,11 @@
package net.corda.bridge.services.ha
import net.corda.bridge.services.api.BridgeArtemisConnectionService
import net.corda.bridge.services.api.BridgeMasterService
import net.corda.bridge.services.api.FirewallAuditService
import net.corda.bridge.services.api.FirewallConfiguration
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.api.*
import net.corda.bridge.services.artemis.ForwardingArtemisMessageClient
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.bully.BullyLeaderClient
import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener
import net.corda.nodeapi.internal.zookeeper.ZkClient
import net.corda.nodeapi.internal.zookeeper.ZkLeader
@ -22,10 +20,17 @@ import kotlin.concurrent.withLock
*/
class ExternalMasterElectionService(private val conf: FirewallConfiguration,
private val auditService: FirewallAuditService,
artemisService: BridgeArtemisConnectionService,
private val artemisService: BridgeArtemisConnectionService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeMasterService, ServiceStateSupport by stateHelper {
private var haElector: ZkLeader? = null
private enum class ElectorMode {
ZOOKEEPER,
BULLY_OVER_ARTEMIS
}
private val mode: ElectorMode
private var leaderListener: CordaLeaderListener? = null
private var statusSubscriber: Subscription? = null
private val statusFollower = ServiceStateCombiner(listOf(artemisService))
@ -38,7 +43,11 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration,
init {
require(conf.haConfig != null) { "Undefined HA Config" }
require(conf.haConfig!!.haConnectionString.split(',').all { it.startsWith("zk://") }) { "Only Zookeeper HA mode 'zk://IPADDR:PORT supported" }
val connectionStrings = conf.haConfig!!.haConnectionString.split(',')
val zkOnly = connectionStrings.all { it.startsWith("zk://") }
val bullyOnly = (connectionStrings.size == 1) && (connectionStrings.single().toLowerCase() == "bully://localhost")
require(zkOnly xor bullyOnly) { "Only all Zookeeper HA mode 'zk://IPADDR:PORT, or Bully Algorithm mode 'bully://localhost' supported" }
if (zkOnly) mode = ElectorMode.ZOOKEEPER else if (bullyOnly) mode = ElectorMode.BULLY_OVER_ARTEMIS else throw IllegalArgumentException("Unsupported elector URL")
}
private fun becomeMaster() {
@ -75,29 +84,37 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration,
*/
private fun activate() {
activeTransitionLock.withLock {
val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",")
val connectionStrings = conf.haConfig!!.haConnectionString.split(',')
val leaderPriority = conf.haConfig!!.haPriority
val hostName: String = InetAddress.getLocalHost().hostName
val info = ManagementFactory.getRuntimeMXBean()
val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this
val nodeId = "$hostName:$pid"
val leaderElector = ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority)
val leaderElector: ZkLeader = when (mode) {
ElectorMode.ZOOKEEPER -> {
val zkConf = connectionStrings.map { it.replace("zk://", "") }.joinToString(",")
ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority)
}
ElectorMode.BULLY_OVER_ARTEMIS -> {
BullyLeaderClient(ForwardingArtemisMessageClient(artemisService), conf.haConfig!!.haTopic, nodeId, leaderPriority)
}
}
haElector = leaderElector
val listener = object : CordaLeaderListener {
override fun notLeader() {
auditService.statusChangeEvent("Leadership loss signalled from Zookeeper")
auditService.statusChangeEvent("Leadership loss signalled from $mode")
becomeSlave()
}
override fun isLeader() {
log.info("Zookeeper has signalled leadership acquired.")
log.info("$mode has signalled leadership acquired.")
becomeMaster()
}
}
leaderListener = listener
leaderElector.addLeadershipListener(listener)
leaderElector.start()
auditService.statusChangeEvent("Requesting leadership from Zookeeper")
auditService.statusChangeEvent("Requesting leadership from $mode")
leaderElector.requestLeadership()
}
}

View File

@ -1,13 +1,12 @@
package net.corda.bridge.services.sender
import net.corda.bridge.services.api.*
import net.corda.bridge.services.artemis.ForwardingArtemisMessageClient
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import net.corda.nodeapi.internal.bridging.BridgeMetricsService
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
@ -36,21 +35,6 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration,
BridgeAuditServiceAdaptor(auditService),
conf.enableAMQPPacketTrace)
private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider {
override fun start(): ArtemisMessagingClient.Started {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
return artemisConnectionService.started!!
}
override fun stop() {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
}
override val started: ArtemisMessagingClient.Started?
get() = artemisConnectionService.started
}
private class BridgeAuditServiceAdaptor(private val auditService: FirewallAuditService) : BridgeMetricsService {
override fun bridgeCreated(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
// No corresponding method on FirewallAuditService yet

View File

@ -0,0 +1,11 @@
firewallMode = SenderReceiver
outboundConfig : {
artemisBrokerAddress = "localhost:11005"
}
inboundConfig : {
listeningAddress = "0.0.0.0:10005"
}
networkParametersPath = network-parameters
haConfig : {
haConnectionString = "bully://localhost"
}

View File

@ -46,8 +46,8 @@ your own ``firewall.conf`` file will use these defaults:
.. literalinclude:: ../../bridge/src/main/resources/firewalldefault.conf
:language: javascript
Bridge operating modes
----------------------
Firewall operating modes
------------------------
.. note:: By default, the Corda node assumes that it will carry out the peer-to-peer functions of the ``bridge`` internally!
Before running a dedicated firewall process, it is essential to turn off the dev mode component by setting the
``enterpriseConfiguration.externalBridge`` property of the ``node.conf`` file to ``true``.
@ -213,16 +213,21 @@ absolute path to the firewall's base directory.
:crlCheckSoftFail: If true (recommended setting) allows certificate checks to pass if the CRL(certificate revocation list) provider is unavailable.
:haConfig: Optionally the ``SenderReceiver`` and ``BridgeInner`` modes can be run in a hot-warm configuration, which determines the active instance using an external master election service.
Currently, only Zookeeper can be used as master elector. Eventually other electors may be supported e.g. ``etcd``. This configuration section controls these options:
Currently, the leader election process can be delegated to Zookeeper, or the firewall can use the ``Bully Algorithm`` (see <https://en.wikipedia.org/wiki/Bully_algorithm>) via Publish/Subscribe messages on the artemis broker.
For production it is recommended that a Zookeeper cluster be used as this will protect against network partitioning scenarios. However, the ``Bully Algorithm`` mode does not require any additional server processes.
Eventually other electors may be supported e.g. ``etcd``. This configuration section controls these options:
:haConnectionString: A string containing the connection details of the master electors as a comma delimited list of connection string in the format ``zk://<host>:<port>``.
In future it intended that other schemes such as ``etcd`` are supported.
:haConnectionString: A string containing the connection details of the master electors as a comma delimited list of individual connection strings.
* To use an external Zookeeper cluster each connection item should be in the format ``zk://<host>:<port>``.
* To use the ``Bully Algorithm`` running over artemis the single connection string should be set to ``bully://localhost``.
:haPriority: The implementation uses a prioritise leader election algorithm, so that a preferred master instance can be set. The highest priority is 0 and larger integers have lower priority.
At the same level of priority, it is random which instance wins the leader election. If a ``bridge`` instance dies another will have the opportunity to become master in instead.
:haTopic: Sets the zookeeper topic prefix that the nodes used in resolving the election and must be the same for all ``bridge``
instances competing for master status. This is available to allow a single zookeeper cluster to be reused with multiple
:haTopic: Sets the zookeeper/artemis topic that the nodes used in resolving the election and must be the same for all ``bridge``
instances competing for master status. This is available to allow a single zookeeper/artemis cluster to be reused with multiple
sets of ``bridges`` (e.g. in test environments).
The default value is ``bridge/ha`` and would not normally need to be changed if the cluster is not shared.

View File

@ -0,0 +1,425 @@
package net.corda.nodeapi.internal.bully
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener
import net.corda.nodeapi.internal.zookeeper.ZkLeader
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import java.time.Clock
import java.time.Instant
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread
class BullyLeaderClient(val artemis: ArtemisSessionProvider,
private val electionPath: String,
val nodeId: String,
val priority: Int) : ZkLeader {
private companion object {
const val LEADER_TIMEOUT_MSEC = 1000L
private val log = contextLogger()
}
@VisibleForTesting
var clock: Clock = Clock.systemUTC()
@CordaSerializable
private enum class MessageType {
ELECTION_REQUEST,
ELECTION_REJECT,
LEADER_ANNOUNCE,
LEADER_RETIRE
}
@CordaSerializable
private data class LeaderMessage(val msgType: MessageType,
val term: Int,
val proposedLeader: String,
val leaderPriority: Int)
private class ArtemisMessageSession private constructor(
private val leaderTopic: SimpleString,
private val session: ClientSession,
private val messageConsumer: ClientConsumer,
private val messageProducer: ClientProducer,
private val onMessage: (LeaderMessage) -> Unit) : AutoCloseable, FailoverEventListener {
companion object {
fun connectToArtemis(electionPath: String, locator: ServerLocator, factory: ClientSessionFactory, onMessage: (LeaderMessage) -> Unit): ArtemisMessageSession {
val session = factory.createSession(ArtemisMessagingComponent.NODE_P2P_USER,
ArtemisMessagingComponent.NODE_P2P_USER,
false,
true,
true,
locator.isPreAcknowledge,
ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
val producer = session.createProducer()
val leaderTopic = SimpleString("${INTERNAL_PREFIX}leader.$electionPath")
val queueName = SimpleString(UUID.randomUUID().toString())
session.createTemporaryQueue(leaderTopic, RoutingType.MULTICAST, queueName)
val consumer = session.createConsumer(queueName)
val artemisMessageSession = ArtemisMessageSession(leaderTopic, session, consumer, producer, onMessage)
session.addFailoverListener(artemisMessageSession)
consumer.setMessageHandler { msg ->
artemisMessageSession.processMessage(msg)
}
session.start()
return artemisMessageSession
}
}
private val closed: AtomicBoolean = AtomicBoolean(false)
private val _connected: AtomicBoolean = AtomicBoolean(true)
val connected: Boolean get() = _connected.get()
override fun failoverEvent(eventType: FailoverEventType) {
log.warn("Artemis Failover event $eventType")
_connected.set(eventType == FailoverEventType.FAILOVER_COMPLETED)
}
private fun processMessage(message: ClientMessage) {
try {
val data: ByteArray = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val leaderMessage = data.deserialize<LeaderMessage>(context = SerializationDefaults.P2P_CONTEXT)
onMessage(leaderMessage)
message.acknowledge()
} catch (ex: Exception) {
log.error("Unable to process leader control message", ex)
}
}
fun sendMessage(message: LeaderMessage) {
if (closed.get() || session.isClosed || !connected) return
val messageBytes = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val artemisMessage = session.createMessage(false)
artemisMessage.writeBodyBufferBytes(messageBytes)
messageProducer.send(leaderTopic, artemisMessage)
}
override fun close() {
if (!closed.getAndSet(true)) {
thread {
// send of to another thread as this can block for ages inside Artemis
closeInternal()
}
}
}
private fun close(target: AutoCloseable) {
try {
target.close()
} catch (ignored: ActiveMQObjectClosedException) {
// swallow
}
}
private fun closeInternal() {
session.removeFailoverListener(this)
if (!messageConsumer.isClosed) {
close(messageConsumer)
}
if (!session.isClosed) {
close(session)
}
}
}
private enum class BullyState {
FOLLOWER,
INITIATE_ELECTION,
POSSIBLE_LEADER,
LEADER
}
private class LeaderState(clock: Clock) {
var scheduler: ScheduledExecutorService? = null
val isStarted: Boolean get() = (scheduler?.isShutdown?.not() ?: false)
var pollTimer: ScheduledFuture<*>? = null
var isActive: Boolean = false
var messageSession: ArtemisMessageSession? = null
var state: BullyState = BullyState.FOLLOWER
var currentTerm: Int = 0
var currentLeader: String? = null
var leaderPriority: Int? = null
var leaderUpdated: Instant = clock.instant()
}
private val state = ThreadBox(LeaderState(clock))
private val listeners = CopyOnWriteArrayList<CordaLeaderListener>()
override fun start() {
state.locked {
if (isStarted) return
val newScheduler = Executors.newSingleThreadScheduledExecutor()
scheduler = newScheduler
pollTimer = newScheduler.scheduleAtFixedRate(::timerEvent, 0L, LEADER_TIMEOUT_MSEC / 2L, TimeUnit.MILLISECONDS)
}
}
override fun close() {
if (isStarted()) {
relinquishLeadership()
}
state.locked {
messageSession?.close()
messageSession = null
pollTimer?.cancel(false)
pollTimer = null
scheduler?.shutdown()
scheduler = null
}
}
private fun timerEvent() {
var down = false
state.locked {
if (!isStarted) return
log.debug { "$nodeId Timer running. current state $state" }
val wasLeader = isLeader()
val artemis = artemis.started
if (artemis == null || artemis.session.isClosed) {
log.info("Artemis lost")
messageSession?.close()
messageSession = null
state = BullyState.FOLLOWER
currentLeader = null
leaderPriority = null
if (wasLeader) down = true
return@locked
}
if (messageSession == null) {
log.info("$nodeId Artemis connected. Creating leader session")
messageSession = ArtemisMessageSession.connectToArtemis(electionPath,
artemis.serverLocator,
artemis.sessionFactory,
::messageEvent)
return@locked
}
if (!messageSession!!.connected) {
state = BullyState.FOLLOWER
currentLeader = null
leaderPriority = null
if (wasLeader) down = true
return@locked
}
if (!isActive) {
log.debug { "$nodeId Idling as inactive" }
state = BullyState.FOLLOWER
if (wasLeader) down = true
return@locked
}
val now = clock.instant()
when (state) {
BullyState.FOLLOWER -> {
// Unknown Leader, or dead leader prepare to do election
if (currentLeader == null || leaderUpdated.plusMillis(LEADER_TIMEOUT_MSEC).isBefore(now)) {
log.info("$nodeId Leader missing start election")
currentLeader = null
leaderPriority = null
state = BullyState.INITIATE_ELECTION
} else if (compareIds(nodeId, priority, currentLeader!!, leaderPriority!!) > 0) { // we are higher priority start elections
log.info("$nodeId Leader lower priority than us start challenge")
currentLeader = null
leaderPriority = null
state = BullyState.INITIATE_ELECTION
}
}
BullyState.INITIATE_ELECTION -> {
leaderUpdated = now // move time to keep track of election announcements
state = BullyState.POSSIBLE_LEADER
val message = LeaderMessage(MessageType.ELECTION_REQUEST,
currentTerm,
nodeId,
priority)
log.debug { "$nodeId Send election message $message" }
messageSession!!.sendMessage(message)
}
BullyState.POSSIBLE_LEADER -> {
val message = LeaderMessage(MessageType.ELECTION_REQUEST,
currentTerm,
nodeId,
priority)
log.debug { "$nodeId Send election message $message" }
messageSession!!.sendMessage(message)
if (leaderUpdated.plusMillis(2 * LEADER_TIMEOUT_MSEC).isBefore(now)) { // everyone had a fair chance
log.info("$nodeId transition to leader state")
state = BullyState.LEADER
++currentTerm
}
}
BullyState.LEADER -> {
val message = LeaderMessage(MessageType.LEADER_ANNOUNCE,
currentTerm,
nodeId,
priority)
log.debug { "$nodeId Broadcast leader heartbeat $message" }
messageSession!!.sendMessage(message)
}
}
}
if (down) {
lostLeadership()
}
}
private fun compareIds(nodeId1: String, priority1: Int, nodeId2: String, priority2: Int): Int {
if (priority1 > priority2) { // smaller priority value wins
return -1
} else if (priority1 < priority2) {
return +1
}
if (nodeId1 < nodeId2) {
return -1
} else if (nodeId2 > nodeId1) {
return +1
}
return 0
}
private fun messageEvent(message: LeaderMessage) {
log.debug { "$nodeId received message $message" }
var up = false
var down = false
state.locked {
if (!isStarted) return
if (message.term < currentTerm) { // ignore old messages
log.debug { "$nodeId discard outdated message" }
return
}
val now = clock.instant()
val wasLeader = isLeader()
if (message.term > currentTerm) { // we are out of sync, reset and consider our position
log.debug { "$nodeId new term detected resetting" }
currentTerm = message.term
state = BullyState.FOLLOWER
leaderUpdated = now
currentLeader = message.proposedLeader
leaderPriority = message.leaderPriority
if (wasLeader) down = true
return@locked
}
val comparison = compareIds(nodeId, priority, message.proposedLeader, message.leaderPriority)
if (comparison < 0) { // we are lower priority so ensure we are just a follower
if (state != BullyState.FOLLOWER) {
state = BullyState.FOLLOWER
log.info("$nodeId terminate our involvement in election possible leader ${message.proposedLeader}")
}
leaderUpdated = now
currentLeader = message.proposedLeader
leaderPriority = message.leaderPriority
}
when (message.msgType) {
MessageType.ELECTION_REQUEST -> {
if (isActive && (comparison > 0)) { // we are higher priority, reject them
log.debug { "$nodeId send rebuttal to lower priority node" }
messageSession!!.sendMessage(LeaderMessage(MessageType.ELECTION_REJECT,
currentTerm,
currentLeader ?: nodeId,
leaderPriority ?: priority))
}
}
MessageType.ELECTION_REJECT -> {
// Nothing to do here, handled by priority check above
}
MessageType.LEADER_ANNOUNCE -> {
leaderUpdated = now
currentLeader = message.proposedLeader
leaderPriority = message.leaderPriority
if ((state == BullyState.LEADER) && (currentLeader != nodeId)) { // shouldn't happen, but reset now!!
log.error("$nodeId unexpected leader announcement")
state = BullyState.FOLLOWER
}
}
MessageType.LEADER_RETIRE -> { // polite leader shutdown
if (isActive) {
log.info("$nodeId retirement announced. Starting election")
state = BullyState.INITIATE_ELECTION
leaderUpdated = now
currentLeader = null
leaderPriority = null
}
}
}
val isLeader = isLeader()
if (wasLeader xor isLeader) {
up = isLeader
down = !isLeader
}
}
if (down) {
lostLeadership()
}
if (up) {
acquireLeadership()
}
}
override fun requestLeadership() {
state.locked {
require(isStarted) { "Leader elector must be started first" }
isActive = true
}
}
override fun relinquishLeadership() {
val wasLeader = state.locked {
require(isStarted) { "Leader elector must be started first" }
val oldState = isLeader()
currentLeader = null
leaderPriority = null
state = BullyState.FOLLOWER
isActive = false
oldState
}
if (wasLeader) {
lostLeadership()
state.locked {
log.debug { "$nodeId tell other nodes to start election" }
messageSession?.sendMessage(LeaderMessage(MessageType.LEADER_RETIRE,
currentTerm,
nodeId,
priority))
}
}
}
private fun acquireLeadership() {
for (listener in listeners) {
listener.isLeader()
}
}
private fun lostLeadership() {
for (listener in listeners) {
listener.notLeader()
}
}
override fun addLeadershipListener(listener: CordaLeaderListener) {
listeners += listener
}
override fun removeLeadershipListener(listener: CordaLeaderListener) {
listeners -= listener
}
override fun isLeader(): Boolean = state.locked { isStarted && isActive && (state == BullyState.LEADER) && (currentLeader == nodeId) }
override fun isStarted(): Boolean = state.locked { isStarted }
}

View File

@ -0,0 +1,509 @@
package net.corda.nodeapi.internal.bully
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.EnterpriseConfiguration
import net.corda.node.services.config.MutualExclusionConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.stubs.CertificateStoreStubs
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class BullyLeaderTest {
@Rule
@JvmField
val tempFolder = TemporaryFolder()
@Rule
@JvmField
val serializationEnvironment = SerializationEnvironmentRule(true)
private abstract class AbstractNodeConfiguration : NodeConfiguration
class StateTracker(val name: String,
val counts: AtomicInteger,
val failureRef: AtomicBoolean) : CordaLeaderListener {
private val monitor = Object()
var isLeader: Boolean = false
override fun isLeader() {
println("$name is leader")
val count = counts.getAndIncrement()
if (count > 0) {
println("More than one leader at once")
failureRef.set(true)
}
synchronized(monitor) {
isLeader = true
monitor.notifyAll()
}
}
override fun notLeader() {
println("$name lost leadership")
counts.decrementAndGet()
synchronized(monitor) {
isLeader = false
monitor.notifyAll()
}
}
fun waitUp() {
synchronized(monitor) {
while (!isLeader) {
monitor.wait()
}
}
}
fun waitDown() {
synchronized(monitor) {
while (isLeader) {
monitor.wait()
}
}
}
}
@Test
fun `Simple Leader Test`() {
val artemisServer = createArtemisServer()
artemisServer.use {
val artemisClient = createArtemisClient()
try {
val counts = AtomicInteger(0)
val errors = AtomicBoolean(false)
val tracker = StateTracker("1", counts, errors)
val leader = BullyLeaderClient(artemisClient, "test", "1", 10)
leader.addLeadershipListener(tracker)
assertFalse(leader.isStarted())
assertFalse(leader.isLeader())
leader.start()
assertTrue(leader.isStarted())
assertFalse(leader.isLeader())
leader.requestLeadership()
tracker.waitUp()
assertTrue(leader.isStarted())
assertTrue(leader.isLeader())
leader.relinquishLeadership()
tracker.waitDown()
assertTrue(leader.isStarted())
assertFalse(leader.isLeader())
leader.close()
assertFalse(leader.isStarted())
assertFalse(leader.isLeader())
assertFalse(errors.get())
} finally {
artemisClient.stop()
}
}
}
@Test
fun `Simple Leader Test Shutdown whilst leader`() {
val artemisServer = createArtemisServer()
artemisServer.use {
val artemisClient = createArtemisClient()
try {
val counts = AtomicInteger(0)
val errors = AtomicBoolean(false)
val tracker = StateTracker("1", counts, errors)
val leader = BullyLeaderClient(artemisClient, "test", "1", 10)
leader.addLeadershipListener(tracker)
assertFalse(leader.isStarted())
assertFalse(leader.isLeader())
leader.start()
assertTrue(leader.isStarted())
assertFalse(leader.isLeader())
leader.requestLeadership()
tracker.waitUp()
assertTrue(leader.isStarted())
assertTrue(leader.isLeader())
leader.close()
assertFalse(leader.isStarted())
assertFalse(leader.isLeader())
assertFalse(errors.get())
} finally {
artemisClient.stop()
}
}
}
@Test
fun `Simple Two Leader Test`() {
val artemisServer = createArtemisServer()
artemisServer.use {
val artemisClient1 = createArtemisClient()
val artemisClient2 = createArtemisClient()
try {
val counts = AtomicInteger(0)
val errors = AtomicBoolean(false)
val tracker1 = StateTracker("1", counts, errors)
val tracker2 = StateTracker("2", counts, errors)
val leader1 = BullyLeaderClient(artemisClient1, "test", "1", 10)
val leader2 = BullyLeaderClient(artemisClient2, "test", "2", 10)
leader1.addLeadershipListener(tracker1)
leader2.addLeadershipListener(tracker2)
assertFalse(leader1.isStarted())
assertFalse(leader1.isLeader())
assertFalse(leader2.isStarted())
assertFalse(leader2.isLeader())
leader1.start()
leader2.start()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
leader1.requestLeadership()
tracker1.waitUp()
assertTrue(leader1.isStarted())
assertTrue(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
leader1.relinquishLeadership()
tracker1.waitDown()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
leader2.requestLeadership()
tracker2.waitUp()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertTrue(leader2.isLeader())
leader1.requestLeadership()
Thread.sleep(10000L)
assertTrue(leader1.isLeader() xor leader2.isLeader())
leader1.close()
leader2.close()
assertFalse(leader1.isStarted())
assertFalse(leader1.isLeader())
assertFalse(leader2.isStarted())
assertFalse(leader2.isLeader())
assertFalse(errors.get())
} finally {
artemisClient1.stop()
artemisClient2.stop()
}
}
}
@Test
fun `Priority Leader Test`() {
val artemisServer = createArtemisServer()
artemisServer.use {
val artemisClient1 = createArtemisClient()
val artemisClient2 = createArtemisClient()
val artemisClient3 = createArtemisClient()
try {
val counts = AtomicInteger(0)
val errors = AtomicBoolean(false)
val tracker1 = StateTracker("1", counts, errors)
val tracker2 = StateTracker("2", counts, errors)
val tracker3 = StateTracker("3", counts, errors)
val leader1 = BullyLeaderClient(artemisClient1, "test", "1", 10)
val leader2 = BullyLeaderClient(artemisClient2, "test", "2", 20)
val leader3 = BullyLeaderClient(artemisClient3, "test", "3", 30)
leader1.addLeadershipListener(tracker1)
leader2.addLeadershipListener(tracker2)
leader3.addLeadershipListener(tracker3)
leader1.start()
leader2.start()
leader3.start()
leader3.requestLeadership() // single active leader should win
tracker3.waitUp()
assertFalse(leader1.isLeader())
assertFalse(leader2.isLeader())
assertTrue(leader3.isLeader())
leader1.requestLeadership() // higher priority leader should preempt
tracker1.waitUp()
assertTrue(leader1.isLeader())
assertFalse(leader2.isLeader())
assertFalse(leader3.isLeader())
leader2.requestLeadership() // lower priority leader should do nothing to existing leader, even after delay
Thread.sleep(10000L)
assertTrue(leader1.isLeader())
assertFalse(leader2.isLeader())
assertFalse(leader3.isLeader())
leader1.relinquishLeadership() // when leader1 gives up leader2 should win as next highest active node
tracker1.waitDown()
tracker2.waitUp()
assertFalse(leader1.isLeader())
assertTrue(leader2.isLeader())
assertFalse(leader3.isLeader())
leader1.close()
leader2.close()
leader3.close()
assertFalse(errors.get())
} finally {
artemisClient1.stop()
artemisClient2.stop()
artemisClient3.stop()
}
}
}
@Test
fun `Multi Leader Tests`() {
val artemisServer = createArtemisServer()
artemisServer.use { _ ->
val leaders = (0..9).map {
val artemis = createArtemisClient()
BullyLeaderClient(artemis, "test", it.toString(), 1)
}
val leaderCount = AtomicInteger(0)
val failureRef = AtomicBoolean(false)
leaders.forEach {
val tracker = StateTracker(it.nodeId, leaderCount, failureRef)
it.addLeadershipListener(tracker)
it.start()
}
val rand = Random()
val active = mutableSetOf<Int>()
for (i in 0 until 30) {
val newLeader = rand.nextInt(leaders.size)
println("activate $newLeader")
active.add(newLeader)
leaders[newLeader].requestLeadership()
val dropLeader = rand.nextInt(leaders.size)
active.remove(dropLeader)
println("deactivate $dropLeader $active")
leaders[dropLeader].relinquishLeadership()
while (active.isNotEmpty() && leaderCount.get() == 0) {
Thread.sleep(100)
}
}
leaders.forEach {
it.artemis.stop()
it.close()
}
assertFalse(failureRef.get())
}
}
@Test
fun `Multi Leader Tests Different Priorities`() {
val artemisServer = createArtemisServer()
artemisServer.use { _ ->
val leaders = (0..9).map {
val artemis = createArtemisClient()
BullyLeaderClient(artemis, "test", it.toString(), it)
}
val leaderCount = AtomicInteger(0)
val failureRef = AtomicBoolean(false)
leaders.forEach {
val tracker = StateTracker(it.nodeId, leaderCount, failureRef)
it.addLeadershipListener(tracker)
it.start()
}
val rand = Random()
val active = mutableSetOf<Int>()
for (i in 0 until 30) {
val newLeader = rand.nextInt(leaders.size)
println("activate $newLeader")
active.add(newLeader)
leaders[newLeader].requestLeadership()
val dropLeader = rand.nextInt(leaders.size)
active.remove(dropLeader)
println("deactivate $dropLeader $active")
leaders[dropLeader].relinquishLeadership()
while (active.isNotEmpty() && leaderCount.get() == 0) {
Thread.sleep(100)
}
}
leaders.forEach {
it.artemis.stop()
it.close()
}
assertFalse(failureRef.get())
}
}
private fun artemisReconnectionLoop(artemisClient: ArtemisSessionProvider, running: CountDownLatch) {
artemisClient.start()
try {
running.await()
} finally {
artemisClient.stop()
}
}
@Test
fun `Disconnect Tests`() {
val artemisConfig = createConfig(11005, true)
val running = CountDownLatch(1)
val artemisClient1 = createArtemisClient(artemisConfig.p2pAddress.port, started = false)
val artemisRetryLoop1 = Thread({ artemisReconnectionLoop(artemisClient1, running) }, "Artemis Connector Thread").apply {
isDaemon = true
}
artemisRetryLoop1.start()
val artemisClient2 = createArtemisClient(artemisConfig.p2pAddress.port, started = false)
val artemisRetryLoop2 = Thread({ artemisReconnectionLoop(artemisClient2, running) }, "Artemis Connector Thread").apply {
isDaemon = true
}
artemisRetryLoop2.start()
try {
val leaderCount = AtomicInteger(0)
val failureRef = AtomicBoolean(false)
val leader1 = BullyLeaderClient(artemisClient1, "test", "1", 10)
val leader2 = BullyLeaderClient(artemisClient2, "test", "2", 20)
val watcher1 = StateTracker(leader1.nodeId, leaderCount, failureRef)
leader1.addLeadershipListener(watcher1)
val watcher2 = StateTracker(leader2.nodeId, leaderCount, failureRef)
leader2.addLeadershipListener(watcher2)
leader1.start()
leader1.requestLeadership()
leader2.start()
leader2.requestLeadership()
Thread.sleep(2000L)
val server = createArtemisServer(artemisConfig.p2pAddress.port, false)
watcher1.waitUp()
assertTrue(leader1.isStarted())
assertTrue(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
server.stop()
watcher1.waitDown()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
assertFalse(failureRef.get())
leader1.relinquishLeadership()
val server2 = createArtemisServer(artemisConfig.p2pAddress.port, false)
watcher2.waitUp()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertTrue(leader2.isLeader())
server2.stop()
leader1.close()
leader2.close()
} finally {
running.countDown()
artemisRetryLoop1.join()
artemisRetryLoop2.join()
}
}
@Test
fun `Clients Randomly do Things`() {
val CLIENTS_NUMBER = 10
val ACTIONS_NUMBER = 10
val artemisServer = createArtemisServer()
artemisServer.use { server ->
val countDownLatch = CountDownLatch(CLIENTS_NUMBER)
val leaderCount = AtomicInteger(0)
val failureRef = AtomicBoolean(false)
val clientList = (1..CLIENTS_NUMBER).map {
val artemis = createArtemisClient()
BullyLeaderClient(artemis, "test", it.toString(), 10)
}
clientList.forEach { client ->
thread {
client.addLeadershipListener(StateTracker(client.nodeId, leaderCount, failureRef))
client.start()
val random = Random()
for (i in 1 until ACTIONS_NUMBER) {
val action = random.nextInt(2)
when (action) {
0 -> client.requestLeadership()
1 -> client.relinquishLeadership()
else -> throw IllegalArgumentException("Invalid action choice")
}
Thread.sleep(100L * random.nextInt(30))
}
client.requestLeadership() // end as possible leader
countDownLatch.countDown()
}
}
countDownLatch.await()
//only one leader should exist
var timeout = 100
while (true) {
val leaderNumber = leaderCount.get()
assertTrue(leaderNumber <= 1)
if (leaderNumber == 1) {
break
}
--timeout
assertTrue(timeout > 0)
Thread.sleep(100L)
}
clientList.forEach { client ->
client.artemis.stop()
client.close()
}
assertFalse(failureRef.get())
}
}
private fun createConfig(port: Int, createCerts: Boolean = false): AbstractNodeConfiguration {
val baseDirectory = tempFolder.root.toPath()
val certificatesDirectory = baseDirectory / "certificates"
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val p2pSslOptions = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(DUMMY_BANK_A_NAME).whenever(it).myLegalName
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(p2pSslOptions).whenever(it).p2pSslOptions
doReturn(NetworkHostAndPort("localhost", port)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
if (createCerts) {
artemisConfig.configureWithDevSSLCertificate()
}
return artemisConfig
}
private fun createArtemisClient(port: Int = 11005, started: Boolean = true): ArtemisSessionProvider {
val artemisConfig = createConfig(port)
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions,
NetworkHostAndPort("localhost", port),
MAX_MESSAGE_SIZE,
confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize,
externalBrokerConnectionConfig = ExternalBrokerConnectionConfiguration.CONTINUOUS_RETRY)
if (started) {
artemisClient.start()
}
return artemisClient
}
private fun createArtemisServer(port: Int = 11005, createCerts: Boolean = true): ArtemisMessagingServer {
val artemisConfig = createConfig(port, createCerts)
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", port), MAX_MESSAGE_SIZE)
artemisServer.start()
return artemisServer
}
}