mirror of
https://github.com/corda/corda.git
synced 2025-04-06 19:07:08 +00:00
ENT-11275: Added span to multi send and receive calls. (#7845)
* ENT-12275: Added spans around multi send & receive.
This commit is contained in:
parent
d1cfcfe74f
commit
1022d1ec97
@ -313,12 +313,14 @@ abstract class FlowLogic<out T> {
|
||||
@Suspendable
|
||||
@JvmOverloads
|
||||
open fun receiveAllMap(sessions: Map<FlowSession, Class<out Any>>, maySkipCheckpoint: Boolean = false): Map<FlowSession, UntrustworthyData<Any>> {
|
||||
enforceNoPrimitiveInReceive(sessions.values)
|
||||
val replies = stateMachine.suspend(
|
||||
ioRequest = FlowIORequest.Receive(sessions.keys.toNonEmptySet()),
|
||||
maySkipCheckpoint = maySkipCheckpoint
|
||||
)
|
||||
return replies.mapValues { (session, payload) -> payload.checkPayloadIs(sessions[session]!!) }
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#receiveAllMap") {
|
||||
enforceNoPrimitiveInReceive(sessions.values)
|
||||
val replies = stateMachine.suspend(
|
||||
ioRequest = FlowIORequest.Receive(sessions.keys.toNonEmptySet()),
|
||||
maySkipCheckpoint = maySkipCheckpoint
|
||||
)
|
||||
return replies.mapValues { (session, payload) -> payload.checkPayloadIs(sessions[session]!!) }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -335,9 +337,11 @@ abstract class FlowLogic<out T> {
|
||||
@Suspendable
|
||||
@JvmOverloads
|
||||
open fun <R : Any> receiveAll(receiveType: Class<R>, sessions: List<FlowSession>, maySkipCheckpoint: Boolean = false): List<UntrustworthyData<R>> {
|
||||
enforceNoPrimitiveInReceive(listOf(receiveType))
|
||||
enforceNoDuplicates(sessions)
|
||||
return castMapValuesToKnownType(receiveAllMap(associateSessionsToReceiveType(receiveType, sessions)))
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#receiveAll") {
|
||||
enforceNoPrimitiveInReceive(listOf(receiveType))
|
||||
enforceNoDuplicates(sessions)
|
||||
return castMapValuesToKnownType(receiveAllMap(associateSessionsToReceiveType(receiveType, sessions)))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -354,8 +358,10 @@ abstract class FlowLogic<out T> {
|
||||
@Suspendable
|
||||
@JvmOverloads
|
||||
fun sendAll(payload: Any, sessions: Set<FlowSession>, maySkipCheckpoint: Boolean = false) {
|
||||
val sessionToPayload = sessions.map { it to payload }.toMap()
|
||||
return sendAllMap(sessionToPayload, maySkipCheckpoint)
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAll") {
|
||||
val sessionToPayload = sessions.map { it to payload }.toMap()
|
||||
return sendAllMap(sessionToPayload, maySkipCheckpoint)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -371,10 +377,12 @@ abstract class FlowLogic<out T> {
|
||||
@Suspendable
|
||||
@JvmOverloads
|
||||
fun sendAllMap(payloadsPerSession: Map<FlowSession, Any>, maySkipCheckpoint: Boolean = false) {
|
||||
val request = FlowIORequest.Send(
|
||||
sessionToMessage = stateMachine.serialize(payloadsPerSession)
|
||||
)
|
||||
stateMachine.suspend(request, maySkipCheckpoint)
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAllMap") {
|
||||
val request = FlowIORequest.Send(
|
||||
sessionToMessage = stateMachine.serialize(payloadsPerSession)
|
||||
)
|
||||
stateMachine.suspend(request, maySkipCheckpoint)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.core.internal.telemetry
|
||||
|
||||
import net.corda.core.CordaInternal
|
||||
import net.corda.core.DeleteForDJVM
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.node.ServiceHub
|
||||
@ -34,6 +35,7 @@ enum class TelemetryStatusCode {
|
||||
ERROR
|
||||
}
|
||||
|
||||
@DeleteForDJVM
|
||||
@CordaSerializable
|
||||
data class TelemetryId(private val telemetryService: TelemetryServiceImpl) {
|
||||
val id: UUID = UUID.randomUUID()
|
||||
@ -57,12 +59,14 @@ data class ComponentTelemetryIds(val componentTelemetryIds: Map<String, UUID>)
|
||||
|
||||
|
||||
interface TelemetryEvent
|
||||
|
||||
@DeleteForDJVM
|
||||
class StartSpanForFlowEvent(val name: String,
|
||||
val attributes: Map<String, String>,
|
||||
val telemetryId: UUID, val flowLogic: FlowLogic<*>?,
|
||||
val telemetryDataItem: TelemetryDataItem?): TelemetryEvent
|
||||
@DeleteForDJVM
|
||||
class EndSpanForFlowEvent(val telemetryId: UUID): TelemetryEvent
|
||||
@DeleteForDJVM
|
||||
class StartSpanEvent(val name: String, val attributes: Map<String, String>, val telemetryId: UUID, val flowLogic: FlowLogic<*>?): TelemetryEvent
|
||||
class EndSpanEvent(val telemetryId: UUID): TelemetryEvent
|
||||
class SetStatusEvent(val telemetryId: UUID, val telemetryStatusCode: TelemetryStatusCode, val message: String): TelemetryEvent
|
||||
@ -88,6 +92,7 @@ interface TelemetryComponentId {
|
||||
}
|
||||
|
||||
@Suppress("TooManyFunctions")
|
||||
@DeleteForDJVM
|
||||
class TelemetryServiceImpl : SingletonSerializeAsToken(), TelemetryService {
|
||||
|
||||
companion object {
|
||||
|
Loading…
x
Reference in New Issue
Block a user