ENT-1775: reworked client to handle failover in HA mode instead of relying on artemis (#759)

* ENT-1775: reworked client to handle failover in HA mode instead of relying on artemis

* ENT-1775: address PR comments
This commit is contained in:
bpaunescu 2018-04-20 10:52:00 +01:00 committed by GitHub
parent 3931f13852
commit c0f254e3bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 241 additions and 30 deletions

View File

@ -336,6 +336,96 @@ class RPCStabilityTests {
}
}
interface ServerOps : RPCOps {
fun serverId(): String
}
@Test
fun `client connects to first available server`() {
rpcDriver {
val ops = object : ServerOps {
override val protocolVersion = 0
override fun serverId() = "server"
}
val serverFollower = shutdownManager.follower()
val serverAddress = startRpcServer<RPCOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ServerOps>(listOf(NetworkHostAndPort("localhost", 12345), serverAddress, NetworkHostAndPort("localhost", 54321))).getOrThrow()
clientFollower.unfollow()
assertEquals("server", client.serverId())
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}
@Test
fun `3 server failover`() {
rpcDriver {
val ops1 = object : ServerOps {
override val protocolVersion = 0
override fun serverId() = "server1"
}
val ops2 = object : ServerOps {
override val protocolVersion = 0
override fun serverId() = "server2"
}
val ops3 = object : ServerOps {
override val protocolVersion = 0
override fun serverId() = "server3"
}
val serverFollower1 = shutdownManager.follower()
val server1 = startRpcServer<RPCOps>(ops = ops1).getOrThrow()
serverFollower1.unfollow()
val serverFollower2 = shutdownManager.follower()
val server2 = startRpcServer<RPCOps>(ops = ops2).getOrThrow()
serverFollower2.unfollow()
val serverFollower3 = shutdownManager.follower()
val server3 = startRpcServer<RPCOps>(ops = ops3).getOrThrow()
serverFollower3.unfollow()
val servers = mutableMapOf("server1" to serverFollower1, "server2" to serverFollower2, "server3" to serverFollower3)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ServerOps>(listOf(server1.broker.hostAndPort!!, server2.broker.hostAndPort!!, server3.broker.hostAndPort!!)).getOrThrow()
clientFollower.unfollow()
var response = client.serverId()
assertTrue(servers.containsKey(response))
servers[response]!!.shutdown()
servers.remove(response)
//failover will take some time
while (true) {
try {
response = client.serverId()
break
} catch (e: RPCException) {}
}
assertTrue(servers.containsKey(response))
servers[response]!!.shutdown()
servers.remove(response)
while (true) {
try {
response = client.serverId()
break
} catch (e: RPCException) {}
}
assertTrue(servers.containsKey(response))
servers[response]!!.shutdown()
servers.remove(response)
assertTrue(servers.isEmpty())
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}
interface TrackSubscriberOps : RPCOps {
fun subscribe(): Observable<Unit>
}

View File

@ -110,7 +110,7 @@ interface CordaRPCClientConfiguration {
* @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server.
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, no failover will occur and the client will try reconnecting to the initial server it connected to.
* HA mode, the client will round-robin from the beginning of the list and try all servers.
*/
class CordaRPCClient private constructor(
private val hostAndPort: NetworkHostAndPort,
@ -125,7 +125,7 @@ class CordaRPCClient private constructor(
/**
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, no failover will occur and the client will try reconnecting to the initial server it connected to.
* HA mode, the client will round-robin from the beginning of the list and try all servers.
* @param configuration An optional configuration used to tweak client behaviour.
*/
@JvmOverloads

View File

@ -106,12 +106,12 @@ class RPCClient<I : RPCOps>(
val serverLocator = (if (haPoolTransportConfigurations.isEmpty()) {
ActiveMQClient.createServerLocatorWithoutHA(transport)
} else {
ActiveMQClient.createServerLocatorWithHA(*haPoolTransportConfigurations.toTypedArray())
ActiveMQClient.createServerLocatorWithoutHA(*haPoolTransportConfigurations.toTypedArray())
}).apply {
retryInterval = rpcConfiguration.connectionRetryInterval.toMillis()
retryIntervalMultiplier = rpcConfiguration.connectionRetryIntervalMultiplier
maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval.toMillis()
reconnectAttempts = rpcConfiguration.maxReconnectAttempts
reconnectAttempts = if (haPoolTransportConfigurations.isEmpty()) rpcConfiguration.maxReconnectAttempts else 0
minLargeMessageSize = rpcConfiguration.maxFileSize
isUseGlobalPools = nodeSerializationEnv != null
}

View File

@ -30,6 +30,7 @@ import net.corda.core.context.Trace.InvocationId
import net.corda.core.internal.LazyStickyPool
import net.corda.core.internal.LifeCycle
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.times
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.serialize
@ -39,6 +40,7 @@ import net.corda.core.utilities.debug
import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.internal.DeduplicationChecker
import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
@ -183,6 +185,8 @@ class RPCClientProxyHandler(
private val deduplicationSequenceNumber = AtomicLong(0)
private val sendingEnabled = AtomicBoolean(true)
// used to interrupt failover thread (i.e. client is closed while failing over)
private var haFailoverThread: Thread? = null
/**
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
@ -202,17 +206,22 @@ class RPCClientProxyHandler(
rpcConfiguration.reapInterval.toMillis(),
TimeUnit.MILLISECONDS
)
// Create a session factory using the first available server. If more than one transport configuration was
// used when creating the server locator, every one will be tried during failover. The locator will round-robin
// through the available transport configurations with the starting position being generated randomly.
// If there's only one available, that one will be retried continuously as configured in rpcConfiguration.
// There is no failover on first attempt, meaning that if a connection cannot be established, the serverLocator
// will try another transport if it exists or throw an exception otherwise.
sessionFactory = serverLocator.createSessionFactory()
producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME)
consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
rpcConsumer = consumerSession!!.createConsumer(clientAddress)
rpcConsumer!!.setMessageHandler(this::artemisMessageHandler)
producerSession!!.addFailoverListener(this::failoverHandler)
// Depending on how the client is constructed, connection failure is treated differently
if (serverLocator.staticTransportConfigurations.size == 1) {
sessionFactory!!.addFailoverListener(this::failoverHandler)
} else {
sessionFactory!!.addFailoverListener(this::haFailoverHandler)
}
initSessions()
lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET)
consumerSession!!.start()
producerSession!!.start()
startSessions()
}
// This is the general function that transforms a client side RPC to internal Artemis messages.
@ -351,6 +360,10 @@ class RPCClientProxyHandler(
* @param notify whether to notify observables or not.
*/
private fun close(notify: Boolean = true) {
haFailoverThread?.apply {
interrupt()
join(1000)
}
sessionFactory?.close()
reaperScheduledFuture?.cancel(false)
observableContext.observableMap.invalidateAll()
@ -413,26 +426,82 @@ class RPCClientProxyHandler(
}
}
private fun attemptReconnect() {
var reconnectAttempts = rpcConfiguration.maxReconnectAttempts * serverLocator.staticTransportConfigurations.size
var retryInterval = rpcConfiguration.connectionRetryInterval
val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval
var transportIterator = serverLocator.staticTransportConfigurations.iterator()
while (transportIterator.hasNext() && reconnectAttempts != 0) {
val transport = transportIterator.next()
if (!transportIterator.hasNext())
transportIterator = serverLocator.staticTransportConfigurations.iterator()
log.debug("Trying to connect using ${transport.params}")
try {
if (serverLocator != null && !serverLocator.isClosed) {
sessionFactory = serverLocator.createSessionFactory(transport)
} else {
log.warn("Stopping reconnect attempts.")
log.debug("Server locator is closed or garbage collected. Proxy may have been closed during reconnect.")
break
}
} catch (e: ActiveMQException) {
try {
Thread.sleep(retryInterval.toMillis())
} catch (e: InterruptedException) {}
// could not connect, try with next server transport
reconnectAttempts--
retryInterval = minOf(maxRetryInterval, retryInterval.times(rpcConfiguration.connectionRetryIntervalMultiplier.toLong()))
continue
}
log.debug("Connected successfully using ${transport.params}")
log.info("RPC server available.")
sessionFactory!!.addFailoverListener(this::haFailoverHandler)
initSessions()
startSessions()
sendingEnabled.set(true)
break
}
if (reconnectAttempts == 0 || sessionFactory == null)
log.error("Could not reconnect to the RPC server.")
}
private fun initSessions() {
producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME)
consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
rpcConsumer = consumerSession!!.createConsumer(clientAddress)
rpcConsumer!!.setMessageHandler(this::artemisMessageHandler)
}
private fun startSessions() {
consumerSession!!.start()
producerSession!!.start()
}
private fun haFailoverHandler(event: FailoverEventType) {
if (event == FailoverEventType.FAILURE_DETECTED) {
log.warn("Connection failure. Attempting to reconnect using back-up addresses.")
cleanUpOnConnectionLoss()
sessionFactory?.apply {
connection.destroy()
cleanup()
close()
}
haFailoverThread = Thread.currentThread()
attemptReconnect()
}
/* Other events are not considered as reconnection is not done by Artemis */
}
private fun failoverHandler(event: FailoverEventType) {
when (event) {
FailoverEventType.FAILURE_DETECTED -> {
sendingEnabled.set(false)
log.warn("Terminating observables.")
val m = observableContext.observableMap.asMap()
m.keys.forEach { k ->
observationExecutorPool.run(k) {
m[k]?.onError(RPCException("Connection failure detected."))
}
}
observableContext.observableMap.invalidateAll()
rpcReplyMap.forEach { _, replyFuture ->
replyFuture.setException(RPCException("Connection failure detected."))
}
rpcReplyMap.clear()
callSiteMap?.clear()
cleanUpOnConnectionLoss()
}
FailoverEventType.FAILOVER_COMPLETED -> {
@ -445,6 +514,25 @@ class RPCClientProxyHandler(
}
}
}
private fun cleanUpOnConnectionLoss() {
sendingEnabled.set(false)
log.warn("Terminating observables.")
val m = observableContext.observableMap.asMap()
m.keys.forEach { k ->
observationExecutorPool.run(k) {
m[k]?.onError(RPCException("Connection failure detected."))
}
}
observableContext.observableMap.invalidateAll()
rpcReplyMap.forEach { _, replyFuture ->
replyFuture.setException(RPCException("Connection failure detected."))
}
rpcReplyMap.clear()
callSiteMap?.clear()
}
}
private typealias RpcObservableMap = Cache<InvocationId, UnicastSubject<Notification<*>>>

View File

@ -85,6 +85,13 @@ inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
) = startRpcClient(I::class.java, rpcAddress, username, password, configuration)
inline fun<reified I : RPCOps> RPCDriverDSL.startRpcClient(
haAddressPool: List<NetworkHostAndPort>,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
) = startRpcClient(I::class.java, haAddressPool, username, password, configuration)
data class RpcBrokerHandle(
val hostAndPort: NetworkHostAndPort?,
/** null if this is an InVM broker */
@ -346,6 +353,32 @@ data class RPCDriverDSL(
}
}
/**
* Starts a Netty RPC client.
*
* @param rpcOpsClass The [Class] of the RPC interface.
* @param haAddressPool The addresses of the RPC servers(configured in HA mode) to connect to.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param configuration The RPC client configuration.
*/
fun <I : RPCOps> startRpcClient(
rpcOpsClass: Class<I>,
haAddressPool: List<NetworkHostAndPort>,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
): CordaFuture<I> {
return driverDSL.executorService.fork {
val client = RPCClient<I>(haAddressPool, null, configuration)
val connection = client.start(rpcOpsClass, username, password, externalTrace)
driverDSL.shutdownManager.registerShutdown {
connection.close()
}
connection.proxy
}
}
/**
* Starts a Netty RPC client in a new JVM process that calls random RPCs with random arguments.
*