CORDA-2617: Add failover listeners to terminate node process (#5337)

* CORDA-2617: Add failover listeners to terminate node process

This is a backport of changes done in Corda Enterprise.

It will be triggered in case of:
a) Loss of connectivity to in-built Artemis for Bridge Control;
b) Loss of connectivity to in-built Artemis for P2P connectivity.

Note on merge to CE: Disregard these changes and take whatever CE already has.

* CORDA-2617: Update documentation on stability of Corda Node

* CORDA-2617: Documentation update after discussion with @mnesbit
This commit is contained in:
Viktor Kolomeyko 2019-08-06 11:28:16 +01:00 committed by Matthew Nesbit
parent fa75711647
commit b60ab70440
5 changed files with 60 additions and 18 deletions

View File

@ -131,3 +131,24 @@ To create nodes locally and run on a remote machine perform the following steps:
4. Run nodes on the remote machine using :ref:`runnodes command <starting-all-nodes-at-once>`. 4. Run nodes on the remote machine using :ref:`runnodes command <starting-all-nodes-at-once>`.
The above steps create a test deployment as ``deployNodes`` Gradle task would do on a local machine. The above steps create a test deployment as ``deployNodes`` Gradle task would do on a local machine.
Stability of the Corda Node
---------------------------
There are a number of critical resources necessary for Corda Node to operate to ensure transactional consistency of the ledger.
These critical resources include:
1. Connection to a database;
2. Connection to Artemis Broker for P2P communication;
3. Connection to Artemis Broker for RPC communication.
Should any of those critical resources become not available, Corda Node will be getting into an unstable state and as a safety precaution it will
shut itself down reporting the cause as an error message to the Node's log file.
.. note:: On some operating systems when PC is going to sleep whilst Corda Node is running, imbedded into Node Artemis message broker reports
the loss of heartbeat event which in turn causes loss of connectivity to Artemis. In such circumstances Corda Node will exit reporting broker
connectivity problem in the log.
Once critical resources node relies upon are available again, it is safe for Node operator to re-start the node for normal operation.

View File

@ -5,11 +5,8 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
interface ArtemisSessionProvider { interface ArtemisSessionProvider {
fun start(): ArtemisMessagingClient.Started fun start(): ArtemisMessagingClient.Started
@ -19,7 +16,8 @@ interface ArtemisSessionProvider {
class ArtemisMessagingClient(private val config: MutualSslConfiguration, class ArtemisMessagingClient(private val config: MutualSslConfiguration,
private val serverAddress: NetworkHostAndPort, private val serverAddress: NetworkHostAndPort,
private val maxMessageSize: Int) : ArtemisSessionProvider { private val maxMessageSize: Int,
private val failoverCallback: ((FailoverEventType) -> Unit)? = null) : ArtemisSessionProvider {
companion object { companion object {
private val log = loggerFor<ArtemisMessagingClient>() private val log = loggerFor<ArtemisMessagingClient>()
} }
@ -44,6 +42,10 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize)) addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize))
} }
val sessionFactory = locator.createSessionFactory() val sessionFactory = locator.createSessionFactory()
// Handle failover events if a callback method is provided
if (failoverCallback != null) sessionFactory.addFailoverListener(failoverCallback)
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer) // Login using the node username. The broker will authenticate us as its node (as opposed to another peer)
// using our TLS certificate. // using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer

View File

@ -3,9 +3,7 @@ package net.corda.nodeapi.internal.bridging
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
@ -34,11 +32,6 @@ class BridgeControlListener(val config: MutualSslConfiguration,
private var artemis: ArtemisSessionProvider? = null private var artemis: ArtemisSessionProvider? = null
private var controlConsumer: ClientConsumer? = null private var controlConsumer: ClientConsumer? = null
constructor(config: MutualSslConfiguration,
p2pAddress: NetworkHostAndPort,
maxMessageSize: Int,
crlCheckSoftFail: Boolean) : this(config, maxMessageSize, crlCheckSoftFail, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
} }

View File

@ -53,6 +53,7 @@ import net.corda.node.services.rpc.InternalRPCMessagingClient
import net.corda.node.services.rpc.RPCServerConfiguration import net.corda.node.services.rpc.RPCServerConfiguration
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.* import net.corda.node.utilities.*
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_SHELL_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_SHELL_USER
import net.corda.nodeapi.internal.ShutdownHook import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook import net.corda.nodeapi.internal.addShutdownHook
@ -265,11 +266,7 @@ open class Node(configuration: NodeConfiguration,
startLocalRpcBroker(securityManager) startLocalRpcBroker(securityManager)
} }
val bridgeControlListener = BridgeControlListener( val bridgeControlListener = makeBridgeControlListener(network.serverAddress, networkParameters)
configuration.p2pSslOptions,
network.serverAddress,
networkParameters.maxMessageSize,
configuration.crlCheckSoftFail)
printBasicNodeInfo("Advertised P2P messaging addresses", nodeInfo.addresses.joinToString()) printBasicNodeInfo("Advertised P2P messaging addresses", nodeInfo.addresses.joinToString())
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT val rpcServerConfiguration = RPCServerConfiguration.DEFAULT
@ -307,6 +304,18 @@ open class Node(configuration: NodeConfiguration,
) )
} }
private fun makeBridgeControlListener(serverAddress: NetworkHostAndPort, networkParameters: NetworkParameters) : BridgeControlListener {
val artemisMessagingClientFactory = {
ArtemisMessagingClient(
configuration.p2pSslOptions,
serverAddress,
networkParameters.maxMessageSize,
failoverCallback = { errorAndTerminate("ArtemisMessagingClient failed. Shutting down.", null) }
)
}
return BridgeControlListener(configuration.p2pSslOptions, networkParameters.maxMessageSize, configuration.crlCheckSoftFail, artemisMessagingClientFactory)
}
private fun startLocalRpcBroker(securityManager: RPCSecurityManager): BrokerAddresses? { private fun startLocalRpcBroker(securityManager: RPCSecurityManager): BrokerAddresses? {
return with(configuration) { return with(configuration) {
rpcOptions.address.let { rpcOptions.address.let {

View File

@ -28,6 +28,7 @@ import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExternalEvent import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.errorAndTerminate
import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
@ -167,7 +168,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE
isUseGlobalPools = nodeSerializationEnv != null isUseGlobalPools = nodeSerializationEnv != null
} }
val sessionFactory = locator!!.createSessionFactory() val sessionFactory = locator!!.createSessionFactory().addFailoverListener(::failoverCallback)
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer) // Login using the node username. The broker will authenticate us as its node (as opposed to another peer)
// using our TLS certificate. // using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
@ -209,6 +210,22 @@ class P2PMessagingClient(val config: NodeConfiguration,
} }
} }
private fun failoverCallback(event: FailoverEventType) {
when (event) {
FailoverEventType.FAILURE_DETECTED -> {
errorAndTerminate("Connection to the broker was lost. Node is shutting down.", null)
}
FailoverEventType.FAILOVER_FAILED -> state.locked {
if (running) {
errorAndTerminate("Could not reconnect to the broker. Node is shutting down.", null)
}
}
else -> {
log.warn("Cannot handle event $event.")
}
}
}
private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) { private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) {
val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}" val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}"
if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) { if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) {