CORDA-1494: Rename p2pMessagingRetry configuration to flowTimeout configuration to reflect the change in how retries are implemented – it is now on flow rather than message level. (#3339)

This commit is contained in:
Andrius Dagys
2018-06-11 15:50:06 +01:00
committed by GitHub
parent c66228adf4
commit 85fe0e4cb1
9 changed files with 29 additions and 28 deletions

View File

@ -94,12 +94,14 @@ absolute path to the node's base directory.
here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable, here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable,
the node will try to auto-discover its public one. the node will try to auto-discover its public one.
:p2pMessagingRetry: Only used for notarisation requests. When the response doesn't arrive in time, the message is :flowTimeout: When a flow implementing the ``TimedFlow`` interface does not complete in time, it is restarted from the
resent to a different notary-replica round-robin in case of clustered notaries. initial checkpoint. Currently only used for notarisation requests: if a notary replica dies while processing a notarisation request,
the client flow eventually times out and gets restarted. On restart the request is resent to a different notary replica
in a round-robin fashion (assuming the notary is clustered).
:messageRedeliveryDelay: The initial retry delay, e.g. `30 seconds`. :timeout: The initial flow timeout period, e.g. `30 seconds`.
:maxRetryCount: How many retries to attempt. :maxRestartCount: Maximum number of times the flow will restart before resulting in an error.
:backoffBase: The base of the exponential backoff, `t_{wait} = messageRedeliveryDelay * backoffBase^{retryCount}`. :backoffBase: The base of the exponential backoff, `t_{wait} = timeout * backoffBase^{retryCount}`.
:rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block. :rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block.

View File

@ -23,7 +23,7 @@ import net.corda.core.utilities.seconds
import net.corda.node.internal.StartedNode import net.corda.node.internal.StartedNode
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.P2PMessagingRetryConfiguration import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
@ -101,8 +101,8 @@ class TimedFlowTests {
InternalMockNodeParameters( InternalMockNodeParameters(
legalName = CordaX500Name("Alice", "AliceCorp", "GB"), legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
configOverrides = { conf: NodeConfiguration -> configOverrides = { conf: NodeConfiguration ->
val retryConfig = P2PMessagingRetryConfiguration(1.seconds, 3, 1.0) val retryConfig = FlowTimeoutConfiguration(1.seconds, 3, 1.0)
doReturn(retryConfig).whenever(conf).p2pMessagingRetry doReturn(retryConfig).whenever(conf).flowTimeout
} }
) )
) )

View File

@ -39,7 +39,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val networkServices: NetworkServicesConfig? val networkServices: NetworkServicesConfig?
val certificateChainCheckPolicies: List<CertChainPolicyConfig> val certificateChainCheckPolicies: List<CertChainPolicyConfig>
val verifierType: VerifierType val verifierType: VerifierType
val p2pMessagingRetry: P2PMessagingRetryConfiguration val flowTimeout: FlowTimeoutConfiguration
val notary: NotaryConfig? val notary: NotaryConfig?
val additionalNodeInfoPollingFrequencyMsec: Long val additionalNodeInfoPollingFrequencyMsec: Long
val p2pAddress: NetworkHostAndPort val p2pAddress: NetworkHostAndPort
@ -139,12 +139,11 @@ data class NetworkServicesConfig(
/** /**
* Currently only used for notarisation requests. * Currently only used for notarisation requests.
* *
* When the response doesn't arrive in time, the message is resent to a different notary-replica round-robin * Specifies the configuration for timing out and restarting a [TimedFlow].
* in case of clustered notaries.
*/ */
data class P2PMessagingRetryConfiguration( data class FlowTimeoutConfiguration(
val messageRedeliveryDelay: Duration, val timeout: Duration,
val maxRetryCount: Int, val maxRestartCount: Int,
val backoffBase: Double val backoffBase: Double
) )
@ -167,7 +166,7 @@ data class NodeConfigurationImpl(
override val rpcUsers: List<User>, override val rpcUsers: List<User>,
override val security: SecurityConfiguration? = null, override val security: SecurityConfiguration? = null,
override val verifierType: VerifierType, override val verifierType: VerifierType,
override val p2pMessagingRetry: P2PMessagingRetryConfiguration, override val flowTimeout: FlowTimeoutConfiguration,
override val p2pAddress: NetworkHostAndPort, override val p2pAddress: NetworkHostAndPort,
private val rpcAddress: NetworkHostAndPort? = null, private val rpcAddress: NetworkHostAndPort? = null,
private val rpcSettings: NodeRpcSettings, private val rpcSettings: NodeRpcSettings,

View File

@ -609,10 +609,10 @@ class SingleThreadedStateMachineManager(
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */ /** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> { private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> {
return with(serviceHub.configuration.p2pMessagingRetry) { return with(serviceHub.configuration.flowTimeout) {
val timeoutDelaySeconds = messageRedeliveryDelay.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
timeoutScheduler.schedule({ timeoutScheduler.schedule({
val event = Event.Error(FlowTimeoutException(maxRetryCount)) val event = Event.Error(FlowTimeoutException(maxRestartCount))
flow.fiber.scheduleEvent(event) flow.fiber.scheduleEvent(event)
}, timeoutDelaySeconds, TimeUnit.SECONDS) }, timeoutDelaySeconds, TimeUnit.SECONDS)
} }

View File

@ -20,8 +20,8 @@ rpcSettings = {
useSsl = false useSsl = false
standAloneBroker = false standAloneBroker = false
} }
p2pMessagingRetry { flowTimeout {
messageRedeliveryDelay = 30 seconds timeout = 30 seconds
maxRetryCount = 3 maxRestartCount = 3
backoffBase = 2.0 backoffBase = 2.0
} }

View File

@ -229,7 +229,7 @@ class NodeConfigurationImplTest {
verifierType = VerifierType.InMemory, verifierType = VerifierType.InMemory,
p2pAddress = NetworkHostAndPort("localhost", 0), p2pAddress = NetworkHostAndPort("localhost", 0),
messagingServerAddress = null, messagingServerAddress = null,
p2pMessagingRetry = P2PMessagingRetryConfiguration(5.seconds, 3, 1.0), flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
notary = null, notary = null,
devMode = true, devMode = true,
noLocalShell = false, noLocalShell = false,

View File

@ -8,7 +8,7 @@ import net.corda.core.utilities.seconds
import net.corda.node.internal.configureDatabase import net.corda.node.internal.configureDatabase
import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.P2PMessagingRetryConfiguration import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.network.NetworkMapCacheImpl import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.network.PersistentNetworkMapCache
@ -73,7 +73,7 @@ class ArtemisMessagingTest {
doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress doReturn(NetworkHostAndPort("0.0.0.0", serverPort)).whenever(it).p2pAddress
doReturn(null).whenever(it).jmxMonitoringHttpPort doReturn(null).whenever(it).jmxMonitoringHttpPort
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry doReturn(FlowTimeoutConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).flowTimeout
} }
LogHelper.setLevel(PersistentUniquenessProvider::class) LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })

View File

@ -24,8 +24,8 @@ rpcSettings = {
useSsl = false useSsl = false
standAloneBroker = false standAloneBroker = false
} }
p2pMessagingRetry { flowTimeout {
messageRedeliveryDelay = 30 seconds timeout = 30 seconds
maxRetryCount = 3 maxRestartCount = 3
backoffBase = 2.0 backoffBase = 2.0
} }

View File

@ -483,7 +483,7 @@ private fun mockNodeConfiguration(): NodeConfiguration {
doReturn(null).whenever(it).networkServices doReturn(null).whenever(it).networkServices
doReturn(VerifierType.InMemory).whenever(it).verifierType doReturn(VerifierType.InMemory).whenever(it).verifierType
// Set to be long enough so retries don't trigger unless we override it // Set to be long enough so retries don't trigger unless we override it
doReturn(P2PMessagingRetryConfiguration(1.hours, 3, backoffBase = 2.0)).whenever(it).p2pMessagingRetry doReturn(FlowTimeoutConfiguration(1.hours, 3, backoffBase = 1.0)).whenever(it).flowTimeout
doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec
doReturn(null).whenever(it).devModeOptions doReturn(null).whenever(it).devModeOptions
} }