From c0f254e3bbc240744265e84ffb86c83912e18b21 Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Fri, 20 Apr 2018 10:52:00 +0100 Subject: [PATCH] ENT-1775: reworked client to handle failover in HA mode instead of relying on artemis (#759) * ENT-1775: reworked client to handle failover in HA mode instead of relying on artemis * ENT-1775: address PR comments --- .../net/corda/client/rpc/RPCStabilityTests.kt | 90 +++++++++++ .../net/corda/client/rpc/CordaRPCClient.kt | 4 +- .../corda/client/rpc/internal/RPCClient.kt | 4 +- .../rpc/internal/RPCClientProxyHandler.kt | 140 ++++++++++++++---- .../corda/testing/node/internal/RPCDriver.kt | 33 +++++ 5 files changed, 241 insertions(+), 30 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 63e1579e8c..f4fa6be9f4 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -336,6 +336,96 @@ class RPCStabilityTests { } } + interface ServerOps : RPCOps { + fun serverId(): String + } + + @Test + fun `client connects to first available server`() { + rpcDriver { + val ops = object : ServerOps { + override val protocolVersion = 0 + override fun serverId() = "server" + } + val serverFollower = shutdownManager.follower() + val serverAddress = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! + serverFollower.unfollow() + + val clientFollower = shutdownManager.follower() + val client = startRpcClient(listOf(NetworkHostAndPort("localhost", 12345), serverAddress, NetworkHostAndPort("localhost", 54321))).getOrThrow() + clientFollower.unfollow() + + assertEquals("server", client.serverId()) + + clientFollower.shutdown() // Driver would do this after the new server, causing hang. + } + } + + @Test + fun `3 server failover`() { + rpcDriver { + val ops1 = object : ServerOps { + override val protocolVersion = 0 + override fun serverId() = "server1" + } + val ops2 = object : ServerOps { + override val protocolVersion = 0 + override fun serverId() = "server2" + } + val ops3 = object : ServerOps { + override val protocolVersion = 0 + override fun serverId() = "server3" + } + val serverFollower1 = shutdownManager.follower() + val server1 = startRpcServer(ops = ops1).getOrThrow() + serverFollower1.unfollow() + + val serverFollower2 = shutdownManager.follower() + val server2 = startRpcServer(ops = ops2).getOrThrow() + serverFollower2.unfollow() + + val serverFollower3 = shutdownManager.follower() + val server3 = startRpcServer(ops = ops3).getOrThrow() + serverFollower3.unfollow() + val servers = mutableMapOf("server1" to serverFollower1, "server2" to serverFollower2, "server3" to serverFollower3) + + val clientFollower = shutdownManager.follower() + val client = startRpcClient(listOf(server1.broker.hostAndPort!!, server2.broker.hostAndPort!!, server3.broker.hostAndPort!!)).getOrThrow() + clientFollower.unfollow() + + var response = client.serverId() + assertTrue(servers.containsKey(response)) + servers[response]!!.shutdown() + servers.remove(response) + + //failover will take some time + while (true) { + try { + response = client.serverId() + break + } catch (e: RPCException) {} + } + assertTrue(servers.containsKey(response)) + servers[response]!!.shutdown() + servers.remove(response) + + while (true) { + try { + response = client.serverId() + break + } catch (e: RPCException) {} + } + assertTrue(servers.containsKey(response)) + servers[response]!!.shutdown() + servers.remove(response) + + assertTrue(servers.isEmpty()) + + clientFollower.shutdown() // Driver would do this after the new server, causing hang. + + } + } + interface TrackSubscriberOps : RPCOps { fun subscribe(): Observable } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 616f5b6b14..ce5d0c617f 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -110,7 +110,7 @@ interface CordaRPCClientConfiguration { * @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server. * @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode. * The client will attempt to connect to a live server by trying each address in the list. If the servers are not in - * HA mode, no failover will occur and the client will try reconnecting to the initial server it connected to. + * HA mode, the client will round-robin from the beginning of the list and try all servers. */ class CordaRPCClient private constructor( private val hostAndPort: NetworkHostAndPort, @@ -125,7 +125,7 @@ class CordaRPCClient private constructor( /** * @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode. * The client will attempt to connect to a live server by trying each address in the list. If the servers are not in - * HA mode, no failover will occur and the client will try reconnecting to the initial server it connected to. + * HA mode, the client will round-robin from the beginning of the list and try all servers. * @param configuration An optional configuration used to tweak client behaviour. */ @JvmOverloads diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 77094eb268..ff13da4c9b 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -106,12 +106,12 @@ class RPCClient( val serverLocator = (if (haPoolTransportConfigurations.isEmpty()) { ActiveMQClient.createServerLocatorWithoutHA(transport) } else { - ActiveMQClient.createServerLocatorWithHA(*haPoolTransportConfigurations.toTypedArray()) + ActiveMQClient.createServerLocatorWithoutHA(*haPoolTransportConfigurations.toTypedArray()) }).apply { retryInterval = rpcConfiguration.connectionRetryInterval.toMillis() retryIntervalMultiplier = rpcConfiguration.connectionRetryIntervalMultiplier maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval.toMillis() - reconnectAttempts = rpcConfiguration.maxReconnectAttempts + reconnectAttempts = if (haPoolTransportConfigurations.isEmpty()) rpcConfiguration.maxReconnectAttempts else 0 minLargeMessageSize = rpcConfiguration.maxFileSize isUseGlobalPools = nodeSerializationEnv != null } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 13ed23e5d8..6f174da6f5 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -30,6 +30,7 @@ import net.corda.core.context.Trace.InvocationId import net.corda.core.internal.LazyStickyPool import net.corda.core.internal.LifeCycle import net.corda.core.internal.ThreadBox +import net.corda.core.internal.times import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.serialize @@ -39,6 +40,7 @@ import net.corda.core.utilities.debug import net.corda.core.utilities.getOrThrow import net.corda.nodeapi.RPCApi import net.corda.nodeapi.internal.DeduplicationChecker +import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* @@ -183,6 +185,8 @@ class RPCClientProxyHandler( private val deduplicationSequenceNumber = AtomicLong(0) private val sendingEnabled = AtomicBoolean(true) + // used to interrupt failover thread (i.e. client is closed while failing over) + private var haFailoverThread: Thread? = null /** * Start the client. This creates the per-client queue, starts the consumer session and the reaper. @@ -202,17 +206,22 @@ class RPCClientProxyHandler( rpcConfiguration.reapInterval.toMillis(), TimeUnit.MILLISECONDS ) + // Create a session factory using the first available server. If more than one transport configuration was + // used when creating the server locator, every one will be tried during failover. The locator will round-robin + // through the available transport configurations with the starting position being generated randomly. + // If there's only one available, that one will be retried continuously as configured in rpcConfiguration. + // There is no failover on first attempt, meaning that if a connection cannot be established, the serverLocator + // will try another transport if it exists or throw an exception otherwise. sessionFactory = serverLocator.createSessionFactory() - producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) - rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME) - consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) - consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress) - rpcConsumer = consumerSession!!.createConsumer(clientAddress) - rpcConsumer!!.setMessageHandler(this::artemisMessageHandler) - producerSession!!.addFailoverListener(this::failoverHandler) + // Depending on how the client is constructed, connection failure is treated differently + if (serverLocator.staticTransportConfigurations.size == 1) { + sessionFactory!!.addFailoverListener(this::failoverHandler) + } else { + sessionFactory!!.addFailoverListener(this::haFailoverHandler) + } + initSessions() lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET) - consumerSession!!.start() - producerSession!!.start() + startSessions() } // This is the general function that transforms a client side RPC to internal Artemis messages. @@ -351,6 +360,10 @@ class RPCClientProxyHandler( * @param notify whether to notify observables or not. */ private fun close(notify: Boolean = true) { + haFailoverThread?.apply { + interrupt() + join(1000) + } sessionFactory?.close() reaperScheduledFuture?.cancel(false) observableContext.observableMap.invalidateAll() @@ -413,26 +426,82 @@ class RPCClientProxyHandler( } } + private fun attemptReconnect() { + var reconnectAttempts = rpcConfiguration.maxReconnectAttempts * serverLocator.staticTransportConfigurations.size + var retryInterval = rpcConfiguration.connectionRetryInterval + val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval + + var transportIterator = serverLocator.staticTransportConfigurations.iterator() + while (transportIterator.hasNext() && reconnectAttempts != 0) { + val transport = transportIterator.next() + if (!transportIterator.hasNext()) + transportIterator = serverLocator.staticTransportConfigurations.iterator() + + log.debug("Trying to connect using ${transport.params}") + try { + if (serverLocator != null && !serverLocator.isClosed) { + sessionFactory = serverLocator.createSessionFactory(transport) + } else { + log.warn("Stopping reconnect attempts.") + log.debug("Server locator is closed or garbage collected. Proxy may have been closed during reconnect.") + break + } + } catch (e: ActiveMQException) { + try { + Thread.sleep(retryInterval.toMillis()) + } catch (e: InterruptedException) {} + // could not connect, try with next server transport + reconnectAttempts-- + retryInterval = minOf(maxRetryInterval, retryInterval.times(rpcConfiguration.connectionRetryIntervalMultiplier.toLong())) + continue + } + + log.debug("Connected successfully using ${transport.params}") + log.info("RPC server available.") + sessionFactory!!.addFailoverListener(this::haFailoverHandler) + initSessions() + startSessions() + sendingEnabled.set(true) + break + } + + if (reconnectAttempts == 0 || sessionFactory == null) + log.error("Could not reconnect to the RPC server.") + } + + private fun initSessions() { + producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) + rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME) + consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) + consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress) + rpcConsumer = consumerSession!!.createConsumer(clientAddress) + rpcConsumer!!.setMessageHandler(this::artemisMessageHandler) + } + + private fun startSessions() { + consumerSession!!.start() + producerSession!!.start() + } + + private fun haFailoverHandler(event: FailoverEventType) { + if (event == FailoverEventType.FAILURE_DETECTED) { + log.warn("Connection failure. Attempting to reconnect using back-up addresses.") + cleanUpOnConnectionLoss() + sessionFactory?.apply { + connection.destroy() + cleanup() + close() + } + haFailoverThread = Thread.currentThread() + attemptReconnect() + } + /* Other events are not considered as reconnection is not done by Artemis */ + } + private fun failoverHandler(event: FailoverEventType) { when (event) { FailoverEventType.FAILURE_DETECTED -> { - sendingEnabled.set(false) - - log.warn("Terminating observables.") - val m = observableContext.observableMap.asMap() - m.keys.forEach { k -> - observationExecutorPool.run(k) { - m[k]?.onError(RPCException("Connection failure detected.")) - } - } - observableContext.observableMap.invalidateAll() - - rpcReplyMap.forEach { _, replyFuture -> - replyFuture.setException(RPCException("Connection failure detected.")) - } - - rpcReplyMap.clear() - callSiteMap?.clear() + cleanUpOnConnectionLoss() } FailoverEventType.FAILOVER_COMPLETED -> { @@ -445,6 +514,25 @@ class RPCClientProxyHandler( } } } + + private fun cleanUpOnConnectionLoss() { + sendingEnabled.set(false) + log.warn("Terminating observables.") + val m = observableContext.observableMap.asMap() + m.keys.forEach { k -> + observationExecutorPool.run(k) { + m[k]?.onError(RPCException("Connection failure detected.")) + } + } + observableContext.observableMap.invalidateAll() + + rpcReplyMap.forEach { _, replyFuture -> + replyFuture.setException(RPCException("Connection failure detected.")) + } + + rpcReplyMap.clear() + callSiteMap?.clear() + } } private typealias RpcObservableMap = Cache>> diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index d3bf3affe4..cccfd447b1 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -85,6 +85,13 @@ inline fun RPCDriverDSL.startRpcClient( configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ) = startRpcClient(I::class.java, rpcAddress, username, password, configuration) +inline fun RPCDriverDSL.startRpcClient( + haAddressPool: List, + username: String = rpcTestUser.username, + password: String = rpcTestUser.password, + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default +) = startRpcClient(I::class.java, haAddressPool, username, password, configuration) + data class RpcBrokerHandle( val hostAndPort: NetworkHostAndPort?, /** null if this is an InVM broker */ @@ -346,6 +353,32 @@ data class RPCDriverDSL( } } + /** + * Starts a Netty RPC client. + * + * @param rpcOpsClass The [Class] of the RPC interface. + * @param haAddressPool The addresses of the RPC servers(configured in HA mode) to connect to. + * @param username The username to authenticate with. + * @param password The password to authenticate with. + * @param configuration The RPC client configuration. + */ + fun startRpcClient( + rpcOpsClass: Class, + haAddressPool: List, + username: String = rpcTestUser.username, + password: String = rpcTestUser.password, + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + ): CordaFuture { + return driverDSL.executorService.fork { + val client = RPCClient(haAddressPool, null, configuration) + val connection = client.start(rpcOpsClass, username, password, externalTrace) + driverDSL.shutdownManager.registerShutdown { + connection.close() + } + connection.proxy + } + } + /** * Starts a Netty RPC client in a new JVM process that calls random RPCs with random arguments. *