diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index fe034e92c6..3cd29a37ef 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -190,4 +190,13 @@ path to the node's base directory. .. _config_amqp_bridge: :useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications. - Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes. \ No newline at end of file + Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes. + +:graphiteOptions: Optionally export metrics to a graphite server. When specified, the node will push out all JMX + metrics to the specified Graphite server at regular intervals. + + :server: Server name or ip address of the graphite instance. + :port: Port the graphite instance is listening at. + :prefix: Optional prefix string to identify metrics from this node, will default to a string made up + from Organisation Name and ip address. + :sampleIntervallSeconds: optional wait time between pushing metrics. This will default to 60 seconds. \ No newline at end of file diff --git a/node/build.gradle b/node/build.gradle index 361c2b3333..fb75f5130c 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -100,6 +100,7 @@ dependencies { // Coda Hale's Metrics: for monitoring of key statistics compile "io.dropwizard.metrics:metrics-core:3.1.2" + compile group: 'io.dropwizard.metrics', name: 'metrics-graphite', version: '3.1.2' // JimFS: in memory java.nio filesystem. Used for test and simulation utilities. compile "com.google.jimfs:jimfs:1.1" diff --git a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt index 652f8b7873..901391a114 100644 --- a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt @@ -1,20 +1,37 @@ package net.corda.node.internal +import com.codahale.metrics.MetricFilter +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.graphite.GraphiteReporter +import com.codahale.metrics.graphite.PickledGraphite import com.jcraft.jsch.JSch import com.jcraft.jsch.JSchException +import net.corda.core.identity.CordaX500Name import net.corda.core.internal.Emoji +import net.corda.core.internal.concurrent.thenMatch import net.corda.core.utilities.loggerFor import net.corda.node.VersionInfo +import net.corda.node.services.config.GraphiteOptions import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.RelayConfiguration import org.fusesource.jansi.Ansi import org.fusesource.jansi.AnsiConsole import java.io.IOException +import java.net.InetAddress +import java.util.concurrent.TimeUnit class EnterpriseNode(configuration: NodeConfiguration, versionInfo: VersionInfo) : Node(configuration, versionInfo) { companion object { private val logger by lazy { loggerFor() } + + private fun defaultGraphitePrefix(legalName: CordaX500Name): String { + return legalName.organisation + "_" + InetAddress.getLocalHost().hostAddress.trim().replace(".", "_") + } + + private fun getGraphitePrefix(configuration: NodeConfiguration): String { + return configuration.graphiteOptions!!.prefix ?: defaultGraphitePrefix(configuration.myLegalName) + } } class Startup(args: Array) : NodeStartup(args) { @@ -103,4 +120,28 @@ D""".trimStart() logger.info("Relay setup successfully!") } } + + private fun registerOptionalMetricsReporter(configuration: NodeConfiguration, metrics: MetricRegistry) { + if (configuration.graphiteOptions != null) { + nodeReadyFuture.thenMatch({ + serverThread.execute { + GraphiteReporter.forRegistry(metrics) + .prefixedWith(getGraphitePrefix(configuration)) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.MINUTES) + .filter(MetricFilter.ALL) + .build(PickledGraphite(configuration.graphiteOptions!!.server, configuration.graphiteOptions!!.port)) + .start(configuration.graphiteOptions!!.sampleInvervallSeconds, TimeUnit.SECONDS) + } + }, { th -> + log.error("Unexpected exception", th) + }) + } + } + + override fun start(): StartedNode { + val started = super.start() + registerOptionalMetricsReporter(configuration, started.services.monitoringService.metrics) + return started + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index d8ad742d74..2330f17fc9 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -45,6 +45,8 @@ interface NodeConfiguration : NodeSSLConfiguration { val relay: RelayConfiguration? val useAMQPBridges: Boolean get() = true val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize + val graphiteOptions: GraphiteOptions? get() = null + companion object { // default to at least 8MB and a bit extra for larger heap sizes @@ -59,6 +61,13 @@ interface NodeConfiguration : NodeSSLConfiguration { data class DevModeOptions(val disableCheckpointChecker: Boolean = false) +data class GraphiteOptions( + val server: String, + val port: Int, + val prefix: String? = null, // defaults to org name and ip address when null + val sampleInvervallSeconds: Long = 60 +) + fun NodeConfiguration.shouldCheckCheckpoints(): Boolean { return this.devMode && this.devModeOptions?.disableCheckpointChecker != true } @@ -142,7 +151,8 @@ data class NodeConfigurationImpl( override val sshd: SSHDConfiguration? = null, override val database: DatabaseConfig = DatabaseConfig(exportHibernateJMXStatistics = devMode), override val useAMQPBridges: Boolean = true, - override val transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize + override val transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize, + override val graphiteOptions: GraphiteOptions? = null ) : NodeConfiguration { override val exportJMXto: String get() = "http" diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 6efdc09f7b..4f7514da01 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -2,6 +2,10 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.Suspendable +import com.codahale.metrics.Gauge +import com.codahale.metrics.Histogram +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.SlidingTimeWindowReservoir import net.corda.core.internal.concurrent.thenMatch import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializedBytes @@ -25,13 +29,32 @@ class ActionExecutorImpl( private val checkpointStorage: CheckpointStorage, private val flowMessaging: FlowMessaging, private val stateMachineManager: StateMachineManagerInternal, - private val checkpointSerializationContext: SerializationContext + private val checkpointSerializationContext: SerializationContext, + metrics: MetricRegistry ) : ActionExecutor { private companion object { val log = contextLogger() } + private class LatchedGauge : Gauge { + private var value: Long = 0 + fun update(value: Long) { + this.value = value + } + + override fun getValue(): Long { + val retVal = value + value = 0 + return retVal + } + } + + private val checkpointingMeter = metrics.meter("Flows.Checkpointing Rate") + private val checkpointSizesThisSecond = SlidingTimeWindowReservoir(1, TimeUnit.SECONDS) + private val checkpointBandwidthHist = metrics.register("Flows.CheckpointVolumeBytesPerSecondHist", Histogram(SlidingTimeWindowReservoir(1, TimeUnit.DAYS))) + private val checkpointBandwidth = metrics.register("Flows.CheckpointVolumeBytesPerSecondCurrent", LatchedGauge()) + @Suspendable override fun executeAction(fiber: FlowFiber, action: Action) { log.trace { "Flow ${fiber.id} executing $action" } @@ -72,6 +95,11 @@ class ActionExecutorImpl( private fun executePersistCheckpoint(action: Action.PersistCheckpoint) { val checkpointBytes = serializeCheckpoint(action.checkpoint) checkpointStorage.addCheckpoint(action.id, checkpointBytes) + checkpointingMeter.mark() + checkpointSizesThisSecond.update(checkpointBytes.size.toLong()) + val checkpointVolume = checkpointSizesThisSecond.snapshot.values.sum() + checkpointBandwidthHist.update(checkpointVolume) + checkpointBandwidth.update(checkpointVolume) } @Suspendable diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index dca556ea59..76b3afe2fc 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -6,6 +6,8 @@ import co.paralleluniverse.fibers.FiberScheduler import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.channels.Channel +import com.codahale.metrics.Counter +import com.codahale.metrics.Metric import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.flows.* @@ -41,7 +43,9 @@ class TransientReference(@Transient val value: A) class FlowStateMachineImpl(override val id: StateMachineRunId, override val logic: FlowLogic, - scheduler: FiberScheduler + scheduler: FiberScheduler, + private val totalSuccessMetric: Counter, + private val totalErrorMetric: Counter // Store the Party rather than the full cert path with PartyAndCertificate ) : Fiber(id.toString(), scheduler), FlowStateMachine, FlowFiber { companion object { @@ -150,9 +154,11 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } val finalEvent = when (resultOrError) { is Try.Success -> { + totalSuccessMetric.inc() Event.FlowFinish(resultOrError.value) } is Try.Failure -> { + totalErrorMetric.inc() Event.Error(resultOrError.exception) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt index 0e3e50f2f9..e1d5d7cc92 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt @@ -98,6 +98,12 @@ class StateMachineManagerImpl( override val allStateMachines: List> get() = mutex.locked { flows.values.map { it.fiber.logic } } + + private val totalStartedFlows = metrics.counter("Flows.Started") + private val totalFinishedFlows = metrics.counter("Flows.Finished") + private val totalSuccessFlows = metrics.counter("Flows.Success") + private val totalErrorFlows = metrics.counter("Flows.Error") + /** * An observable that emits triples of the changing flow, the type of change, and a process-specific ID number * which may change across restarts. @@ -194,6 +200,7 @@ class StateMachineManagerImpl( if (flow != null) { logger.debug("Killing flow known to physical node.") decrementLiveFibers() + totalFinishedFlows.inc() unfinishedFibers.countDown() try { flow.fiber.interrupt() @@ -243,6 +250,7 @@ class StateMachineManagerImpl( val flow = flows.remove(flowId) if (flow != null) { decrementLiveFibers() + totalFinishedFlows.inc() unfinishedFibers.countDown() return when (removalReason) { is FlowRemovalReason.OrderlyFinish -> removeFlowOrderly(flow, removalReason, lastState) @@ -431,7 +439,7 @@ class StateMachineManagerImpl( // Before we construct the state machine state by freezing the FlowLogic we need to make sure that lazy properties // have access to the fiber (and thereby the service hub) - val flowStateMachineImpl = FlowStateMachineImpl(flowId, flowLogic, scheduler) + val flowStateMachineImpl = FlowStateMachineImpl(flowId, flowLogic, scheduler, totalSuccessFlows, totalErrorFlows) val resultFuture = openFuture() flowStateMachineImpl.transientValues = TransientReference(createTransientValues(flowId, resultFuture)) flowLogic.stateMachine = flowStateMachineImpl @@ -453,6 +461,7 @@ class StateMachineManagerImpl( mutex.locked { startedFutures[flowId] = startedFuture } + totalStartedFlows.inc() addAndStartFlow(flowId, Flow(flowStateMachineImpl, resultFuture)) return startedFuture.map { flowStateMachineImpl as FlowStateMachine } } @@ -514,7 +523,7 @@ class StateMachineManagerImpl( isRemoved = false, flowLogic = logic ) - val fiber = FlowStateMachineImpl(id, logic, scheduler) + val fiber = FlowStateMachineImpl(id, logic, scheduler, totalSuccessFlows, totalErrorFlows) fiber.transientValues = TransientReference(createTransientValues(id, resultFuture)) fiber.transientState = TransientReference(state) fiber.logic.stateMachine = fiber @@ -586,7 +595,8 @@ class StateMachineManagerImpl( checkpointStorage, flowMessaging, this, - checkpointSerializationContext + checkpointSerializationContext, + metrics ) }