Add exponential backoff to P2P messaging retry (#2975)

This commit is contained in:
Thomas Schroeter
2018-04-26 13:58:41 +01:00
committed by GitHub
parent 5dc71fc350
commit 2ff3939e2e
9 changed files with 43 additions and 12 deletions

View File

@ -118,7 +118,8 @@ class P2PMessagingTest {
}
private fun DriverDSL.startAlice(): InProcess {
return startNode(providedName = ALICE_NAME, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1))
return startNode(providedName = ALICE_NAME, customOverrides = mapOf("p2pMessagingRetry" to mapOf(
"messageRedeliveryDelay" to 1.seconds, "backoffBase" to 1.0, "maxRetryCount" to 3)))
.map { (it as InProcess) }
.getOrThrow()
}

View File

@ -35,7 +35,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val compatibilityZoneURL: URL?
val certificateChainCheckPolicies: List<CertChainPolicyConfig>
val verifierType: VerifierType
val messageRedeliveryDelaySeconds: Int
val p2pMessagingRetry: P2PMessagingRetryConfiguration
val notary: NotaryConfig?
val additionalNodeInfoPollingFrequencyMsec: Long
val p2pAddress: NetworkHostAndPort
@ -108,6 +108,18 @@ data class BFTSMaRtConfiguration(
}
}
/**
* Currently only used for notarisation requests.
*
* When the response doesn't arrive in time, the message is resent to a different notary-replica round-robin
* in case of clustered notaries.
*/
data class P2PMessagingRetryConfiguration(
val messageRedeliveryDelay: Duration,
val maxRetryCount: Int,
val backoffBase: Double
)
fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs<NodeConfigurationImpl>()
data class NodeConfigurationImpl(
@ -123,9 +135,7 @@ data class NodeConfigurationImpl(
override val rpcUsers: List<User>,
override val security : SecurityConfiguration? = null,
override val verifierType: VerifierType,
// TODO typesafe config supports the notion of durations. Make use of that by mapping it to java.time.Duration.
// Then rename this to messageRedeliveryDelay and make it of type Duration
override val messageRedeliveryDelaySeconds: Int = 30,
override val p2pMessagingRetry: P2PMessagingRetryConfiguration,
override val p2pAddress: NetworkHostAndPort,
private val rpcAddress: NetworkHostAndPort? = null,
private val rpcSettings: NodeRpcSettings,
@ -337,4 +347,4 @@ data class SecurityConfiguration(val authService: SecurityConfiguration.AuthServ
id = AuthServiceId("NODE_CONFIG"))
}
}
}
}

View File

@ -103,7 +103,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable {
companion object {
private val log = contextLogger()
private const val messageMaxRetryCount: Int = 3
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
return PersistentMap(
@ -131,6 +130,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
}
}
private val messageMaxRetryCount: Int = config.p2pMessagingRetry.maxRetryCount
private val backoffBase: Double = config.p2pMessagingRetry.backoffBase
private class InnerState {
var started = false
var running = false
@ -156,7 +158,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
private val messageRedeliveryDelaySeconds = config.p2pMessagingRetry.messageRedeliveryDelay.seconds
private val state = ThreadBox(InnerState())
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
@ -526,7 +528,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
scheduledMessageRedeliveries[retryId] = nodeExecutor.schedule({
sendWithRetry(retryCount + 1, message, target, retryId)
}, messageRedeliveryDelaySeconds, TimeUnit.SECONDS)
},messageRedeliveryDelaySeconds * Math.pow(backoffBase, retryCount.toDouble()).toLong(), TimeUnit.SECONDS)
}
override fun cancelRedelivery(retryId: Long) {

View File

@ -19,4 +19,9 @@ verifierType = InMemory
rpcSettings = {
useSsl = false
standAloneBroker = false
}
}
p2pMessagingRetry {
messageRedeliveryDelay = 30 seconds
maxRetryCount = 3
backoffBase = 2.0
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.config
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.tools.shell.SSHDConfiguration
@ -73,6 +74,7 @@ class NodeConfigurationImplTest {
verifierType = VerifierType.InMemory,
p2pAddress = NetworkHostAndPort("localhost", 0),
messagingServerAddress = null,
p2pMessagingRetry = P2PMessagingRetryConfiguration(5.seconds, 3, 1.0),
notary = null,
certificateChainCheckPolicies = emptyList(),
devMode = true,

View File

@ -4,9 +4,11 @@ import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.generateKeyPair
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.internal.configureDatabase
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.P2PMessagingRetryConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.PersistentNetworkMapCache
@ -69,7 +71,7 @@ class ArtemisMessagingTest {
doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase=1.0)).whenever(it).p2pMessagingRetry
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())