From fc61d3d83ecabd82e35de62f6b559ff0ac0aa446 Mon Sep 17 00:00:00 2001 From: Lucas Siqueira Date: Wed, 18 Jun 2025 14:53:55 +0100 Subject: [PATCH] ENT-12832: metrics from flows (#7942) Add ability to register metrics inside flows; --- .../coretests/node/ServiceHubMetricsTest.kt | 145 ++++++++++++++++++ .../kotlin/net/corda/core/node/ServiceHub.kt | 3 + node-api/build.gradle | 2 + .../serialization/kryo/KryoCheckpointTest.kt | 14 ++ .../net/corda/node/internal/AbstractNode.kt | 9 ++ .../serialization/internal/SharedContexts.kt | 5 +- .../net/corda/testing/node/MockServices.kt | 4 +- 7 files changed, 180 insertions(+), 2 deletions(-) create mode 100644 core-tests/src/test/kotlin/net/corda/coretests/node/ServiceHubMetricsTest.kt diff --git a/core-tests/src/test/kotlin/net/corda/coretests/node/ServiceHubMetricsTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/node/ServiceHubMetricsTest.kt new file mode 100644 index 0000000000..006c5c0a36 --- /dev/null +++ b/core-tests/src/test/kotlin/net/corda/coretests/node/ServiceHubMetricsTest.kt @@ -0,0 +1,145 @@ +package net.corda.coretests.node + +import co.paralleluniverse.fibers.Suspendable +import com.codahale.metrics.MetricRegistry +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.utilities.getOrThrow +import com.codahale.metrics.Gauge +import net.corda.core.flows.FlowExternalAsyncOperation +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.TestStartedNode +import net.corda.testing.node.internal.enclosedCordapp +import net.corda.testing.node.internal.startFlow +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.time.Duration +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CountDownLatch +import kotlin.test.assertEquals +import kotlin.test.assertNotNull + +class ServiceHubMetricsTest { + private lateinit var mockNet: InternalMockNetwork + private lateinit var nodeA: TestStartedNode + + interface ExternalLatch { + val latch: CountDownLatch + } + + object Latch1 : ExternalLatch { + override val latch = CountDownLatch(0) + } + object Latch2 : ExternalLatch { + override val latch = CountDownLatch(1) + } + @Before + fun start() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP, enclosedCordapp()), + networkSendManuallyPumped = false, + threadPerNode = true) + + nodeA = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) + + mockNet.startNodes() + } + + @After + fun cleanup() { + mockNet.stopNodes() + } + + @Test(timeout=300_000) + fun `Can register metrics from a flow`() { + val result = nodeA.services.startFlow(TestFlow(Latch1, "Result")).resultFuture.getOrThrow() + val metric = nodeA.internals.metricRegistry.gauges["TestFlow.TestMetric"] + + assertNotNull(result) + assertNotNull(metric) + assertEquals("Result", result) + assertEquals("Result", metric.value) + } + + @Test(timeout=300_000) + fun `Can checkpoint`() { + nodeA.services.startFlow(TestFlow(Latch2, "Result2")) + nodeA = mockNet.restartNode(nodeA, InternalMockNodeParameters(legalName = ALICE_NAME)) + Latch2.latch.countDown() + + val metric = nodeA.internals.metricRegistry.gauges["TestFlow.TestMetric"] + eventuallyAssert { + assertNotNull(metric) + assertEquals("Result2", metric.value) + } + } + + class ExternalOperation(val externalLatch: ExternalLatch) : FlowExternalAsyncOperation { + override fun execute(deduplicationId: String): CompletableFuture { + return externalLatch.latch.asCompletableFuture() + } + } + + @StartableByRPC + @InitiatingFlow + class TestFlow(private val externalLatch: ExternalLatch, private val metric : String) : FlowLogic() { + @Suspendable + override fun call(): String { + registerMetricFromFlow(metric) + ExternalOperation(externalLatch)// Wait for the latch to be released + return getMetricFromFlow() + } + + private fun registerMetricFromFlow(value: String) { + serviceHub.getMetricsRegistry(MetricRegistry::class.java).register( + MetricRegistry.name("TestFlow", "TestMetric"), + Gauge { value } + ) + } + + private fun getMetricFromFlow():String { + return serviceHub.getMetricsRegistry(MetricRegistry::class.java).gauges["TestFlow.TestMetric"]?.value as String + } + } +} + +private fun eventuallyAssert( + timeout: Duration = Duration.ofSeconds(30), + pollInterval: Duration = Duration.ofMillis(100), + assertions: () -> Unit, +) { + val deadline = Instant.now().plus(timeout) + var lastError: Throwable? = null + + while (Instant.now().isBefore(deadline)) { + try { + assertions() + return // Success + } catch (e: Throwable) { + lastError = e + Thread.sleep(pollInterval.toMillis()) + } + } + + // If we get here, we've timed out - throw the last error + throw AssertionError("Assertions failed after ${timeout.seconds} seconds", lastError) +} +fun CountDownLatch.asCompletableFuture(): CompletableFuture { + val future = CompletableFuture() + Thread { + try { + this.await() + future.complete(Unit) + } catch (e: InterruptedException) { + future.completeExceptionally(e) + Thread.currentThread().interrupt() + } + }.start() + return future +} diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index 2bf4bcdfcd..f1a1c26950 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -520,4 +520,7 @@ interface ServiceHub : ServicesForResolution { * See [CordappProvider.getAppContext] */ fun getAppContext(): CordappContext = cordappProvider.getAppContext() + + /** Provides metric registration and access to the metrics registry. */ + fun getMetricsRegistry(type: Class): T } diff --git a/node-api/build.gradle b/node-api/build.gradle index 773cfae1b9..20ebdd2cc8 100644 --- a/node-api/build.gradle +++ b/node-api/build.gradle @@ -70,6 +70,8 @@ dependencies { testImplementation "org.junit.jupiter:junit-jupiter-api:${junit_jupiter_version}" testImplementation "junit:junit:$junit_version" + testImplementation "io.dropwizard.metrics:metrics-jmx:$metrics_version" + testRuntimeOnly "org.junit.vintage:junit-vintage-engine:${junit_vintage_version}" testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junit_jupiter_version}" testRuntimeOnly "org.junit.platform:junit-platform-launcher:${junit_platform_version}" diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt index e0906294b4..fe6f609e00 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt @@ -1,8 +1,11 @@ package net.corda.nodeapi.internal.serialization.kryo +import com.codahale.metrics.MetricRegistry +import com.esotericsoftware.kryo.KryoException import org.junit.Ignore import org.junit.Test import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows import java.util.LinkedList import kotlin.test.assertEquals @@ -168,4 +171,15 @@ class KryoCheckpointTest { } assertEquals(testSize, result) } + + /** + * This test just ensures that the checkpoints still work in light of [LinkedListItrSerializer]. + */ + @Test(timeout=300_000) + fun `MetricRegistry cannot checkpoint without error`() { + val metricRegistry = MetricRegistry() + assertThrows("Class com.codahale.metrics.MetricRegistry is not annotated or on the whitelist, so cannot be used in serialization") { + KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.serialize(metricRegistry, KRYO_CHECKPOINT_CONTEXT), MetricRegistry::class.java, KRYO_CHECKPOINT_CONTEXT) + } + } } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 65a587e407..cefb2f9b06 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -1303,6 +1303,15 @@ abstract class AbstractNode(val configuration: NodeConfiguration, override fun onNewNetworkParameters(networkParameters: NetworkParameters) { this.networkParameters = networkParameters } + + override fun getMetricsRegistry(type: Class): T { + if(type == MetricRegistry::class.java) { + @Suppress("UNCHECKED_CAST") + return this@AbstractNode.metricRegistry as T + } else { + throw IllegalArgumentException("Only ${MetricRegistry::class.java} is currently supported") + } + } } } diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/SharedContexts.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/SharedContexts.kt index 557e10850e..df7f2f4b42 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/SharedContexts.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/SharedContexts.kt @@ -19,5 +19,8 @@ object AlwaysAcceptEncodingWhitelist : EncodingWhitelist { } object QuasarWhitelist : ClassWhitelist { - override fun hasListed(type: Class<*>): Boolean = true + private val packageBlackList = listOf( + "com.codahale.metrics" + ) + override fun hasListed(type: Class<*>): Boolean = packageBlackList.none { type.packageName.startsWith(it) } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index 4967282c78..4e581d2546 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -116,6 +116,7 @@ fun makeTestIdentityService(vararg identities: PartyAndCertificate): IdentitySer * There are a variety of constructors that can be used to supply enough data to simulate a node. Each mock service hub * must have at least an identity of its own. The other components have defaults that work in most situations. */ +@Suppress("TooManyFunctions") open class MockServices private constructor( private val cordappLoader: CordappLoader, override val validatedTransactions: TransactionStorage, @@ -557,7 +558,8 @@ open class MockServices private constructor( /** Returns a dummy Attachment, in context of signature constrains non-downgrade rule this default to contract class version `1`. */ override fun loadContractAttachment(stateRef: StateRef) = dummyAttachment - + override fun getMetricsRegistry(type: Class): T = throw UnsupportedOperationException() + /** * All [ServiceHub]s must also implement [VerifyingServiceHub]. However, since [MockServices] is part of the public API, making it * extend [VerifyingServiceHub] would leak internal APIs. Instead we have this private view class and have the `toVerifyingServiceHub`