From 3a17d4726f0804ab3c83b658a5927f2f3c46d387 Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Fri, 20 Apr 2018 10:52:00 +0100 Subject: [PATCH 1/4] ENT-1775: reworked client to handle failover in HA mode instead of relying on artemis (#759) * ENT-1775: reworked client to handle failover in HA mode instead of relying on artemis * ENT-1775: address PR comments --- .../net/corda/client/rpc/RPCStabilityTests.kt | 90 +++++++++++ .../net/corda/client/rpc/CordaRPCClient.kt | 60 ++++++-- .../corda/client/rpc/internal/RPCClient.kt | 20 ++- .../rpc/internal/RPCClientProxyHandler.kt | 140 ++++++++++++++---- .../corda/testing/node/internal/RPCDriver.kt | 33 +++++ 5 files changed, 304 insertions(+), 39 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index e7452d49ab..d9602f0c43 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -326,6 +326,96 @@ class RPCStabilityTests { } } + interface ServerOps : RPCOps { + fun serverId(): String + } + + @Test + fun `client connects to first available server`() { + rpcDriver { + val ops = object : ServerOps { + override val protocolVersion = 0 + override fun serverId() = "server" + } + val serverFollower = shutdownManager.follower() + val serverAddress = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! + serverFollower.unfollow() + + val clientFollower = shutdownManager.follower() + val client = startRpcClient(listOf(NetworkHostAndPort("localhost", 12345), serverAddress, NetworkHostAndPort("localhost", 54321))).getOrThrow() + clientFollower.unfollow() + + assertEquals("server", client.serverId()) + + clientFollower.shutdown() // Driver would do this after the new server, causing hang. + } + } + + @Test + fun `3 server failover`() { + rpcDriver { + val ops1 = object : ServerOps { + override val protocolVersion = 0 + override fun serverId() = "server1" + } + val ops2 = object : ServerOps { + override val protocolVersion = 0 + override fun serverId() = "server2" + } + val ops3 = object : ServerOps { + override val protocolVersion = 0 + override fun serverId() = "server3" + } + val serverFollower1 = shutdownManager.follower() + val server1 = startRpcServer(ops = ops1).getOrThrow() + serverFollower1.unfollow() + + val serverFollower2 = shutdownManager.follower() + val server2 = startRpcServer(ops = ops2).getOrThrow() + serverFollower2.unfollow() + + val serverFollower3 = shutdownManager.follower() + val server3 = startRpcServer(ops = ops3).getOrThrow() + serverFollower3.unfollow() + val servers = mutableMapOf("server1" to serverFollower1, "server2" to serverFollower2, "server3" to serverFollower3) + + val clientFollower = shutdownManager.follower() + val client = startRpcClient(listOf(server1.broker.hostAndPort!!, server2.broker.hostAndPort!!, server3.broker.hostAndPort!!)).getOrThrow() + clientFollower.unfollow() + + var response = client.serverId() + assertTrue(servers.containsKey(response)) + servers[response]!!.shutdown() + servers.remove(response) + + //failover will take some time + while (true) { + try { + response = client.serverId() + break + } catch (e: RPCException) {} + } + assertTrue(servers.containsKey(response)) + servers[response]!!.shutdown() + servers.remove(response) + + while (true) { + try { + response = client.serverId() + break + } catch (e: RPCException) {} + } + assertTrue(servers.containsKey(response)) + servers[response]!!.shutdown() + servers.remove(response) + + assertTrue(servers.isEmpty()) + + clientFollower.shutdown() // Driver would do this after the new server, causing hang. + + } + } + interface TrackSubscriberOps : RPCOps { fun subscribe(): Observable } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index ed4ad14a23..a378808068 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -93,19 +93,34 @@ interface CordaRPCClientConfiguration { * [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw * [RPCException] and previously returned observables will call onError(). * + * If the client was created using a list of hosts, automatic failover will occur(the servers have to be started in HA mode) + * * @param hostAndPort The network address to connect to. * @param configuration An optional configuration used to tweak client behaviour. * @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server. + * @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode. + * The client will attempt to connect to a live server by trying each address in the list. If the servers are not in + * HA mode, the client will round-robin from the beginning of the list and try all servers. */ class CordaRPCClient private constructor( - hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), - sslConfiguration: SSLConfiguration? = null, - classLoader: ClassLoader? = null + private val hostAndPort: NetworkHostAndPort, + private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + private val sslConfiguration: SSLConfiguration? = null, + private val classLoader: ClassLoader? = null, + private val haAddressPool: List = emptyList() ) { @JvmOverloads constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(hostAndPort, configuration, null) + /** + * @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode. + * The client will attempt to connect to a live server by trying each address in the list. If the servers are not in + * HA mode, the client will round-robin from the beginning of the list and try all servers. + * @param configuration An optional configuration used to tweak client behaviour. + */ + @JvmOverloads + constructor(haAddressPool: List, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(haAddressPool.first(), configuration, null, null, haAddressPool) + companion object { internal fun createWithSsl( hostAndPort: NetworkHostAndPort, @@ -115,6 +130,14 @@ class CordaRPCClient private constructor( return CordaRPCClient(hostAndPort, configuration, sslConfiguration) } + internal fun createWithSsl( + haAddressPool: List, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + sslConfiguration: SSLConfiguration? = null + ): CordaRPCClient { + return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, null, haAddressPool) + } + internal fun createWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), @@ -123,6 +146,15 @@ class CordaRPCClient private constructor( ): CordaRPCClient { return CordaRPCClient(hostAndPort, configuration, sslConfiguration, classLoader) } + + internal fun createWithSslAndClassLoader( + haAddressPool: List, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + sslConfiguration: SSLConfiguration? = null, + classLoader: ClassLoader? = null + ): CordaRPCClient { + return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, classLoader, haAddressPool) + } } init { @@ -137,11 +169,19 @@ class CordaRPCClient private constructor( } } - private val rpcClient = RPCClient( - tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration), - configuration, - if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT - ) + private fun getRpcClient() : RPCClient { + return if (haAddressPool.isEmpty()) { + RPCClient( + tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration), + configuration, + if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) + } else { + RPCClient(haAddressPool, + sslConfiguration, + configuration, + if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) + } + } /** * Logs in to the target server and returns an active connection. The returned connection is a [java.io.Closeable] @@ -169,7 +209,7 @@ class CordaRPCClient private constructor( * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?): CordaRPCConnection { - return CordaRPCConnection(rpcClient.start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor)) + return CordaRPCConnection(getRpcClient().start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor)) } /** diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index e8f33d284f..24096c0952 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -14,6 +14,7 @@ import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.utilities.* import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport +import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransportsFromList import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.RPCApi import net.corda.nodeapi.internal.config.SSLConfiguration @@ -60,7 +61,8 @@ data class CordaRPCClientConfigurationImpl( class RPCClient( val transport: TransportConfiguration, val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, - val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT + val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT, + val haPoolTransportConfigurations: List = emptyList() ) { constructor( hostAndPort: NetworkHostAndPort, @@ -69,6 +71,14 @@ class RPCClient( serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration, serializationContext) + constructor( + haAddressPool: List, + sslConfiguration: SSLConfiguration? = null, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT + ) : this(tcpTransport(ConnectionDirection.Outbound(), haAddressPool.first(), sslConfiguration), + configuration, serializationContext, tcpTransportsFromList(ConnectionDirection.Outbound(), haAddressPool, sslConfiguration)) + companion object { private val log = contextLogger() } @@ -83,11 +93,15 @@ class RPCClient( return log.logElapsedTime("Startup") { val clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.${random63BitValue()}") - val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(transport).apply { + val serverLocator = (if (haPoolTransportConfigurations.isEmpty()) { + ActiveMQClient.createServerLocatorWithoutHA(transport) + } else { + ActiveMQClient.createServerLocatorWithoutHA(*haPoolTransportConfigurations.toTypedArray()) + }).apply { retryInterval = rpcConfiguration.connectionRetryInterval.toMillis() retryIntervalMultiplier = rpcConfiguration.connectionRetryIntervalMultiplier maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval.toMillis() - reconnectAttempts = rpcConfiguration.maxReconnectAttempts + reconnectAttempts = if (haPoolTransportConfigurations.isEmpty()) rpcConfiguration.maxReconnectAttempts else 0 minLargeMessageSize = rpcConfiguration.maxFileSize isUseGlobalPools = nodeSerializationEnv != null } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index dc07b1ada2..8852a096cd 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -20,6 +20,7 @@ import net.corda.core.context.Trace.InvocationId import net.corda.core.internal.LazyStickyPool import net.corda.core.internal.LifeCycle import net.corda.core.internal.ThreadBox +import net.corda.core.internal.times import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.serialize @@ -29,6 +30,7 @@ import net.corda.core.utilities.debug import net.corda.core.utilities.getOrThrow import net.corda.nodeapi.RPCApi import net.corda.nodeapi.internal.DeduplicationChecker +import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* @@ -173,6 +175,8 @@ class RPCClientProxyHandler( private val deduplicationSequenceNumber = AtomicLong(0) private val sendingEnabled = AtomicBoolean(true) + // used to interrupt failover thread (i.e. client is closed while failing over) + private var haFailoverThread: Thread? = null /** * Start the client. This creates the per-client queue, starts the consumer session and the reaper. @@ -192,17 +196,22 @@ class RPCClientProxyHandler( rpcConfiguration.reapInterval.toMillis(), TimeUnit.MILLISECONDS ) + // Create a session factory using the first available server. If more than one transport configuration was + // used when creating the server locator, every one will be tried during failover. The locator will round-robin + // through the available transport configurations with the starting position being generated randomly. + // If there's only one available, that one will be retried continuously as configured in rpcConfiguration. + // There is no failover on first attempt, meaning that if a connection cannot be established, the serverLocator + // will try another transport if it exists or throw an exception otherwise. sessionFactory = serverLocator.createSessionFactory() - producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) - rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME) - consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) - consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress) - rpcConsumer = consumerSession!!.createConsumer(clientAddress) - rpcConsumer!!.setMessageHandler(this::artemisMessageHandler) - producerSession!!.addFailoverListener(this::failoverHandler) + // Depending on how the client is constructed, connection failure is treated differently + if (serverLocator.staticTransportConfigurations.size == 1) { + sessionFactory!!.addFailoverListener(this::failoverHandler) + } else { + sessionFactory!!.addFailoverListener(this::haFailoverHandler) + } + initSessions() lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET) - consumerSession!!.start() - producerSession!!.start() + startSessions() } // This is the general function that transforms a client side RPC to internal Artemis messages. @@ -341,6 +350,10 @@ class RPCClientProxyHandler( * @param notify whether to notify observables or not. */ private fun close(notify: Boolean = true) { + haFailoverThread?.apply { + interrupt() + join(1000) + } sessionFactory?.close() reaperScheduledFuture?.cancel(false) observableContext.observableMap.invalidateAll() @@ -403,26 +416,82 @@ class RPCClientProxyHandler( } } + private fun attemptReconnect() { + var reconnectAttempts = rpcConfiguration.maxReconnectAttempts * serverLocator.staticTransportConfigurations.size + var retryInterval = rpcConfiguration.connectionRetryInterval + val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval + + var transportIterator = serverLocator.staticTransportConfigurations.iterator() + while (transportIterator.hasNext() && reconnectAttempts != 0) { + val transport = transportIterator.next() + if (!transportIterator.hasNext()) + transportIterator = serverLocator.staticTransportConfigurations.iterator() + + log.debug("Trying to connect using ${transport.params}") + try { + if (serverLocator != null && !serverLocator.isClosed) { + sessionFactory = serverLocator.createSessionFactory(transport) + } else { + log.warn("Stopping reconnect attempts.") + log.debug("Server locator is closed or garbage collected. Proxy may have been closed during reconnect.") + break + } + } catch (e: ActiveMQException) { + try { + Thread.sleep(retryInterval.toMillis()) + } catch (e: InterruptedException) {} + // could not connect, try with next server transport + reconnectAttempts-- + retryInterval = minOf(maxRetryInterval, retryInterval.times(rpcConfiguration.connectionRetryIntervalMultiplier.toLong())) + continue + } + + log.debug("Connected successfully using ${transport.params}") + log.info("RPC server available.") + sessionFactory!!.addFailoverListener(this::haFailoverHandler) + initSessions() + startSessions() + sendingEnabled.set(true) + break + } + + if (reconnectAttempts == 0 || sessionFactory == null) + log.error("Could not reconnect to the RPC server.") + } + + private fun initSessions() { + producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) + rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME) + consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) + consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress) + rpcConsumer = consumerSession!!.createConsumer(clientAddress) + rpcConsumer!!.setMessageHandler(this::artemisMessageHandler) + } + + private fun startSessions() { + consumerSession!!.start() + producerSession!!.start() + } + + private fun haFailoverHandler(event: FailoverEventType) { + if (event == FailoverEventType.FAILURE_DETECTED) { + log.warn("Connection failure. Attempting to reconnect using back-up addresses.") + cleanUpOnConnectionLoss() + sessionFactory?.apply { + connection.destroy() + cleanup() + close() + } + haFailoverThread = Thread.currentThread() + attemptReconnect() + } + /* Other events are not considered as reconnection is not done by Artemis */ + } + private fun failoverHandler(event: FailoverEventType) { when (event) { FailoverEventType.FAILURE_DETECTED -> { - sendingEnabled.set(false) - - log.warn("Terminating observables.") - val m = observableContext.observableMap.asMap() - m.keys.forEach { k -> - observationExecutorPool.run(k) { - m[k]?.onError(RPCException("Connection failure detected.")) - } - } - observableContext.observableMap.invalidateAll() - - rpcReplyMap.forEach { _, replyFuture -> - replyFuture.setException(RPCException("Connection failure detected.")) - } - - rpcReplyMap.clear() - callSiteMap?.clear() + cleanUpOnConnectionLoss() } FailoverEventType.FAILOVER_COMPLETED -> { @@ -435,6 +504,25 @@ class RPCClientProxyHandler( } } } + + private fun cleanUpOnConnectionLoss() { + sendingEnabled.set(false) + log.warn("Terminating observables.") + val m = observableContext.observableMap.asMap() + m.keys.forEach { k -> + observationExecutorPool.run(k) { + m[k]?.onError(RPCException("Connection failure detected.")) + } + } + observableContext.observableMap.invalidateAll() + + rpcReplyMap.forEach { _, replyFuture -> + replyFuture.setException(RPCException("Connection failure detected.")) + } + + rpcReplyMap.clear() + callSiteMap?.clear() + } } private typealias RpcObservableMap = Cache>> diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 042a38ce67..5e04f66414 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -75,6 +75,13 @@ inline fun RPCDriverDSL.startRpcClient( configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ) = startRpcClient(I::class.java, rpcAddress, username, password, configuration) +inline fun RPCDriverDSL.startRpcClient( + haAddressPool: List, + username: String = rpcTestUser.username, + password: String = rpcTestUser.password, + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default +) = startRpcClient(I::class.java, haAddressPool, username, password, configuration) + data class RpcBrokerHandle( val hostAndPort: NetworkHostAndPort?, /** null if this is an InVM broker */ @@ -336,6 +343,32 @@ data class RPCDriverDSL( } } + /** + * Starts a Netty RPC client. + * + * @param rpcOpsClass The [Class] of the RPC interface. + * @param haAddressPool The addresses of the RPC servers(configured in HA mode) to connect to. + * @param username The username to authenticate with. + * @param password The password to authenticate with. + * @param configuration The RPC client configuration. + */ + fun startRpcClient( + rpcOpsClass: Class, + haAddressPool: List, + username: String = rpcTestUser.username, + password: String = rpcTestUser.password, + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + ): CordaFuture { + return driverDSL.executorService.fork { + val client = RPCClient(haAddressPool, null, configuration) + val connection = client.start(rpcOpsClass, username, password, externalTrace) + driverDSL.shutdownManager.registerShutdown { + connection.close() + } + connection.proxy + } + } + /** * Starts a Netty RPC client in a new JVM process that calls random RPCs with random arguments. * From be083d6763da463dbc5be6b7e81b9d6df752f5e7 Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Sun, 22 Apr 2018 15:04:19 +0100 Subject: [PATCH 2/4] Added helper method for creating tcp transports from a list of host:port --- .../client/rpc/internal/RPCClientProxyHandler.kt | 2 +- .../net/corda/nodeapi/ArtemisTcpTransport.kt | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 8852a096cd..458bea35ef 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -417,7 +417,7 @@ class RPCClientProxyHandler( } private fun attemptReconnect() { - var reconnectAttempts = rpcConfiguration.maxReconnectAttempts * serverLocator.staticTransportConfigurations.size + var reconnectAttempts = rpcConfiguration.maxReconnectAttempts.times(serverLocator.staticTransportConfigurations.size) var retryInterval = rpcConfiguration.connectionRetryInterval val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt index a9438611a4..40333af921 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt @@ -97,5 +97,19 @@ class ArtemisTcpTransport { } return TransportConfiguration(factoryName, options) } + + /** Create as list of [TransportConfiguration]. **/ + fun tcpTransportsFromList( + direction: ConnectionDirection, + hostAndPortList: List, + config: SSLConfiguration?, + enableSSL: Boolean = true): List{ + val tcpTransports = ArrayList(hostAndPortList.size) + hostAndPortList.forEach { + tcpTransports.add(tcpTransport(direction, it, config, enableSSL)) + } + + return tcpTransports + } } } From e51878417b81dd882cc846d3d8f9aa977752e51b Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Mon, 23 Apr 2018 11:20:08 +0100 Subject: [PATCH 3/4] Address PR comments --- .../net/corda/client/rpc/RPCStabilityTests.kt | 2 +- .../net/corda/client/rpc/CordaRPCClient.kt | 20 ++----------------- .../rpc/internal/RPCClientProxyHandler.kt | 14 +++++++++---- 3 files changed, 13 insertions(+), 23 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index d9602f0c43..ada3e1e37c 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -388,7 +388,7 @@ class RPCStabilityTests { servers[response]!!.shutdown() servers.remove(response) - //failover will take some time + // Failover will take some time. while (true) { try { response = client.serverId() diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index a378808068..cf82f270c9 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -93,7 +93,8 @@ interface CordaRPCClientConfiguration { * [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw * [RPCException] and previously returned observables will call onError(). * - * If the client was created using a list of hosts, automatic failover will occur(the servers have to be started in HA mode) + * If the client was created using a list of hosts, automatic failover will occur (the servers have to be started in + * HA mode). * * @param hostAndPort The network address to connect to. * @param configuration An optional configuration used to tweak client behaviour. @@ -130,14 +131,6 @@ class CordaRPCClient private constructor( return CordaRPCClient(hostAndPort, configuration, sslConfiguration) } - internal fun createWithSsl( - haAddressPool: List, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), - sslConfiguration: SSLConfiguration? = null - ): CordaRPCClient { - return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, null, haAddressPool) - } - internal fun createWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), @@ -146,15 +139,6 @@ class CordaRPCClient private constructor( ): CordaRPCClient { return CordaRPCClient(hostAndPort, configuration, sslConfiguration, classLoader) } - - internal fun createWithSslAndClassLoader( - haAddressPool: List, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), - sslConfiguration: SSLConfiguration? = null, - classLoader: ClassLoader? = null - ): CordaRPCClient { - return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, classLoader, haAddressPool) - } } init { diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 458bea35ef..2ddb44cbad 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -70,6 +70,12 @@ import kotlin.reflect.jvm.javaMethod * unsubscribing from the [Observable], or if the [Observable] is garbage collected the client will eventually * automatically signal the server. This is done using a cache that holds weak references to the [UnicastSubject]s. * The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor]. + * + * The client will attempt to failover in case the server become unreachable. Depending on the [ServerLocataor] instance + * passed in the constructor, failover is either handle at Artemis level or client level. If only one transport + * was used to create the [ServerLocator], failover is handled by Artemis (retrying based on [CordaRPCClientConfiguration]. + * If a list of transport configurations was used, failover is handled locally. Artemis is able to do it, however the + * brokers on server side need to be configured in HA mode and the [ServerLocator] needs to be created with HA as well. */ class RPCClientProxyHandler( private val rpcConfiguration: CordaRPCClientConfiguration, @@ -175,7 +181,7 @@ class RPCClientProxyHandler( private val deduplicationSequenceNumber = AtomicLong(0) private val sendingEnabled = AtomicBoolean(true) - // used to interrupt failover thread (i.e. client is closed while failing over) + // Used to interrupt failover thread (i.e. client is closed while failing over). private var haFailoverThread: Thread? = null /** @@ -440,13 +446,13 @@ class RPCClientProxyHandler( try { Thread.sleep(retryInterval.toMillis()) } catch (e: InterruptedException) {} - // could not connect, try with next server transport + // Could not connect, try with next server transport. reconnectAttempts-- retryInterval = minOf(maxRetryInterval, retryInterval.times(rpcConfiguration.connectionRetryIntervalMultiplier.toLong())) continue } - log.debug("Connected successfully using ${transport.params}") + log.debug("Connected successfully after $reconnectAttempts attempts using ${transport.params}.") log.info("RPC server available.") sessionFactory!!.addFailoverListener(this::haFailoverHandler) initSessions() @@ -485,7 +491,7 @@ class RPCClientProxyHandler( haFailoverThread = Thread.currentThread() attemptReconnect() } - /* Other events are not considered as reconnection is not done by Artemis */ + // Other events are not considered as reconnection is not done by Artemis. } private fun failoverHandler(event: FailoverEventType) { From b0d2a258c0fa9db726c8ecb9e85b93af019ae900 Mon Sep 17 00:00:00 2001 From: Chris Burlinchon Date: Mon, 16 Apr 2018 18:05:01 +0100 Subject: [PATCH 4/4] cherry-pick 7759fdbb71ea9b2021afd8af0ac05447c5305b3a --- .../net/corda/node/internal/AbstractNode.kt | 4 +-- .../security/RPCSecurityManagerImpl.kt | 3 +-- .../services/events/NodeSchedulerService.kt | 25 +------------------ .../services/messaging/P2PMessagingClient.kt | 5 ---- .../SingleThreadedStateMachineManager.kt | 14 ----------- .../statemachine/StateMachineManager.kt | 5 ---- ...FiberDeserializationCheckingInterceptor.kt | 1 - 7 files changed, 4 insertions(+), 53 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 2bae06e785..b20b0022f7 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -242,7 +242,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val notaryService = makeNotaryService(nodeServices, database) val smm = makeStateMachineManager(database) val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader) - val flowStarter = FlowStarterImpl(serverThread, smm, flowLogicRefFactory) + val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory) val schedulerService = NodeSchedulerService( platformClock, database, @@ -893,7 +893,7 @@ internal fun logVendorString(database: CordaPersistence, log: Logger) { } } -internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter { +internal class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter { override fun startFlow(logic: FlowLogic, context: InvocationContext, deduplicationHandler: DeduplicationHandler?): CordaFuture> { return smm.startFlow(logic, context, ourIdentity = null, deduplicationHandler = deduplicationHandler) } diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt index cf961636f9..4b9af66259 100644 --- a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt @@ -77,8 +77,7 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager { * Instantiate RPCSecurityManager initialised with users data from a list of [User] */ fun fromUserList(id: AuthServiceId, users: List) = - RPCSecurityManagerImpl( - AuthServiceConfig.fromUsers(users).copy(id = id)) + RPCSecurityManagerImpl(AuthServiceConfig.fromUsers(users).copy(id = id)) // Build internal Shiro securityManager instance private fun buildImpl(config: AuthServiceConfig): DefaultSecurityManager { diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index cea738df76..bfada7354d 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -178,29 +178,6 @@ class NodeSchedulerService(private val clock: CordaClock, } } - /** - * Stop scheduler service. - */ - fun stop() { - mutex.locked { - schedulerTimerExecutor.shutdown() - scheduledStatesQueue.clear() - scheduledStates.clear() - } - } - - /** - * Resume scheduler service after having called [stop]. - */ - fun resume() { - mutex.locked { - schedulerTimerExecutor = Executors.newSingleThreadExecutor() - scheduledStates.putAll(createMap()) - scheduledStatesQueue.addAll(scheduledStates.values) - rescheduleWakeUp() - } - } - override fun scheduleStateActivity(action: ScheduledStateRef) { log.trace { "Schedule $action" } val previousState = scheduledStates[action.ref] @@ -240,7 +217,7 @@ class NodeSchedulerService(private val clock: CordaClock, } } - private var schedulerTimerExecutor = Executors.newSingleThreadExecutor() + private val schedulerTimerExecutor = Executors.newSingleThreadExecutor() /** * This method first cancels the [java.util.concurrent.Future] for any pending action so that the * [awaitWithDeadline] used below drops through without running the action. We then create a new diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 189d5db813..88fa4d8e1f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -5,7 +5,6 @@ import com.codahale.metrics.MetricRegistry import net.corda.core.crypto.toStringShort import net.corda.core.identity.CordaX500Name import net.corda.core.internal.ThreadBox -import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient @@ -333,8 +332,6 @@ class P2PMessagingClient(val config: NodeConfiguration, private val shutdownLatch = CountDownLatch(1) - var runningFuture = openFuture() - /** * Starts the p2p event loop: this method only returns once [stop] has been called. */ @@ -345,7 +342,6 @@ class P2PMessagingClient(val config: NodeConfiguration, check(started) { "start must be called first" } check(!running) { "run can't be called twice" } running = true - runningFuture.set(Unit) // If it's null, it means we already called stop, so return immediately. if (p2pConsumer == null) { return @@ -457,7 +453,6 @@ class P2PMessagingClient(val config: NodeConfiguration, check(started) val prevRunning = running running = false - runningFuture = openFuture() networkChangeSubscription?.unsubscribe() require(p2pConsumer != null, { "stop can't be called twice" }) require(producer != null, { "stop can't be called twice" }) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 4c579c9ba8..f7460f4bab 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -134,20 +134,6 @@ class SingleThreadedStateMachineManager( } } - override fun resume() { - fiberDeserializationChecker?.start(checkpointSerializationContext!!) - val fibers = restoreFlowsFromCheckpoints() - Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> - (fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable) - } - serviceHub.networkMapCache.nodeReady.then { - resumeRestoredFlows(fibers) - } - mutex.locked { - stopping = false - } - } - override fun > findStateMachines(flowClass: Class): List>> { return mutex.locked { flows.values.mapNotNull { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 7d645407a1..96edb248b0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -39,11 +39,6 @@ interface StateMachineManager { */ fun stop(allowedUnsuspendedFiberCount: Int) - /** - * Resume state machine manager after having called [stop]. - */ - fun resume() - /** * Starts a new flow. * diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt index 2a9b96cc5b..cbde382f4d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt @@ -89,7 +89,6 @@ class FiberDeserializationChecker { fun stop(): Boolean { jobQueue.add(Job.Finish) checkerThread?.join() - checkerThread = null return foundUnrestorableFibers } }