diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index e79ea6b7f9..3152b8c6a6 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -29,6 +29,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import java.sql.SQLException import java.util.* +import java.util.concurrent.TimeUnit class FlowStateMachineImpl(override val id: StateMachineRunId, val logic: FlowLogic, @@ -83,20 +84,24 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, override fun run() { createTransaction() logger.debug { "Calling flow: $logic" } + val startTime = System.nanoTime() val result = try { logic.call() } catch (e: FlowException) { + recordDuration(startTime, success = false) // Check if the FlowException was propagated by looking at where the stack trace originates (see suspendAndExpectReceive). val propagated = e.stackTrace[0].className == javaClass.name processException(e, propagated) logger.debug(if (propagated) "Flow ended due to receiving exception" else "Flow finished with exception", e) return } catch (t: Throwable) { + recordDuration(startTime, success = false) logger.warn("Terminated by unexpected exception", t) processException(t, false) return } + recordDuration(startTime) // Only sessions which have done a single send and nothing else will block here openSessions.values .filter { it.state is FlowSessionState.Initiating } @@ -270,7 +275,6 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable - private fun waitForMessage(receiveRequest: ReceiveRequest): ReceivedSessionMessage { return receiveRequest.suspendAndExpectReceive().confirmReceiveType(receiveRequest) } @@ -374,4 +378,17 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, logger.error("Error during resume", t) } } + + /** + * Records the duration of this flow – from call() to completion or failure. + * Note that the duration will include the time the flow spent being parked, and not just the total + * execution time. + */ + private fun recordDuration(startTime: Long, success: Boolean = true) { + val timerName = "FlowDuration.${if (success) "Success" else "Failure"}.${logic.javaClass.name}" + val timer = serviceHub.monitoringService.metrics.timer(timerName) + // Start time gets serialized along with the fiber when it suspends + val duration = System.nanoTime() - startTime + timer.update(duration, TimeUnit.NANOSECONDS) + } }