ENT-12832: metrics from flows (#7942)

Add ability to register metrics inside flows;
This commit is contained in:
Lucas Siqueira
2025-06-18 14:53:55 +01:00
committed by GitHub
parent b88d22abc3
commit fc61d3d83e
7 changed files with 180 additions and 2 deletions

View File

@ -0,0 +1,145 @@
package net.corda.coretests.node
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.MetricRegistry
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.utilities.getOrThrow
import com.codahale.metrics.Gauge
import net.corda.core.flows.FlowExternalAsyncOperation
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
class ServiceHubMetricsTest {
private lateinit var mockNet: InternalMockNetwork
private lateinit var nodeA: TestStartedNode
interface ExternalLatch {
val latch: CountDownLatch
}
object Latch1 : ExternalLatch {
override val latch = CountDownLatch(0)
}
object Latch2 : ExternalLatch {
override val latch = CountDownLatch(1)
}
@Before
fun start() {
mockNet = InternalMockNetwork(
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP, enclosedCordapp()),
networkSendManuallyPumped = false,
threadPerNode = true)
nodeA = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
mockNet.startNodes()
}
@After
fun cleanup() {
mockNet.stopNodes()
}
@Test(timeout=300_000)
fun `Can register metrics from a flow`() {
val result = nodeA.services.startFlow(TestFlow(Latch1, "Result")).resultFuture.getOrThrow()
val metric = nodeA.internals.metricRegistry.gauges["TestFlow.TestMetric"]
assertNotNull(result)
assertNotNull(metric)
assertEquals("Result", result)
assertEquals("Result", metric.value)
}
@Test(timeout=300_000)
fun `Can checkpoint`() {
nodeA.services.startFlow(TestFlow(Latch2, "Result2"))
nodeA = mockNet.restartNode(nodeA, InternalMockNodeParameters(legalName = ALICE_NAME))
Latch2.latch.countDown()
val metric = nodeA.internals.metricRegistry.gauges["TestFlow.TestMetric"]
eventuallyAssert {
assertNotNull(metric)
assertEquals("Result2", metric.value)
}
}
class ExternalOperation(val externalLatch: ExternalLatch) : FlowExternalAsyncOperation<Unit> {
override fun execute(deduplicationId: String): CompletableFuture<Unit> {
return externalLatch.latch.asCompletableFuture()
}
}
@StartableByRPC
@InitiatingFlow
class TestFlow(private val externalLatch: ExternalLatch, private val metric : String) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
registerMetricFromFlow(metric)
ExternalOperation(externalLatch)// Wait for the latch to be released
return getMetricFromFlow()
}
private fun registerMetricFromFlow(value: String) {
serviceHub.getMetricsRegistry(MetricRegistry::class.java).register(
MetricRegistry.name("TestFlow", "TestMetric"),
Gauge { value }
)
}
private fun getMetricFromFlow():String {
return serviceHub.getMetricsRegistry(MetricRegistry::class.java).gauges["TestFlow.TestMetric"]?.value as String
}
}
}
private fun eventuallyAssert(
timeout: Duration = Duration.ofSeconds(30),
pollInterval: Duration = Duration.ofMillis(100),
assertions: () -> Unit,
) {
val deadline = Instant.now().plus(timeout)
var lastError: Throwable? = null
while (Instant.now().isBefore(deadline)) {
try {
assertions()
return // Success
} catch (e: Throwable) {
lastError = e
Thread.sleep(pollInterval.toMillis())
}
}
// If we get here, we've timed out - throw the last error
throw AssertionError("Assertions failed after ${timeout.seconds} seconds", lastError)
}
fun CountDownLatch.asCompletableFuture(): CompletableFuture<Unit> {
val future = CompletableFuture<Unit>()
Thread {
try {
this.await()
future.complete(Unit)
} catch (e: InterruptedException) {
future.completeExceptionally(e)
Thread.currentThread().interrupt()
}
}.start()
return future
}

View File

@ -520,4 +520,7 @@ interface ServiceHub : ServicesForResolution {
* See [CordappProvider.getAppContext]
*/
fun getAppContext(): CordappContext = cordappProvider.getAppContext()
/** Provides metric registration and access to the metrics registry. */
fun <T> getMetricsRegistry(type: Class<T>): T
}

View File

@ -70,6 +70,8 @@ dependencies {
testImplementation "org.junit.jupiter:junit-jupiter-api:${junit_jupiter_version}"
testImplementation "junit:junit:$junit_version"
testImplementation "io.dropwizard.metrics:metrics-jmx:$metrics_version"
testRuntimeOnly "org.junit.vintage:junit-vintage-engine:${junit_vintage_version}"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junit_jupiter_version}"
testRuntimeOnly "org.junit.platform:junit-platform-launcher:${junit_platform_version}"

View File

@ -1,8 +1,11 @@
package net.corda.nodeapi.internal.serialization.kryo
import com.codahale.metrics.MetricRegistry
import com.esotericsoftware.kryo.KryoException
import org.junit.Ignore
import org.junit.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import java.util.LinkedList
import kotlin.test.assertEquals
@ -168,4 +171,15 @@ class KryoCheckpointTest {
}
assertEquals(testSize, result)
}
/**
* This test just ensures that the checkpoints still work in light of [LinkedListItrSerializer].
*/
@Test(timeout=300_000)
fun `MetricRegistry cannot checkpoint without error`() {
val metricRegistry = MetricRegistry()
assertThrows<KryoException>("Class com.codahale.metrics.MetricRegistry is not annotated or on the whitelist, so cannot be used in serialization") {
KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.serialize(metricRegistry, KRYO_CHECKPOINT_CONTEXT), MetricRegistry::class.java, KRYO_CHECKPOINT_CONTEXT)
}
}
}

View File

@ -1303,6 +1303,15 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
this.networkParameters = networkParameters
}
override fun <T> getMetricsRegistry(type: Class<T>): T {
if(type == MetricRegistry::class.java) {
@Suppress("UNCHECKED_CAST")
return this@AbstractNode.metricRegistry as T
} else {
throw IllegalArgumentException("Only ${MetricRegistry::class.java} is currently supported")
}
}
}
}

View File

@ -19,5 +19,8 @@ object AlwaysAcceptEncodingWhitelist : EncodingWhitelist {
}
object QuasarWhitelist : ClassWhitelist {
override fun hasListed(type: Class<*>): Boolean = true
private val packageBlackList = listOf(
"com.codahale.metrics"
)
override fun hasListed(type: Class<*>): Boolean = packageBlackList.none { type.packageName.startsWith(it) }
}

View File

@ -116,6 +116,7 @@ fun makeTestIdentityService(vararg identities: PartyAndCertificate): IdentitySer
* There are a variety of constructors that can be used to supply enough data to simulate a node. Each mock service hub
* must have at least an identity of its own. The other components have defaults that work in most situations.
*/
@Suppress("TooManyFunctions")
open class MockServices private constructor(
private val cordappLoader: CordappLoader,
override val validatedTransactions: TransactionStorage,
@ -557,7 +558,8 @@ open class MockServices private constructor(
/** Returns a dummy Attachment, in context of signature constrains non-downgrade rule this default to contract class version `1`. */
override fun loadContractAttachment(stateRef: StateRef) = dummyAttachment
override fun <T> getMetricsRegistry(type: Class<T>): T = throw UnsupportedOperationException()
/**
* All [ServiceHub]s must also implement [VerifyingServiceHub]. However, since [MockServices] is part of the public API, making it
* extend [VerifyingServiceHub] would leak internal APIs. Instead we have this private view class and have the `toVerifyingServiceHub`