mirror of
https://github.com/corda/corda.git
synced 2025-01-19 03:06:36 +00:00
Merged in jmx-mbeans (pull request #32)
Export various ledger stats via JMX
This commit is contained in:
commit
77fd6071a1
@ -11,6 +11,7 @@ package core.messaging
|
||||
import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.google.common.base.Throwables
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
@ -65,6 +66,13 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
// property.
|
||||
private val _stateMachines = Collections.synchronizedList(ArrayList<ProtocolLogic<*>>())
|
||||
|
||||
// Monitoring support.
|
||||
private val metrics = serviceHub.monitoringService.metrics
|
||||
init { metrics.register("Protocols.InFlight", Gauge<kotlin.Int> { _stateMachines.size }) }
|
||||
private val checkpointingMeter = metrics.meter("Protocols.Checkpointing Rate")
|
||||
private val totalStartedProtocols = metrics.counter("Protocols.Started")
|
||||
private val totalFinishedProtocols = metrics.counter("Protocols.Finished")
|
||||
|
||||
// This is a workaround for something Gradle does to us during unit tests. It replaces stderr with its own
|
||||
// class that inserts itself into a ThreadLocal. That then gets caught in fiber serialisation, which we don't
|
||||
// want because it can't get recreated properly. It turns out there's no good workaround for this! All the obvious
|
||||
@ -163,6 +171,8 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
iterateStateMachine(fiber, serviceHub.networkService, logger, null, null) {
|
||||
it.start()
|
||||
}
|
||||
_stateMachines.add(logic)
|
||||
totalStartedProtocols.inc()
|
||||
return fiber.resultFuture
|
||||
}
|
||||
|
||||
@ -173,6 +183,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
checkpointsMap.remove(prevCheckpointKey)
|
||||
val key = SecureHash.sha256(new)
|
||||
checkpointsMap[key] = new
|
||||
checkpointingMeter.mark()
|
||||
return key
|
||||
}
|
||||
|
||||
@ -212,6 +223,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
||||
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||
_stateMachines.remove(psm.logic)
|
||||
checkpointsMap.remove(prevCheckpointKey)
|
||||
totalFinishedProtocols.inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package core.node
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import contracts.*
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
@ -60,6 +61,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
override val walletService: WalletService get() = wallet
|
||||
override val keyManagementService: KeyManagementService get() = keyManagement
|
||||
override val identityService: IdentityService get() = identity
|
||||
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
|
||||
}
|
||||
|
||||
val legallyIdentifableAddress: LegallyIdentifiableNode get() = LegallyIdentifiableNode(net.myAddress, storage.myLegalIdentity)
|
||||
@ -209,6 +211,6 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
Files.createDirectory(attachmentsDir)
|
||||
} catch (e: FileAlreadyExistsException) {
|
||||
}
|
||||
return NodeAttachmentService(attachmentsDir)
|
||||
return NodeAttachmentService(attachmentsDir, services.monitoringService.metrics)
|
||||
}
|
||||
}
|
@ -9,6 +9,7 @@
|
||||
package core.node
|
||||
|
||||
import api.Config
|
||||
import com.codahale.metrics.JmxReporter
|
||||
import com.google.common.net.HostAndPort
|
||||
import core.messaging.LegallyIdentifiableNode
|
||||
import core.messaging.MessagingService
|
||||
@ -29,6 +30,7 @@ import java.nio.channels.FileLock
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import javax.management.ObjectName
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
class ConfigurationException(message: String) : Exception(message)
|
||||
@ -112,6 +114,21 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration
|
||||
webServer = initWebServer()
|
||||
// Start up the MQ service.
|
||||
(net as ArtemisMessagingService).start()
|
||||
// Begin exporting our own metrics via JMX.
|
||||
JmxReporter.
|
||||
forRegistry(services.monitoringService.metrics).
|
||||
inDomain("com.r3cev.corda").
|
||||
createsObjectNamesWith { type, domain, name ->
|
||||
// Make the JMX hierarchy a bit better organised.
|
||||
val category = name.substringBefore('.')
|
||||
val subName = name.substringAfter('.', "")
|
||||
if (subName == "")
|
||||
ObjectName("$domain:name=$category")
|
||||
else
|
||||
ObjectName("$domain:type=$category,name=$subName")
|
||||
}.
|
||||
build().
|
||||
start()
|
||||
return this
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
package core.node.services
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.hash.Hashing
|
||||
import com.google.common.hash.HashingInputStream
|
||||
@ -31,12 +32,21 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* Stores attachments in the specified local directory, which must exist. Doesn't allow new attachments to be uploaded.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeAttachmentService(val storePath: Path) : AttachmentStorage, AcceptsFileUpload {
|
||||
class NodeAttachmentService(val storePath: Path, val metrics: MetricRegistry) : AttachmentStorage, AcceptsFileUpload {
|
||||
private val log = loggerFor<NodeAttachmentService>()
|
||||
|
||||
@VisibleForTesting
|
||||
var checkAttachmentsOnLoad = true
|
||||
|
||||
private val attachmentCount = metrics.counter("Attachments")
|
||||
|
||||
init {
|
||||
attachmentCount.inc(countAttachments())
|
||||
}
|
||||
|
||||
// Just count all non-directories in the attachment store, and assume the admin hasn't dumped any junk there.
|
||||
private fun countAttachments() = Files.list(storePath).filter { Files.isRegularFile(it) }.count()
|
||||
|
||||
/**
|
||||
* If true, newly inserted attachments will be unzipped to a subdirectory of the [storePath]. This is intended for
|
||||
* human browsing convenience: the attachment itself will still be the file (that is, edits to the extracted directory
|
||||
@ -77,22 +87,25 @@ class NodeAttachmentService(val storePath: Path) : AttachmentStorage, AcceptsFil
|
||||
}
|
||||
}
|
||||
|
||||
override fun openAttachment(id: SecureHash): Attachment? {
|
||||
val path = storePath.resolve(id.toString())
|
||||
if (!Files.exists(path)) return null
|
||||
return object : Attachment {
|
||||
// Deliberately not an inner class to avoid holding a reference to the attachments service.
|
||||
private class AttachmentImpl(override val id: SecureHash,
|
||||
private val path: Path,
|
||||
private val checkOnLoad: Boolean) : Attachment {
|
||||
override fun open(): InputStream {
|
||||
var stream = Files.newInputStream(path)
|
||||
// This is just an optional safety check. If it slows things down too much it can be disabled.
|
||||
if (id is SecureHash.SHA256 && checkAttachmentsOnLoad)
|
||||
if (id is SecureHash.SHA256 && checkOnLoad)
|
||||
stream = HashCheckingStream(id, path, stream)
|
||||
log.debug("Opening attachment $id")
|
||||
return stream
|
||||
}
|
||||
override val id: SecureHash = id
|
||||
override fun equals(other: Any?) = other is Attachment && other.id == id
|
||||
override fun hashCode(): Int = id.hashCode()
|
||||
}
|
||||
|
||||
override fun openAttachment(id: SecureHash): Attachment? {
|
||||
val path = storePath.resolve(id.toString())
|
||||
if (!Files.exists(path)) return null
|
||||
return AttachmentImpl(id, path, checkAttachmentsOnLoad)
|
||||
}
|
||||
|
||||
override fun importAttachment(jar: InputStream): SecureHash {
|
||||
@ -106,10 +119,12 @@ class NodeAttachmentService(val storePath: Path) : AttachmentStorage, AcceptsFil
|
||||
try {
|
||||
// Move into place atomically or fail if that isn't possible. We don't want a half moved attachment to
|
||||
// be exposed to parallel threads. This gives us thread safety.
|
||||
if (!Files.exists(finalPath))
|
||||
if (!Files.exists(finalPath)) {
|
||||
log.info("Stored new attachment $id")
|
||||
else
|
||||
attachmentCount.inc()
|
||||
} else {
|
||||
log.info("Replacing attachment $id - only bother doing this if you're trying to repair file corruption")
|
||||
}
|
||||
Files.move(tmp, finalPath, StandardCopyOption.ATOMIC_MOVE)
|
||||
} finally {
|
||||
Files.deleteIfExists(tmp)
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
package core.node.services
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import contracts.Cash
|
||||
import core.*
|
||||
import core.utilities.loggerFor
|
||||
@ -39,16 +40,7 @@ class NodeWalletService(private val services: ServiceHub) : WalletService {
|
||||
* Returns a snapshot of how much cash we have in each currency, ignoring details like issuer. Note: currencies for
|
||||
* which we have no cash evaluate to null, not 0.
|
||||
*/
|
||||
override val cashBalances: Map<Currency, Amount>
|
||||
get() = mutex.locked { wallet }.let { wallet ->
|
||||
wallet.states.
|
||||
// Select the states we own which are cash, ignore the rest, take the amounts.
|
||||
mapNotNull { (it.state as? Cash.State)?.amount }.
|
||||
// Turn into a Map<Currency, List<Amount>> like { GBP -> (£100, £500, etc), USD -> ($2000, $50) }
|
||||
groupBy { it.currency }.
|
||||
// Collapse to Map<Currency, Amount> by summing all the amounts of the same currency together.
|
||||
mapValues { it.value.sumOrThrow() }
|
||||
}
|
||||
override val cashBalances: Map<Currency, Amount> get() = mutex.locked { wallet }.cashBalances
|
||||
|
||||
override fun notifyAll(txns: Iterable<WireTransaction>): Wallet {
|
||||
val ourKeys = services.keyManagementService.keys.keys
|
||||
@ -71,6 +63,7 @@ class NodeWalletService(private val services: ServiceHub) : WalletService {
|
||||
// time, until we get to the result (this is perhaps a bit inefficient, but it's functional and easily
|
||||
// unit tested).
|
||||
wallet = txns.fold(currentWallet) { current, tx -> current.update(tx, ourKeys) }
|
||||
exportCashBalancesViaMetrics(wallet)
|
||||
return wallet
|
||||
}
|
||||
}
|
||||
@ -100,6 +93,29 @@ class NodeWalletService(private val services: ServiceHub) : WalletService {
|
||||
return Wallet(newStates)
|
||||
}
|
||||
|
||||
private class BalanceMetric : Gauge<Long> {
|
||||
@Volatile var pennies = 0L
|
||||
override fun getValue(): Long? = pennies
|
||||
}
|
||||
|
||||
private val balanceMetrics = HashMap<Currency, BalanceMetric>()
|
||||
|
||||
private fun exportCashBalancesViaMetrics(wallet: Wallet) {
|
||||
// This is just for demo purposes. We probably shouldn't expose balances via JMX in a real node as that might
|
||||
// be commercially sensitive info that the sysadmins aren't even meant to know.
|
||||
//
|
||||
// Note: exported as pennies.
|
||||
val m = services.monitoringService.metrics
|
||||
for (balance in wallet.cashBalances) {
|
||||
val metric = balanceMetrics.getOrPut(balance.key) {
|
||||
val newMetric = BalanceMetric()
|
||||
m.register("WalletBalances.${balance.key}Pennies", newMetric)
|
||||
newMetric
|
||||
}
|
||||
metric.pennies = balance.value.pennies
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a random set of between (by default) 3 and 10 cash states that add up to the given amount and adds them
|
||||
* to the wallet.
|
||||
|
@ -8,6 +8,8 @@
|
||||
|
||||
package core.node.services
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import contracts.Cash
|
||||
import core.*
|
||||
import core.crypto.SecureHash
|
||||
import core.messaging.MessagingService
|
||||
@ -32,6 +34,18 @@ import java.util.*
|
||||
data class Wallet(val states: List<StateAndRef<OwnableState>>) {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
inline fun <reified T : OwnableState> statesOfType() = states.filter { it.state is T } as List<StateAndRef<T>>
|
||||
|
||||
/**
|
||||
* Returns a map of how much cash we have in each currency, ignoring details like issuer. Note: currencies for
|
||||
* which we have no cash evaluate to null (not present in map), not 0.
|
||||
*/
|
||||
val cashBalances: Map<Currency, Amount> get() = states.
|
||||
// Select the states we own which are cash, ignore the rest, take the amounts.
|
||||
mapNotNull { (it.state as? Cash.State)?.amount }.
|
||||
// Turn into a Map<Currency, List<Amount>> like { GBP -> (£100, £500, etc), USD -> ($2000, $50) }
|
||||
groupBy { it.currency }.
|
||||
// Collapse to Map<Currency, Amount> by summing all the amounts of the same currency together.
|
||||
mapValues { it.value.sumOrThrow() }
|
||||
}
|
||||
|
||||
/**
|
||||
@ -143,6 +157,12 @@ interface AttachmentStorage {
|
||||
fun importAttachment(jar: InputStream): SecureHash
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides access to various metrics and ways to notify monitoring services of things, for sysadmin purposes.
|
||||
* This is not an interface because it is too lightweight to bother mocking out.
|
||||
*/
|
||||
class MonitoringService(val metrics: MetricRegistry)
|
||||
|
||||
/**
|
||||
* A service hub simply vends references to the other services a node has. Some of those services may be missing or
|
||||
* mocked out. This class is useful to pass to chunks of pluggable code that might have need of many different kinds of
|
||||
@ -155,6 +175,7 @@ interface ServiceHub {
|
||||
val storageService: StorageService
|
||||
val networkService: MessagingService
|
||||
val networkMapService: NetworkMapService
|
||||
val monitoringService: MonitoringService
|
||||
|
||||
/**
|
||||
* Given a [LedgerTransaction], looks up all its dependencies in the local database, uses the identity service to map
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
package core
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import core.crypto.*
|
||||
import core.messaging.MessagingService
|
||||
import core.messaging.MockNetworkMapService
|
||||
@ -169,6 +170,8 @@ class MockServices(
|
||||
override val storageService: StorageService
|
||||
get() = storage ?: throw UnsupportedOperationException()
|
||||
|
||||
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
|
||||
|
||||
init {
|
||||
if (net != null && storage != null) {
|
||||
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
package core.node
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.google.common.jimfs.Configuration
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import core.crypto.SecureHash
|
||||
@ -40,7 +41,7 @@ class NodeAttachmentStorageTest {
|
||||
val testJar = makeTestJar()
|
||||
val expectedHash = SecureHash.sha256(Files.readAllBytes(testJar))
|
||||
|
||||
val storage = NodeAttachmentService(fs.getPath("/"))
|
||||
val storage = NodeAttachmentService(fs.getPath("/"), MetricRegistry())
|
||||
val id = testJar.use { storage.importAttachment(it) }
|
||||
assertEquals(expectedHash, id)
|
||||
|
||||
@ -57,7 +58,7 @@ class NodeAttachmentStorageTest {
|
||||
@Test
|
||||
fun `duplicates not allowed`() {
|
||||
val testJar = makeTestJar()
|
||||
val storage = NodeAttachmentService(fs.getPath("/"))
|
||||
val storage = NodeAttachmentService(fs.getPath("/"), MetricRegistry())
|
||||
testJar.use { storage.importAttachment(it) }
|
||||
assertFailsWith<java.nio.file.FileAlreadyExistsException> {
|
||||
testJar.use { storage.importAttachment(it) }
|
||||
@ -67,7 +68,7 @@ class NodeAttachmentStorageTest {
|
||||
@Test
|
||||
fun `corrupt entry throws exception`() {
|
||||
val testJar = makeTestJar()
|
||||
val storage = NodeAttachmentService(fs.getPath("/"))
|
||||
val storage = NodeAttachmentService(fs.getPath("/"), MetricRegistry())
|
||||
val id = testJar.use { storage.importAttachment(it) }
|
||||
|
||||
// Corrupt the file in the store.
|
||||
|
Loading…
Reference in New Issue
Block a user