Apply notary back pressure code to MultiThreadedStateMachineManager to fix compilation

This commit is contained in:
rick.parker
2018-11-26 08:48:00 +00:00
parent b6736cbaf9
commit aee0ebdc7b

View File

@ -572,22 +572,54 @@ class MultiThreadedStateMachineManager(
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true) if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
scheduledTimeout.retryCount scheduledTimeout.retryCount
} else 0 } else 0
val scheduledFuture = scheduleTimeoutException(flow, retryCount) val scheduledFuture = scheduleTimeoutException(flow, calculateDefaultTimeoutSeconds(retryCount))
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1) timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1)
} else { } else {
logger.warn("Unable to schedule timeout for flow $flowId flow not found.") logger.warn("Unable to schedule timeout for flow $flowId flow not found.")
} }
} }
private fun resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
if (timeoutSeconds < serviceHub.configuration.flowTimeout.timeout.seconds) {
logger.debug { "Ignoring request to set time-out on timed flow $flowId to $timeoutSeconds seconds which is shorter than default of ${serviceHub.configuration.flowTimeout.timeout.seconds} seconds." }
return
}
logger.debug { "Processing request to set time-out on timed flow $flowId to $timeoutSeconds seconds." }
concurrentBox.concurrent {
resetCustomTimeout(flowId, timeoutSeconds)
}
}
private fun InnerState.resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) {
val flow = flows[flowId]
if (flow != null) {
val scheduledTimeout = timedFlows[flowId]
val retryCount = if (scheduledTimeout != null) {
val timeoutFuture = scheduledTimeout.scheduledFuture
if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true)
scheduledTimeout.retryCount
} else 0
val scheduledFuture = scheduleTimeoutException(flow, timeoutSeconds)
timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount)
} else {
logger.warn("Unable to schedule timeout for flow $flowId flow not found.")
}
}
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */ /** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> { private fun scheduleTimeoutException(flow: Flow, delay: Long): ScheduledFuture<*> {
return with(serviceHub.configuration.flowTimeout) { return with(serviceHub.configuration.flowTimeout) {
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
val jitteredDelaySeconds = maxOf(1L ,timeoutDelaySeconds/2 + (Random().nextDouble() * timeoutDelaySeconds/2).toLong())
timeoutScheduler.schedule({ timeoutScheduler.schedule({
val event = Event.Error(FlowTimeoutException(maxRestartCount)) val event = Event.Error(FlowTimeoutException(maxRestartCount))
flow.fiber.scheduleEvent(event) flow.fiber.scheduleEvent(event)
}, jitteredDelaySeconds, TimeUnit.SECONDS) }, delay, TimeUnit.SECONDS)
}
}
private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long {
return with(serviceHub.configuration.flowTimeout) {
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong())
} }
} }
@ -635,7 +667,8 @@ class MultiThreadedStateMachineManager(
stateMachine = StateMachine(id, secureRandom), stateMachine = StateMachine(id, secureRandom),
serviceHub = serviceHub, serviceHub = serviceHub,
checkpointSerializationContext = checkpointSerializationContext!!, checkpointSerializationContext = checkpointSerializationContext!!,
unfinishedFibers = unfinishedFibers unfinishedFibers = unfinishedFibers,
waitTimeUpdateHook = { flowId, timeout -> resetCustomTimeout(flowId, timeout) }
) )
} }