ENT-2247: node will deal with loss of broker according to configuration (#1373)

This commit is contained in:
bpaunescu 2018-09-06 11:37:13 +01:00 committed by GitHub
parent 767580c298
commit eb7cdda8a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 293 additions and 70 deletions

View File

@ -4,6 +4,7 @@ import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.*
@ -20,7 +21,9 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
private val maxMessageSize: Int,
private val autoCommitSends: Boolean = true,
private val autoCommitAcks: Boolean = true,
private val confirmationWindowSize: Int = -1) : ArtemisSessionProvider {
private val confirmationWindowSize: Int = -1,
private val externalBrokerConnectionConfig: ExternalBrokerConnectionConfiguration? = null
) : ArtemisSessionProvider {
companion object {
private val log = loggerFor<ArtemisMessagingClient>()
}
@ -43,6 +46,14 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
minLargeMessageSize = maxMessageSize
isUseGlobalPools = nodeSerializationEnv != null
confirmationWindowSize = this@ArtemisMessagingClient.confirmationWindowSize
externalBrokerConnectionConfig?.let {
reconnectAttempts = externalBrokerConnectionConfig.reconnectAttempts
retryInterval = externalBrokerConnectionConfig.retryInterval.toMillis()
retryIntervalMultiplier = externalBrokerConnectionConfig.retryIntervalMultiplier
maxRetryInterval = externalBrokerConnectionConfig.maxRetryInterval.toMillis()
isFailoverOnInitialConnection = externalBrokerConnectionConfig.failoverOnInitialAttempt
initialConnectAttempts = externalBrokerConnectionConfig.initialConnectAttempts
}
addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize))
}
val sessionFactory = locator.createSessionFactory()

View File

@ -0,0 +1,88 @@
package net.corda.nodeapi.internal.config
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import java.time.Duration
/**
* Predefined connection configurations used by Artemis clients (currently used in the P2P messaging layer).
* The enum names represent the approximate total duration of the failover (with exponential back-off). The formula used to calculate
* this duration is as follows:
*
* totalFailoverDuration = SUM(k=0 to [reconnectAttempts]) of [retryInterval] * POW([retryIntervalMultiplier], k)
*
* Example calculation for [DEFAULT]:
*
* totalFailoverDuration = 5 + 5 * 1.5 + 5 * (1.5)^2 + 5 * (1.5)^3 + 5 * (1.5)^4 = ~66 seconds
*
* @param failoverOnInitialAttempt Determines whether failover is triggered if initial connection fails.
* @param initialConnectAttempts The number of reconnect attempts if failover is enabled for initial connection. A value
* of -1 represents infinite attempts.
* @param reconnectAttempts The number of reconnect attempts for failover after initial connection is done. A value
* of -1 represents infinite attempts.
* @param retryInterval Duration between reconnect attempts.
* @param retryIntervalMultiplier Value used in the reconnection back-off process.
* @param maxRetryInterval Determines the maximum duration between reconnection attempts. Useful when using infinite retries.
*/
enum class ExternalBrokerConnectionConfiguration(
val failoverOnInitialAttempt: Boolean,
val initialConnectAttempts: Int,
val reconnectAttempts: Int,
val retryInterval: Duration,
val retryIntervalMultiplier: Double,
val maxRetryInterval: Duration) {
DEFAULT(
failoverOnInitialAttempt = true,
initialConnectAttempts = 5,
reconnectAttempts = 5,
retryInterval = 5.seconds,
retryIntervalMultiplier = 1.5,
maxRetryInterval = 3.minutes
),
FAIL_FAST(
failoverOnInitialAttempt = false,
initialConnectAttempts = 0,
reconnectAttempts = 0,
retryInterval = 5.seconds,
retryIntervalMultiplier = 1.5,
maxRetryInterval = 3.minutes
),
CONTINUOUS_RETRY(
failoverOnInitialAttempt = true,
initialConnectAttempts = -1,
reconnectAttempts = -1,
retryInterval = 5.seconds,
retryIntervalMultiplier = 1.5,
maxRetryInterval = 5.minutes
),
FIVE_MINUTES(
failoverOnInitialAttempt = true,
initialConnectAttempts = 13,
reconnectAttempts = 13,
retryInterval = 5.seconds,
retryIntervalMultiplier = 1.2,
maxRetryInterval = 5.minutes
),
TEN_MINUTES(
failoverOnInitialAttempt = true,
initialConnectAttempts = 17,
reconnectAttempts = 17,
retryInterval = 5.seconds,
retryIntervalMultiplier = 1.2,
maxRetryInterval = 5.minutes
),
ONE_HOUR(
failoverOnInitialAttempt = true,
initialConnectAttempts = 17,
reconnectAttempts = 17,
retryInterval = 5.seconds,
retryIntervalMultiplier = 1.5,
maxRetryInterval = 10.minutes
)
}

View File

@ -0,0 +1,152 @@
package net.corda.node
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.RPCException
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.config.*
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.stubs.CertificateStoreStubs
import net.corda.testing.node.User
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import kotlin.concurrent.thread
import kotlin.math.pow
import kotlin.test.assertEquals
import kotlin.test.fail
class ExternalBrokertests : IntegrationTest() {
@Rule
@JvmField
val tempFolder = TemporaryFolder()
private val portAllocator = PortAllocation.Incremental(10000)
private abstract class AbstractNodeConfiguration : NodeConfiguration
@Test
fun `node startup sequence waits for broker to be available using default mode`() {
val aliceUser = User("alice", "alice", permissions = setOf("ALL"))
val p2pPort = portAllocator.nextPort()
val rpcPort = portAllocator.nextPort()
val broker = createArtemis(p2pPort)
val nodeConfiguration = mapOf(
"baseDirectory" to tempFolder.root.toPath().toString() + "/",
"devMode" to false, "messagingServerExternal" to true,
"messagingServerAddress" to NetworkHostAndPort("localhost", p2pPort).toString(),
"enterpriseConfiguration" to mapOf("externalBridge" to true),
"keyStorePassword" to "cordacadevpass",
"trustStorePassword" to "trustpass",
"rpcSettings.address" to NetworkHostAndPort("localhost", rpcPort).toString())
driver(DriverParameters(startNodesInProcess = false, notarySpecs = emptyList())) {
val nodeThread = thread {
startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), customOverrides = nodeConfiguration).getOrThrow()
}
// Connect RPC client to node (will take some time) and check exception
try {
CordaRPCClient(NetworkHostAndPort("localhost", rpcPort)).start(aliceUser.username, aliceUser.password)
} catch (e: RPCException) {
assertEquals("Cannot connect to server(s). Tried with all available servers.", e.message)
}
broker.start()
nodeThread.join()
// Try connecting to the node again (should be running) and execute and RPC
try {
CordaRPCClient(NetworkHostAndPort("localhost", rpcPort)).start(aliceUser.username, aliceUser.password).use {
try {
val nodeInfo = it.proxy.nodeInfo()
assertEquals(nodeInfo.legalIdentities.first().name, ALICE_NAME)
} catch (e: RPCException) {
fail("Calling RPC nodeInfo failed. Node is not running.")
}
it.close()
}
} catch (e: RPCException) {
fail("Could not connect RPC client to the node.")
}
}
broker.stop()
}
@Test
fun `node terminates if connection to broker has been lost and cannot be re-established`() {
val aliceUser = User("alice", "alice", permissions = setOf("ALL"))
val p2pPort = portAllocator.nextPort()
val broker = createArtemis(p2pPort)
broker.start()
val nodeConfiguration = mapOf(
"baseDirectory" to tempFolder.root.toPath().toString() + "/",
"devMode" to false, "messagingServerExternal" to true,
"messagingServerAddress" to NetworkHostAndPort("localhost", p2pPort).toString(),
"enterpriseConfiguration" to mapOf("externalBrokerConnectionConfiguration" to "FAIL_FAST"),
"keyStorePassword" to "cordacadevpass",
"trustStorePassword" to "trustpass")
driver(DriverParameters(startNodesInProcess = false, notarySpecs = emptyList())) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), customOverrides = nodeConfiguration).getOrThrow()
// Check node is running by calling and RPC
CordaRPCClient(aliceNode.rpcAddress).start(aliceUser.username, aliceUser.password).use {
try {
val nodeInfo = it.proxy.nodeInfo()
assertEquals(nodeInfo.legalIdentities.first().name, ALICE_NAME)
} catch (e: RPCException) {
fail("Calling RPC nodeInfo failed. Node is not running.")
}
it.close()
}
broker.stop()
val defaultConfig = ExternalBrokerConnectionConfiguration.FAIL_FAST
var reconnectTimeout = 0.0
(1..defaultConfig.reconnectAttempts).forEach {
reconnectTimeout += defaultConfig.retryInterval.toMillis() * defaultConfig.retryIntervalMultiplier.pow(it - 1)
}
// Wait for the configured reconnection time to pass before attempting and RPC connection and check whether the node is stopped or running
Thread.sleep(reconnectTimeout.toLong())
try {
CordaRPCClient(aliceNode.rpcAddress).start(aliceUser.username, aliceUser.password)
} catch (e: RPCException) {
assertEquals("Cannot connect to server(s). Tried with all available servers.", e.message)
}
}
}
private fun createArtemis(p2pPort: Int): ArtemisMessagingServer {
val baseDirectory = tempFolder.root.toPath()
val certificatesDirectory = baseDirectory / "certificates"
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val p2pSslOptions = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(p2pSslOptions).whenever(it).p2pSslOptions
doReturn(ALICE_NAME).whenever(it).myLegalName
doReturn(NetworkHostAndPort("localhost", p2pPort)).whenever(it).p2pAddress
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000), externalBridge = false)).whenever(it).enterpriseConfiguration
doReturn(null).whenever(it).jmxMonitoringHttpPort
}
artemisConfig.configureWithDevSSLCertificate()
return ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", p2pPort), MAX_MESSAGE_SIZE)
}
}

View File

@ -86,6 +86,7 @@ class ArtemisMessagingTest {
doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
doReturn(false).whenever(it).messagingServerExternal
doReturn(FlowTimeoutConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).flowTimeout
}
LogHelper.setLevel(PersistentUniquenessProvider::class)

View File

@ -52,6 +52,7 @@ import net.corda.node.services.rpc.ArtemisRpcBroker
import net.corda.node.utilities.AddressUtils
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.DemoClock
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_SHELL_USER
import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook
@ -233,7 +234,16 @@ open class Node(configuration: NodeConfiguration,
val externalBridge = configuration.enterpriseConfiguration.externalBridge
val bridgeControlListener = if (externalBridge == null || !externalBridge) {
BridgeControlListener(configuration.p2pSslOptions, network.serverAddress, networkParameters.maxMessageSize)
val artemisClient = {
ArtemisMessagingClient(configuration.p2pSslOptions,
network.serverAddress,
networkParameters.maxMessageSize,
true,
true,
-1,
configuration.enterpriseConfiguration.externalBrokerConnectionConfiguration)
}
BridgeControlListener(configuration.p2pSslOptions, null, networkParameters.maxMessageSize, artemisClient)
} else {
null
}

View File

@ -3,9 +3,11 @@ package net.corda.node.services.config
import java.io.File
import java.net.InetAddress
import java.nio.file.Path
import net.corda.nodeapi.internal.config.ExternalBrokerConnectionConfiguration
data class EnterpriseConfiguration(
val mutualExclusionConfiguration: MutualExclusionConfiguration,
val externalBrokerConnectionConfiguration: ExternalBrokerConnectionConfiguration = ExternalBrokerConnectionConfiguration.DEFAULT,
val useMultiThreadedSMM: Boolean = true,
val tuning: PerformanceTuning = PerformanceTuning.default,
val externalBridge: Boolean? = null,

View File

@ -11,7 +11,6 @@ import net.corda.node.VersionInfo
import net.corda.node.services.statemachine.FlowMessagingImpl
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
@ -120,12 +119,6 @@ class MessagingExecutor(
break@eventLoop
}
}
} catch (exception: ActiveMQObjectClosedException) {
log.error("Messaging client connection closed", exception)
if (job is Job.Send) {
job.sentFuture.setException(exception)
}
System.exit(1)
} catch (exception: Throwable) {
log.error("Exception while handling job $job, disregarding", exception)
if (job is Job.Send) {

View File

@ -161,8 +161,35 @@ class P2PMessagingClient(val config: NodeConfiguration,
minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE
isUseGlobalPools = nodeSerializationEnv != null
confirmationWindowSize = config.enterpriseConfiguration.tuning.p2pConfirmationWindowSize
// Configuration for dealing with external broker failover
if (config.messagingServerExternal) {
reconnectAttempts = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts
retryInterval = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis()
retryIntervalMultiplier = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryIntervalMultiplier
maxRetryInterval = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.maxRetryInterval.toMillis()
isFailoverOnInitialConnection = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.failoverOnInitialAttempt
initialConnectAttempts = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.initialConnectAttempts
}
}
val sessionFactory = locator!!.createSessionFactory()
sessionFactory.addFailoverListener { event ->
when (event) {
FailoverEventType.FAILURE_DETECTED -> {
log.warn("Connection to the broker was lost. Starting ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} reconnect attempts.")
}
FailoverEventType.FAILOVER_COMPLETED -> {
log.info("Connection to broker re-established.")
}
FailoverEventType.FAILOVER_FAILED -> {
log.error("Could not reconnect to the broker after ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} attempts. Node is shutting down.")
Thread.sleep(config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis())
Runtime.getRuntime().halt(1)
}
else -> {
log.warn("Cannot handle event $event.")
}
}
}
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer)
// using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer

View File

@ -1,61 +0,0 @@
package net.corda.node.services.messaging
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.Timer
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.utilities.OpaqueBytes
import net.corda.node.VersionInfo
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.testing.node.internal.InMemoryMessage
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.junit.After
import org.junit.Rule
import org.junit.Test
import org.junit.contrib.java.lang.system.ExpectedSystemExit
import kotlin.concurrent.thread
class MessagingExecutorTest {
@Rule
@JvmField
val exit: ExpectedSystemExit = ExpectedSystemExit.none()
private lateinit var messagingExecutor: MessagingExecutor
@After
fun after() {
messagingExecutor.close()
}
@Test
fun `System exit node if messaging is closed`() {
exit.expectSystemExitWithStatus(1)
val session = mock<ClientSession>()
whenever(session.createMessage(any())).thenReturn(mock())
val producer = mock<ClientProducer>()
whenever(producer.send(any(), any(), any())).thenThrow(ActiveMQObjectClosedException())
val resolver = mock<AddressToArtemisQueueResolver>()
whenever(resolver.resolveTargetToArtemisQueue(any())).thenReturn("address")
val metricRegistry = mock<MetricRegistry>()
val sendLatencyMetric = mock<Timer>()
whenever(metricRegistry.timer(any())).thenReturn(sendLatencyMetric)
whenever(sendLatencyMetric.time()).thenReturn(mock())
whenever(metricRegistry.histogram(any())).thenReturn(mock())
messagingExecutor = MessagingExecutor(session, producer, VersionInfo.UNKNOWN, resolver, metricRegistry, "ourSenderUUID", 10, "legalName")
messagingExecutor.start()
thread {
messagingExecutor.send(InMemoryMessage("topic", OpaqueBytes(ByteArray(10)), DeduplicationId("1")), mock())
}.join()
}
}