Merge pull request #447 from corda/aslemmer-more-flow-metrics

Measure Action execution times, utilised flow threads
This commit is contained in:
Andras Slemmer 2018-04-04 09:47:53 +01:00 committed by GitHub
commit ff3d497d15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 1 deletions

View File

@ -52,6 +52,7 @@ import java.security.SecureRandom
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
import kotlin.streams.toList
@ -132,7 +133,8 @@ class MultiThreadedStateMachineManager(
this.actionExecutor = makeActionExecutor(checkpointSerializationContext)
fiberDeserializationChecker?.start(checkpointSerializationContext)
val fibers = restoreFlowsFromCheckpoints()
metrics.register("Flows.InFlight", Gauge<Int> { concurrentBox.content.flows.size })
metrics.register("Flows.ActiveThreads", Gauge { (executor as? ThreadPoolExecutor)?.activeCount })
metrics.register("Flows.InFlight", Gauge { concurrentBox.content.flows.size })
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
@ -625,6 +627,7 @@ class MultiThreadedStateMachineManager(
interceptors.add { HospitalisingInterceptor(PropagatingFlowHospital, it) }
if (serviceHub.configuration.devMode) {
interceptors.add { DumpHistoryOnErrorInterceptor(it) }
interceptors.add { MetricInterceptor(metrics, it) }
}
if (serviceHub.configuration.shouldCheckCheckpoints()) {
interceptors.add { FiberDeserializationCheckingInterceptor(fiberDeserializationChecker!!, it) }

View File

@ -0,0 +1,24 @@
package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.MetricRegistry
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
class MetricInterceptor(val metrics: MetricRegistry, val delegate: TransitionExecutor): TransitionExecutor {
@Suspendable
override fun executeTransition(fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor): Pair<FlowContinuation, StateMachineState> {
val metricActionInterceptor = MetricActionInterceptor(metrics, actionExecutor)
return delegate.executeTransition(fiber, previousState, event, transition, metricActionInterceptor)
}
}
class MetricActionInterceptor(val metrics: MetricRegistry, val delegate: ActionExecutor): ActionExecutor {
@Suspendable
override fun executeAction(fiber: FlowFiber, action: Action) {
val context = metrics.timer("Flows.Actions.${action.javaClass.simpleName}").time()
delegate.executeAction(fiber, action)
context.stop()
}
}