From 85fe0e4cb1a9e93c9b1189cd0a3411dbeb2105ce Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Mon, 11 Jun 2018 15:50:06 +0100 Subject: [PATCH] =?UTF-8?q?CORDA-1494:=20Rename=20p2pMessagingRetry=20conf?= =?UTF-8?q?iguration=20to=20flowTimeout=20configuration=20to=20reflect=20t?= =?UTF-8?q?he=20change=20in=20how=20retries=20are=20implemented=20?= =?UTF-8?q?=E2=80=93=20it=20is=20now=20on=20flow=20rather=20than=20message?= =?UTF-8?q?=20level.=20(#3339)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/source/corda-configuration-file.rst | 12 +++++++----- .../net/corda/node/services/TimedFlowTests.kt | 6 +++--- .../corda/node/services/config/NodeConfiguration.kt | 13 ++++++------- .../SingleThreadedStateMachineManager.kt | 6 +++--- node/src/main/resources/reference.conf | 6 +++--- .../services/config/NodeConfigurationImplTest.kt | 2 +- .../node/services/messaging/ArtemisMessagingTest.kt | 4 ++-- node/src/test/resources/working-config.conf | 6 +++--- .../testing/node/internal/InternalMockNetwork.kt | 2 +- 9 files changed, 29 insertions(+), 28 deletions(-) diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 58a68b4791..2d0e0ba42d 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -94,12 +94,14 @@ absolute path to the node's base directory. here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable, the node will try to auto-discover its public one. -:p2pMessagingRetry: Only used for notarisation requests. When the response doesn't arrive in time, the message is - resent to a different notary-replica round-robin in case of clustered notaries. +:flowTimeout: When a flow implementing the ``TimedFlow`` interface does not complete in time, it is restarted from the + initial checkpoint. Currently only used for notarisation requests: if a notary replica dies while processing a notarisation request, + the client flow eventually times out and gets restarted. On restart the request is resent to a different notary replica + in a round-robin fashion (assuming the notary is clustered). - :messageRedeliveryDelay: The initial retry delay, e.g. `30 seconds`. - :maxRetryCount: How many retries to attempt. - :backoffBase: The base of the exponential backoff, `t_{wait} = messageRedeliveryDelay * backoffBase^{retryCount}`. + :timeout: The initial flow timeout period, e.g. `30 seconds`. + :maxRestartCount: Maximum number of times the flow will restart before resulting in an error. + :backoffBase: The base of the exponential backoff, `t_{wait} = timeout * backoffBase^{retryCount}`. :rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block. diff --git a/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt index 180d2024af..bc6bc655e7 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt @@ -23,7 +23,7 @@ import net.corda.core.utilities.seconds import net.corda.node.internal.StartedNode import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NotaryConfig -import net.corda.node.services.config.P2PMessagingRetryConfiguration +import net.corda.node.services.config.FlowTimeoutConfiguration import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.testing.common.internal.testNetworkParameters @@ -101,8 +101,8 @@ class TimedFlowTests { InternalMockNodeParameters( legalName = CordaX500Name("Alice", "AliceCorp", "GB"), configOverrides = { conf: NodeConfiguration -> - val retryConfig = P2PMessagingRetryConfiguration(1.seconds, 3, 1.0) - doReturn(retryConfig).whenever(conf).p2pMessagingRetry + val retryConfig = FlowTimeoutConfiguration(1.seconds, 3, 1.0) + doReturn(retryConfig).whenever(conf).flowTimeout } ) ) diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index b2cdafa210..ab581eac0c 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -39,7 +39,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val networkServices: NetworkServicesConfig? val certificateChainCheckPolicies: List val verifierType: VerifierType - val p2pMessagingRetry: P2PMessagingRetryConfiguration + val flowTimeout: FlowTimeoutConfiguration val notary: NotaryConfig? val additionalNodeInfoPollingFrequencyMsec: Long val p2pAddress: NetworkHostAndPort @@ -139,12 +139,11 @@ data class NetworkServicesConfig( /** * Currently only used for notarisation requests. * - * When the response doesn't arrive in time, the message is resent to a different notary-replica round-robin - * in case of clustered notaries. + * Specifies the configuration for timing out and restarting a [TimedFlow]. */ -data class P2PMessagingRetryConfiguration( - val messageRedeliveryDelay: Duration, - val maxRetryCount: Int, +data class FlowTimeoutConfiguration( + val timeout: Duration, + val maxRestartCount: Int, val backoffBase: Double ) @@ -167,7 +166,7 @@ data class NodeConfigurationImpl( override val rpcUsers: List, override val security: SecurityConfiguration? = null, override val verifierType: VerifierType, - override val p2pMessagingRetry: P2PMessagingRetryConfiguration, + override val flowTimeout: FlowTimeoutConfiguration, override val p2pAddress: NetworkHostAndPort, private val rpcAddress: NetworkHostAndPort? = null, private val rpcSettings: NodeRpcSettings, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index e59efe6044..e89bd67282 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -609,10 +609,10 @@ class SingleThreadedStateMachineManager( /** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */ private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> { - return with(serviceHub.configuration.p2pMessagingRetry) { - val timeoutDelaySeconds = messageRedeliveryDelay.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() + return with(serviceHub.configuration.flowTimeout) { + val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() timeoutScheduler.schedule({ - val event = Event.Error(FlowTimeoutException(maxRetryCount)) + val event = Event.Error(FlowTimeoutException(maxRestartCount)) flow.fiber.scheduleEvent(event) }, timeoutDelaySeconds, TimeUnit.SECONDS) } diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 5fe68279e4..f11ed7355e 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -20,8 +20,8 @@ rpcSettings = { useSsl = false standAloneBroker = false } -p2pMessagingRetry { - messageRedeliveryDelay = 30 seconds - maxRetryCount = 3 +flowTimeout { + timeout = 30 seconds + maxRestartCount = 3 backoffBase = 2.0 } diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index bf99cb3a4a..7c826f9d5b 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -229,7 +229,7 @@ class NodeConfigurationImplTest { verifierType = VerifierType.InMemory, p2pAddress = NetworkHostAndPort("localhost", 0), messagingServerAddress = null, - p2pMessagingRetry = P2PMessagingRetryConfiguration(5.seconds, 3, 1.0), + flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0), notary = null, devMode = true, noLocalShell = false, diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index bdc49dfa34..98b066e343 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -8,7 +8,7 @@ import net.corda.core.utilities.seconds import net.corda.node.internal.configureDatabase import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.P2PMessagingRetryConfiguration +import net.corda.node.services.config.FlowTimeoutConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.network.NetworkMapCacheImpl import net.corda.node.services.network.PersistentNetworkMapCache @@ -73,7 +73,7 @@ class ArtemisMessagingTest { doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress doReturn(null).whenever(it).jmxMonitoringHttpPort doReturn(emptyList()).whenever(it).certificateChainCheckPolicies - doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry + doReturn(FlowTimeoutConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).flowTimeout } LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }) diff --git a/node/src/test/resources/working-config.conf b/node/src/test/resources/working-config.conf index 45ca6ef647..419b866013 100644 --- a/node/src/test/resources/working-config.conf +++ b/node/src/test/resources/working-config.conf @@ -24,8 +24,8 @@ rpcSettings = { useSsl = false standAloneBroker = false } -p2pMessagingRetry { - messageRedeliveryDelay = 30 seconds - maxRetryCount = 3 +flowTimeout { + timeout = 30 seconds + maxRestartCount = 3 backoffBase = 2.0 } \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 0eae84a517..2db5bd39cd 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -483,7 +483,7 @@ private fun mockNodeConfiguration(): NodeConfiguration { doReturn(null).whenever(it).networkServices doReturn(VerifierType.InMemory).whenever(it).verifierType // Set to be long enough so retries don't trigger unless we override it - doReturn(P2PMessagingRetryConfiguration(1.hours, 3, backoffBase = 2.0)).whenever(it).p2pMessagingRetry + doReturn(FlowTimeoutConfiguration(1.hours, 3, backoffBase = 1.0)).whenever(it).flowTimeout doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(null).whenever(it).devModeOptions }