Measure Action execution times, utilised flow threads

This commit is contained in:
Andras Slemmer 2018-02-02 19:26:15 +00:00
parent 6593560655
commit e9d2963912
2 changed files with 28 additions and 1 deletions

View File

@ -52,6 +52,7 @@ import java.security.SecureRandom
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList import kotlin.collections.ArrayList
import kotlin.streams.toList import kotlin.streams.toList
@ -132,7 +133,8 @@ class MultiThreadedStateMachineManager(
this.actionExecutor = makeActionExecutor(checkpointSerializationContext) this.actionExecutor = makeActionExecutor(checkpointSerializationContext)
fiberDeserializationChecker?.start(checkpointSerializationContext) fiberDeserializationChecker?.start(checkpointSerializationContext)
val fibers = restoreFlowsFromCheckpoints() val fibers = restoreFlowsFromCheckpoints()
metrics.register("Flows.InFlight", Gauge<Int> { concurrentBox.content.flows.size }) metrics.register("Flows.ActiveThreads", Gauge { (executor as? ThreadPoolExecutor)?.activeCount })
metrics.register("Flows.InFlight", Gauge { concurrentBox.content.flows.size })
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable) (fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
} }
@ -625,6 +627,7 @@ class MultiThreadedStateMachineManager(
interceptors.add { HospitalisingInterceptor(PropagatingFlowHospital, it) } interceptors.add { HospitalisingInterceptor(PropagatingFlowHospital, it) }
if (serviceHub.configuration.devMode) { if (serviceHub.configuration.devMode) {
interceptors.add { DumpHistoryOnErrorInterceptor(it) } interceptors.add { DumpHistoryOnErrorInterceptor(it) }
interceptors.add { MetricInterceptor(metrics, it) }
} }
if (serviceHub.configuration.shouldCheckCheckpoints()) { if (serviceHub.configuration.shouldCheckCheckpoints()) {
interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) } interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) }

View File

@ -0,0 +1,24 @@
package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.MetricRegistry
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
class MetricInterceptor(val metrics: MetricRegistry, val delegate: TransitionExecutor): TransitionExecutor {
@Suspendable
override fun executeTransition(fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor): Pair<FlowContinuation, StateMachineState> {
val metricActionInterceptor = MetricActionInterceptor(metrics, actionExecutor)
return delegate.executeTransition(fiber, previousState, event, transition, metricActionInterceptor)
}
}
class MetricActionInterceptor(val metrics: MetricRegistry, val delegate: ActionExecutor): ActionExecutor {
@Suspendable
override fun executeAction(fiber: FlowFiber, action: Action) {
val context = metrics.timer("Flows.Actions.${action.javaClass.simpleName}").time()
delegate.executeAction(fiber, action)
context.stop()
}
}