CORDA-3880: Streamline re-connection logic in RPCClientProxyHandler (#6426)

* CORDA-3880: Streamline re-connection logic in RPCClientProxyHandler

* CORDA-3880: Address PR review comments from @dimosr

* CORDA-3880: Explicitly mention contract around `maxReconnectAttempts`

* CORDA-3880: Handle `maxReconnectAttempts = 0` and do not re-connect
This commit is contained in:
Viktor Kolomeyko 2020-07-03 09:29:43 +01:00 committed by GitHub
parent 6bc2c79e23
commit a2058490ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 28 deletions

View File

@ -158,7 +158,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
open val connectionRetryIntervalMultiplier: Double = 1.5, open val connectionRetryIntervalMultiplier: Double = 1.5,
/** /**
* Maximum reconnect attempts on failover or disconnection. The default is -1 which means unlimited. * Maximum reconnect attempts on failover or disconnection.
* Any negative value would mean that there will be an infinite number of reconnect attempts.
*/ */
open val maxReconnectAttempts: Int = unlimitedReconnectAttempts, open val maxReconnectAttempts: Int = unlimitedReconnectAttempts,

View File

@ -76,10 +76,10 @@ import kotlin.reflect.jvm.javaMethod
* forwarded to the [UnicastSubject]. Note that the observations themselves may contain further [Observable]s, which are * forwarded to the [UnicastSubject]. Note that the observations themselves may contain further [Observable]s, which are
* handled in the same way. * handled in the same way.
* *
* To do the above we take advantage of Kryo's datastructure traversal. When the client is deserialising a message from * To do the above we take advantage of serialisation data structure traversal. When the client is deserialising a message from
* the server that may contain Observables it is supplied with an [ObservableContext] that exposes the map used to demux * the server that may contain [Observable]s, it is supplied with an [ObservableContext] that exposes the map used to demux
* the observations. When an [Observable] is encountered during traversal a new [UnicastSubject] is added to the map and * the observations. When a new [Observable] is encountered during traversal a new [UnicastSubject] is added to the map and
* we carry on. Each observation later contains the corresponding Observable ID, and we just forward that to the * we carry on. Each observation later contains the corresponding [Observable] ID, and we just forward that to the
* associated [UnicastSubject]. * associated [UnicastSubject].
* *
* The client may signal that it no longer consumes a particular [Observable]. This may be done explicitly by * The client may signal that it no longer consumes a particular [Observable]. This may be done explicitly by
@ -88,12 +88,12 @@ import kotlin.reflect.jvm.javaMethod
* The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor]. * The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor].
* *
* The client will attempt to failover in case the server become unreachable. Depending on the [ServerLocator] instance * The client will attempt to failover in case the server become unreachable. Depending on the [ServerLocator] instance
* passed in the constructor, failover is either handle at Artemis level or client level. If only one transport * passed in the constructor, failover is either handled at Artemis level or client level. If only one transport
* was used to create the [ServerLocator], failover is handled by Artemis (retrying based on [CordaRPCClientConfiguration]. * was used to create the [ServerLocator], failover is handled by Artemis (retrying based on [CordaRPCClientConfiguration].
* If a list of transport configurations was used, failover is handled locally. Artemis is able to do it, however the * If a list of transport configurations was used, failover is handled locally. Artemis is able to do it, however the
* brokers on server side need to be configured in HA mode and the [ServerLocator] needs to be created with HA as well. * brokers on server side need to be configured in HA mode and the [ServerLocator] needs to be created with HA as well.
*/ */
class RPCClientProxyHandler( internal class RPCClientProxyHandler(
private val rpcConfiguration: CordaRPCClientConfiguration, private val rpcConfiguration: CordaRPCClientConfiguration,
private val rpcUsername: String, private val rpcUsername: String,
private val rpcPassword: String, private val rpcPassword: String,
@ -247,7 +247,7 @@ class RPCClientProxyHandler(
try { try {
sessionFactory = serverLocator.createSessionFactory() sessionFactory = serverLocator.createSessionFactory()
} catch (e: ActiveMQNotConnectedException) { } catch (e: ActiveMQNotConnectedException) {
throw (RPCException("Cannot connect to server(s). Tried with all available servers.", e)) throw RPCException("Cannot connect to server(s). Tried with all available servers.", e)
} }
// Depending on how the client is constructed, connection failure is treated differently // Depending on how the client is constructed, connection failure is treated differently
if (serverLocator.staticTransportConfigurations.size == 1) { if (serverLocator.staticTransportConfigurations.size == 1) {
@ -380,9 +380,11 @@ class RPCClientProxyHandler(
is RPCApi.ServerToClient.Observation -> { is RPCApi.ServerToClient.Observation -> {
val observable: UnicastSubject<Notification<*>>? = observableContext.observableMap.getIfPresent(serverToClient.id) val observable: UnicastSubject<Notification<*>>? = observableContext.observableMap.getIfPresent(serverToClient.id)
if (observable == null) { if (observable == null) {
log.debug("Observation ${serverToClient.content} arrived to unknown Observable with ID ${serverToClient.id}. " + log.debug {
"This may be due to an observation arriving before the server was " + "Observation ${serverToClient.content} arrived to unknown Observable with ID ${serverToClient.id}. " +
"notified of observable shutdown") "This may be due to an observation arriving before the server was " +
"notified of observable shutdown"
}
} else { } else {
// We schedule the onNext() on an executor sticky-pooled based on the Observable ID. // We schedule the onNext() on an executor sticky-pooled based on the Observable ID.
observationExecutorPool.run(serverToClient.id) { executor -> observationExecutorPool.run(serverToClient.id) { executor ->
@ -461,7 +463,7 @@ class RPCClientProxyHandler(
} }
} }
observableContext.observableMap.invalidateAll() observableContext.observableMap.invalidateAll()
rpcReplyMap.forEach { _, replyFuture -> rpcReplyMap.forEach { (_, replyFuture) ->
replyFuture.setException(ConnectionFailureException()) replyFuture.setException(ConnectionFailureException())
} }
@ -528,23 +530,26 @@ class RPCClientProxyHandler(
} }
private fun attemptReconnect() { private fun attemptReconnect() {
var reconnectAttempts = rpcConfiguration.maxReconnectAttempts.times(serverLocator.staticTransportConfigurations.size) // This can be a negative number as `rpcConfiguration.maxReconnectAttempts = -1` means infinite number of re-connects
val maxReconnectCount = rpcConfiguration.maxReconnectAttempts.times(serverLocator.staticTransportConfigurations.size)
log.debug { "maxReconnectCount = $maxReconnectCount" }
var reconnectAttempt = 1
var retryInterval = rpcConfiguration.connectionRetryInterval var retryInterval = rpcConfiguration.connectionRetryInterval
val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval
var transportIterator = serverLocator.staticTransportConfigurations.iterator() fun shouldRetry(reconnectAttempt: Int) =
while (transportIterator.hasNext() && reconnectAttempts != 0) { if (maxReconnectCount < 0) true else reconnectAttempt <= maxReconnectCount
val transport = transportIterator.next()
if (!transportIterator.hasNext())
transportIterator = serverLocator.staticTransportConfigurations.iterator()
log.debug("Trying to connect using ${transport.params}") while (shouldRetry(reconnectAttempt)) {
val transport = serverLocator.staticTransportConfigurations.let { it[(reconnectAttempt - 1) % it.size] }
log.debug { "Trying to connect using ${transport.params}" }
try { try {
if (!serverLocator.isClosed) { if (!serverLocator.isClosed) {
sessionFactory = serverLocator.createSessionFactory(transport) sessionFactory = serverLocator.createSessionFactory(transport)
} else { } else {
log.warn("Stopping reconnect attempts.") log.warn("Stopping reconnect attempts.")
log.debug("Server locator is closed or garbage collected. Proxy may have been closed during reconnect.") log.debug { "Server locator is closed or garbage collected. Proxy may have been closed during reconnect." }
break break
} }
} catch (e: ActiveMQException) { } catch (e: ActiveMQException) {
@ -552,12 +557,12 @@ class RPCClientProxyHandler(
Thread.sleep(retryInterval.toMillis()) Thread.sleep(retryInterval.toMillis())
} catch (e: InterruptedException) {} } catch (e: InterruptedException) {}
// Could not connect, try with next server transport. // Could not connect, try with next server transport.
reconnectAttempts-- reconnectAttempt++
retryInterval = minOf(maxRetryInterval, retryInterval.times(rpcConfiguration.connectionRetryIntervalMultiplier.toLong())) retryInterval = minOf(maxRetryInterval, retryInterval.times(rpcConfiguration.connectionRetryIntervalMultiplier.toLong()))
continue continue
} }
log.debug("Connected successfully after $reconnectAttempts attempts using ${transport.params}.") log.debug { "Connected successfully after $reconnectAttempt attempts using ${transport.params}." }
log.info("RPC server available.") log.info("RPC server available.")
sessionFactory!!.addFailoverListener(this::haFailoverHandler) sessionFactory!!.addFailoverListener(this::haFailoverHandler)
initSessions() initSessions()
@ -566,8 +571,12 @@ class RPCClientProxyHandler(
break break
} }
if (reconnectAttempts == 0 || sessionFactory == null) val maxReconnectReached = !shouldRetry(reconnectAttempt)
log.error("Could not reconnect to the RPC server.") if (maxReconnectReached || sessionFactory == null) {
val errMessage = "Could not reconnect to the RPC server after trying $reconnectAttempt times." +
if (sessionFactory != null) "" else " It was never possible to to establish connection with any of the endpoints."
log.error(errMessage)
}
} }
private fun initSessions() { private fun initSessions() {
@ -620,10 +629,11 @@ class RPCClientProxyHandler(
sendingEnabled.set(false) sendingEnabled.set(false)
log.warn("Terminating observables.") log.warn("Terminating observables.")
val m = observableContext.observableMap.asMap() val m = observableContext.observableMap.asMap()
val connectionFailureException = ConnectionFailureException()
m.keys.forEach { k -> m.keys.forEach { k ->
observationExecutorPool.run(k) { observationExecutorPool.run(k) {
try { try {
m[k]?.onError(ConnectionFailureException()) m[k]?.onError(connectionFailureException)
} catch (e: Exception) { } catch (e: Exception) {
log.error("Unexpected exception when RPC connection failure handling", e) log.error("Unexpected exception when RPC connection failure handling", e)
} }
@ -631,8 +641,8 @@ class RPCClientProxyHandler(
} }
observableContext.observableMap.invalidateAll() observableContext.observableMap.invalidateAll()
rpcReplyMap.forEach { _, replyFuture -> rpcReplyMap.forEach { (_, replyFuture) ->
replyFuture.setException(ConnectionFailureException()) replyFuture.setException(connectionFailureException)
} }
rpcReplyMap.clear() rpcReplyMap.clear()
@ -666,5 +676,5 @@ class RPCClientProxyHandler(
} }
} }
private typealias RpcReplyMap = ConcurrentHashMap<Trace.InvocationId, SettableFuture<Any?>> private typealias RpcReplyMap = ConcurrentHashMap<InvocationId, SettableFuture<Any?>>