From 2ff3939e2e2aeaaf86b5e1d733aa9e8b1818d23b Mon Sep 17 00:00:00 2001 From: Thomas Schroeter Date: Thu, 26 Apr 2018 13:58:41 +0100 Subject: [PATCH] Add exponential backoff to P2P messaging retry (#2975) --- docs/source/corda-configuration-file.rst | 7 +++++++ .../internal/config/ConfigUtilities.kt | 2 ++ .../services/messaging/P2PMessagingTest.kt | 3 ++- .../node/services/config/NodeConfiguration.kt | 20 ++++++++++++++----- .../services/messaging/P2PMessagingClient.kt | 8 +++++--- node/src/main/resources/reference.conf | 7 ++++++- .../config/NodeConfigurationImplTest.kt | 2 ++ .../messaging/ArtemisMessagingTest.kt | 4 +++- .../node/internal/InternalMockNetwork.kt | 2 +- 9 files changed, 43 insertions(+), 12 deletions(-) diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index d8c10ef1c0..d57dfbaae8 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -86,6 +86,13 @@ absolute path to the node's base directory. here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable, the node will try to auto-discover its public one. +:p2pMessagingRetry: Only used for notarisation requests. When the response doesn't arrive in time, the message is + resent to a different notary-replica round-robin in case of clustered notaries. + + :messageRedeliveryDelay: The initial retry delay, e.g. `30 seconds`. + :maxRetryCount: How many retries to attempt. + :backoffBase: The base of the exponential backoff, :math:`t_{wait} = messageRedeliveryDelay * backoffBase^{retryCount}`. + :rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block. :rpcSettings: Options for the RPC server. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ConfigUtilities.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ConfigUtilities.kt index d51d25fabe..5b6120678d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ConfigUtilities.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/ConfigUtilities.kt @@ -16,6 +16,7 @@ import java.net.Proxy import java.net.URL import java.nio.file.Path import java.nio.file.Paths +import java.time.Duration import java.time.Instant import java.time.LocalDate import java.time.temporal.Temporal @@ -104,6 +105,7 @@ private fun Config.getSingleValue(path: String, type: KType): Any? { Double::class -> getDouble(path) Boolean::class -> getBoolean(path) LocalDate::class -> LocalDate.parse(getString(path)) + Duration::class -> getDuration(path) Instant::class -> Instant.parse(getString(path)) NetworkHostAndPort::class -> NetworkHostAndPort.parse(getString(path)) Path::class -> Paths.get(getString(path)) diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index 35dffc045c..c160a4a680 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -118,7 +118,8 @@ class P2PMessagingTest { } private fun DriverDSL.startAlice(): InProcess { - return startNode(providedName = ALICE_NAME, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)) + return startNode(providedName = ALICE_NAME, customOverrides = mapOf("p2pMessagingRetry" to mapOf( + "messageRedeliveryDelay" to 1.seconds, "backoffBase" to 1.0, "maxRetryCount" to 3))) .map { (it as InProcess) } .getOrThrow() } diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 4d4b4a95f2..6e808ff5b0 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -35,7 +35,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val compatibilityZoneURL: URL? val certificateChainCheckPolicies: List val verifierType: VerifierType - val messageRedeliveryDelaySeconds: Int + val p2pMessagingRetry: P2PMessagingRetryConfiguration val notary: NotaryConfig? val additionalNodeInfoPollingFrequencyMsec: Long val p2pAddress: NetworkHostAndPort @@ -108,6 +108,18 @@ data class BFTSMaRtConfiguration( } } +/** + * Currently only used for notarisation requests. + * + * When the response doesn't arrive in time, the message is resent to a different notary-replica round-robin + * in case of clustered notaries. + */ +data class P2PMessagingRetryConfiguration( + val messageRedeliveryDelay: Duration, + val maxRetryCount: Int, + val backoffBase: Double +) + fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs() data class NodeConfigurationImpl( @@ -123,9 +135,7 @@ data class NodeConfigurationImpl( override val rpcUsers: List, override val security : SecurityConfiguration? = null, override val verifierType: VerifierType, - // TODO typesafe config supports the notion of durations. Make use of that by mapping it to java.time.Duration. - // Then rename this to messageRedeliveryDelay and make it of type Duration - override val messageRedeliveryDelaySeconds: Int = 30, + override val p2pMessagingRetry: P2PMessagingRetryConfiguration, override val p2pAddress: NetworkHostAndPort, private val rpcAddress: NetworkHostAndPort? = null, private val rpcSettings: NodeRpcSettings, @@ -337,4 +347,4 @@ data class SecurityConfiguration(val authService: SecurityConfiguration.AuthServ id = AuthServiceId("NODE_CONFIG")) } } -} \ No newline at end of file +} diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 88fa4d8e1f..a020c10055 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -103,7 +103,6 @@ class P2PMessagingClient(val config: NodeConfiguration, ) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable { companion object { private val log = contextLogger() - private const val messageMaxRetryCount: Int = 3 fun createMessageToRedeliver(): PersistentMap, RetryMessage, Long> { return PersistentMap( @@ -131,6 +130,9 @@ class P2PMessagingClient(val config: NodeConfiguration, } } + private val messageMaxRetryCount: Int = config.p2pMessagingRetry.maxRetryCount + private val backoffBase: Double = config.p2pMessagingRetry.backoffBase + private class InnerState { var started = false var running = false @@ -156,7 +158,7 @@ class P2PMessagingClient(val config: NodeConfiguration, data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress) - private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong() + private val messageRedeliveryDelaySeconds = config.p2pMessagingRetry.messageRedeliveryDelay.seconds private val state = ThreadBox(InnerState()) private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap()) @@ -526,7 +528,7 @@ class P2PMessagingClient(val config: NodeConfiguration, scheduledMessageRedeliveries[retryId] = nodeExecutor.schedule({ sendWithRetry(retryCount + 1, message, target, retryId) - }, messageRedeliveryDelaySeconds, TimeUnit.SECONDS) + },messageRedeliveryDelaySeconds * Math.pow(backoffBase, retryCount.toDouble()).toLong(), TimeUnit.SECONDS) } override fun cancelRedelivery(retryId: Long) { diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index a949a45a39..108e0fdc9c 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -19,4 +19,9 @@ verifierType = InMemory rpcSettings = { useSsl = false standAloneBroker = false -} \ No newline at end of file +} +p2pMessagingRetry { + messageRedeliveryDelay = 30 seconds + maxRetryCount = 3 + backoffBase = 2.0 +} diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index c0c3b16bf1..d50b95c923 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -2,6 +2,7 @@ package net.corda.node.services.config import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.seconds import net.corda.testing.core.ALICE_NAME import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.tools.shell.SSHDConfiguration @@ -73,6 +74,7 @@ class NodeConfigurationImplTest { verifierType = VerifierType.InMemory, p2pAddress = NetworkHostAndPort("localhost", 0), messagingServerAddress = null, + p2pMessagingRetry = P2PMessagingRetryConfiguration(5.seconds, 3, 1.0), notary = null, certificateChainCheckPolicies = emptyList(), devMode = true, diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index bc255c91aa..9afd9c4c38 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -4,9 +4,11 @@ import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever import net.corda.core.crypto.generateKeyPair import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.seconds import net.corda.node.internal.configureDatabase import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.P2PMessagingRetryConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.network.NetworkMapCacheImpl import net.corda.node.services.network.PersistentNetworkMapCache @@ -69,7 +71,7 @@ class ArtemisMessagingTest { doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress doReturn(null).whenever(it).jmxMonitoringHttpPort doReturn(emptyList()).whenever(it).certificateChainCheckPolicies - doReturn(5).whenever(it).messageRedeliveryDelaySeconds + doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase=1.0)).whenever(it).p2pMessagingRetry } LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock()) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 3bb7b419d5..971da142a7 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -463,7 +463,7 @@ private fun mockNodeConfiguration(): NodeConfiguration { doReturn(null).whenever(it).compatibilityZoneURL doReturn(emptyList()).whenever(it).certificateChainCheckPolicies doReturn(VerifierType.InMemory).whenever(it).verifierType - doReturn(5).whenever(it).messageRedeliveryDelaySeconds + doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(null).whenever(it).devModeOptions }