Record a metric for duration and rate of execution for every flow (#372)

* Record a metric for the duration and rate of execution for every flow. This is useful for task intense scenarios to detect performance degradation.

* Measure both successful and failed flow durations
This commit is contained in:
Andrius Dagys 2017-03-22 16:52:41 +00:00 committed by GitHub
parent 98266da41c
commit 71fe3e3d3b

View File

@ -29,6 +29,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.sql.SQLException import java.sql.SQLException
import java.util.* import java.util.*
import java.util.concurrent.TimeUnit
class FlowStateMachineImpl<R>(override val id: StateMachineRunId, class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val logic: FlowLogic<R>, val logic: FlowLogic<R>,
@ -83,20 +84,24 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override fun run() { override fun run() {
createTransaction() createTransaction()
logger.debug { "Calling flow: $logic" } logger.debug { "Calling flow: $logic" }
val startTime = System.nanoTime()
val result = try { val result = try {
logic.call() logic.call()
} catch (e: FlowException) { } catch (e: FlowException) {
recordDuration(startTime, success = false)
// Check if the FlowException was propagated by looking at where the stack trace originates (see suspendAndExpectReceive). // Check if the FlowException was propagated by looking at where the stack trace originates (see suspendAndExpectReceive).
val propagated = e.stackTrace[0].className == javaClass.name val propagated = e.stackTrace[0].className == javaClass.name
processException(e, propagated) processException(e, propagated)
logger.debug(if (propagated) "Flow ended due to receiving exception" else "Flow finished with exception", e) logger.debug(if (propagated) "Flow ended due to receiving exception" else "Flow finished with exception", e)
return return
} catch (t: Throwable) { } catch (t: Throwable) {
recordDuration(startTime, success = false)
logger.warn("Terminated by unexpected exception", t) logger.warn("Terminated by unexpected exception", t)
processException(t, false) processException(t, false)
return return
} }
recordDuration(startTime)
// Only sessions which have done a single send and nothing else will block here // Only sessions which have done a single send and nothing else will block here
openSessions.values openSessions.values
.filter { it.state is FlowSessionState.Initiating } .filter { it.state is FlowSessionState.Initiating }
@ -270,7 +275,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
@Suspendable @Suspendable
private fun <M : ExistingSessionMessage> waitForMessage(receiveRequest: ReceiveRequest<M>): ReceivedSessionMessage<M> { private fun <M : ExistingSessionMessage> waitForMessage(receiveRequest: ReceiveRequest<M>): ReceivedSessionMessage<M> {
return receiveRequest.suspendAndExpectReceive().confirmReceiveType(receiveRequest) return receiveRequest.suspendAndExpectReceive().confirmReceiveType(receiveRequest)
} }
@ -374,4 +378,17 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
logger.error("Error during resume", t) logger.error("Error during resume", t)
} }
} }
/**
* Records the duration of this flow from call() to completion or failure.
* Note that the duration will include the time the flow spent being parked, and not just the total
* execution time.
*/
private fun recordDuration(startTime: Long, success: Boolean = true) {
val timerName = "FlowDuration.${if (success) "Success" else "Failure"}.${logic.javaClass.name}"
val timer = serviceHub.monitoringService.metrics.timer(timerName)
// Start time gets serialized along with the fiber when it suspends
val duration = System.nanoTime() - startTime
timer.update(duration, TimeUnit.NANOSECONDS)
}
} }