From e9d2963912cc586541a584fb9fb98b03d457507e Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Fri, 2 Feb 2018 19:26:15 +0000 Subject: [PATCH] Measure Action execution times, utilised flow threads --- .../MultiThreadedStateMachineManager.kt | 5 +++- .../interceptors/MetricInterceptor.kt | 24 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index 7cc94b361e..9669251b89 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -52,6 +52,7 @@ import java.security.SecureRandom import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService +import java.util.concurrent.ThreadPoolExecutor import javax.annotation.concurrent.ThreadSafe import kotlin.collections.ArrayList import kotlin.streams.toList @@ -132,7 +133,8 @@ class MultiThreadedStateMachineManager( this.actionExecutor = makeActionExecutor(checkpointSerializationContext) fiberDeserializationChecker?.start(checkpointSerializationContext) val fibers = restoreFlowsFromCheckpoints() - metrics.register("Flows.InFlight", Gauge { concurrentBox.content.flows.size }) + metrics.register("Flows.ActiveThreads", Gauge { (executor as? ThreadPoolExecutor)?.activeCount }) + metrics.register("Flows.InFlight", Gauge { concurrentBox.content.flows.size }) Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> (fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable) } @@ -625,6 +627,7 @@ class MultiThreadedStateMachineManager( interceptors.add { HospitalisingInterceptor(PropagatingFlowHospital, it) } if (serviceHub.configuration.devMode) { interceptors.add { DumpHistoryOnErrorInterceptor(it) } + interceptors.add { MetricInterceptor(metrics, it) } } if (serviceHub.configuration.shouldCheckCheckpoints()) { interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt new file mode 100644 index 0000000000..f5e298fed8 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt @@ -0,0 +1,24 @@ +package net.corda.node.services.statemachine.interceptors + +import co.paralleluniverse.fibers.Suspendable +import com.codahale.metrics.MetricRegistry +import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.transitions.FlowContinuation +import net.corda.node.services.statemachine.transitions.TransitionResult + +class MetricInterceptor(val metrics: MetricRegistry, val delegate: TransitionExecutor): TransitionExecutor { + @Suspendable + override fun executeTransition(fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor): Pair { + val metricActionInterceptor = MetricActionInterceptor(metrics, actionExecutor) + return delegate.executeTransition(fiber, previousState, event, transition, metricActionInterceptor) + } +} + +class MetricActionInterceptor(val metrics: MetricRegistry, val delegate: ActionExecutor): ActionExecutor { + @Suspendable + override fun executeAction(fiber: FlowFiber, action: Action) { + val context = metrics.timer("Flows.Actions.${action.javaClass.simpleName}").time() + delegate.executeAction(fiber, action) + context.stop() + } +} \ No newline at end of file