diff --git a/docs/source/running-a-node.rst b/docs/source/running-a-node.rst index 13cc0cd38f..fe231e9b40 100644 --- a/docs/source/running-a-node.rst +++ b/docs/source/running-a-node.rst @@ -131,3 +131,24 @@ To create nodes locally and run on a remote machine perform the following steps: 4. Run nodes on the remote machine using :ref:`runnodes command `. The above steps create a test deployment as ``deployNodes`` Gradle task would do on a local machine. + +Stability of the Corda Node +--------------------------- + +There are a number of critical resources necessary for Corda Node to operate to ensure transactional consistency of the ledger. +These critical resources include: + +1. Connection to a database; + +2. Connection to Artemis Broker for P2P communication; + +3. Connection to Artemis Broker for RPC communication. + +Should any of those critical resources become not available, Corda Node will be getting into an unstable state and as a safety precaution it will +shut itself down reporting the cause as an error message to the Node's log file. + +.. note:: On some operating systems when PC is going to sleep whilst Corda Node is running, imbedded into Node Artemis message broker reports + the loss of heartbeat event which in turn causes loss of connectivity to Artemis. In such circumstances Corda Node will exit reporting broker + connectivity problem in the log. + +Once critical resources node relies upon are available again, it is safe for Node operator to re-start the node for normal operation. \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt index e7b57d10f1..b0957af4e4 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt @@ -5,11 +5,8 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER import net.corda.nodeapi.internal.config.MutualSslConfiguration -import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import org.apache.activemq.artemis.api.core.client.* import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE -import org.apache.activemq.artemis.api.core.client.ClientProducer -import org.apache.activemq.artemis.api.core.client.ClientSession -import org.apache.activemq.artemis.api.core.client.ClientSessionFactory interface ArtemisSessionProvider { fun start(): ArtemisMessagingClient.Started @@ -19,7 +16,8 @@ interface ArtemisSessionProvider { class ArtemisMessagingClient(private val config: MutualSslConfiguration, private val serverAddress: NetworkHostAndPort, - private val maxMessageSize: Int) : ArtemisSessionProvider { + private val maxMessageSize: Int, + private val failoverCallback: ((FailoverEventType) -> Unit)? = null) : ArtemisSessionProvider { companion object { private val log = loggerFor() } @@ -44,6 +42,10 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize)) } val sessionFactory = locator.createSessionFactory() + + // Handle failover events if a callback method is provided + if (failoverCallback != null) sessionFactory.addFailoverListener(failoverCallback) + // Login using the node username. The broker will authenticate us as its node (as opposed to another peer) // using our TLS certificate. // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index 46e5062c66..c3c829b94a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -3,9 +3,7 @@ package net.corda.nodeapi.internal.bridging import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger -import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX @@ -34,11 +32,6 @@ class BridgeControlListener(val config: MutualSslConfiguration, private var artemis: ArtemisSessionProvider? = null private var controlConsumer: ClientConsumer? = null - constructor(config: MutualSslConfiguration, - p2pAddress: NetworkHostAndPort, - maxMessageSize: Int, - crlCheckSoftFail: Boolean) : this(config, maxMessageSize, crlCheckSoftFail, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }) - companion object { private val log = contextLogger() } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 675a6f4a01..427f965bc6 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -53,6 +53,7 @@ import net.corda.node.services.rpc.InternalRPCMessagingClient import net.corda.node.services.rpc.RPCServerConfiguration import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.utilities.* +import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_SHELL_USER import net.corda.nodeapi.internal.ShutdownHook import net.corda.nodeapi.internal.addShutdownHook @@ -265,11 +266,7 @@ open class Node(configuration: NodeConfiguration, startLocalRpcBroker(securityManager) } - val bridgeControlListener = BridgeControlListener( - configuration.p2pSslOptions, - network.serverAddress, - networkParameters.maxMessageSize, - configuration.crlCheckSoftFail) + val bridgeControlListener = makeBridgeControlListener(network.serverAddress, networkParameters) printBasicNodeInfo("Advertised P2P messaging addresses", nodeInfo.addresses.joinToString()) val rpcServerConfiguration = RPCServerConfiguration.DEFAULT @@ -307,6 +304,18 @@ open class Node(configuration: NodeConfiguration, ) } + private fun makeBridgeControlListener(serverAddress: NetworkHostAndPort, networkParameters: NetworkParameters) : BridgeControlListener { + val artemisMessagingClientFactory = { + ArtemisMessagingClient( + configuration.p2pSslOptions, + serverAddress, + networkParameters.maxMessageSize, + failoverCallback = { errorAndTerminate("ArtemisMessagingClient failed. Shutting down.", null) } + ) + } + return BridgeControlListener(configuration.p2pSslOptions, networkParameters.maxMessageSize, configuration.crlCheckSoftFail, artemisMessagingClientFactory) + } + private fun startLocalRpcBroker(securityManager: RPCSecurityManager): BrokerAddresses? { return with(configuration) { rpcOptions.address.let { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 7065728ed0..c89589324c 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -28,6 +28,7 @@ import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.ExternalEvent import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.utilities.AffinityExecutor +import net.corda.node.utilities.errorAndTerminate import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL @@ -167,7 +168,7 @@ class P2PMessagingClient(val config: NodeConfiguration, minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE isUseGlobalPools = nodeSerializationEnv != null } - val sessionFactory = locator!!.createSessionFactory() + val sessionFactory = locator!!.createSessionFactory().addFailoverListener(::failoverCallback) // Login using the node username. The broker will authenticate us as its node (as opposed to another peer) // using our TLS certificate. // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer @@ -209,6 +210,22 @@ class P2PMessagingClient(val config: NodeConfiguration, } } + private fun failoverCallback(event: FailoverEventType) { + when (event) { + FailoverEventType.FAILURE_DETECTED -> { + errorAndTerminate("Connection to the broker was lost. Node is shutting down.", null) + } + FailoverEventType.FAILOVER_FAILED -> state.locked { + if (running) { + errorAndTerminate("Could not reconnect to the broker. Node is shutting down.", null) + } + } + else -> { + log.warn("Cannot handle event $event.") + } + } + } + private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List) { val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}" if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) {