From aee0ebdc7b200fac36f7ead8d7276aea6afde57e Mon Sep 17 00:00:00 2001 From: "rick.parker" Date: Mon, 26 Nov 2018 08:48:00 +0000 Subject: [PATCH] Apply notary back pressure code to MultiThreadedStateMachineManager to fix compilation --- .../MultiThreadedStateMachineManager.kt | 45 ++++++++++++++++--- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index d941b83c23..5cd89d0a35 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -572,22 +572,54 @@ class MultiThreadedStateMachineManager( if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true) scheduledTimeout.retryCount } else 0 - val scheduledFuture = scheduleTimeoutException(flow, retryCount) + val scheduledFuture = scheduleTimeoutException(flow, calculateDefaultTimeoutSeconds(retryCount)) timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount + 1) } else { logger.warn("Unable to schedule timeout for flow $flowId – flow not found.") } } + private fun resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) { + if (timeoutSeconds < serviceHub.configuration.flowTimeout.timeout.seconds) { + logger.debug { "Ignoring request to set time-out on timed flow $flowId to $timeoutSeconds seconds which is shorter than default of ${serviceHub.configuration.flowTimeout.timeout.seconds} seconds." } + return + } + logger.debug { "Processing request to set time-out on timed flow $flowId to $timeoutSeconds seconds." } + concurrentBox.concurrent { + resetCustomTimeout(flowId, timeoutSeconds) + } + } + + private fun InnerState.resetCustomTimeout(flowId: StateMachineRunId, timeoutSeconds: Long) { + val flow = flows[flowId] + if (flow != null) { + val scheduledTimeout = timedFlows[flowId] + val retryCount = if (scheduledTimeout != null) { + val timeoutFuture = scheduledTimeout.scheduledFuture + if (!timeoutFuture.isDone) scheduledTimeout.scheduledFuture.cancel(true) + scheduledTimeout.retryCount + } else 0 + val scheduledFuture = scheduleTimeoutException(flow, timeoutSeconds) + timedFlows[flowId] = ScheduledTimeout(scheduledFuture, retryCount) + } else { + logger.warn("Unable to schedule timeout for flow $flowId – flow not found.") + } + } + /** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */ - private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> { + private fun scheduleTimeoutException(flow: Flow, delay: Long): ScheduledFuture<*> { return with(serviceHub.configuration.flowTimeout) { - val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() - val jitteredDelaySeconds = maxOf(1L ,timeoutDelaySeconds/2 + (Random().nextDouble() * timeoutDelaySeconds/2).toLong()) timeoutScheduler.schedule({ val event = Event.Error(FlowTimeoutException(maxRestartCount)) flow.fiber.scheduleEvent(event) - }, jitteredDelaySeconds, TimeUnit.SECONDS) + }, delay, TimeUnit.SECONDS) + } + } + + private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long { + return with(serviceHub.configuration.flowTimeout) { + val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() + maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong()) } } @@ -635,7 +667,8 @@ class MultiThreadedStateMachineManager( stateMachine = StateMachine(id, secureRandom), serviceHub = serviceHub, checkpointSerializationContext = checkpointSerializationContext!!, - unfinishedFibers = unfinishedFibers + unfinishedFibers = unfinishedFibers, + waitTimeUpdateHook = { flowId, timeout -> resetCustomTimeout(flowId, timeout) } ) }