Initial stage of Bully Algorithm Leader election

Some artemis reconnect logic

Fix disconnect behaviour of BullyLeader code and improve Artemis shutdown behaviour when disconnected.

Integrate Bully Algorithm leader elector with the bridge

Fix docs

Remove pointless header change
This commit is contained in:
Matthew Nesbit 2018-11-02 18:02:49 +00:00
parent 8ecf4385dc
commit 51e9ef0cc0
8 changed files with 1054 additions and 36 deletions

View File

@ -22,6 +22,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.bully.BullyLeaderClient
import net.corda.nodeapi.internal.zookeeper.ZkClient
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
@ -299,6 +300,52 @@ class BridgeIntegrationTest {
}
}
@Test
fun `Run HA using Bully algorithm`() {
val configResource = "/net/corda/bridge/habullysingleprocess/firewall.conf"
createNetworkParams(tempFolder.root.toPath())
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
assertEquals(BridgeHAConfigImpl("bully://localhost", 10), config.haConfig)
config.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val (artemisServer, artemisClient) = createArtemis()
try {
installBridgeControlResponder(artemisClient)
val bridge = FirewallInstance(config, FirewallVersionInfo(1, "1.1", "Dummy", "Test"))
val stateFollower = bridge.activeChange.toBlocking().iterator
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
bridge.start()
assertEquals(false, bridge.active) // Starting the bridge insufficient to go active
assertEquals(true, stateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, serverListening("localhost", 10005))
val higherPriorityClient = BullyLeaderClient(artemisClient, "/bridge/ha", "Test", 5)
higherPriorityClient.start()
higherPriorityClient.requestLeadership() // should win leadership and kick out our bridge
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
var socketState = true
for (i in 0 until 5) { // The event signalling bridge down is pretty immediate, but the cascade of events leading to socket close can take a while
socketState = serverListening("localhost", 10005)
if (!socketState) break
Thread.sleep(100)
}
assertEquals(false, socketState)
higherPriorityClient.relinquishLeadership() // let our bridge back as leader
higherPriorityClient.close()
assertEquals(true, stateFollower.next())
assertEquals(true, bridge.active)
assertEquals(true, serverListening("localhost", 10005))
bridge.stop() // Finally check shutdown
assertEquals(false, stateFollower.next())
assertEquals(false, bridge.active)
assertEquals(false, serverListening("localhost", 10005))
} finally {
artemisClient.stop()
artemisServer.stop()
}
}
@Test
fun `Test artemis failover logic`() {
val configResource = "/net/corda/bridge/artemisfailover/firewall.conf"

View File

@ -0,0 +1,20 @@
package net.corda.bridge.services.artemis
import net.corda.bridge.services.api.BridgeArtemisConnectionService
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisSessionProvider
class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider {
override fun start(): ArtemisMessagingClient.Started {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
return artemisConnectionService.started!!
}
override fun stop() {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
}
override val started: ArtemisMessagingClient.Started?
get() = artemisConnectionService.started
}

View File

@ -1,13 +1,11 @@
package net.corda.bridge.services.ha
import net.corda.bridge.services.api.BridgeArtemisConnectionService
import net.corda.bridge.services.api.BridgeMasterService
import net.corda.bridge.services.api.FirewallAuditService
import net.corda.bridge.services.api.FirewallConfiguration
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.api.*
import net.corda.bridge.services.artemis.ForwardingArtemisMessageClient
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.bully.BullyLeaderClient
import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener
import net.corda.nodeapi.internal.zookeeper.ZkClient
import net.corda.nodeapi.internal.zookeeper.ZkLeader
@ -22,10 +20,17 @@ import kotlin.concurrent.withLock
*/
class ExternalMasterElectionService(private val conf: FirewallConfiguration,
private val auditService: FirewallAuditService,
artemisService: BridgeArtemisConnectionService,
private val artemisService: BridgeArtemisConnectionService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeMasterService, ServiceStateSupport by stateHelper {
private var haElector: ZkLeader? = null
private enum class ElectorMode {
ZOOKEEPER,
BULLY_OVER_ARTEMIS
}
private val mode: ElectorMode
private var leaderListener: CordaLeaderListener? = null
private var statusSubscriber: Subscription? = null
private val statusFollower = ServiceStateCombiner(listOf(artemisService))
@ -38,7 +43,11 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration,
init {
require(conf.haConfig != null) { "Undefined HA Config" }
require(conf.haConfig!!.haConnectionString.split(',').all { it.startsWith("zk://") }) { "Only Zookeeper HA mode 'zk://IPADDR:PORT supported" }
val connectionStrings = conf.haConfig!!.haConnectionString.split(',')
val zkOnly = connectionStrings.all { it.startsWith("zk://") }
val bullyOnly = (connectionStrings.size == 1) && (connectionStrings.single().toLowerCase() == "bully://localhost")
require(zkOnly xor bullyOnly) { "Only all Zookeeper HA mode 'zk://IPADDR:PORT, or Bully Algorithm mode 'bully://localhost' supported" }
if (zkOnly) mode = ElectorMode.ZOOKEEPER else if (bullyOnly) mode = ElectorMode.BULLY_OVER_ARTEMIS else throw IllegalArgumentException("Unsupported elector URL")
}
private fun becomeMaster() {
@ -75,29 +84,37 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration,
*/
private fun activate() {
activeTransitionLock.withLock {
val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",")
val connectionStrings = conf.haConfig!!.haConnectionString.split(',')
val leaderPriority = conf.haConfig!!.haPriority
val hostName: String = InetAddress.getLocalHost().hostName
val info = ManagementFactory.getRuntimeMXBean()
val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this
val nodeId = "$hostName:$pid"
val leaderElector = ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority)
val leaderElector: ZkLeader = when (mode) {
ElectorMode.ZOOKEEPER -> {
val zkConf = connectionStrings.map { it.replace("zk://", "") }.joinToString(",")
ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority)
}
ElectorMode.BULLY_OVER_ARTEMIS -> {
BullyLeaderClient(ForwardingArtemisMessageClient(artemisService), conf.haConfig!!.haTopic, nodeId, leaderPriority)
}
}
haElector = leaderElector
val listener = object : CordaLeaderListener {
override fun notLeader() {
auditService.statusChangeEvent("Leadership loss signalled from Zookeeper")
auditService.statusChangeEvent("Leadership loss signalled from $mode")
becomeSlave()
}
override fun isLeader() {
log.info("Zookeeper has signalled leadership acquired.")
log.info("$mode has signalled leadership acquired.")
becomeMaster()
}
}
leaderListener = listener
leaderElector.addLeadershipListener(listener)
leaderElector.start()
auditService.statusChangeEvent("Requesting leadership from Zookeeper")
auditService.statusChangeEvent("Requesting leadership from $mode")
leaderElector.requestLeadership()
}
}

View File

@ -1,13 +1,12 @@
package net.corda.bridge.services.sender
import net.corda.bridge.services.api.*
import net.corda.bridge.services.artemis.ForwardingArtemisMessageClient
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import net.corda.nodeapi.internal.bridging.BridgeMetricsService
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
@ -36,21 +35,6 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration,
BridgeAuditServiceAdaptor(auditService),
conf.enableAMQPPacketTrace)
private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider {
override fun start(): ArtemisMessagingClient.Started {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
return artemisConnectionService.started!!
}
override fun stop() {
// We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService
}
override val started: ArtemisMessagingClient.Started?
get() = artemisConnectionService.started
}
private class BridgeAuditServiceAdaptor(private val auditService: FirewallAuditService) : BridgeMetricsService {
override fun bridgeCreated(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
// No corresponding method on FirewallAuditService yet

View File

@ -0,0 +1,11 @@
firewallMode = SenderReceiver
outboundConfig : {
artemisBrokerAddress = "localhost:11005"
}
inboundConfig : {
listeningAddress = "0.0.0.0:10005"
}
networkParametersPath = network-parameters
haConfig : {
haConnectionString = "bully://localhost"
}

View File

@ -46,8 +46,8 @@ your own ``firewall.conf`` file will use these defaults:
.. literalinclude:: ../../bridge/src/main/resources/firewalldefault.conf
:language: javascript
Bridge operating modes
----------------------
Firewall operating modes
------------------------
.. note:: By default, the Corda node assumes that it will carry out the peer-to-peer functions of the ``bridge`` internally!
Before running a dedicated firewall process, it is essential to turn off the dev mode component by setting the
``enterpriseConfiguration.externalBridge`` property of the ``node.conf`` file to ``true``.
@ -213,16 +213,21 @@ absolute path to the firewall's base directory.
:crlCheckSoftFail: If true (recommended setting) allows certificate checks to pass if the CRL(certificate revocation list) provider is unavailable.
:haConfig: Optionally the ``SenderReceiver`` and ``BridgeInner`` modes can be run in a hot-warm configuration, which determines the active instance using an external master election service.
Currently, only Zookeeper can be used as master elector. Eventually other electors may be supported e.g. ``etcd``. This configuration section controls these options:
Currently, the leader election process can be delegated to Zookeeper, or the firewall can use the ``Bully Algorithm`` (see <https://en.wikipedia.org/wiki/Bully_algorithm>) via Publish/Subscribe messages on the artemis broker.
For production it is recommended that a Zookeeper cluster be used as this will protect against network partitioning scenarios. However, the ``Bully Algorithm`` mode does not require any additional server processes.
Eventually other electors may be supported e.g. ``etcd``. This configuration section controls these options:
:haConnectionString: A string containing the connection details of the master electors as a comma delimited list of connection string in the format ``zk://<host>:<port>``.
In future it intended that other schemes such as ``etcd`` are supported.
:haConnectionString: A string containing the connection details of the master electors as a comma delimited list of individual connection strings.
* To use an external Zookeeper cluster each connection item should be in the format ``zk://<host>:<port>``.
* To use the ``Bully Algorithm`` running over artemis the single connection string should be set to ``bully://localhost``.
:haPriority: The implementation uses a prioritise leader election algorithm, so that a preferred master instance can be set. The highest priority is 0 and larger integers have lower priority.
At the same level of priority, it is random which instance wins the leader election. If a ``bridge`` instance dies another will have the opportunity to become master in instead.
:haTopic: Sets the zookeeper topic prefix that the nodes used in resolving the election and must be the same for all ``bridge``
instances competing for master status. This is available to allow a single zookeeper cluster to be reused with multiple
:haTopic: Sets the zookeeper/artemis topic that the nodes used in resolving the election and must be the same for all ``bridge``
instances competing for master status. This is available to allow a single zookeeper/artemis cluster to be reused with multiple
sets of ``bridges`` (e.g. in test environments).
The default value is ``bridge/ha`` and would not normally need to be changed if the cluster is not shared.

View File

@ -0,0 +1,425 @@
package net.corda.nodeapi.internal.bully
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener
import net.corda.nodeapi.internal.zookeeper.ZkLeader
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import java.time.Clock
import java.time.Instant
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread
class BullyLeaderClient(val artemis: ArtemisSessionProvider,
private val electionPath: String,
val nodeId: String,
val priority: Int) : ZkLeader {
private companion object {
const val LEADER_TIMEOUT_MSEC = 1000L
private val log = contextLogger()
}
@VisibleForTesting
var clock: Clock = Clock.systemUTC()
@CordaSerializable
private enum class MessageType {
ELECTION_REQUEST,
ELECTION_REJECT,
LEADER_ANNOUNCE,
LEADER_RETIRE
}
@CordaSerializable
private data class LeaderMessage(val msgType: MessageType,
val term: Int,
val proposedLeader: String,
val leaderPriority: Int)
private class ArtemisMessageSession private constructor(
private val leaderTopic: SimpleString,
private val session: ClientSession,
private val messageConsumer: ClientConsumer,
private val messageProducer: ClientProducer,
private val onMessage: (LeaderMessage) -> Unit) : AutoCloseable, FailoverEventListener {
companion object {
fun connectToArtemis(electionPath: String, locator: ServerLocator, factory: ClientSessionFactory, onMessage: (LeaderMessage) -> Unit): ArtemisMessageSession {
val session = factory.createSession(ArtemisMessagingComponent.NODE_P2P_USER,
ArtemisMessagingComponent.NODE_P2P_USER,
false,
true,
true,
locator.isPreAcknowledge,
ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
val producer = session.createProducer()
val leaderTopic = SimpleString("${INTERNAL_PREFIX}leader.$electionPath")
val queueName = SimpleString(UUID.randomUUID().toString())
session.createTemporaryQueue(leaderTopic, RoutingType.MULTICAST, queueName)
val consumer = session.createConsumer(queueName)
val artemisMessageSession = ArtemisMessageSession(leaderTopic, session, consumer, producer, onMessage)
session.addFailoverListener(artemisMessageSession)
consumer.setMessageHandler { msg ->
artemisMessageSession.processMessage(msg)
}
session.start()
return artemisMessageSession
}
}
private val closed: AtomicBoolean = AtomicBoolean(false)
private val _connected: AtomicBoolean = AtomicBoolean(true)
val connected: Boolean get() = _connected.get()
override fun failoverEvent(eventType: FailoverEventType) {
log.warn("Artemis Failover event $eventType")
_connected.set(eventType == FailoverEventType.FAILOVER_COMPLETED)
}
private fun processMessage(message: ClientMessage) {
try {
val data: ByteArray = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val leaderMessage = data.deserialize<LeaderMessage>(context = SerializationDefaults.P2P_CONTEXT)
onMessage(leaderMessage)
message.acknowledge()
} catch (ex: Exception) {
log.error("Unable to process leader control message", ex)
}
}
fun sendMessage(message: LeaderMessage) {
if (closed.get() || session.isClosed || !connected) return
val messageBytes = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val artemisMessage = session.createMessage(false)
artemisMessage.writeBodyBufferBytes(messageBytes)
messageProducer.send(leaderTopic, artemisMessage)
}
override fun close() {
if (!closed.getAndSet(true)) {
thread {
// send of to another thread as this can block for ages inside Artemis
closeInternal()
}
}
}
private fun close(target: AutoCloseable) {
try {
target.close()
} catch (ignored: ActiveMQObjectClosedException) {
// swallow
}
}
private fun closeInternal() {
session.removeFailoverListener(this)
if (!messageConsumer.isClosed) {
close(messageConsumer)
}
if (!session.isClosed) {
close(session)
}
}
}
private enum class BullyState {
FOLLOWER,
INITIATE_ELECTION,
POSSIBLE_LEADER,
LEADER
}
private class LeaderState(clock: Clock) {
var scheduler: ScheduledExecutorService? = null
val isStarted: Boolean get() = (scheduler?.isShutdown?.not() ?: false)
var pollTimer: ScheduledFuture<*>? = null
var isActive: Boolean = false
var messageSession: ArtemisMessageSession? = null
var state: BullyState = BullyState.FOLLOWER
var currentTerm: Int = 0
var currentLeader: String? = null
var leaderPriority: Int? = null
var leaderUpdated: Instant = clock.instant()
}
private val state = ThreadBox(LeaderState(clock))
private val listeners = CopyOnWriteArrayList<CordaLeaderListener>()
override fun start() {
state.locked {
if (isStarted) return
val newScheduler = Executors.newSingleThreadScheduledExecutor()
scheduler = newScheduler
pollTimer = newScheduler.scheduleAtFixedRate(::timerEvent, 0L, LEADER_TIMEOUT_MSEC / 2L, TimeUnit.MILLISECONDS)
}
}
override fun close() {
if (isStarted()) {
relinquishLeadership()
}
state.locked {
messageSession?.close()
messageSession = null
pollTimer?.cancel(false)
pollTimer = null
scheduler?.shutdown()
scheduler = null
}
}
private fun timerEvent() {
var down = false
state.locked {
if (!isStarted) return
log.debug { "$nodeId Timer running. current state $state" }
val wasLeader = isLeader()
val artemis = artemis.started
if (artemis == null || artemis.session.isClosed) {
log.info("Artemis lost")
messageSession?.close()
messageSession = null
state = BullyState.FOLLOWER
currentLeader = null
leaderPriority = null
if (wasLeader) down = true
return@locked
}
if (messageSession == null) {
log.info("$nodeId Artemis connected. Creating leader session")
messageSession = ArtemisMessageSession.connectToArtemis(electionPath,
artemis.serverLocator,
artemis.sessionFactory,
::messageEvent)
return@locked
}
if (!messageSession!!.connected) {
state = BullyState.FOLLOWER
currentLeader = null
leaderPriority = null
if (wasLeader) down = true
return@locked
}
if (!isActive) {
log.debug { "$nodeId Idling as inactive" }
state = BullyState.FOLLOWER
if (wasLeader) down = true
return@locked
}
val now = clock.instant()
when (state) {
BullyState.FOLLOWER -> {
// Unknown Leader, or dead leader prepare to do election
if (currentLeader == null || leaderUpdated.plusMillis(LEADER_TIMEOUT_MSEC).isBefore(now)) {
log.info("$nodeId Leader missing start election")
currentLeader = null
leaderPriority = null
state = BullyState.INITIATE_ELECTION
} else if (compareIds(nodeId, priority, currentLeader!!, leaderPriority!!) > 0) { // we are higher priority start elections
log.info("$nodeId Leader lower priority than us start challenge")
currentLeader = null
leaderPriority = null
state = BullyState.INITIATE_ELECTION
}
}
BullyState.INITIATE_ELECTION -> {
leaderUpdated = now // move time to keep track of election announcements
state = BullyState.POSSIBLE_LEADER
val message = LeaderMessage(MessageType.ELECTION_REQUEST,
currentTerm,
nodeId,
priority)
log.debug { "$nodeId Send election message $message" }
messageSession!!.sendMessage(message)
}
BullyState.POSSIBLE_LEADER -> {
val message = LeaderMessage(MessageType.ELECTION_REQUEST,
currentTerm,
nodeId,
priority)
log.debug { "$nodeId Send election message $message" }
messageSession!!.sendMessage(message)
if (leaderUpdated.plusMillis(2 * LEADER_TIMEOUT_MSEC).isBefore(now)) { // everyone had a fair chance
log.info("$nodeId transition to leader state")
state = BullyState.LEADER
++currentTerm
}
}
BullyState.LEADER -> {
val message = LeaderMessage(MessageType.LEADER_ANNOUNCE,
currentTerm,
nodeId,
priority)
log.debug { "$nodeId Broadcast leader heartbeat $message" }
messageSession!!.sendMessage(message)
}
}
}
if (down) {
lostLeadership()
}
}
private fun compareIds(nodeId1: String, priority1: Int, nodeId2: String, priority2: Int): Int {
if (priority1 > priority2) { // smaller priority value wins
return -1
} else if (priority1 < priority2) {
return +1
}
if (nodeId1 < nodeId2) {
return -1
} else if (nodeId2 > nodeId1) {
return +1
}
return 0
}
private fun messageEvent(message: LeaderMessage) {
log.debug { "$nodeId received message $message" }
var up = false
var down = false
state.locked {
if (!isStarted) return
if (message.term < currentTerm) { // ignore old messages
log.debug { "$nodeId discard outdated message" }
return
}
val now = clock.instant()
val wasLeader = isLeader()
if (message.term > currentTerm) { // we are out of sync, reset and consider our position
log.debug { "$nodeId new term detected resetting" }
currentTerm = message.term
state = BullyState.FOLLOWER
leaderUpdated = now
currentLeader = message.proposedLeader
leaderPriority = message.leaderPriority
if (wasLeader) down = true
return@locked
}
val comparison = compareIds(nodeId, priority, message.proposedLeader, message.leaderPriority)
if (comparison < 0) { // we are lower priority so ensure we are just a follower
if (state != BullyState.FOLLOWER) {
state = BullyState.FOLLOWER
log.info("$nodeId terminate our involvement in election possible leader ${message.proposedLeader}")
}
leaderUpdated = now
currentLeader = message.proposedLeader
leaderPriority = message.leaderPriority
}
when (message.msgType) {
MessageType.ELECTION_REQUEST -> {
if (isActive && (comparison > 0)) { // we are higher priority, reject them
log.debug { "$nodeId send rebuttal to lower priority node" }
messageSession!!.sendMessage(LeaderMessage(MessageType.ELECTION_REJECT,
currentTerm,
currentLeader ?: nodeId,
leaderPriority ?: priority))
}
}
MessageType.ELECTION_REJECT -> {
// Nothing to do here, handled by priority check above
}
MessageType.LEADER_ANNOUNCE -> {
leaderUpdated = now
currentLeader = message.proposedLeader
leaderPriority = message.leaderPriority
if ((state == BullyState.LEADER) && (currentLeader != nodeId)) { // shouldn't happen, but reset now!!
log.error("$nodeId unexpected leader announcement")
state = BullyState.FOLLOWER
}
}
MessageType.LEADER_RETIRE -> { // polite leader shutdown
if (isActive) {
log.info("$nodeId retirement announced. Starting election")
state = BullyState.INITIATE_ELECTION
leaderUpdated = now
currentLeader = null
leaderPriority = null
}
}
}
val isLeader = isLeader()
if (wasLeader xor isLeader) {
up = isLeader
down = !isLeader
}
}
if (down) {
lostLeadership()
}
if (up) {
acquireLeadership()
}
}
override fun requestLeadership() {
state.locked {
require(isStarted) { "Leader elector must be started first" }
isActive = true
}
}
override fun relinquishLeadership() {
val wasLeader = state.locked {
require(isStarted) { "Leader elector must be started first" }
val oldState = isLeader()
currentLeader = null
leaderPriority = null
state = BullyState.FOLLOWER
isActive = false
oldState
}
if (wasLeader) {
lostLeadership()
state.locked {
log.debug { "$nodeId tell other nodes to start election" }
messageSession?.sendMessage(LeaderMessage(MessageType.LEADER_RETIRE,
currentTerm,
nodeId,
priority))
}
}
}
private fun acquireLeadership() {
for (listener in listeners) {
listener.isLeader()
}
}
private fun lostLeadership() {
for (listener in listeners) {
listener.notLeader()
}
}
override fun addLeadershipListener(listener: CordaLeaderListener) {
listeners += listener
}
override fun removeLeadershipListener(listener: CordaLeaderListener) {
listeners -= listener
}
override fun isLeader(): Boolean = state.locked { isStarted && isActive && (state == BullyState.LEADER) && (currentLeader == nodeId) }
override fun isStarted(): Boolean = state.locked { isStarted }
}

View File

@ -0,0 +1,509 @@
package net.corda.nodeapi.internal.bully
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.EnterpriseConfiguration
import net.corda.node.services.config.MutualExclusionConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.stubs.CertificateStoreStubs
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class BullyLeaderTest {
@Rule
@JvmField
val tempFolder = TemporaryFolder()
@Rule
@JvmField
val serializationEnvironment = SerializationEnvironmentRule(true)
private abstract class AbstractNodeConfiguration : NodeConfiguration
class StateTracker(val name: String,
val counts: AtomicInteger,
val failureRef: AtomicBoolean) : CordaLeaderListener {
private val monitor = Object()
var isLeader: Boolean = false
override fun isLeader() {
println("$name is leader")
val count = counts.getAndIncrement()
if (count > 0) {
println("More than one leader at once")
failureRef.set(true)
}
synchronized(monitor) {
isLeader = true
monitor.notifyAll()
}
}
override fun notLeader() {
println("$name lost leadership")
counts.decrementAndGet()
synchronized(monitor) {
isLeader = false
monitor.notifyAll()
}
}
fun waitUp() {
synchronized(monitor) {
while (!isLeader) {
monitor.wait()
}
}
}
fun waitDown() {
synchronized(monitor) {
while (isLeader) {
monitor.wait()
}
}
}
}
@Test
fun `Simple Leader Test`() {
val artemisServer = createArtemisServer()
artemisServer.use {
val artemisClient = createArtemisClient()
try {
val counts = AtomicInteger(0)
val errors = AtomicBoolean(false)
val tracker = StateTracker("1", counts, errors)
val leader = BullyLeaderClient(artemisClient, "test", "1", 10)
leader.addLeadershipListener(tracker)
assertFalse(leader.isStarted())
assertFalse(leader.isLeader())
leader.start()
assertTrue(leader.isStarted())
assertFalse(leader.isLeader())
leader.requestLeadership()
tracker.waitUp()
assertTrue(leader.isStarted())
assertTrue(leader.isLeader())
leader.relinquishLeadership()
tracker.waitDown()
assertTrue(leader.isStarted())
assertFalse(leader.isLeader())
leader.close()
assertFalse(leader.isStarted())
assertFalse(leader.isLeader())
assertFalse(errors.get())
} finally {
artemisClient.stop()
}
}
}
@Test
fun `Simple Leader Test Shutdown whilst leader`() {
val artemisServer = createArtemisServer()
artemisServer.use {
val artemisClient = createArtemisClient()
try {
val counts = AtomicInteger(0)
val errors = AtomicBoolean(false)
val tracker = StateTracker("1", counts, errors)
val leader = BullyLeaderClient(artemisClient, "test", "1", 10)
leader.addLeadershipListener(tracker)
assertFalse(leader.isStarted())
assertFalse(leader.isLeader())
leader.start()
assertTrue(leader.isStarted())
assertFalse(leader.isLeader())
leader.requestLeadership()
tracker.waitUp()
assertTrue(leader.isStarted())
assertTrue(leader.isLeader())
leader.close()
assertFalse(leader.isStarted())
assertFalse(leader.isLeader())
assertFalse(errors.get())
} finally {
artemisClient.stop()
}
}
}
@Test
fun `Simple Two Leader Test`() {
val artemisServer = createArtemisServer()
artemisServer.use {
val artemisClient1 = createArtemisClient()
val artemisClient2 = createArtemisClient()
try {
val counts = AtomicInteger(0)
val errors = AtomicBoolean(false)
val tracker1 = StateTracker("1", counts, errors)
val tracker2 = StateTracker("2", counts, errors)
val leader1 = BullyLeaderClient(artemisClient1, "test", "1", 10)
val leader2 = BullyLeaderClient(artemisClient2, "test", "2", 10)
leader1.addLeadershipListener(tracker1)
leader2.addLeadershipListener(tracker2)
assertFalse(leader1.isStarted())
assertFalse(leader1.isLeader())
assertFalse(leader2.isStarted())
assertFalse(leader2.isLeader())
leader1.start()
leader2.start()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
leader1.requestLeadership()
tracker1.waitUp()
assertTrue(leader1.isStarted())
assertTrue(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
leader1.relinquishLeadership()
tracker1.waitDown()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
leader2.requestLeadership()
tracker2.waitUp()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertTrue(leader2.isLeader())
leader1.requestLeadership()
Thread.sleep(10000L)
assertTrue(leader1.isLeader() xor leader2.isLeader())
leader1.close()
leader2.close()
assertFalse(leader1.isStarted())
assertFalse(leader1.isLeader())
assertFalse(leader2.isStarted())
assertFalse(leader2.isLeader())
assertFalse(errors.get())
} finally {
artemisClient1.stop()
artemisClient2.stop()
}
}
}
@Test
fun `Priority Leader Test`() {
val artemisServer = createArtemisServer()
artemisServer.use {
val artemisClient1 = createArtemisClient()
val artemisClient2 = createArtemisClient()
val artemisClient3 = createArtemisClient()
try {
val counts = AtomicInteger(0)
val errors = AtomicBoolean(false)
val tracker1 = StateTracker("1", counts, errors)
val tracker2 = StateTracker("2", counts, errors)
val tracker3 = StateTracker("3", counts, errors)
val leader1 = BullyLeaderClient(artemisClient1, "test", "1", 10)
val leader2 = BullyLeaderClient(artemisClient2, "test", "2", 20)
val leader3 = BullyLeaderClient(artemisClient3, "test", "3", 30)
leader1.addLeadershipListener(tracker1)
leader2.addLeadershipListener(tracker2)
leader3.addLeadershipListener(tracker3)
leader1.start()
leader2.start()
leader3.start()
leader3.requestLeadership() // single active leader should win
tracker3.waitUp()
assertFalse(leader1.isLeader())
assertFalse(leader2.isLeader())
assertTrue(leader3.isLeader())
leader1.requestLeadership() // higher priority leader should preempt
tracker1.waitUp()
assertTrue(leader1.isLeader())
assertFalse(leader2.isLeader())
assertFalse(leader3.isLeader())
leader2.requestLeadership() // lower priority leader should do nothing to existing leader, even after delay
Thread.sleep(10000L)
assertTrue(leader1.isLeader())
assertFalse(leader2.isLeader())
assertFalse(leader3.isLeader())
leader1.relinquishLeadership() // when leader1 gives up leader2 should win as next highest active node
tracker1.waitDown()
tracker2.waitUp()
assertFalse(leader1.isLeader())
assertTrue(leader2.isLeader())
assertFalse(leader3.isLeader())
leader1.close()
leader2.close()
leader3.close()
assertFalse(errors.get())
} finally {
artemisClient1.stop()
artemisClient2.stop()
artemisClient3.stop()
}
}
}
@Test
fun `Multi Leader Tests`() {
val artemisServer = createArtemisServer()
artemisServer.use { _ ->
val leaders = (0..9).map {
val artemis = createArtemisClient()
BullyLeaderClient(artemis, "test", it.toString(), 1)
}
val leaderCount = AtomicInteger(0)
val failureRef = AtomicBoolean(false)
leaders.forEach {
val tracker = StateTracker(it.nodeId, leaderCount, failureRef)
it.addLeadershipListener(tracker)
it.start()
}
val rand = Random()
val active = mutableSetOf<Int>()
for (i in 0 until 30) {
val newLeader = rand.nextInt(leaders.size)
println("activate $newLeader")
active.add(newLeader)
leaders[newLeader].requestLeadership()
val dropLeader = rand.nextInt(leaders.size)
active.remove(dropLeader)
println("deactivate $dropLeader $active")
leaders[dropLeader].relinquishLeadership()
while (active.isNotEmpty() && leaderCount.get() == 0) {
Thread.sleep(100)
}
}
leaders.forEach {
it.artemis.stop()
it.close()
}
assertFalse(failureRef.get())
}
}
@Test
fun `Multi Leader Tests Different Priorities`() {
val artemisServer = createArtemisServer()
artemisServer.use { _ ->
val leaders = (0..9).map {
val artemis = createArtemisClient()
BullyLeaderClient(artemis, "test", it.toString(), it)
}
val leaderCount = AtomicInteger(0)
val failureRef = AtomicBoolean(false)
leaders.forEach {
val tracker = StateTracker(it.nodeId, leaderCount, failureRef)
it.addLeadershipListener(tracker)
it.start()
}
val rand = Random()
val active = mutableSetOf<Int>()
for (i in 0 until 30) {
val newLeader = rand.nextInt(leaders.size)
println("activate $newLeader")
active.add(newLeader)
leaders[newLeader].requestLeadership()
val dropLeader = rand.nextInt(leaders.size)
active.remove(dropLeader)
println("deactivate $dropLeader $active")
leaders[dropLeader].relinquishLeadership()
while (active.isNotEmpty() && leaderCount.get() == 0) {
Thread.sleep(100)
}
}
leaders.forEach {
it.artemis.stop()
it.close()
}
assertFalse(failureRef.get())
}
}
private fun artemisReconnectionLoop(artemisClient: ArtemisSessionProvider, running: CountDownLatch) {
artemisClient.start()
try {
running.await()
} finally {
artemisClient.stop()
}
}
@Test
fun `Disconnect Tests`() {
val artemisConfig = createConfig(11005, true)
val running = CountDownLatch(1)
val artemisClient1 = createArtemisClient(artemisConfig.p2pAddress.port, started = false)
val artemisRetryLoop1 = Thread({ artemisReconnectionLoop(artemisClient1, running) }, "Artemis Connector Thread").apply {
isDaemon = true
}
artemisRetryLoop1.start()
val artemisClient2 = createArtemisClient(artemisConfig.p2pAddress.port, started = false)
val artemisRetryLoop2 = Thread({ artemisReconnectionLoop(artemisClient2, running) }, "Artemis Connector Thread").apply {
isDaemon = true
}
artemisRetryLoop2.start()
try {
val leaderCount = AtomicInteger(0)
val failureRef = AtomicBoolean(false)
val leader1 = BullyLeaderClient(artemisClient1, "test", "1", 10)
val leader2 = BullyLeaderClient(artemisClient2, "test", "2", 20)
val watcher1 = StateTracker(leader1.nodeId, leaderCount, failureRef)
leader1.addLeadershipListener(watcher1)
val watcher2 = StateTracker(leader2.nodeId, leaderCount, failureRef)
leader2.addLeadershipListener(watcher2)
leader1.start()
leader1.requestLeadership()
leader2.start()
leader2.requestLeadership()
Thread.sleep(2000L)
val server = createArtemisServer(artemisConfig.p2pAddress.port, false)
watcher1.waitUp()
assertTrue(leader1.isStarted())
assertTrue(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
server.stop()
watcher1.waitDown()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertFalse(leader2.isLeader())
assertFalse(failureRef.get())
leader1.relinquishLeadership()
val server2 = createArtemisServer(artemisConfig.p2pAddress.port, false)
watcher2.waitUp()
assertTrue(leader1.isStarted())
assertFalse(leader1.isLeader())
assertTrue(leader2.isStarted())
assertTrue(leader2.isLeader())
server2.stop()
leader1.close()
leader2.close()
} finally {
running.countDown()
artemisRetryLoop1.join()
artemisRetryLoop2.join()
}
}
@Test
fun `Clients Randomly do Things`() {
val CLIENTS_NUMBER = 10
val ACTIONS_NUMBER = 10
val artemisServer = createArtemisServer()
artemisServer.use { server ->
val countDownLatch = CountDownLatch(CLIENTS_NUMBER)
val leaderCount = AtomicInteger(0)
val failureRef = AtomicBoolean(false)
val clientList = (1..CLIENTS_NUMBER).map {
val artemis = createArtemisClient()
BullyLeaderClient(artemis, "test", it.toString(), 10)
}
clientList.forEach { client ->
thread {
client.addLeadershipListener(StateTracker(client.nodeId, leaderCount, failureRef))
client.start()
val random = Random()
for (i in 1 until ACTIONS_NUMBER) {
val action = random.nextInt(2)
when (action) {
0 -> client.requestLeadership()
1 -> client.relinquishLeadership()
else -> throw IllegalArgumentException("Invalid action choice")
}
Thread.sleep(100L * random.nextInt(30))
}
client.requestLeadership() // end as possible leader
countDownLatch.countDown()
}
}
countDownLatch.await()
//only one leader should exist
var timeout = 100
while (true) {
val leaderNumber = leaderCount.get()
assertTrue(leaderNumber <= 1)
if (leaderNumber == 1) {
break
}
--timeout
assertTrue(timeout > 0)
Thread.sleep(100L)
}
clientList.forEach { client ->
client.artemis.stop()
client.close()
}
assertFalse(failureRef.get())
}
}
private fun createConfig(port: Int, createCerts: Boolean = false): AbstractNodeConfiguration {
val baseDirectory = tempFolder.root.toPath()
val certificatesDirectory = baseDirectory / "certificates"
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val p2pSslOptions = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(DUMMY_BANK_A_NAME).whenever(it).myLegalName
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(p2pSslOptions).whenever(it).p2pSslOptions
doReturn(NetworkHostAndPort("localhost", port)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration
}
if (createCerts) {
artemisConfig.configureWithDevSSLCertificate()
}
return artemisConfig
}
private fun createArtemisClient(port: Int = 11005, started: Boolean = true): ArtemisSessionProvider {
val artemisConfig = createConfig(port)
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions,
NetworkHostAndPort("localhost", port),
MAX_MESSAGE_SIZE,
confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize,
externalBrokerConnectionConfig = ExternalBrokerConnectionConfiguration.CONTINUOUS_RETRY)
if (started) {
artemisClient.start()
}
return artemisClient
}
private fun createArtemisServer(port: Int = 11005, createCerts: Boolean = true): ArtemisMessagingServer {
val artemisConfig = createConfig(port, createCerts)
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", port), MAX_MESSAGE_SIZE)
artemisServer.start()
return artemisServer
}
}