From 1022d1ec97ccf1e326655e9082f6152f83354036 Mon Sep 17 00:00:00 2001 From: Adel El-Beik <48713346+adelel1@users.noreply.github.com> Date: Wed, 23 Oct 2024 08:58:40 +0100 Subject: [PATCH] ENT-11275: Added span to multi send and receive calls. (#7845) * ENT-12275: Added spans around multi send & receive. --- .../kotlin/net/corda/core/flows/FlowLogic.kt | 38 +++++++++++-------- .../telemetry/TelemetryServiceImpl.kt | 7 +++- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 8c207766fc..1a5363156d 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -313,12 +313,14 @@ abstract class FlowLogic { @Suspendable @JvmOverloads open fun receiveAllMap(sessions: Map>, maySkipCheckpoint: Boolean = false): Map> { - enforceNoPrimitiveInReceive(sessions.values) - val replies = stateMachine.suspend( - ioRequest = FlowIORequest.Receive(sessions.keys.toNonEmptySet()), - maySkipCheckpoint = maySkipCheckpoint - ) - return replies.mapValues { (session, payload) -> payload.checkPayloadIs(sessions[session]!!) } + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#receiveAllMap") { + enforceNoPrimitiveInReceive(sessions.values) + val replies = stateMachine.suspend( + ioRequest = FlowIORequest.Receive(sessions.keys.toNonEmptySet()), + maySkipCheckpoint = maySkipCheckpoint + ) + return replies.mapValues { (session, payload) -> payload.checkPayloadIs(sessions[session]!!) } + } } /** @@ -335,9 +337,11 @@ abstract class FlowLogic { @Suspendable @JvmOverloads open fun receiveAll(receiveType: Class, sessions: List, maySkipCheckpoint: Boolean = false): List> { - enforceNoPrimitiveInReceive(listOf(receiveType)) - enforceNoDuplicates(sessions) - return castMapValuesToKnownType(receiveAllMap(associateSessionsToReceiveType(receiveType, sessions))) + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#receiveAll") { + enforceNoPrimitiveInReceive(listOf(receiveType)) + enforceNoDuplicates(sessions) + return castMapValuesToKnownType(receiveAllMap(associateSessionsToReceiveType(receiveType, sessions))) + } } /** @@ -354,8 +358,10 @@ abstract class FlowLogic { @Suspendable @JvmOverloads fun sendAll(payload: Any, sessions: Set, maySkipCheckpoint: Boolean = false) { - val sessionToPayload = sessions.map { it to payload }.toMap() - return sendAllMap(sessionToPayload, maySkipCheckpoint) + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAll") { + val sessionToPayload = sessions.map { it to payload }.toMap() + return sendAllMap(sessionToPayload, maySkipCheckpoint) + } } /** @@ -371,10 +377,12 @@ abstract class FlowLogic { @Suspendable @JvmOverloads fun sendAllMap(payloadsPerSession: Map, maySkipCheckpoint: Boolean = false) { - val request = FlowIORequest.Send( - sessionToMessage = stateMachine.serialize(payloadsPerSession) - ) - stateMachine.suspend(request, maySkipCheckpoint) + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAllMap") { + val request = FlowIORequest.Send( + sessionToMessage = stateMachine.serialize(payloadsPerSession) + ) + stateMachine.suspend(request, maySkipCheckpoint) + } } /** diff --git a/core/src/main/kotlin/net/corda/core/internal/telemetry/TelemetryServiceImpl.kt b/core/src/main/kotlin/net/corda/core/internal/telemetry/TelemetryServiceImpl.kt index 54bc1249f7..95b4f22595 100644 --- a/core/src/main/kotlin/net/corda/core/internal/telemetry/TelemetryServiceImpl.kt +++ b/core/src/main/kotlin/net/corda/core/internal/telemetry/TelemetryServiceImpl.kt @@ -1,6 +1,7 @@ package net.corda.core.internal.telemetry import net.corda.core.CordaInternal +import net.corda.core.DeleteForDJVM import net.corda.core.flows.FlowLogic import net.corda.core.internal.uncheckedCast import net.corda.core.node.ServiceHub @@ -34,6 +35,7 @@ enum class TelemetryStatusCode { ERROR } +@DeleteForDJVM @CordaSerializable data class TelemetryId(private val telemetryService: TelemetryServiceImpl) { val id: UUID = UUID.randomUUID() @@ -57,12 +59,14 @@ data class ComponentTelemetryIds(val componentTelemetryIds: Map) interface TelemetryEvent - +@DeleteForDJVM class StartSpanForFlowEvent(val name: String, val attributes: Map, val telemetryId: UUID, val flowLogic: FlowLogic<*>?, val telemetryDataItem: TelemetryDataItem?): TelemetryEvent +@DeleteForDJVM class EndSpanForFlowEvent(val telemetryId: UUID): TelemetryEvent +@DeleteForDJVM class StartSpanEvent(val name: String, val attributes: Map, val telemetryId: UUID, val flowLogic: FlowLogic<*>?): TelemetryEvent class EndSpanEvent(val telemetryId: UUID): TelemetryEvent class SetStatusEvent(val telemetryId: UUID, val telemetryStatusCode: TelemetryStatusCode, val message: String): TelemetryEvent @@ -88,6 +92,7 @@ interface TelemetryComponentId { } @Suppress("TooManyFunctions") +@DeleteForDJVM class TelemetryServiceImpl : SingletonSerializeAsToken(), TelemetryService { companion object {