From 0b4e3ea5c73d12d2cf6bc33c9b4fd7e51da62ef9 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 16 May 2018 14:08:12 +0100 Subject: [PATCH] Add exponential backoff to bridge Artemis reconnection --- .../net/corda/bridge/services/api/BridgeConfiguration.kt | 6 ++++-- .../services/artemis/BridgeArtemisConnectionServiceImpl.kt | 6 +++++- .../corda/bridge/services/config/BridgeConfigurationImpl.kt | 3 ++- bridge/src/main/resources/bridgedefault.conf | 3 ++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt b/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt index a7e37f524f..e1902d0244 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/api/BridgeConfiguration.kt @@ -110,8 +110,10 @@ interface BridgeConfiguration : NodeSSLConfiguration { val haConfig: BridgeHAConfig? val networkParametersPath: Path val enableAMQPPacketTrace: Boolean - // Reconnect to artemis after [artemisReconnectionInterval] ms the default value is 5000 ms. - val artemisReconnectionInterval: Int + // Initial reconnect interval for link to artemis after [artemisReconnectionIntervalMin] ms the default value is 5000 ms. + val artemisReconnectionIntervalMin: Int + // Slowest Artemis reconnect interval after exponential backoff applied. The default value is 60000 ms. + val artemisReconnectionIntervalMax: Int // The period to wait for clean shutdown of remote components // e.g links to the Float Outer, or Artemis sessions, before the process continues shutting down anyway. // Default value is 1000 ms. diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt index 7a29192f7f..2a215d699a 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/artemis/BridgeArtemisConnectionServiceImpl.kt @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.client.FailoverEventType import org.apache.activemq.artemis.api.core.client.ServerLocator import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import rx.Subscription +import java.lang.Long.min import java.util.concurrent.CountDownLatch class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration, @@ -121,6 +122,7 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration, private fun artemisReconnectionLoop() { var tcpIndex = 0 + var reconnectInterval = conf.artemisReconnectionIntervalMin.toLong() while (state.locked { running }) { val locator = state.locked { locator } if (locator == null) { @@ -154,6 +156,7 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration, log.info("Session created") val newProducer = newSession.createProducer() state.locked { + reconnectInterval = conf.artemisReconnectionIntervalMin.toLong() started = ArtemisMessagingClient.Started(locator, newSessionFactory, newSession, newProducer) } stateHelper.active = true @@ -174,10 +177,11 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration, try { // Sleep for a short while before attempting reconnect - Thread.sleep(conf.artemisReconnectionInterval.toLong()) + Thread.sleep(reconnectInterval) } catch (ex: InterruptedException) { // ignore } + reconnectInterval = min(2L * reconnectInterval, conf.artemisReconnectionIntervalMax.toLong()) } log.info("Ended Artemis Connector Thread") } diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt index ec3ab87b18..b563d85af3 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/config/BridgeConfigurationImpl.kt @@ -69,7 +69,8 @@ data class BridgeConfigurationImpl( override val floatOuterConfig: FloatOuterConfigurationImpl?, override val haConfig: BridgeHAConfigImpl?, override val enableAMQPPacketTrace: Boolean, - override val artemisReconnectionInterval: Int = 5000, + override val artemisReconnectionIntervalMin: Int = 5000, + override val artemisReconnectionIntervalMax: Int = 60000, override val politeShutdownPeriod: Int = 1000, override val whitelistedHeaders: List = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList() ) : BridgeConfiguration { diff --git a/bridge/src/main/resources/bridgedefault.conf b/bridge/src/main/resources/bridgedefault.conf index 3e36072f2f..307f59aec0 100644 --- a/bridge/src/main/resources/bridgedefault.conf +++ b/bridge/src/main/resources/bridgedefault.conf @@ -10,6 +10,7 @@ keyStorePassword = "cordacadevpass" trustStorePassword = "trustpass" enableAMQPPacketTrace = false -artemisReconnectionInterval = 5000 +artemisReconnectionIntervalMin = 5000 +artemisReconnectionIntervalMax = 60000 politeShutdownPeriod = 1000 crlCheckSoftFail = true \ No newline at end of file