mirror of
https://github.com/corda/corda.git
synced 2025-05-07 02:58:36 +00:00
Add exponential backoff to bridge Artemis reconnection
This commit is contained in:
parent
ca89854493
commit
0b4e3ea5c7
@ -110,8 +110,10 @@ interface BridgeConfiguration : NodeSSLConfiguration {
|
|||||||
val haConfig: BridgeHAConfig?
|
val haConfig: BridgeHAConfig?
|
||||||
val networkParametersPath: Path
|
val networkParametersPath: Path
|
||||||
val enableAMQPPacketTrace: Boolean
|
val enableAMQPPacketTrace: Boolean
|
||||||
// Reconnect to artemis after [artemisReconnectionInterval] ms the default value is 5000 ms.
|
// Initial reconnect interval for link to artemis after [artemisReconnectionIntervalMin] ms the default value is 5000 ms.
|
||||||
val artemisReconnectionInterval: Int
|
val artemisReconnectionIntervalMin: Int
|
||||||
|
// Slowest Artemis reconnect interval after exponential backoff applied. The default value is 60000 ms.
|
||||||
|
val artemisReconnectionIntervalMax: Int
|
||||||
// The period to wait for clean shutdown of remote components
|
// The period to wait for clean shutdown of remote components
|
||||||
// e.g links to the Float Outer, or Artemis sessions, before the process continues shutting down anyway.
|
// e.g links to the Float Outer, or Artemis sessions, before the process continues shutting down anyway.
|
||||||
// Default value is 1000 ms.
|
// Default value is 1000 ms.
|
||||||
|
@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.client.FailoverEventType
|
|||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
|
import java.lang.Long.min
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
|
|
||||||
class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
|
class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
|
||||||
@ -121,6 +122,7 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
|
|||||||
|
|
||||||
private fun artemisReconnectionLoop() {
|
private fun artemisReconnectionLoop() {
|
||||||
var tcpIndex = 0
|
var tcpIndex = 0
|
||||||
|
var reconnectInterval = conf.artemisReconnectionIntervalMin.toLong()
|
||||||
while (state.locked { running }) {
|
while (state.locked { running }) {
|
||||||
val locator = state.locked { locator }
|
val locator = state.locked { locator }
|
||||||
if (locator == null) {
|
if (locator == null) {
|
||||||
@ -154,6 +156,7 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
|
|||||||
log.info("Session created")
|
log.info("Session created")
|
||||||
val newProducer = newSession.createProducer()
|
val newProducer = newSession.createProducer()
|
||||||
state.locked {
|
state.locked {
|
||||||
|
reconnectInterval = conf.artemisReconnectionIntervalMin.toLong()
|
||||||
started = ArtemisMessagingClient.Started(locator, newSessionFactory, newSession, newProducer)
|
started = ArtemisMessagingClient.Started(locator, newSessionFactory, newSession, newProducer)
|
||||||
}
|
}
|
||||||
stateHelper.active = true
|
stateHelper.active = true
|
||||||
@ -174,10 +177,11 @@ class BridgeArtemisConnectionServiceImpl(val conf: BridgeConfiguration,
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// Sleep for a short while before attempting reconnect
|
// Sleep for a short while before attempting reconnect
|
||||||
Thread.sleep(conf.artemisReconnectionInterval.toLong())
|
Thread.sleep(reconnectInterval)
|
||||||
} catch (ex: InterruptedException) {
|
} catch (ex: InterruptedException) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
reconnectInterval = min(2L * reconnectInterval, conf.artemisReconnectionIntervalMax.toLong())
|
||||||
}
|
}
|
||||||
log.info("Ended Artemis Connector Thread")
|
log.info("Ended Artemis Connector Thread")
|
||||||
}
|
}
|
||||||
|
@ -69,7 +69,8 @@ data class BridgeConfigurationImpl(
|
|||||||
override val floatOuterConfig: FloatOuterConfigurationImpl?,
|
override val floatOuterConfig: FloatOuterConfigurationImpl?,
|
||||||
override val haConfig: BridgeHAConfigImpl?,
|
override val haConfig: BridgeHAConfigImpl?,
|
||||||
override val enableAMQPPacketTrace: Boolean,
|
override val enableAMQPPacketTrace: Boolean,
|
||||||
override val artemisReconnectionInterval: Int = 5000,
|
override val artemisReconnectionIntervalMin: Int = 5000,
|
||||||
|
override val artemisReconnectionIntervalMax: Int = 60000,
|
||||||
override val politeShutdownPeriod: Int = 1000,
|
override val politeShutdownPeriod: Int = 1000,
|
||||||
override val whitelistedHeaders: List<String> = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList()
|
override val whitelistedHeaders: List<String> = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList()
|
||||||
) : BridgeConfiguration {
|
) : BridgeConfiguration {
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
keyStorePassword = "cordacadevpass"
|
keyStorePassword = "cordacadevpass"
|
||||||
trustStorePassword = "trustpass"
|
trustStorePassword = "trustpass"
|
||||||
enableAMQPPacketTrace = false
|
enableAMQPPacketTrace = false
|
||||||
artemisReconnectionInterval = 5000
|
artemisReconnectionIntervalMin = 5000
|
||||||
|
artemisReconnectionIntervalMax = 60000
|
||||||
politeShutdownPeriod = 1000
|
politeShutdownPeriod = 1000
|
||||||
crlCheckSoftFail = true
|
crlCheckSoftFail = true
|
Loading…
x
Reference in New Issue
Block a user