mirror of
https://github.com/corda/corda.git
synced 2025-01-19 03:06:36 +00:00
Export stats to JMX from the state machine manager.
This commit is contained in:
parent
139bf1e450
commit
730b7949ea
@ -11,6 +11,7 @@ package core.messaging
|
||||
import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.google.common.base.Throwables
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
@ -65,6 +66,13 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
// property.
|
||||
private val _stateMachines = Collections.synchronizedList(ArrayList<ProtocolLogic<*>>())
|
||||
|
||||
// Monitoring support.
|
||||
private val metrics = serviceHub.monitoringService.metrics
|
||||
init { metrics.register("Protocols.InFlight", Gauge<kotlin.Int> { _stateMachines.size }) }
|
||||
private val checkpointingMeter = metrics.meter("Protocols.Checkpointing Rate")
|
||||
private val totalStartedProtocols = metrics.counter("Protocols.Started")
|
||||
private val totalFinishedProtocols = metrics.counter("Protocols.Finished")
|
||||
|
||||
// This is a workaround for something Gradle does to us during unit tests. It replaces stderr with its own
|
||||
// class that inserts itself into a ThreadLocal. That then gets caught in fiber serialisation, which we don't
|
||||
// want because it can't get recreated properly. It turns out there's no good workaround for this! All the obvious
|
||||
@ -163,6 +171,8 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
iterateStateMachine(fiber, serviceHub.networkService, logger, null, null) {
|
||||
it.start()
|
||||
}
|
||||
_stateMachines.add(logic)
|
||||
totalStartedProtocols.inc()
|
||||
return fiber.resultFuture
|
||||
}
|
||||
|
||||
@ -173,6 +183,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
checkpointsMap.remove(prevCheckpointKey)
|
||||
val key = SecureHash.sha256(new)
|
||||
checkpointsMap[key] = new
|
||||
checkpointingMeter.mark()
|
||||
return key
|
||||
}
|
||||
|
||||
@ -212,6 +223,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||
_stateMachines.remove(psm.logic)
|
||||
checkpointsMap.remove(prevCheckpointKey)
|
||||
totalFinishedProtocols.inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package core.node
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import contracts.*
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
@ -60,6 +61,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
override val walletService: WalletService get() = wallet
|
||||
override val keyManagementService: KeyManagementService get() = keyManagement
|
||||
override val identityService: IdentityService get() = identity
|
||||
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
|
||||
}
|
||||
|
||||
val legallyIdentifableAddress: LegallyIdentifiableNode get() = LegallyIdentifiableNode(net.myAddress, storage.myLegalIdentity)
|
||||
|
@ -9,6 +9,7 @@
|
||||
package core.node
|
||||
|
||||
import api.Config
|
||||
import com.codahale.metrics.JmxReporter
|
||||
import com.google.common.net.HostAndPort
|
||||
import core.messaging.LegallyIdentifiableNode
|
||||
import core.messaging.MessagingService
|
||||
@ -29,6 +30,7 @@ import java.nio.channels.FileLock
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import javax.management.ObjectName
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
class ConfigurationException(message: String) : Exception(message)
|
||||
@ -112,6 +114,21 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration
|
||||
webServer = initWebServer()
|
||||
// Start up the MQ service.
|
||||
(net as ArtemisMessagingService).start()
|
||||
// Begin exporting our own metrics via JMX.
|
||||
JmxReporter.
|
||||
forRegistry(services.monitoringService.metrics).
|
||||
inDomain("com.r3cev.corda").
|
||||
createsObjectNamesWith { type, domain, name ->
|
||||
// Make the JMX hierarchy a bit better organised.
|
||||
val category = name.substringBefore('.')
|
||||
val subName = name.substringAfter('.', "")
|
||||
if (subName == "")
|
||||
ObjectName("$domain:name=$category")
|
||||
else
|
||||
ObjectName("$domain:type=$category,name=$subName")
|
||||
}.
|
||||
build().
|
||||
start()
|
||||
return this
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
package core.node.services
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
import core.messaging.MessagingService
|
||||
@ -143,6 +144,12 @@ interface AttachmentStorage {
|
||||
fun importAttachment(jar: InputStream): SecureHash
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides access to various metrics and ways to notify monitoring services of things, for sysadmin purposes.
|
||||
* This is not an interface because it is too lightweight to bother mocking out.
|
||||
*/
|
||||
class MonitoringService(val metrics: MetricRegistry)
|
||||
|
||||
/**
|
||||
* A service hub simply vends references to the other services a node has. Some of those services may be missing or
|
||||
* mocked out. This class is useful to pass to chunks of pluggable code that might have need of many different kinds of
|
||||
@ -155,6 +162,7 @@ interface ServiceHub {
|
||||
val storageService: StorageService
|
||||
val networkService: MessagingService
|
||||
val networkMapService: NetworkMapService
|
||||
val monitoringService: MonitoringService
|
||||
|
||||
/**
|
||||
* Given a [LedgerTransaction], looks up all its dependencies in the local database, uses the identity service to map
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
package core
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import core.crypto.*
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.MockNetworkMapService
|
||||
@ -169,6 +170,8 @@ class MockServices(
|
||||
override val storageService: StorageService
|
||||
get() = storage ?: throw UnsupportedOperationException()
|
||||
|
||||
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
|
||||
|
||||
init {
|
||||
if (net != null && storage != null) {
|
||||
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
|
||||
|
Loading…
Reference in New Issue
Block a user