mirror of
https://github.com/corda/corda.git
synced 2025-06-01 15:10:54 +00:00
ENT-1959: add a default value for mutualExclusionConfiguration.machineName (#877)
* ENT-1959: add a default value for mutualExclusionConfiguration.machineName * ENT-1959: update docs * ENT-1959: update docs, remove machineName from default conf, add unit test
This commit is contained in:
parent
9155000407
commit
d13cca49ec
@ -301,7 +301,6 @@ Typical configuration for ``nodeserver1`` would be a ``node.conf`` files contain
|
|||||||
externalBridge = true // Ensure node doesn't run P2P AMQP bridge, instead delegate to the BridgeInner.
|
externalBridge = true // Ensure node doesn't run P2P AMQP bridge, instead delegate to the BridgeInner.
|
||||||
mutualExclusionConfiguration = { // Enable the protective heartbeat logic so that only one node instance is ever running.
|
mutualExclusionConfiguration = { // Enable the protective heartbeat logic so that only one node instance is ever running.
|
||||||
on = true
|
on = true
|
||||||
machineName = "nodeserver1"
|
|
||||||
updateInterval = 20000
|
updateInterval = 20000
|
||||||
waitInterval = 40000
|
waitInterval = 40000
|
||||||
}
|
}
|
||||||
@ -353,7 +352,6 @@ Typical configuration for ``nodeserver2`` would be a ``node.conf`` files contain
|
|||||||
externalBridge = true // Ensure node doesn't run P2P AMQP bridge, instead delegate to the BridgeInner.
|
externalBridge = true // Ensure node doesn't run P2P AMQP bridge, instead delegate to the BridgeInner.
|
||||||
mutualExclusionConfiguration = { // Enable the protective heartbeat logic so that only one node instance is ever running.
|
mutualExclusionConfiguration = { // Enable the protective heartbeat logic so that only one node instance is ever running.
|
||||||
on = true
|
on = true
|
||||||
machineName = "nodeserver2"
|
|
||||||
updateInterval = 20000
|
updateInterval = 20000
|
||||||
waitInterval = 40000
|
waitInterval = 40000
|
||||||
}
|
}
|
||||||
|
Binary file not shown.
@ -1,152 +0,0 @@
|
|||||||
High Availability
|
|
||||||
=================
|
|
||||||
|
|
||||||
This section describes how to make a Corda node highly available.
|
|
||||||
|
|
||||||
Hot Cold
|
|
||||||
~~~~~~~~
|
|
||||||
|
|
||||||
In the hot cold configuration, failover is handled manually, by promoting the cold node after the former hot node
|
|
||||||
failed or was taken offline for maintenance.
|
|
||||||
|
|
||||||
For RPC clients there is a way to recover in case of failover, see section below.
|
|
||||||
|
|
||||||
Prerequisites
|
|
||||||
-------------
|
|
||||||
|
|
||||||
* A load-balancer for P2P, RPC and web traffic
|
|
||||||
* A shared file system for the artemis and certificates directories
|
|
||||||
* A shared database, e.g. Azure SQL
|
|
||||||
|
|
||||||
The hot-cold deployment consists of two Corda nodes, a hot node that is currently handling request and running flows
|
|
||||||
and a cold backup node that can take over, if the hot node fails or is taken offline for an upgrade. Both nodes should
|
|
||||||
be able to connect to a shared database and a replicated file-system hosting the artemis and certificates directories.
|
|
||||||
The hot-cold ensemble should be fronted by a load-balancer for P2P, web and RPC traffic. The load-balancer should do
|
|
||||||
health monitoring and route the traffic to the node that is currently active. To prevent data corruption in case of
|
|
||||||
accidental simultaneous start of both nodes, the current hot node takes a leader lease in the form of a mutual exclusion
|
|
||||||
lock implemented by a row in the shared database.
|
|
||||||
|
|
||||||
Configuration
|
|
||||||
-------------
|
|
||||||
|
|
||||||
The configuration snippet below shows the relevant settings.
|
|
||||||
|
|
||||||
.. sourcecode:: none
|
|
||||||
|
|
||||||
enterpriseConfiguration = {
|
|
||||||
mutualExclusionConfiguration = {
|
|
||||||
on = true
|
|
||||||
machineName = ${HOSTNAME}
|
|
||||||
updateInterval = 20000
|
|
||||||
waitInterval = 40000
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Fields
|
|
||||||
------
|
|
||||||
|
|
||||||
:on: Whether hot cold high availability is turned on, defaulted to ``false``.
|
|
||||||
|
|
||||||
:machineName: Unique name for node.
|
|
||||||
|
|
||||||
:updateInterval: Period(milliseconds) over which the running node updates the mutual exclusion lease.
|
|
||||||
|
|
||||||
:waitInterval: Amount of time(milliseconds) to wait since last mutual exclusion lease update before being able to become the master node. This has to be greater than updateInterval.
|
|
||||||
|
|
||||||
RPC failover
|
|
||||||
------------
|
|
||||||
|
|
||||||
In case of hot-cold there will be a short period of time when none of the nodes available and accepting connections.
|
|
||||||
If the RPC client has not been connected at all and makes its first RPC connection during this instability window, the connection will be rejected
|
|
||||||
as if server address does not exists. The only choice client has in this case is to catch corresponding exception during ``CordaRPCClient.start()``
|
|
||||||
and keep on re-trying.
|
|
||||||
|
|
||||||
The following code snippet illustrates that.
|
|
||||||
|
|
||||||
.. sourcecode:: Kotlin
|
|
||||||
|
|
||||||
fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
|
|
||||||
|
|
||||||
val retryInterval = 5.seconds
|
|
||||||
|
|
||||||
do {
|
|
||||||
val connection = try {
|
|
||||||
logger.info("Connecting to: $nodeHostAndPort")
|
|
||||||
val client = CordaRPCClient(
|
|
||||||
nodeHostAndPort,
|
|
||||||
object : CordaRPCClientConfiguration {
|
|
||||||
override val connectionMaxRetryInterval = retryInterval
|
|
||||||
}
|
|
||||||
)
|
|
||||||
val _connection = client.start(username, password)
|
|
||||||
// Check connection is truly operational before returning it.
|
|
||||||
val nodeInfo = _connection.proxy.nodeInfo()
|
|
||||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
|
||||||
_connection
|
|
||||||
} catch(secEx: ActiveMQSecurityException) {
|
|
||||||
// Happens when incorrect credentials provided - no point to retry connecting.
|
|
||||||
throw secEx
|
|
||||||
}
|
|
||||||
catch(th: Throwable) {
|
|
||||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
|
||||||
logger.info("Exception upon establishing connection: " + th.message)
|
|
||||||
null
|
|
||||||
}
|
|
||||||
|
|
||||||
if(connection != null) {
|
|
||||||
logger.info("Connection successfully established with: $nodeHostAndPort")
|
|
||||||
return connection
|
|
||||||
}
|
|
||||||
// Could not connect this time round - pause before giving another try.
|
|
||||||
Thread.sleep(retryInterval.toMillis())
|
|
||||||
} while (connection == null)
|
|
||||||
|
|
||||||
throw IllegalArgumentException("Never reaches here")
|
|
||||||
}
|
|
||||||
|
|
||||||
If, however, the RPC client was connected through load-balancer to a node and failover occurred it will take sometime for cold instance to start-up.
|
|
||||||
Acceptable behavior in this case would be for RPC client to keep re-trying to connect and once connected - back-fill any data that might have been missed since connection was down.
|
|
||||||
In a way this scenario is no different to a temporal loss of connectivity with a node even without any form of High Availability.
|
|
||||||
|
|
||||||
In order to achieve said re-try/back-fill functionality the client needs to install ``onError`` handler on the ``Observable`` returned by ``CordaRPCOps``.
|
|
||||||
Please see code below which illustrates how this can be achieved.
|
|
||||||
|
|
||||||
.. sourcecode:: Kotlin
|
|
||||||
|
|
||||||
fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
|
|
||||||
|
|
||||||
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
|
|
||||||
val proxy = connection.proxy
|
|
||||||
|
|
||||||
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
|
||||||
|
|
||||||
val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
|
|
||||||
val subscription: Subscription = stateMachineUpdatesRaw
|
|
||||||
.startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
|
|
||||||
.subscribe({ clientCode(it) /* Client code here */ }, {
|
|
||||||
// Terminate subscription such that nothing gets past this point to downstream Observables.
|
|
||||||
retryableStateMachineUpdatesSubscription.get()?.unsubscribe()
|
|
||||||
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
|
|
||||||
// client and a new connection, so no going back to this one. Also the server might be down, so we are
|
|
||||||
// force closing the connection to avoid propagation of notification to the server side.
|
|
||||||
connection.forceClose()
|
|
||||||
// Perform re-connect.
|
|
||||||
performRpcReconnect(nodeHostAndPort, username, password)
|
|
||||||
})
|
|
||||||
|
|
||||||
retryableStateMachineUpdatesSubscription.set(subscription)
|
|
||||||
}
|
|
||||||
|
|
||||||
In this code snippet it is possible to see that function ``performRpcReconnect`` creates RPC connection and installs error handler
|
|
||||||
upon subscription to an ``Observable``. The call to this ``onError`` handler will be made when failover happens then the code
|
|
||||||
will terminate existing subscription, closes RPC connection and recursively calls ``performRpcReconnect`` which will re-subscribe
|
|
||||||
once RPC connection comes back online.
|
|
||||||
|
|
||||||
Client code if fed with instances of ``StateMachineInfo`` using call ``clientCode(it)``. Upon re-connect this code receives
|
|
||||||
all the items. Some of these items might have already been delivered to client code prior to failover occurred.
|
|
||||||
It is down to client code in this case to have a memory and handle those duplicating items as appropriate.
|
|
||||||
|
|
||||||
Hot Warm
|
|
||||||
~~~~~~~~
|
|
||||||
|
|
||||||
In the future we are going to support automatic failover.
|
|
@ -156,7 +156,7 @@ exists, all others will shut down shortly after starting. A standard configurati
|
|||||||
enterpriseConfiguration = {
|
enterpriseConfiguration = {
|
||||||
mutualExclusionConfiguration = {
|
mutualExclusionConfiguration = {
|
||||||
on = true
|
on = true
|
||||||
machineName = ${UNIQUE_ID}
|
machineName = ${UNIQUE_ID} // Optional
|
||||||
updateInterval = 20000
|
updateInterval = 20000
|
||||||
waitInterval = 40000
|
waitInterval = 40000
|
||||||
}
|
}
|
||||||
@ -164,7 +164,9 @@ exists, all others will shut down shortly after starting. A standard configurati
|
|||||||
|
|
||||||
:on: Whether hot cold high availability is turned on, default is ``false``.
|
:on: Whether hot cold high availability is turned on, default is ``false``.
|
||||||
|
|
||||||
:machineName: Unique name for node. Used when checking which node is active. Example: *corda-ha-vm1.example.com*
|
:machineName: Unique name for node. It is combined with the node's base directory to create an identifier which is
|
||||||
|
used in the mutual exclusion process (signal which corda instance is active and using the database). Default value is the
|
||||||
|
machines host name.
|
||||||
|
|
||||||
:updateInterval: Period(milliseconds) over which the running node updates the mutual exclusion lease.
|
:updateInterval: Period(milliseconds) over which the running node updates the mutual exclusion lease.
|
||||||
|
|
||||||
@ -206,7 +208,6 @@ file that can be used for either node:
|
|||||||
enterpriseConfiguration = {
|
enterpriseConfiguration = {
|
||||||
mutualExclusionConfiguration = {
|
mutualExclusionConfiguration = {
|
||||||
on = true
|
on = true
|
||||||
machineName = "${NODE_MACHINE_ID}"
|
|
||||||
updateInterval = 20000
|
updateInterval = 20000
|
||||||
waitInterval = 40000
|
waitInterval = 40000
|
||||||
}
|
}
|
||||||
@ -218,5 +219,4 @@ network.
|
|||||||
Each machine's own address is used for the RPC connection as the node's internal messaging client needs it to
|
Each machine's own address is used for the RPC connection as the node's internal messaging client needs it to
|
||||||
connect to the broker.
|
connect to the broker.
|
||||||
|
|
||||||
The ``machineName`` value should be different for each node as it is used to ensure that only one of them can be active at any time.
|
|
||||||
|
|
||||||
|
Binary file not shown.
Before Width: | Height: | Size: 112 KiB After Width: | Height: | Size: 278 KiB |
Binary file not shown.
Before Width: | Height: | Size: 118 KiB After Width: | Height: | Size: 289 KiB |
@ -339,7 +339,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
|||||||
networkParameters)
|
networkParameters)
|
||||||
val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration
|
val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration
|
||||||
if (mutualExclusionConfiguration.on) {
|
if (mutualExclusionConfiguration.on) {
|
||||||
RunOnceService(database, mutualExclusionConfiguration.machineName,
|
// Ensure uniqueness in case nodes are hosted on the same machine.
|
||||||
|
val extendedMachineName = "${configuration.baseDirectory}/${mutualExclusionConfiguration.machineName}"
|
||||||
|
RunOnceService(database, extendedMachineName,
|
||||||
ManagementFactory.getRuntimeMXBean().name.split("@")[0],
|
ManagementFactory.getRuntimeMXBean().name.split("@")[0],
|
||||||
mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start()
|
mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start()
|
||||||
}
|
}
|
||||||
|
@ -10,8 +10,8 @@
|
|||||||
|
|
||||||
package net.corda.node.services.config
|
package net.corda.node.services.config
|
||||||
|
|
||||||
import net.corda.node.services.statemachine.transitions.SessionDeliverPersistenceStrategy
|
|
||||||
import net.corda.node.services.statemachine.transitions.StateMachineConfiguration
|
import net.corda.node.services.statemachine.transitions.StateMachineConfiguration
|
||||||
|
import java.net.InetAddress
|
||||||
|
|
||||||
data class EnterpriseConfiguration(
|
data class EnterpriseConfiguration(
|
||||||
val mutualExclusionConfiguration: MutualExclusionConfiguration,
|
val mutualExclusionConfiguration: MutualExclusionConfiguration,
|
||||||
@ -19,7 +19,15 @@ data class EnterpriseConfiguration(
|
|||||||
val tuning: PerformanceTuning = PerformanceTuning.default,
|
val tuning: PerformanceTuning = PerformanceTuning.default,
|
||||||
val externalBridge: Boolean? = null)
|
val externalBridge: Boolean? = null)
|
||||||
|
|
||||||
data class MutualExclusionConfiguration(val on: Boolean = false, val machineName: String, val updateInterval: Long, val waitInterval: Long)
|
data class MutualExclusionConfiguration(val on: Boolean = false,
|
||||||
|
val machineName: String = defaultMachineName,
|
||||||
|
val updateInterval: Long,
|
||||||
|
val waitInterval: Long
|
||||||
|
) {
|
||||||
|
companion object {
|
||||||
|
private val defaultMachineName = InetAddress.getLocalHost().hostName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param flowThreadPoolSize Determines the size of the thread pool used by the flow framework to run flows.
|
* @param flowThreadPoolSize Determines the size of the thread pool used by the flow framework to run flows.
|
||||||
|
@ -28,7 +28,6 @@ verifierType = InMemory
|
|||||||
enterpriseConfiguration = {
|
enterpriseConfiguration = {
|
||||||
mutualExclusionConfiguration = {
|
mutualExclusionConfiguration = {
|
||||||
on = false
|
on = false
|
||||||
machineName = ""
|
|
||||||
updateInterval = 20000
|
updateInterval = 20000
|
||||||
waitInterval = 40000
|
waitInterval = 40000
|
||||||
}
|
}
|
||||||
|
@ -13,26 +13,23 @@ package net.corda.node.services.config
|
|||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import com.zaxxer.hikari.HikariConfig
|
import com.zaxxer.hikari.HikariConfig
|
||||||
import net.corda.core.internal.div
|
|
||||||
import net.corda.core.internal.toPath
|
import net.corda.core.internal.toPath
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigTag
|
import net.corda.nodeapi.internal.persistence.CordaPersistence.DataSourceConfigTag
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.nodeapi.BrokerRpcSslOptions
|
import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||||
import net.corda.tools.shell.SSHDConfiguration
|
import net.corda.tools.shell.SSHDConfiguration
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
import java.net.InetAddress
|
||||||
import java.net.URL
|
import java.net.URL
|
||||||
import java.net.URI
|
import java.net.URI
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.*
|
||||||
import kotlin.test.assertFalse
|
|
||||||
import kotlin.test.assertNull
|
|
||||||
import kotlin.test.assertTrue
|
|
||||||
|
|
||||||
class NodeConfigurationImplTest {
|
class NodeConfigurationImplTest {
|
||||||
@Test
|
@Test
|
||||||
@ -174,6 +171,12 @@ class NodeConfigurationImplTest {
|
|||||||
assertThat(errors).hasOnlyOneElementSatisfying { error -> error.contains("compatibilityZoneURL") && error.contains("devMode") }
|
assertThat(errors).hasOnlyOneElementSatisfying { error -> error.contains("compatibilityZoneURL") && error.contains("devMode") }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `mutual exclusion machineName set to default if not explicitly set`() {
|
||||||
|
val config = getConfig("test-config-mutualExclusion-noMachineName.conf").parseAsNodeConfiguration(UnknownConfigKeysPolicy.IGNORE::handle)
|
||||||
|
assertEquals(InetAddress.getLocalHost().hostName, config.enterpriseConfiguration.mutualExclusionConfiguration.machineName)
|
||||||
|
}
|
||||||
|
|
||||||
private fun configDebugOptions(devMode: Boolean, devModeOptions: DevModeOptions?): NodeConfiguration {
|
private fun configDebugOptions(devMode: Boolean, devModeOptions: DevModeOptions?): NodeConfiguration {
|
||||||
return testConfiguration.copy(devMode = devMode, devModeOptions = devModeOptions)
|
return testConfiguration.copy(devMode = devMode, devModeOptions = devModeOptions)
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,26 @@
|
|||||||
|
p2pAddress : "localhost:10002"
|
||||||
|
rpcSettings {
|
||||||
|
address : "localhost:10003"
|
||||||
|
adminAddress : "localhost:1777"
|
||||||
|
}
|
||||||
|
h2port : 11000
|
||||||
|
myLegalName : "O=Corda HA, L=London, C=GB"
|
||||||
|
keyStorePassword : "cordacadevpass"
|
||||||
|
trustStorePassword : "trustpass"
|
||||||
|
devMode : true
|
||||||
|
rpcUsers=[
|
||||||
|
{
|
||||||
|
user=corda
|
||||||
|
password=corda_is_awesome
|
||||||
|
permissions=[
|
||||||
|
ALL
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
enterpriseConfiguration = {
|
||||||
|
mutualExclusionConfiguration = {
|
||||||
|
on = true
|
||||||
|
updateInterval = 20000
|
||||||
|
waitInterval = 40000
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user