mirror of
synced 2025-03-28 22:59:09 +00:00
ENT-1412 flow state machine metrics (#390)
* re-add metrics for flows started, flows finished and checkpoint rate Add metric for checkpoint volume rate. * Put error and success counters in FlowStateMachine * Optional graphite metrics writer * Improved checkpoint volume metrics. * Code review: Clean up/layout. * Code review: spelling
This commit is contained in:
@ -190,4 +190,13 @@ path to the node's base directory.
.. _config_amqp_bridge:
:useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications.
Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes.
Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes.
:graphiteOptions: Optionally export metrics to a graphite server. When specified, the node will push out all JMX
metrics to the specified Graphite server at regular intervals.
:server: Server name or ip address of the graphite instance.
:port: Port the graphite instance is listening at.
:prefix: Optional prefix string to identify metrics from this node, will default to a string made up
from Organisation Name and ip address.
:sampleIntervallSeconds: optional wait time between pushing metrics. This will default to 60 seconds.
@ -100,6 +100,7 @@ dependencies {
// Coda Hale's Metrics: for monitoring of key statistics
compile "io.dropwizard.metrics:metrics-core:3.1.2"
compile group: 'io.dropwizard.metrics', name: 'metrics-graphite', version: '3.1.2'
// JimFS: in memory java.nio filesystem. Used for test and simulation utilities.
compile "com.google.jimfs:jimfs:1.1"
@ -1,20 +1,37 @@
package net.corda.node.internal
import com.codahale.metrics.MetricFilter
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.graphite.GraphiteReporter
import com.codahale.metrics.graphite.PickledGraphite
import com.jcraft.jsch.JSch
import com.jcraft.jsch.JSchException
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.Emoji
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.utilities.loggerFor
import net.corda.node.VersionInfo
import net.corda.node.services.config.GraphiteOptions
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.RelayConfiguration
import org.fusesource.jansi.Ansi
import org.fusesource.jansi.AnsiConsole
import java.io.IOException
import java.net.InetAddress
import java.util.concurrent.TimeUnit
class EnterpriseNode(configuration: NodeConfiguration,
versionInfo: VersionInfo) : Node(configuration, versionInfo) {
companion object {
private val logger by lazy { loggerFor<EnterpriseNode>() }
private fun defaultGraphitePrefix(legalName: CordaX500Name): String {
return legalName.organisation + "_" + InetAddress.getLocalHost().hostAddress.trim().replace(".", "_")
private fun getGraphitePrefix(configuration: NodeConfiguration): String {
return configuration.graphiteOptions!!.prefix ?: defaultGraphitePrefix(configuration.myLegalName)
class Startup(args: Array<String>) : NodeStartup(args) {
@ -103,4 +120,28 @@ D""".trimStart()
logger.info("Relay setup successfully!")
private fun registerOptionalMetricsReporter(configuration: NodeConfiguration, metrics: MetricRegistry) {
if (configuration.graphiteOptions != null) {
serverThread.execute {
.build(PickledGraphite(configuration.graphiteOptions!!.server, configuration.graphiteOptions!!.port))
.start(configuration.graphiteOptions!!.sampleInvervallSeconds, TimeUnit.SECONDS)
}, { th ->
log.error("Unexpected exception", th)
override fun start(): StartedNode<Node> {
val started = super.start()
registerOptionalMetricsReporter(configuration, started.services.monitoringService.metrics)
return started
@ -45,6 +45,8 @@ interface NodeConfiguration : NodeSSLConfiguration {
val relay: RelayConfiguration?
val useAMQPBridges: Boolean get() = true
val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize
val graphiteOptions: GraphiteOptions? get() = null
companion object {
// default to at least 8MB and a bit extra for larger heap sizes
@ -59,6 +61,13 @@ interface NodeConfiguration : NodeSSLConfiguration {
data class DevModeOptions(val disableCheckpointChecker: Boolean = false)
data class GraphiteOptions(
val server: String,
val port: Int,
val prefix: String? = null, // defaults to org name and ip address when null
val sampleInvervallSeconds: Long = 60
fun NodeConfiguration.shouldCheckCheckpoints(): Boolean {
return this.devMode && this.devModeOptions?.disableCheckpointChecker != true
@ -142,7 +151,8 @@ data class NodeConfigurationImpl(
override val sshd: SSHDConfiguration? = null,
override val database: DatabaseConfig = DatabaseConfig(exportHibernateJMXStatistics = devMode),
override val useAMQPBridges: Boolean = true,
override val transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize
override val transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize,
override val graphiteOptions: GraphiteOptions? = null
) : NodeConfiguration {
override val exportJMXto: String get() = "http"
@ -2,6 +2,10 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.Gauge
import com.codahale.metrics.Histogram
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.SlidingTimeWindowReservoir
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializedBytes
@ -25,13 +29,32 @@ class ActionExecutorImpl(
private val checkpointStorage: CheckpointStorage,
private val flowMessaging: FlowMessaging,
private val stateMachineManager: StateMachineManagerInternal,
private val checkpointSerializationContext: SerializationContext
private val checkpointSerializationContext: SerializationContext,
metrics: MetricRegistry
) : ActionExecutor {
private companion object {
val log = contextLogger()
private class LatchedGauge : Gauge<Long> {
private var value: Long = 0
fun update(value: Long) {
this.value = value
override fun getValue(): Long {
val retVal = value
value = 0
return retVal
private val checkpointingMeter = metrics.meter("Flows.Checkpointing Rate")
private val checkpointSizesThisSecond = SlidingTimeWindowReservoir(1, TimeUnit.SECONDS)
private val checkpointBandwidthHist = metrics.register("Flows.CheckpointVolumeBytesPerSecondHist", Histogram(SlidingTimeWindowReservoir(1, TimeUnit.DAYS)))
private val checkpointBandwidth = metrics.register("Flows.CheckpointVolumeBytesPerSecondCurrent", LatchedGauge())
override fun executeAction(fiber: FlowFiber, action: Action) {
log.trace { "Flow ${fiber.id} executing $action" }
@ -72,6 +95,11 @@ class ActionExecutorImpl(
private fun executePersistCheckpoint(action: Action.PersistCheckpoint) {
val checkpointBytes = serializeCheckpoint(action.checkpoint)
checkpointStorage.addCheckpoint(action.id, checkpointBytes)
val checkpointVolume = checkpointSizesThisSecond.snapshot.values.sum()
@ -6,6 +6,8 @@ import co.paralleluniverse.fibers.FiberScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import co.paralleluniverse.strands.channels.Channel
import com.codahale.metrics.Counter
import com.codahale.metrics.Metric
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.*
@ -41,7 +43,9 @@ class TransientReference<out A>(@Transient val value: A)
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override val logic: FlowLogic<R>,
scheduler: FiberScheduler
scheduler: FiberScheduler,
private val totalSuccessMetric: Counter,
private val totalErrorMetric: Counter
// Store the Party rather than the full cert path with PartyAndCertificate
) : Fiber<Unit>(id.toString(), scheduler), FlowStateMachine<R>, FlowFiber {
companion object {
@ -150,9 +154,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val finalEvent = when (resultOrError) {
is Try.Success -> {
is Try.Failure -> {
@ -98,6 +98,12 @@ class StateMachineManagerImpl(
override val allStateMachines: List<FlowLogic<*>>
get() = mutex.locked { flows.values.map { it.fiber.logic } }
private val totalStartedFlows = metrics.counter("Flows.Started")
private val totalFinishedFlows = metrics.counter("Flows.Finished")
private val totalSuccessFlows = metrics.counter("Flows.Success")
private val totalErrorFlows = metrics.counter("Flows.Error")
* An observable that emits triples of the changing flow, the type of change, and a process-specific ID number
* which may change across restarts.
@ -194,6 +200,7 @@ class StateMachineManagerImpl(
if (flow != null) {
logger.debug("Killing flow known to physical node.")
try {
@ -243,6 +250,7 @@ class StateMachineManagerImpl(
val flow = flows.remove(flowId)
if (flow != null) {
return when (removalReason) {
is FlowRemovalReason.OrderlyFinish -> removeFlowOrderly(flow, removalReason, lastState)
@ -431,7 +439,7 @@ class StateMachineManagerImpl(
// Before we construct the state machine state by freezing the FlowLogic we need to make sure that lazy properties
// have access to the fiber (and thereby the service hub)
val flowStateMachineImpl = FlowStateMachineImpl(flowId, flowLogic, scheduler)
val flowStateMachineImpl = FlowStateMachineImpl(flowId, flowLogic, scheduler, totalSuccessFlows, totalErrorFlows)
val resultFuture = openFuture<Any?>()
flowStateMachineImpl.transientValues = TransientReference(createTransientValues(flowId, resultFuture))
flowLogic.stateMachine = flowStateMachineImpl
@ -453,6 +461,7 @@ class StateMachineManagerImpl(
mutex.locked {
startedFutures[flowId] = startedFuture
addAndStartFlow(flowId, Flow(flowStateMachineImpl, resultFuture))
return startedFuture.map { flowStateMachineImpl as FlowStateMachine<A> }
@ -514,7 +523,7 @@ class StateMachineManagerImpl(
isRemoved = false,
flowLogic = logic
val fiber = FlowStateMachineImpl(id, logic, scheduler)
val fiber = FlowStateMachineImpl(id, logic, scheduler, totalSuccessFlows, totalErrorFlows)
fiber.transientValues = TransientReference(createTransientValues(id, resultFuture))
fiber.transientState = TransientReference(state)
fiber.logic.stateMachine = fiber
@ -586,7 +595,8 @@ class StateMachineManagerImpl(
Reference in New Issue
Block a user