mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
AG-341 Notary backpressure mechanism improvements (#6762)
Notary backpressure mechanism improvements
This commit is contained in:
parent
f73bb5ae11
commit
e2efbaea35
@ -9,4 +9,4 @@ package net.corda.common.logging
|
||||
* (originally added to source control for ease of use)
|
||||
*/
|
||||
|
||||
internal const val CURRENT_MAJOR_RELEASE = "4.6-SNAPSHOT"
|
||||
internal const val CURRENT_MAJOR_RELEASE = "4.7-SNAPSHOT"
|
@ -820,6 +820,7 @@
|
||||
<ID>MagicNumber:FlowStackSnapshot.kt$14</ID>
|
||||
<ID>MagicNumber:FlowStackSnapshot.kt$16</ID>
|
||||
<ID>MagicNumber:FlowStackSnapshot.kt$64</ID>
|
||||
<ID>MagicNumber:FlowTimeoutScheduler.kt$FlowTimeoutScheduler$0.5</ID>
|
||||
<ID>MagicNumber:Generator.kt$Generator.Companion$16</ID>
|
||||
<ID>MagicNumber:Generator.kt$Generator.Companion$17</ID>
|
||||
<ID>MagicNumber:GuiUtilities.kt$1000</ID>
|
||||
@ -2063,9 +2064,6 @@
|
||||
<ID>WildcardImport:InterestRatesSwapDemoAPI.kt$import org.springframework.web.bind.annotation.*</ID>
|
||||
<ID>WildcardImport:InterestSwapRestAPI.kt$import org.springframework.web.bind.annotation.*</ID>
|
||||
<ID>WildcardImport:InternalAccessTestHelpers.kt$import net.corda.serialization.internal.amqp.*</ID>
|
||||
<ID>WildcardImport:InternalMockNetwork.kt$import net.corda.core.internal.*</ID>
|
||||
<ID>WildcardImport:InternalMockNetwork.kt$import net.corda.node.services.config.*</ID>
|
||||
<ID>WildcardImport:InternalMockNetwork.kt$import net.corda.testing.node.*</ID>
|
||||
<ID>WildcardImport:IssuerModel.kt$import tornadofx.*</ID>
|
||||
<ID>WildcardImport:JVMConfig.kt$import tornadofx.*</ID>
|
||||
<ID>WildcardImport:JacksonSupport.kt$import com.fasterxml.jackson.core.*</ID>
|
||||
@ -2243,7 +2241,6 @@
|
||||
<ID>WildcardImport:PathUtils.kt$import java.io.*</ID>
|
||||
<ID>WildcardImport:PathUtils.kt$import java.nio.file.*</ID>
|
||||
<ID>WildcardImport:PersistentIdentityMigrationNewTableTest.kt$import net.corda.testing.core.*</ID>
|
||||
<ID>WildcardImport:PersistentNetworkMapCacheTest.kt$import net.corda.testing.core.*</ID>
|
||||
<ID>WildcardImport:PersistentStateServiceTests.kt$import net.corda.core.contracts.*</ID>
|
||||
<ID>WildcardImport:Portfolio.kt$import net.corda.core.contracts.*</ID>
|
||||
<ID>WildcardImport:PortfolioApi.kt$import javax.ws.rs.*</ID>
|
||||
@ -2333,9 +2330,6 @@
|
||||
<ID>WildcardImport:SignedTransaction.kt$import net.corda.core.crypto.*</ID>
|
||||
<ID>WildcardImport:SimpleMQClient.kt$import org.apache.activemq.artemis.api.core.client.*</ID>
|
||||
<ID>WildcardImport:SpringDriver.kt$import net.corda.testing.node.internal.*</ID>
|
||||
<ID>WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.messaging.*</ID>
|
||||
<ID>WildcardImport:StandaloneCordaRPClientTest.kt$import net.corda.core.node.services.vault.*</ID>
|
||||
<ID>WildcardImport:StandaloneCordaRPClientTest.kt$import org.junit.*</ID>
|
||||
<ID>WildcardImport:StartedFlowTransition.kt$import net.corda.node.services.statemachine.*</ID>
|
||||
<ID>WildcardImport:StatePointerSearchTests.kt$import net.corda.core.contracts.*</ID>
|
||||
<ID>WildcardImport:StubOutForDJVM.kt$import kotlin.annotation.AnnotationTarget.*</ID>
|
||||
|
@ -27,7 +27,9 @@ internal class FlowTimeoutScheduler(
|
||||
*/
|
||||
fun timeout(flowId: StateMachineRunId) {
|
||||
timeout(flowId) { flow, retryCount ->
|
||||
val scheduledFuture = scheduleTimeoutException(flow, calculateDefaultTimeoutSeconds(retryCount))
|
||||
val defaultTimeout = calculateDefaultTimeoutSeconds(retryCount)
|
||||
val scheduledFuture = scheduleTimeoutException(flow, defaultTimeout)
|
||||
log.debug("Setting default time-out on timed flow $flowId to $defaultTimeout seconds (retry #$retryCount).")
|
||||
ScheduledTimeout(scheduledFuture, retryCount + 1)
|
||||
}
|
||||
}
|
||||
@ -89,9 +91,13 @@ internal class FlowTimeoutScheduler(
|
||||
|
||||
private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long {
|
||||
return serviceHub.configuration.flowTimeout.run {
|
||||
val timeoutDelaySeconds =
|
||||
timeout.seconds * Math.pow(backoffBase, Integer.min(retryCount, maxRestartCount).toDouble()).toLong()
|
||||
maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong())
|
||||
val timeoutDelaySeconds = (timeout.seconds *
|
||||
Math.pow(backoffBase, Integer.min(retryCount, maxRestartCount).toDouble())).toLong()
|
||||
|
||||
// Introduce a variable delay to ensure that if a large spike of transactions are
|
||||
// received, we do not trigger retries of them all at the same time. This results
|
||||
// in an effective timeout between 100-150% of the calculated timeout.
|
||||
maxOf(1L, (timeoutDelaySeconds * (1 + (Math.random() * 0.5))).toLong())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -238,7 +238,7 @@ class TimedFlowTests {
|
||||
exceptionThrown = true
|
||||
}
|
||||
assertTrue(exceptionThrown)
|
||||
val notarySignatures = resultFuture.get(10, TimeUnit.SECONDS)
|
||||
val notarySignatures = resultFuture.get(15, TimeUnit.SECONDS)
|
||||
(issueTx + notarySignatures).verifyRequiredSignatures()
|
||||
progressTrackerDone.get()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user