diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt index 57a9f1d5b7..177babfc68 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt @@ -22,6 +22,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry +import net.corda.nodeapi.internal.bully.BullyLeaderClient import net.corda.nodeapi.internal.zookeeper.ZkClient import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.DUMMY_BANK_A_NAME @@ -299,6 +300,52 @@ class BridgeIntegrationTest { } } + @Test + fun `Run HA using Bully algorithm`() { + val configResource = "/net/corda/bridge/habullysingleprocess/firewall.conf" + createNetworkParams(tempFolder.root.toPath()) + val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource) + assertEquals(BridgeHAConfigImpl("bully://localhost", 10), config.haConfig) + config.createBridgeKeyStores(DUMMY_BANK_A_NAME) + val (artemisServer, artemisClient) = createArtemis() + try { + installBridgeControlResponder(artemisClient) + val bridge = FirewallInstance(config, FirewallVersionInfo(1, "1.1", "Dummy", "Test")) + val stateFollower = bridge.activeChange.toBlocking().iterator + assertEquals(false, stateFollower.next()) + assertEquals(false, bridge.active) + bridge.start() + assertEquals(false, bridge.active) // Starting the bridge insufficient to go active + assertEquals(true, stateFollower.next()) + assertEquals(true, bridge.active) + assertEquals(true, serverListening("localhost", 10005)) + val higherPriorityClient = BullyLeaderClient(artemisClient, "/bridge/ha", "Test", 5) + higherPriorityClient.start() + higherPriorityClient.requestLeadership() // should win leadership and kick out our bridge + assertEquals(false, stateFollower.next()) + assertEquals(false, bridge.active) + var socketState = true + for (i in 0 until 5) { // The event signalling bridge down is pretty immediate, but the cascade of events leading to socket close can take a while + socketState = serverListening("localhost", 10005) + if (!socketState) break + Thread.sleep(100) + } + assertEquals(false, socketState) + higherPriorityClient.relinquishLeadership() // let our bridge back as leader + higherPriorityClient.close() + assertEquals(true, stateFollower.next()) + assertEquals(true, bridge.active) + assertEquals(true, serverListening("localhost", 10005)) + bridge.stop() // Finally check shutdown + assertEquals(false, stateFollower.next()) + assertEquals(false, bridge.active) + assertEquals(false, serverListening("localhost", 10005)) + } finally { + artemisClient.stop() + artemisServer.stop() + } + } + @Test fun `Test artemis failover logic`() { val configResource = "/net/corda/bridge/artemisfailover/firewall.conf" diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/artemis/ForwardingArtemisMessageClient.kt b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/ForwardingArtemisMessageClient.kt new file mode 100644 index 0000000000..7f7906f544 --- /dev/null +++ b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/ForwardingArtemisMessageClient.kt @@ -0,0 +1,20 @@ +package net.corda.bridge.services.artemis + +import net.corda.bridge.services.api.BridgeArtemisConnectionService +import net.corda.nodeapi.internal.ArtemisMessagingClient +import net.corda.nodeapi.internal.ArtemisSessionProvider + +class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider { + override fun start(): ArtemisMessagingClient.Started { + // We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService + return artemisConnectionService.started!! + } + + override fun stop() { + // We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService + } + + override val started: ArtemisMessagingClient.Started? + get() = artemisConnectionService.started + +} \ No newline at end of file diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt index f1babc3dc5..f55214bd3a 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/ha/ExternalMasterElectionService.kt @@ -1,13 +1,11 @@ package net.corda.bridge.services.ha -import net.corda.bridge.services.api.BridgeArtemisConnectionService -import net.corda.bridge.services.api.BridgeMasterService -import net.corda.bridge.services.api.FirewallAuditService -import net.corda.bridge.services.api.FirewallConfiguration -import net.corda.bridge.services.api.ServiceStateSupport +import net.corda.bridge.services.api.* +import net.corda.bridge.services.artemis.ForwardingArtemisMessageClient import net.corda.bridge.services.util.ServiceStateCombiner import net.corda.bridge.services.util.ServiceStateHelper import net.corda.core.utilities.contextLogger +import net.corda.nodeapi.internal.bully.BullyLeaderClient import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener import net.corda.nodeapi.internal.zookeeper.ZkClient import net.corda.nodeapi.internal.zookeeper.ZkLeader @@ -22,10 +20,17 @@ import kotlin.concurrent.withLock */ class ExternalMasterElectionService(private val conf: FirewallConfiguration, private val auditService: FirewallAuditService, - artemisService: BridgeArtemisConnectionService, + private val artemisService: BridgeArtemisConnectionService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeMasterService, ServiceStateSupport by stateHelper { private var haElector: ZkLeader? = null + + private enum class ElectorMode { + ZOOKEEPER, + BULLY_OVER_ARTEMIS + } + + private val mode: ElectorMode private var leaderListener: CordaLeaderListener? = null private var statusSubscriber: Subscription? = null private val statusFollower = ServiceStateCombiner(listOf(artemisService)) @@ -38,7 +43,11 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration, init { require(conf.haConfig != null) { "Undefined HA Config" } - require(conf.haConfig!!.haConnectionString.split(',').all { it.startsWith("zk://") }) { "Only Zookeeper HA mode 'zk://IPADDR:PORT supported" } + val connectionStrings = conf.haConfig!!.haConnectionString.split(',') + val zkOnly = connectionStrings.all { it.startsWith("zk://") } + val bullyOnly = (connectionStrings.size == 1) && (connectionStrings.single().toLowerCase() == "bully://localhost") + require(zkOnly xor bullyOnly) { "Only all Zookeeper HA mode 'zk://IPADDR:PORT, or Bully Algorithm mode 'bully://localhost' supported" } + if (zkOnly) mode = ElectorMode.ZOOKEEPER else if (bullyOnly) mode = ElectorMode.BULLY_OVER_ARTEMIS else throw IllegalArgumentException("Unsupported elector URL") } private fun becomeMaster() { @@ -75,29 +84,37 @@ class ExternalMasterElectionService(private val conf: FirewallConfiguration, */ private fun activate() { activeTransitionLock.withLock { - val zkConf = conf.haConfig!!.haConnectionString.split(',').map { it.replace("zk://", "") }.joinToString(",") + val connectionStrings = conf.haConfig!!.haConnectionString.split(',') val leaderPriority = conf.haConfig!!.haPriority val hostName: String = InetAddress.getLocalHost().hostName val info = ManagementFactory.getRuntimeMXBean() val pid = info.name.split("@").firstOrNull() // TODO Java 9 has better support for this val nodeId = "$hostName:$pid" - val leaderElector = ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority) + val leaderElector: ZkLeader = when (mode) { + ElectorMode.ZOOKEEPER -> { + val zkConf = connectionStrings.map { it.replace("zk://", "") }.joinToString(",") + ZkClient(zkConf, conf.haConfig!!.haTopic, nodeId, leaderPriority) + } + ElectorMode.BULLY_OVER_ARTEMIS -> { + BullyLeaderClient(ForwardingArtemisMessageClient(artemisService), conf.haConfig!!.haTopic, nodeId, leaderPriority) + } + } haElector = leaderElector val listener = object : CordaLeaderListener { override fun notLeader() { - auditService.statusChangeEvent("Leadership loss signalled from Zookeeper") + auditService.statusChangeEvent("Leadership loss signalled from $mode") becomeSlave() } override fun isLeader() { - log.info("Zookeeper has signalled leadership acquired.") + log.info("$mode has signalled leadership acquired.") becomeMaster() } } leaderListener = listener leaderElector.addLeadershipListener(listener) leaderElector.start() - auditService.statusChangeEvent("Requesting leadership from Zookeeper") + auditService.statusChangeEvent("Requesting leadership from $mode") leaderElector.requestLeadership() } } diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt index ea5441ef24..d524fac469 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt @@ -1,13 +1,12 @@ package net.corda.bridge.services.sender import net.corda.bridge.services.api.* +import net.corda.bridge.services.artemis.ForwardingArtemisMessageClient import net.corda.bridge.services.util.ServiceStateCombiner import net.corda.bridge.services.util.ServiceStateHelper import net.corda.core.identity.CordaX500Name import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger -import net.corda.nodeapi.internal.ArtemisMessagingClient -import net.corda.nodeapi.internal.ArtemisSessionProvider import net.corda.nodeapi.internal.bridging.BridgeControlListener import net.corda.nodeapi.internal.bridging.BridgeMetricsService import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage @@ -36,21 +35,6 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration, BridgeAuditServiceAdaptor(auditService), conf.enableAMQPPacketTrace) - private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider { - override fun start(): ArtemisMessagingClient.Started { - // We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService - return artemisConnectionService.started!! - } - - override fun stop() { - // We don't want to start and stop artemis from clients as the lifecycle management is provided by the BridgeArtemisConnectionService - } - - override val started: ArtemisMessagingClient.Started? - get() = artemisConnectionService.started - - } - private class BridgeAuditServiceAdaptor(private val auditService: FirewallAuditService) : BridgeMetricsService { override fun bridgeCreated(targets: List, legalNames: Set) { // No corresponding method on FirewallAuditService yet diff --git a/bridge/src/test/resources/net/corda/bridge/habullysingleprocess/firewall.conf b/bridge/src/test/resources/net/corda/bridge/habullysingleprocess/firewall.conf new file mode 100644 index 0000000000..148eb8690f --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/habullysingleprocess/firewall.conf @@ -0,0 +1,11 @@ +firewallMode = SenderReceiver +outboundConfig : { + artemisBrokerAddress = "localhost:11005" +} +inboundConfig : { + listeningAddress = "0.0.0.0:10005" +} +networkParametersPath = network-parameters +haConfig : { + haConnectionString = "bully://localhost" +} \ No newline at end of file diff --git a/docs/source/firewall-configuration-file.rst b/docs/source/firewall-configuration-file.rst index decffda399..955442ac84 100644 --- a/docs/source/firewall-configuration-file.rst +++ b/docs/source/firewall-configuration-file.rst @@ -46,8 +46,8 @@ your own ``firewall.conf`` file will use these defaults: .. literalinclude:: ../../bridge/src/main/resources/firewalldefault.conf :language: javascript -Bridge operating modes ----------------------- +Firewall operating modes +------------------------ .. note:: By default, the Corda node assumes that it will carry out the peer-to-peer functions of the ``bridge`` internally! Before running a dedicated firewall process, it is essential to turn off the dev mode component by setting the ``enterpriseConfiguration.externalBridge`` property of the ``node.conf`` file to ``true``. @@ -213,16 +213,21 @@ absolute path to the firewall's base directory. :crlCheckSoftFail: If true (recommended setting) allows certificate checks to pass if the CRL(certificate revocation list) provider is unavailable. :haConfig: Optionally the ``SenderReceiver`` and ``BridgeInner`` modes can be run in a hot-warm configuration, which determines the active instance using an external master election service. - Currently, only Zookeeper can be used as master elector. Eventually other electors may be supported e.g. ``etcd``. This configuration section controls these options: + Currently, the leader election process can be delegated to Zookeeper, or the firewall can use the ``Bully Algorithm`` (see ) via Publish/Subscribe messages on the artemis broker. + For production it is recommended that a Zookeeper cluster be used as this will protect against network partitioning scenarios. However, the ``Bully Algorithm`` mode does not require any additional server processes. + Eventually other electors may be supported e.g. ``etcd``. This configuration section controls these options: - :haConnectionString: A string containing the connection details of the master electors as a comma delimited list of connection string in the format ``zk://:``. - In future it intended that other schemes such as ``etcd`` are supported. + :haConnectionString: A string containing the connection details of the master electors as a comma delimited list of individual connection strings. + + * To use an external Zookeeper cluster each connection item should be in the format ``zk://:``. + + * To use the ``Bully Algorithm`` running over artemis the single connection string should be set to ``bully://localhost``. :haPriority: The implementation uses a prioritise leader election algorithm, so that a preferred master instance can be set. The highest priority is 0 and larger integers have lower priority. At the same level of priority, it is random which instance wins the leader election. If a ``bridge`` instance dies another will have the opportunity to become master in instead. - :haTopic: Sets the zookeeper topic prefix that the nodes used in resolving the election and must be the same for all ``bridge`` - instances competing for master status. This is available to allow a single zookeeper cluster to be reused with multiple + :haTopic: Sets the zookeeper/artemis topic that the nodes used in resolving the election and must be the same for all ``bridge`` + instances competing for master status. This is available to allow a single zookeeper/artemis cluster to be reused with multiple sets of ``bridges`` (e.g. in test environments). The default value is ``bridge/ha`` and would not normally need to be changed if the cluster is not shared. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bully/BullyLeader.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bully/BullyLeader.kt new file mode 100644 index 0000000000..7190164f05 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bully/BullyLeader.kt @@ -0,0 +1,425 @@ +package net.corda.nodeapi.internal.bully + +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.VisibleForTesting +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX +import net.corda.nodeapi.internal.ArtemisSessionProvider +import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener +import net.corda.nodeapi.internal.zookeeper.ZkLeader +import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.* +import java.time.Clock +import java.time.Instant +import java.util.* +import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.concurrent.thread + +class BullyLeaderClient(val artemis: ArtemisSessionProvider, + private val electionPath: String, + val nodeId: String, + val priority: Int) : ZkLeader { + + private companion object { + const val LEADER_TIMEOUT_MSEC = 1000L + private val log = contextLogger() + } + + @VisibleForTesting + var clock: Clock = Clock.systemUTC() + + + @CordaSerializable + private enum class MessageType { + ELECTION_REQUEST, + ELECTION_REJECT, + LEADER_ANNOUNCE, + LEADER_RETIRE + } + + @CordaSerializable + private data class LeaderMessage(val msgType: MessageType, + val term: Int, + val proposedLeader: String, + val leaderPriority: Int) + + private class ArtemisMessageSession private constructor( + private val leaderTopic: SimpleString, + private val session: ClientSession, + private val messageConsumer: ClientConsumer, + private val messageProducer: ClientProducer, + private val onMessage: (LeaderMessage) -> Unit) : AutoCloseable, FailoverEventListener { + companion object { + fun connectToArtemis(electionPath: String, locator: ServerLocator, factory: ClientSessionFactory, onMessage: (LeaderMessage) -> Unit): ArtemisMessageSession { + val session = factory.createSession(ArtemisMessagingComponent.NODE_P2P_USER, + ArtemisMessagingComponent.NODE_P2P_USER, + false, + true, + true, + locator.isPreAcknowledge, + ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) + val producer = session.createProducer() + val leaderTopic = SimpleString("${INTERNAL_PREFIX}leader.$electionPath") + val queueName = SimpleString(UUID.randomUUID().toString()) + session.createTemporaryQueue(leaderTopic, RoutingType.MULTICAST, queueName) + val consumer = session.createConsumer(queueName) + val artemisMessageSession = ArtemisMessageSession(leaderTopic, session, consumer, producer, onMessage) + session.addFailoverListener(artemisMessageSession) + consumer.setMessageHandler { msg -> + artemisMessageSession.processMessage(msg) + } + session.start() + return artemisMessageSession + } + } + + private val closed: AtomicBoolean = AtomicBoolean(false) + private val _connected: AtomicBoolean = AtomicBoolean(true) + val connected: Boolean get() = _connected.get() + + override fun failoverEvent(eventType: FailoverEventType) { + log.warn("Artemis Failover event $eventType") + _connected.set(eventType == FailoverEventType.FAILOVER_COMPLETED) + } + + private fun processMessage(message: ClientMessage) { + try { + val data: ByteArray = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } + val leaderMessage = data.deserialize(context = SerializationDefaults.P2P_CONTEXT) + onMessage(leaderMessage) + message.acknowledge() + } catch (ex: Exception) { + log.error("Unable to process leader control message", ex) + } + } + + fun sendMessage(message: LeaderMessage) { + if (closed.get() || session.isClosed || !connected) return + val messageBytes = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes + val artemisMessage = session.createMessage(false) + artemisMessage.writeBodyBufferBytes(messageBytes) + messageProducer.send(leaderTopic, artemisMessage) + } + + override fun close() { + if (!closed.getAndSet(true)) { + thread { + // send of to another thread as this can block for ages inside Artemis + closeInternal() + } + } + } + + private fun close(target: AutoCloseable) { + try { + target.close() + } catch (ignored: ActiveMQObjectClosedException) { + // swallow + } + } + + private fun closeInternal() { + session.removeFailoverListener(this) + if (!messageConsumer.isClosed) { + close(messageConsumer) + } + if (!session.isClosed) { + close(session) + } + } + } + + private enum class BullyState { + FOLLOWER, + INITIATE_ELECTION, + POSSIBLE_LEADER, + LEADER + } + + private class LeaderState(clock: Clock) { + var scheduler: ScheduledExecutorService? = null + val isStarted: Boolean get() = (scheduler?.isShutdown?.not() ?: false) + var pollTimer: ScheduledFuture<*>? = null + var isActive: Boolean = false + var messageSession: ArtemisMessageSession? = null + var state: BullyState = BullyState.FOLLOWER + var currentTerm: Int = 0 + var currentLeader: String? = null + var leaderPriority: Int? = null + var leaderUpdated: Instant = clock.instant() + } + + private val state = ThreadBox(LeaderState(clock)) + private val listeners = CopyOnWriteArrayList() + + override fun start() { + state.locked { + if (isStarted) return + val newScheduler = Executors.newSingleThreadScheduledExecutor() + scheduler = newScheduler + pollTimer = newScheduler.scheduleAtFixedRate(::timerEvent, 0L, LEADER_TIMEOUT_MSEC / 2L, TimeUnit.MILLISECONDS) + } + } + + override fun close() { + if (isStarted()) { + relinquishLeadership() + } + state.locked { + messageSession?.close() + messageSession = null + pollTimer?.cancel(false) + pollTimer = null + scheduler?.shutdown() + scheduler = null + } + } + + private fun timerEvent() { + var down = false + state.locked { + if (!isStarted) return + log.debug { "$nodeId Timer running. current state $state" } + val wasLeader = isLeader() + val artemis = artemis.started + if (artemis == null || artemis.session.isClosed) { + log.info("Artemis lost") + messageSession?.close() + messageSession = null + state = BullyState.FOLLOWER + currentLeader = null + leaderPriority = null + if (wasLeader) down = true + return@locked + } + if (messageSession == null) { + log.info("$nodeId Artemis connected. Creating leader session") + messageSession = ArtemisMessageSession.connectToArtemis(electionPath, + artemis.serverLocator, + artemis.sessionFactory, + ::messageEvent) + return@locked + } + if (!messageSession!!.connected) { + state = BullyState.FOLLOWER + currentLeader = null + leaderPriority = null + if (wasLeader) down = true + return@locked + } + if (!isActive) { + log.debug { "$nodeId Idling as inactive" } + state = BullyState.FOLLOWER + if (wasLeader) down = true + return@locked + } + val now = clock.instant() + when (state) { + BullyState.FOLLOWER -> { + // Unknown Leader, or dead leader prepare to do election + if (currentLeader == null || leaderUpdated.plusMillis(LEADER_TIMEOUT_MSEC).isBefore(now)) { + log.info("$nodeId Leader missing start election") + currentLeader = null + leaderPriority = null + state = BullyState.INITIATE_ELECTION + } else if (compareIds(nodeId, priority, currentLeader!!, leaderPriority!!) > 0) { // we are higher priority start elections + log.info("$nodeId Leader lower priority than us start challenge") + currentLeader = null + leaderPriority = null + state = BullyState.INITIATE_ELECTION + } + } + BullyState.INITIATE_ELECTION -> { + leaderUpdated = now // move time to keep track of election announcements + state = BullyState.POSSIBLE_LEADER + val message = LeaderMessage(MessageType.ELECTION_REQUEST, + currentTerm, + nodeId, + priority) + log.debug { "$nodeId Send election message $message" } + messageSession!!.sendMessage(message) + } + BullyState.POSSIBLE_LEADER -> { + val message = LeaderMessage(MessageType.ELECTION_REQUEST, + currentTerm, + nodeId, + priority) + log.debug { "$nodeId Send election message $message" } + messageSession!!.sendMessage(message) + if (leaderUpdated.plusMillis(2 * LEADER_TIMEOUT_MSEC).isBefore(now)) { // everyone had a fair chance + log.info("$nodeId transition to leader state") + state = BullyState.LEADER + ++currentTerm + } + } + BullyState.LEADER -> { + val message = LeaderMessage(MessageType.LEADER_ANNOUNCE, + currentTerm, + nodeId, + priority) + log.debug { "$nodeId Broadcast leader heartbeat $message" } + messageSession!!.sendMessage(message) + } + } + } + if (down) { + lostLeadership() + } + } + + private fun compareIds(nodeId1: String, priority1: Int, nodeId2: String, priority2: Int): Int { + if (priority1 > priority2) { // smaller priority value wins + return -1 + } else if (priority1 < priority2) { + return +1 + } + if (nodeId1 < nodeId2) { + return -1 + } else if (nodeId2 > nodeId1) { + return +1 + } + return 0 + } + + private fun messageEvent(message: LeaderMessage) { + log.debug { "$nodeId received message $message" } + var up = false + var down = false + state.locked { + if (!isStarted) return + if (message.term < currentTerm) { // ignore old messages + log.debug { "$nodeId discard outdated message" } + return + } + val now = clock.instant() + val wasLeader = isLeader() + if (message.term > currentTerm) { // we are out of sync, reset and consider our position + log.debug { "$nodeId new term detected resetting" } + currentTerm = message.term + state = BullyState.FOLLOWER + leaderUpdated = now + currentLeader = message.proposedLeader + leaderPriority = message.leaderPriority + if (wasLeader) down = true + return@locked + } + val comparison = compareIds(nodeId, priority, message.proposedLeader, message.leaderPriority) + if (comparison < 0) { // we are lower priority so ensure we are just a follower + if (state != BullyState.FOLLOWER) { + state = BullyState.FOLLOWER + log.info("$nodeId terminate our involvement in election possible leader ${message.proposedLeader}") + } + leaderUpdated = now + currentLeader = message.proposedLeader + leaderPriority = message.leaderPriority + } + when (message.msgType) { + MessageType.ELECTION_REQUEST -> { + if (isActive && (comparison > 0)) { // we are higher priority, reject them + log.debug { "$nodeId send rebuttal to lower priority node" } + messageSession!!.sendMessage(LeaderMessage(MessageType.ELECTION_REJECT, + currentTerm, + currentLeader ?: nodeId, + leaderPriority ?: priority)) + } + } + MessageType.ELECTION_REJECT -> { + // Nothing to do here, handled by priority check above + } + MessageType.LEADER_ANNOUNCE -> { + leaderUpdated = now + currentLeader = message.proposedLeader + leaderPriority = message.leaderPriority + if ((state == BullyState.LEADER) && (currentLeader != nodeId)) { // shouldn't happen, but reset now!! + log.error("$nodeId unexpected leader announcement") + state = BullyState.FOLLOWER + } + } + MessageType.LEADER_RETIRE -> { // polite leader shutdown + if (isActive) { + log.info("$nodeId retirement announced. Starting election") + state = BullyState.INITIATE_ELECTION + leaderUpdated = now + currentLeader = null + leaderPriority = null + } + } + } + val isLeader = isLeader() + if (wasLeader xor isLeader) { + up = isLeader + down = !isLeader + } + } + if (down) { + lostLeadership() + } + if (up) { + acquireLeadership() + } + } + + override fun requestLeadership() { + state.locked { + require(isStarted) { "Leader elector must be started first" } + isActive = true + } + } + + override fun relinquishLeadership() { + val wasLeader = state.locked { + require(isStarted) { "Leader elector must be started first" } + val oldState = isLeader() + currentLeader = null + leaderPriority = null + state = BullyState.FOLLOWER + isActive = false + oldState + } + if (wasLeader) { + lostLeadership() + state.locked { + log.debug { "$nodeId tell other nodes to start election" } + messageSession?.sendMessage(LeaderMessage(MessageType.LEADER_RETIRE, + currentTerm, + nodeId, + priority)) + } + } + } + + private fun acquireLeadership() { + for (listener in listeners) { + listener.isLeader() + } + } + + private fun lostLeadership() { + for (listener in listeners) { + listener.notLeader() + } + } + + override fun addLeadershipListener(listener: CordaLeaderListener) { + listeners += listener + } + + + override fun removeLeadershipListener(listener: CordaLeaderListener) { + listeners -= listener + } + + override fun isLeader(): Boolean = state.locked { isStarted && isActive && (state == BullyState.LEADER) && (currentLeader == nodeId) } + + override fun isStarted(): Boolean = state.locked { isStarted } + +} \ No newline at end of file diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/bully/BullyLeaderTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/bully/BullyLeaderTest.kt new file mode 100644 index 0000000000..3a09fd1d73 --- /dev/null +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/bully/BullyLeaderTest.kt @@ -0,0 +1,509 @@ +package net.corda.nodeapi.internal.bully + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.internal.div +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.services.config.EnterpriseConfiguration +import net.corda.node.services.config.MutualExclusionConfiguration +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.nodeapi.internal.ArtemisMessagingClient +import net.corda.nodeapi.internal.ArtemisSessionProvider +import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration +import net.corda.nodeapi.internal.zookeeper.CordaLeaderListener +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.MAX_MESSAGE_SIZE +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.internal.rigorousMock +import net.corda.testing.internal.stubs.CertificateStoreStubs +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class BullyLeaderTest { + @Rule + @JvmField + val tempFolder = TemporaryFolder() + + @Rule + @JvmField + val serializationEnvironment = SerializationEnvironmentRule(true) + + private abstract class AbstractNodeConfiguration : NodeConfiguration + + class StateTracker(val name: String, + val counts: AtomicInteger, + val failureRef: AtomicBoolean) : CordaLeaderListener { + private val monitor = Object() + var isLeader: Boolean = false + + override fun isLeader() { + println("$name is leader") + val count = counts.getAndIncrement() + if (count > 0) { + println("More than one leader at once") + failureRef.set(true) + } + synchronized(monitor) { + isLeader = true + monitor.notifyAll() + } + } + + override fun notLeader() { + println("$name lost leadership") + counts.decrementAndGet() + synchronized(monitor) { + isLeader = false + monitor.notifyAll() + } + } + + fun waitUp() { + synchronized(monitor) { + while (!isLeader) { + monitor.wait() + } + } + } + + fun waitDown() { + synchronized(monitor) { + while (isLeader) { + monitor.wait() + } + } + } + } + + @Test + fun `Simple Leader Test`() { + val artemisServer = createArtemisServer() + artemisServer.use { + val artemisClient = createArtemisClient() + try { + val counts = AtomicInteger(0) + val errors = AtomicBoolean(false) + val tracker = StateTracker("1", counts, errors) + val leader = BullyLeaderClient(artemisClient, "test", "1", 10) + leader.addLeadershipListener(tracker) + assertFalse(leader.isStarted()) + assertFalse(leader.isLeader()) + leader.start() + assertTrue(leader.isStarted()) + assertFalse(leader.isLeader()) + leader.requestLeadership() + tracker.waitUp() + assertTrue(leader.isStarted()) + assertTrue(leader.isLeader()) + leader.relinquishLeadership() + tracker.waitDown() + assertTrue(leader.isStarted()) + assertFalse(leader.isLeader()) + leader.close() + assertFalse(leader.isStarted()) + assertFalse(leader.isLeader()) + assertFalse(errors.get()) + } finally { + artemisClient.stop() + } + } + } + + @Test + fun `Simple Leader Test Shutdown whilst leader`() { + val artemisServer = createArtemisServer() + artemisServer.use { + val artemisClient = createArtemisClient() + try { + val counts = AtomicInteger(0) + val errors = AtomicBoolean(false) + val tracker = StateTracker("1", counts, errors) + val leader = BullyLeaderClient(artemisClient, "test", "1", 10) + leader.addLeadershipListener(tracker) + assertFalse(leader.isStarted()) + assertFalse(leader.isLeader()) + leader.start() + assertTrue(leader.isStarted()) + assertFalse(leader.isLeader()) + leader.requestLeadership() + tracker.waitUp() + assertTrue(leader.isStarted()) + assertTrue(leader.isLeader()) + leader.close() + assertFalse(leader.isStarted()) + assertFalse(leader.isLeader()) + assertFalse(errors.get()) + } finally { + artemisClient.stop() + } + } + } + + @Test + fun `Simple Two Leader Test`() { + val artemisServer = createArtemisServer() + artemisServer.use { + val artemisClient1 = createArtemisClient() + val artemisClient2 = createArtemisClient() + try { + val counts = AtomicInteger(0) + val errors = AtomicBoolean(false) + val tracker1 = StateTracker("1", counts, errors) + val tracker2 = StateTracker("2", counts, errors) + val leader1 = BullyLeaderClient(artemisClient1, "test", "1", 10) + val leader2 = BullyLeaderClient(artemisClient2, "test", "2", 10) + leader1.addLeadershipListener(tracker1) + leader2.addLeadershipListener(tracker2) + assertFalse(leader1.isStarted()) + assertFalse(leader1.isLeader()) + assertFalse(leader2.isStarted()) + assertFalse(leader2.isLeader()) + leader1.start() + leader2.start() + assertTrue(leader1.isStarted()) + assertFalse(leader1.isLeader()) + assertTrue(leader2.isStarted()) + assertFalse(leader2.isLeader()) + leader1.requestLeadership() + tracker1.waitUp() + assertTrue(leader1.isStarted()) + assertTrue(leader1.isLeader()) + assertTrue(leader2.isStarted()) + assertFalse(leader2.isLeader()) + leader1.relinquishLeadership() + tracker1.waitDown() + assertTrue(leader1.isStarted()) + assertFalse(leader1.isLeader()) + assertTrue(leader2.isStarted()) + assertFalse(leader2.isLeader()) + leader2.requestLeadership() + tracker2.waitUp() + assertTrue(leader1.isStarted()) + assertFalse(leader1.isLeader()) + assertTrue(leader2.isStarted()) + assertTrue(leader2.isLeader()) + leader1.requestLeadership() + Thread.sleep(10000L) + assertTrue(leader1.isLeader() xor leader2.isLeader()) + leader1.close() + leader2.close() + assertFalse(leader1.isStarted()) + assertFalse(leader1.isLeader()) + assertFalse(leader2.isStarted()) + assertFalse(leader2.isLeader()) + assertFalse(errors.get()) + } finally { + artemisClient1.stop() + artemisClient2.stop() + } + } + } + + @Test + fun `Priority Leader Test`() { + val artemisServer = createArtemisServer() + artemisServer.use { + val artemisClient1 = createArtemisClient() + val artemisClient2 = createArtemisClient() + val artemisClient3 = createArtemisClient() + try { + val counts = AtomicInteger(0) + val errors = AtomicBoolean(false) + val tracker1 = StateTracker("1", counts, errors) + val tracker2 = StateTracker("2", counts, errors) + val tracker3 = StateTracker("3", counts, errors) + val leader1 = BullyLeaderClient(artemisClient1, "test", "1", 10) + val leader2 = BullyLeaderClient(artemisClient2, "test", "2", 20) + val leader3 = BullyLeaderClient(artemisClient3, "test", "3", 30) + leader1.addLeadershipListener(tracker1) + leader2.addLeadershipListener(tracker2) + leader3.addLeadershipListener(tracker3) + leader1.start() + leader2.start() + leader3.start() + leader3.requestLeadership() // single active leader should win + tracker3.waitUp() + assertFalse(leader1.isLeader()) + assertFalse(leader2.isLeader()) + assertTrue(leader3.isLeader()) + leader1.requestLeadership() // higher priority leader should preempt + tracker1.waitUp() + assertTrue(leader1.isLeader()) + assertFalse(leader2.isLeader()) + assertFalse(leader3.isLeader()) + leader2.requestLeadership() // lower priority leader should do nothing to existing leader, even after delay + Thread.sleep(10000L) + assertTrue(leader1.isLeader()) + assertFalse(leader2.isLeader()) + assertFalse(leader3.isLeader()) + leader1.relinquishLeadership() // when leader1 gives up leader2 should win as next highest active node + tracker1.waitDown() + tracker2.waitUp() + assertFalse(leader1.isLeader()) + assertTrue(leader2.isLeader()) + assertFalse(leader3.isLeader()) + leader1.close() + leader2.close() + leader3.close() + assertFalse(errors.get()) + } finally { + artemisClient1.stop() + artemisClient2.stop() + artemisClient3.stop() + } + } + } + + @Test + fun `Multi Leader Tests`() { + val artemisServer = createArtemisServer() + artemisServer.use { _ -> + val leaders = (0..9).map { + val artemis = createArtemisClient() + BullyLeaderClient(artemis, "test", it.toString(), 1) + } + val leaderCount = AtomicInteger(0) + val failureRef = AtomicBoolean(false) + leaders.forEach { + val tracker = StateTracker(it.nodeId, leaderCount, failureRef) + it.addLeadershipListener(tracker) + it.start() + } + val rand = Random() + val active = mutableSetOf() + for (i in 0 until 30) { + val newLeader = rand.nextInt(leaders.size) + println("activate $newLeader") + active.add(newLeader) + leaders[newLeader].requestLeadership() + val dropLeader = rand.nextInt(leaders.size) + active.remove(dropLeader) + println("deactivate $dropLeader $active") + leaders[dropLeader].relinquishLeadership() + while (active.isNotEmpty() && leaderCount.get() == 0) { + Thread.sleep(100) + } + } + leaders.forEach { + it.artemis.stop() + it.close() + } + assertFalse(failureRef.get()) + } + } + + @Test + fun `Multi Leader Tests Different Priorities`() { + val artemisServer = createArtemisServer() + artemisServer.use { _ -> + val leaders = (0..9).map { + val artemis = createArtemisClient() + BullyLeaderClient(artemis, "test", it.toString(), it) + } + val leaderCount = AtomicInteger(0) + val failureRef = AtomicBoolean(false) + leaders.forEach { + val tracker = StateTracker(it.nodeId, leaderCount, failureRef) + it.addLeadershipListener(tracker) + it.start() + } + val rand = Random() + val active = mutableSetOf() + for (i in 0 until 30) { + val newLeader = rand.nextInt(leaders.size) + println("activate $newLeader") + active.add(newLeader) + leaders[newLeader].requestLeadership() + val dropLeader = rand.nextInt(leaders.size) + active.remove(dropLeader) + println("deactivate $dropLeader $active") + leaders[dropLeader].relinquishLeadership() + while (active.isNotEmpty() && leaderCount.get() == 0) { + Thread.sleep(100) + } + } + leaders.forEach { + it.artemis.stop() + it.close() + } + assertFalse(failureRef.get()) + } + } + + private fun artemisReconnectionLoop(artemisClient: ArtemisSessionProvider, running: CountDownLatch) { + artemisClient.start() + try { + running.await() + } finally { + artemisClient.stop() + } + } + + @Test + fun `Disconnect Tests`() { + val artemisConfig = createConfig(11005, true) + val running = CountDownLatch(1) + val artemisClient1 = createArtemisClient(artemisConfig.p2pAddress.port, started = false) + val artemisRetryLoop1 = Thread({ artemisReconnectionLoop(artemisClient1, running) }, "Artemis Connector Thread").apply { + isDaemon = true + } + artemisRetryLoop1.start() + + val artemisClient2 = createArtemisClient(artemisConfig.p2pAddress.port, started = false) + val artemisRetryLoop2 = Thread({ artemisReconnectionLoop(artemisClient2, running) }, "Artemis Connector Thread").apply { + isDaemon = true + } + artemisRetryLoop2.start() + try { + val leaderCount = AtomicInteger(0) + val failureRef = AtomicBoolean(false) + val leader1 = BullyLeaderClient(artemisClient1, "test", "1", 10) + val leader2 = BullyLeaderClient(artemisClient2, "test", "2", 20) + val watcher1 = StateTracker(leader1.nodeId, leaderCount, failureRef) + leader1.addLeadershipListener(watcher1) + val watcher2 = StateTracker(leader2.nodeId, leaderCount, failureRef) + leader2.addLeadershipListener(watcher2) + leader1.start() + leader1.requestLeadership() + leader2.start() + leader2.requestLeadership() + Thread.sleep(2000L) + val server = createArtemisServer(artemisConfig.p2pAddress.port, false) + watcher1.waitUp() + assertTrue(leader1.isStarted()) + assertTrue(leader1.isLeader()) + assertTrue(leader2.isStarted()) + assertFalse(leader2.isLeader()) + server.stop() + watcher1.waitDown() + assertTrue(leader1.isStarted()) + assertFalse(leader1.isLeader()) + assertTrue(leader2.isStarted()) + assertFalse(leader2.isLeader()) + assertFalse(failureRef.get()) + leader1.relinquishLeadership() + val server2 = createArtemisServer(artemisConfig.p2pAddress.port, false) + watcher2.waitUp() + assertTrue(leader1.isStarted()) + assertFalse(leader1.isLeader()) + assertTrue(leader2.isStarted()) + assertTrue(leader2.isLeader()) + server2.stop() + leader1.close() + leader2.close() + } finally { + running.countDown() + artemisRetryLoop1.join() + artemisRetryLoop2.join() + } + } + + @Test + fun `Clients Randomly do Things`() { + val CLIENTS_NUMBER = 10 + val ACTIONS_NUMBER = 10 + val artemisServer = createArtemisServer() + artemisServer.use { server -> + val countDownLatch = CountDownLatch(CLIENTS_NUMBER) + val leaderCount = AtomicInteger(0) + val failureRef = AtomicBoolean(false) + + val clientList = (1..CLIENTS_NUMBER).map { + val artemis = createArtemisClient() + BullyLeaderClient(artemis, "test", it.toString(), 10) + } + + clientList.forEach { client -> + thread { + client.addLeadershipListener(StateTracker(client.nodeId, leaderCount, failureRef)) + client.start() + val random = Random() + for (i in 1 until ACTIONS_NUMBER) { + val action = random.nextInt(2) + when (action) { + 0 -> client.requestLeadership() + 1 -> client.relinquishLeadership() + else -> throw IllegalArgumentException("Invalid action choice") + } + Thread.sleep(100L * random.nextInt(30)) + } + client.requestLeadership() // end as possible leader + countDownLatch.countDown() + } + } + + countDownLatch.await() + //only one leader should exist + var timeout = 100 + while (true) { + val leaderNumber = leaderCount.get() + assertTrue(leaderNumber <= 1) + if (leaderNumber == 1) { + break + } + --timeout + assertTrue(timeout > 0) + Thread.sleep(100L) + } + + clientList.forEach { client -> + client.artemis.stop() + client.close() + } + + assertFalse(failureRef.get()) + } + } + + private fun createConfig(port: Int, createCerts: Boolean = false): AbstractNodeConfiguration { + val baseDirectory = tempFolder.root.toPath() + val certificatesDirectory = baseDirectory / "certificates" + val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) + val p2pSslOptions = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory) + val artemisConfig = rigorousMock().also { + doReturn(baseDirectory).whenever(it).baseDirectory + doReturn(certificatesDirectory).whenever(it).certificatesDirectory + doReturn(DUMMY_BANK_A_NAME).whenever(it).myLegalName + doReturn(signingCertificateStore).whenever(it).signingCertificateStore + doReturn(p2pSslOptions).whenever(it).p2pSslOptions + doReturn(NetworkHostAndPort("localhost", port)).whenever(it).p2pAddress + doReturn(null).whenever(it).jmxMonitoringHttpPort + doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = true)).whenever(it).enterpriseConfiguration + } + if (createCerts) { + artemisConfig.configureWithDevSSLCertificate() + } + return artemisConfig + } + + private fun createArtemisClient(port: Int = 11005, started: Boolean = true): ArtemisSessionProvider { + val artemisConfig = createConfig(port) + val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, + NetworkHostAndPort("localhost", port), + MAX_MESSAGE_SIZE, + confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize, + externalBrokerConnectionConfig = ExternalBrokerConnectionConfiguration.CONTINUOUS_RETRY) + if (started) { + artemisClient.start() + } + return artemisClient + } + + private fun createArtemisServer(port: Int = 11005, createCerts: Boolean = true): ArtemisMessagingServer { + val artemisConfig = createConfig(port, createCerts) + val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", port), MAX_MESSAGE_SIZE) + artemisServer.start() + return artemisServer + } + +} \ No newline at end of file