Merge pull request from corda/adel/ENT-8823

ENT-8823: StartEnd events now dont get mangled when parent span is a …
This commit is contained in:
Adel El-Beik 2022-12-09 10:08:40 +00:00 committed by GitHub
commit 5fa1bba66b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 191 additions and 122 deletions
.ci
client/rpc/src/main/kotlin/net/corda/client/rpc
constants.properties
core/src/main/kotlin/net/corda/core/internal/telemetry
node/src
main
test/kotlin/net/corda/node
opentelemetry
build.gradle
opentelemetry-driver
src/main/kotlin/net/corda/opentelemetrydriver
testing/node-driver/src/main/kotlin/net/corda/testing/node/internal

@ -9391,7 +9391,8 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, boolean)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, boolean, int, kotlin.jvm.internal.DefaultConstructorMarker)
@NotNull
public final java.time.Duration component1()
@NotNull
@ -9419,7 +9420,7 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.
@NotNull
public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration)
@NotNull
public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean)
public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, boolean)
public boolean equals(Object)
public int getCacheConcurrencyLevel()
@NotNull
@ -9427,6 +9428,7 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.
@NotNull
public java.time.Duration getConnectionRetryInterval()
public double getConnectionRetryIntervalMultiplier()
public boolean getCopyBaggageToTags()
@NotNull
public java.time.Duration getDeduplicationCacheExpiry()
public int getMaxFileSize()

@ -179,7 +179,9 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
open val simpleLogTelemetryEnabled: Boolean = false,
open val spanStartEndEventsEnabled: Boolean = true
open val spanStartEndEventsEnabled: Boolean = false,
open val copyBaggageToTags: Boolean = false
) {
companion object {
@ -226,7 +228,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
deduplicationCacheExpiry,
openTelemetryEnabled,
simpleLogTelemetryEnabled,
spanStartEndEventsEnabled
spanStartEndEventsEnabled,
copyBaggageToTags
)
}
@ -246,7 +249,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry,
openTelemetryEnabled: Boolean = this.openTelemetryEnabled,
simpleLogTelemetryEnabled: Boolean = this.simpleLogTelemetryEnabled,
spanStartEndEventsEnabled: Boolean = this.spanStartEndEventsEnabled
spanStartEndEventsEnabled: Boolean = this.spanStartEndEventsEnabled,
copyBaggageToTags: Boolean = this.copyBaggageToTags
): CordaRPCClientConfiguration {
return CordaRPCClientConfiguration(
connectionMaxRetryInterval,
@ -262,7 +266,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
deduplicationCacheExpiry,
openTelemetryEnabled,
simpleLogTelemetryEnabled,
spanStartEndEventsEnabled
spanStartEndEventsEnabled,
copyBaggageToTags
)
}
@ -286,6 +291,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
if (openTelemetryEnabled != other.openTelemetryEnabled) return false
if (simpleLogTelemetryEnabled != other.simpleLogTelemetryEnabled) return false
if (spanStartEndEventsEnabled != other.spanStartEndEventsEnabled) return false
if (copyBaggageToTags != other.copyBaggageToTags) return false
return true
}
@ -306,6 +312,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
result = 31 * result + openTelemetryEnabled.hashCode()
result = 31 * result + simpleLogTelemetryEnabled.hashCode()
result = 31 * result + spanStartEndEventsEnabled.hashCode()
result = 31 * result + copyBaggageToTags.hashCode()
return result
}
@ -321,7 +328,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
"deduplicationCacheExpiry=$deduplicationCacheExpiry, " +
"openTelemetryEnabled=$openTelemetryEnabled, " +
"simpleLogTelemetryEnabled=$simpleLogTelemetryEnabled, " +
"spanStartEndEventsEnabled=$spanStartEndEventsEnabled)"
"spanStartEndEventsEnabled=$spanStartEndEventsEnabled, " +
"copyBaggageToTags=$copyBaggageToTags )"
}
// Left in for backwards compatibility with version 3.1

@ -102,7 +102,7 @@ class RPCClient<I : RPCOps>(
val targetString = "${transport.params[TransportConstants.HOST_PROP_NAME]}:${transport.params[TransportConstants.PORT_PROP_NAME]}"
val rpcClientTelemetry = RPCClientTelemetry("rpcClient-$targetString", rpcConfiguration.openTelemetryEnabled,
rpcConfiguration.simpleLogTelemetryEnabled, rpcConfiguration.spanStartEndEventsEnabled)
rpcConfiguration.simpleLogTelemetryEnabled, rpcConfiguration.spanStartEndEventsEnabled, rpcConfiguration.copyBaggageToTags)
val sessionId = Trace.SessionId.newInstance()
val distributionMux = DistributionMux(listeners, username)
val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator,

@ -5,7 +5,9 @@ import net.corda.core.internal.telemetry.SimpleLogTelemetryComponent
import net.corda.core.internal.telemetry.TelemetryServiceImpl
import net.corda.core.utilities.contextLogger
class RPCClientTelemetry(val serviceName: String, val openTelemetryEnabled: Boolean, val simpleLogTelemetryEnabled: Boolean, val spanStartEndEventsEnabled: Boolean) {
class RPCClientTelemetry(val serviceName: String, val openTelemetryEnabled: Boolean,
val simpleLogTelemetryEnabled: Boolean, val spanStartEndEventsEnabled: Boolean,
val copyBaggageToTags: Boolean) {
companion object {
private val log = contextLogger()
@ -16,7 +18,7 @@ class RPCClientTelemetry(val serviceName: String, val openTelemetryEnabled: Bool
init {
if (openTelemetryEnabled) {
try {
val openTelemetryComponent = OpenTelemetryComponent(serviceName, spanStartEndEventsEnabled)
val openTelemetryComponent = OpenTelemetryComponent(serviceName, spanStartEndEventsEnabled, copyBaggageToTags)
if (openTelemetryComponent.isEnabled()) {
telemetryService.addTelemetryComponent(openTelemetryComponent)
log.debug("OpenTelemetry enabled")

@ -13,8 +13,8 @@ java8MinUpdateVersion=171
# net.corda.core.internal.CordaUtilsKt.PLATFORM_VERSION as well. #
# ***************************************************************#
platformVersion=12
openTelemetryVersion=1.17.0
openTelemetrySemConvVersion=1.17.0-alpha
openTelemetryVersion=1.20.1
openTelemetrySemConvVersion=1.20.1-alpha
guavaVersion=28.0-jre
# Quasar version to use with Java 8:
quasarVersion=0.7.15_r3

@ -7,10 +7,7 @@ import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.common.AttributesBuilder
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanContext
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.api.trace.TraceFlags
import io.opentelemetry.api.trace.TraceState
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.Context
import io.opentelemetry.context.Scope
@ -21,26 +18,19 @@ import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import net.corda.opentelemetrydriver.OpenTelemetryDriver
import io.opentelemetry.context.propagation.TextMapGetter
import java.util.concurrent.ConcurrentLinkedDeque
const val TRACEIDHEX_LENGTH = 32
const val SPANIDHEX_LENGTH = 16
@CordaSerializable
data class SerializableSpanContext(val traceIdHex : String, val spanIdHex : String, val traceFlagsHex : String, val traceStateMap : Map<String, String>) {
data class SpanEventContexts(val child: Context, val parent: Context)
@CordaSerializable
data class ContextCarrier(val context: MutableMap<String,String>)
@CordaSerializable
data class OpenTelemetryContext(val context: ContextCarrier, val spanEventChildContext: ContextCarrier, val spanEventParentContext: ContextCarrier, val baggage: Map<String,String>): TelemetryDataItem
constructor(spanContext: SpanContext) : this(spanContext.traceId, spanContext.spanId, spanContext.traceFlags.asHex(), spanContext.traceState.asMap().toMap())
constructor() : this("0".repeat(TRACEIDHEX_LENGTH), "0".repeat(SPANIDHEX_LENGTH), TraceFlags.getDefault().asHex(), TraceState.getDefault().asMap())
fun createSpanContext() = SpanContext.create(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState())
fun createRemoteSpanContext() = SpanContext.createFromRemoteParent(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState())
private fun createTraceState() = traceStateMap.toList().fold(TraceState.builder()) { builder, pair -> builder.put(pair.first, pair.second)}.build()
}
data class OpenTelemetryContext(val spanContext: SerializableSpanContext, val startedSpanContext: SerializableSpanContext, val baggage: Map<String,String>): TelemetryDataItem
data class SpanInfo(val name: String, val span: Span, val spanScope: Scope, val spanEventContext: SerializableSpanContext? = null, val parentSpanEventContext: SerializableSpanContext? = null)
data class SpanInfo(val name: String, val span: Span, val spanScope: Scope,
val spanEventContext: SpanEventContexts? = null,
val spanEventContextQueue: ConcurrentLinkedDeque<SpanEventContexts>? = null)
class TracerSetup(serviceName: String) {
private var openTelemetryDriver: Any? = null
@ -63,7 +53,7 @@ class TracerSetup(serviceName: String) {
}
@Suppress("TooManyFunctions")
class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnabled: Boolean) : TelemetryComponent {
class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnabled: Boolean, val copyBaggageToTags: Boolean) : TelemetryComponent {
val tracerSetup = TracerSetup(serviceName)
val tracer: Tracer = tracerSetup.getTracer()
@ -98,29 +88,70 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
tracerSetup.shutdown()
}
private fun extractContext(carrier: ContextCarrier): Context {
val getter = object : TextMapGetter<ContextCarrier?> {
override fun get(carrier: ContextCarrier?, key: String): String? {
return if (carrier?.context?.containsKey(key) == true) {
val value = carrier.context[key]
value
} else null
}
override fun keys(carrier: ContextCarrier?): MutableIterable<String> {
return carrier?.context?.keys ?: mutableListOf()
}
}
return carrier.let {
tracerSetup.openTelemetry.propagators.textMapPropagator.extract(Context.current(), it, getter)
}
}
@Suppress("LongParameterList")
private fun startSpanForFlow(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?,
telemetryDataItem: TelemetryDataItem?) {
val baggageAttributes = (telemetryDataItem as? OpenTelemetryContext)?.baggage?.let {
val openTelemetryContext = telemetryDataItem as? OpenTelemetryContext
val extractedContext = openTelemetryContext?.let { extractContext(it.context) }
val spanEventContexts = openTelemetryContext?.let { SpanEventContexts(extractContext(it.spanEventChildContext), extractContext(it.spanEventParentContext)) }
val baggageAttributes = openTelemetryContext?.baggage?.let {
val baggageBuilder = it.toList().fold(Baggage.current().toBuilder()) {builder, attribute -> builder.put(attribute.first, attribute.second)}
baggages[telemetryId] = baggageBuilder.build().makeCurrent()
it
} ?: emptyMap()
// Also add any baggage to the span
val attributesMap = (attributes+baggageAttributes).toList()
val allAttributes = if (copyBaggageToTags) {
attributes + baggageAttributes
}
else {
attributes
}
val attributesMap = allAttributes.toList()
.fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also {
populateWithFlowAttributes(it, flowLogic)
}.build()
if (telemetryDataItem != null) {
startSpanForFlowWithRemoteParent(name, attributesMap, telemetryId, telemetryDataItem)
if (extractedContext != null && spanEventContexts != null) {
startSpanForFlowWithRemoteParent(name, attributesMap, telemetryId, extractedContext, spanEventContexts)
}
else {
startSpanForFlowWithNoParent(name, attributesMap, telemetryId)
}
}
private fun startSpanForFlowWithRemoteParent(name: String, attributesMap: Attributes, telemetryId: UUID, parentContext: Context, spanEventContexts: SpanEventContexts) {
val span = tracer.spanBuilder(name).setParent(parentContext).setAllAttributes(attributesMap).startSpan()
val spanScope = span.makeCurrent()
val contextAndQueue = startEndEventForFlowWithRemoteParent(name, attributesMap, spanEventContexts)
spans[telemetryId] = SpanInfo(name, span, spanScope, contextAndQueue?.first, contextAndQueue?.second)
}
private fun startEndEventForFlowWithRemoteParent(name: String, attributesMap: Attributes, spanEventContexts: SpanEventContexts): Pair<SpanEventContexts, ConcurrentLinkedDeque<SpanEventContexts>>? {
if (spanStartEndEventsEnabled) {
val contexts = createSpanToCaptureStartedSpanEventWithRemoteParent(name, spanEventContexts, attributesMap)
return Pair( contexts, ConcurrentLinkedDeque<SpanEventContexts>().also { it.add(contexts) })
}
return null
}
private fun startSpanForFlowWithNoParent(name: String, attributesMap: Attributes, telemetryId: UUID) {
val rootSpan = tracer.spanBuilder(name).setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.flow"), "true")).startSpan()
val rootSpanScope = rootSpan.makeCurrent()
@ -129,50 +160,31 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
val span = tracer.spanBuilder("Child Spans").setParent(Context.current().with(rootSpan)).startSpan()
val spanScope = span.makeCurrent()
rootSpans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope)
spans[telemetryId] = SpanInfo(name, span, spanScope, startedSpanContexts.first, startedSpanContexts.second)
val spanEventContextStack = ConcurrentLinkedDeque<SpanEventContexts>().also { it.add(startedSpanContexts) }
spans[telemetryId] = SpanInfo(name, span, spanScope, startedSpanContexts, spanEventContextStack)
}
else {
spans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope)
}
}
private fun startSpanForFlowWithRemoteParent(name: String, attributesMap: Attributes, telemetryId: UUID, telemetryDataItem: TelemetryDataItem) {
val parentContext = (telemetryDataItem as OpenTelemetryContext).spanContext
val spanContext = parentContext.createRemoteSpanContext()
val parentSpan = Span.wrap(spanContext)
val span = tracer.spanBuilder(name).setParent(Context.current().with(parentSpan)).setAllAttributes(attributesMap).startSpan()
val spanScope = span.makeCurrent()
if (spanStartEndEventsEnabled) {
val contexts = createSpanToCaptureStartedSpanEventWithRemoteParent(name, telemetryDataItem, attributesMap)
spans[telemetryId] = SpanInfo(name, span, spanScope, contexts.first, contexts.second)
}
else {
spans[telemetryId] = SpanInfo(name, span, spanScope)
}
}
private fun createSpanToCaptureStartedSpanEvent(name: String, rootSpan: Span, attributesMap: Attributes): Pair<SerializableSpanContext, SerializableSpanContext> {
private fun createSpanToCaptureStartedSpanEvent(name: String, rootSpan: Span, attributesMap: Attributes): SpanEventContexts {
val startedSpan = tracer.spanBuilder("Started Events").setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.startend.events"), "true")).setParent(Context.current().with(rootSpan)).startSpan()
val serializableSpanContext = SerializableSpanContext(startedSpan.spanContext)
val parentContext = Context.current().with(startedSpan)
startedSpan.end()
val startedSpanContext = serializableSpanContext.createSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(Context.current().with(startedSpanFromContext)).startSpan()
val childSerializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext)
.setParent(parentContext).startSpan()
val childContext = Context.current().with(startedSpanChild)
startedSpanChild.end()
return Pair(childSerializableSpanContext, serializableSpanContext)
return SpanEventContexts(childContext, parentContext)
}
private fun createSpanToCaptureStartedSpanEventWithRemoteParent(name: String, telemetryDataItem: OpenTelemetryContext, attributesMap: Attributes ): Pair<SerializableSpanContext, SerializableSpanContext> {
val startedSpanParentContext = telemetryDataItem.startedSpanContext
val startedSpanContext = startedSpanParentContext.createRemoteSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
private fun createSpanToCaptureStartedSpanEventWithRemoteParent(name: String, spanEventContexts: SpanEventContexts, attributesMap: Attributes ): SpanEventContexts {
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(Context.current().with(startedSpanFromContext)).startSpan()
val serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext)
.setParent(spanEventContexts.child).startSpan()
val grandChildContext = Context.current().with(startedSpanChild)
startedSpanChild.end()
return Pair(serializableSpanContext, startedSpanParentContext)
return SpanEventContexts(grandChildContext, spanEventContexts.child)
}
private fun endSpanForFlow(telemetryId: UUID){
@ -194,27 +206,29 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
private fun createSpanToCaptureEndSpanEvent(spanInfo: SpanInfo?) {
spanInfo?.parentSpanEventContext?.let {
val startedSpanContext = it.createSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${spanInfo.name}-end").setParent(Context.current().with(startedSpanFromContext)).startSpan()
spanInfo?.spanEventContext?.parent?.let {
val startedSpanChild = tracer.spanBuilder("${spanInfo.name}-end").setParent(it).startSpan()
startedSpanChild.end()
}
val spanEventContextStack = spanInfo?.spanEventContextQueue
val filteredSpanEventContexts = spanEventContextStack?.filter { it == spanInfo.spanEventContext }
filteredSpanEventContexts?.forEach { spanEventContextStack.remove(it) }
}
private fun startSpan(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?) {
val currentBaggage = Baggage.current()
val baggageAttributes = mutableMapOf<String,String>()
currentBaggage.forEach { t, u -> baggageAttributes[t] = u.value }
if (copyBaggageToTags) {
currentBaggage.forEach { t, u -> baggageAttributes[t] = u.value }
}
val parentSpan = Span.current()
val attributesMap = (attributes+baggageAttributes).toList().fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also {
populateWithFlowAttributes(it, flowLogic)
}.build()
val span = tracer.spanBuilder(name).setAllAttributes(attributesMap).startSpan()
val spanScope = span.makeCurrent()
val startedEventContexts = createStartedEventSpan(name, attributesMap, parentSpan)
spans[telemetryId] = SpanInfo(name, span, spanScope, startedEventContexts.first, startedEventContexts.second)
val spanEventContexts = createStartedEventSpan(name, attributesMap, parentSpan)
spans[telemetryId] = SpanInfo(name, span, spanScope, spanEventContexts?.peekLast(), spanEventContexts)
}
private fun populateWithFlowAttributes(attributesBuilder: AttributesBuilder, flowLogic: FlowLogic<*>?) {
@ -225,28 +239,36 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
}
private fun createStartedEventSpan(name: String, attributesMap: Attributes, parentSpan: Span): Pair<SerializableSpanContext?, SerializableSpanContext?> {
if (spanStartEndEventsEnabled) {
// Fix up null contexts - make not null
private fun createStartedEventSpan(name: String, attributesMap: Attributes, parentSpan: Span): ConcurrentLinkedDeque<SpanEventContexts>? {
return if (spanStartEndEventsEnabled) {
val filteredSpans = spans.filter { it.value.span == parentSpan }.toList()
var serializableSpanContext = SerializableSpanContext()
var parentStartedSpanContext = SerializableSpanContext()
if (filteredSpans.isNotEmpty()) {
parentStartedSpanContext = filteredSpans[0].second.spanEventContext ?: SerializableSpanContext()
val startedSpanContext = parentStartedSpanContext.createSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(Context.current().with(startedSpanFromContext)).startSpan()
serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext)
startedSpanChild.end()
}
return Pair(serializableSpanContext, parentStartedSpanContext)
val (startEventParentContext, spanEventContextQueue) = getStartEventParentContext(filteredSpans, parentSpan)
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(startEventParentContext).startSpan()
val childContext = Context.current().with(startedSpanChild)
startedSpanChild.end()
spanEventContextQueue?.offer(SpanEventContexts(childContext, startEventParentContext))
spanEventContextQueue
}
else {
return Pair(null, null)
null
}
}
private fun getStartEventParentContext(filteredSpans: List<Pair<UUID, SpanInfo>>, parentSpan: Span): Pair<Context, ConcurrentLinkedDeque<SpanEventContexts>?> {
return if (filteredSpans.isNotEmpty()) {
Pair(filteredSpans[0].second.spanEventContext?.child ?: Context.current(), filteredSpans[0].second.spanEventContextQueue)
}
else {
// Copes with case where user has created their own span. So we just use the most
// recent span we know about on the stack.
val altFilteredSpans = spans.filter { it.value.span.spanContext.traceId == parentSpan.spanContext.traceId }.toList()
val spanEventContexts = altFilteredSpans[0].second.spanEventContextQueue
Pair(spanEventContexts?.peekLast()?.child ?: Context.current(), spanEventContexts)
}
}
private fun endSpan(telemetryId: UUID){
val spanInfo = spans[telemetryId]
createSpanToCaptureEndSpanEvent(spanInfo)
@ -256,11 +278,45 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
override fun getCurrentTelemetryData(): TelemetryDataItem {
val currentSpan = Span.current()
val spanContext = SerializableSpanContext(currentSpan.spanContext)
val currentContextCarrier = inject(tracerSetup, Context.current())
val filteredSpans = spans.filter { it.value.span == currentSpan }.toList()
val startedSpanContext = filteredSpans.getOrNull(0)?.second?.spanEventContext ?: SerializableSpanContext()
return OpenTelemetryContext(spanContext, startedSpanContext, Baggage.current().asMap().mapValues { it.value.value })
val childContext = if (filteredSpans.isNotEmpty()) {
filteredSpans.getOrNull(0)?.second?.spanEventContext?.child
}
else {
val altFilteredSpans = spans.filter { it.value.span.spanContext.traceId == currentSpan.spanContext.traceId }.toList()
if (altFilteredSpans.isNotEmpty()) {
altFilteredSpans[0].second.spanEventContextQueue?.peekLast()?.child
}
else {
null
}
}
val childContextCarrier = inject(tracerSetup, childContext)
val parentContext = if (filteredSpans.isNotEmpty()) {
filteredSpans.getOrNull(0)?.second?.spanEventContext?.parent
}
else {
val altFilteredSpans = spans.filter { it.value.span.spanContext.traceId == currentSpan.spanContext.traceId }.toList()
if (altFilteredSpans.isNotEmpty()) {
altFilteredSpans[0].second.spanEventContextQueue?.peekLast()?.parent
}
else {
null
}
}
val parentContextCarrier = inject(tracerSetup, parentContext)
return OpenTelemetryContext(currentContextCarrier, childContextCarrier, parentContextCarrier, Baggage.current().asMap().mapValues { it.value.value })
}
private fun inject(tracerSetup: TracerSetup, context: Context?) : ContextCarrier {
val contextCarrier = ContextCarrier(mutableMapOf())
context?.let { tracerSetup.openTelemetry.propagators.textMapPropagator.inject(it, contextCarrier) { carrier, key, value -> carrier?.context?.put(key, value) }}
return contextCarrier
}
override fun getCurrentTelemetryId(): UUID {
@ -273,7 +329,7 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
override fun setCurrentTelemetryId(id: UUID) {
val spanInfo = spans.get(id)
val spanInfo = spans[id]
spanInfo?.let {
it.spanScope.close() // close the old scope
val childSpanScope = it.span.makeCurrent()
@ -295,7 +351,7 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
override fun getTelemetryHandles(): List<Any> {
return tracerSetup.openTelemetry.let { listOf(it) }
return listOf(tracerSetup.openTelemetry)
}
private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) {

@ -2,7 +2,6 @@ package net.corda.core.internal.telemetry
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.debug
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
@ -57,13 +56,13 @@ class SimpleLogTelemetryComponent : TelemetryComponent {
logContexts[traceId] = SimpleLogContext(traceId, baggageAttributes)
clientId?.let { MDC.put(CLIENT_ID, it) }
MDC.put(TRACE_ID, traceId.toString())
log.debug {"startSpanForFlow: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: ${attributes+baggageAttributes}"}
log.info("startSpanForFlow: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: ${attributes+baggageAttributes}")
}
// Check when you start a top level flow the startSpanForFlow appears just once, and so the endSpanForFlow also appears just once
// So its valid to do the MDC clear here. For remotes nodes as well
private fun endSpanForFlow(telemetryId: UUID) {
log.debug {"endSpanForFlow: traceId: ${traces.get()}"}
log.info("endSpanForFlow: traceId: ${traces.get()}")
logContexts.remove(telemetryId)
MDC.clear()
}
@ -73,12 +72,12 @@ class SimpleLogTelemetryComponent : TelemetryComponent {
val flowId = flowLogic?.runId
val clientId = flowLogic?.stateMachine?.clientId
val traceId = traces.get()
log.debug {"startSpan: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: $attributes"}
log.info("startSpan: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: $attributes")
}
@Suppress("UNUSED_PARAMETER")
private fun endSpan(telemetryId: UUID) {
log.debug {"endSpan: traceId: ${traces.get()}"}
log.info("endSpan: traceId: ${traces.get()}")
}
override fun getCurrentTelemetryData(): SimpleLogContext {
@ -119,7 +118,7 @@ class SimpleLogTelemetryComponent : TelemetryComponent {
private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) {
when(telemetryStatusCode) {
TelemetryStatusCode.ERROR -> log.error("setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message")
TelemetryStatusCode.OK, TelemetryStatusCode.UNSET -> log.debug {"setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message" }
TelemetryStatusCode.OK, TelemetryStatusCode.UNSET -> log.info("setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message")
}
}

@ -256,7 +256,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop(false)
val telemetryService: TelemetryServiceImpl = TelemetryServiceImpl().also {
val openTelemetryComponent = OpenTelemetryComponent(configuration.myLegalName.toString(), configuration.telemetry.spanStartEndEventsEnabled)
val openTelemetryComponent = OpenTelemetryComponent(configuration.myLegalName.toString(), configuration.telemetry.spanStartEndEventsEnabled, configuration.telemetry.copyBaggageToTags)
if (configuration.telemetry.openTelemetryEnabled && openTelemetryComponent.isEnabled()) {
it.addTelemetryComponent(openTelemetryComponent)
}

@ -224,7 +224,8 @@ data class FlowTimeoutConfiguration(
data class TelemetryConfiguration(
val openTelemetryEnabled: Boolean,
val simpleLogTelemetryEnabled: Boolean,
val spanStartEndEventsEnabled: Boolean
val spanStartEndEventsEnabled: Boolean,
val copyBaggageToTags: Boolean
)
internal typealias Valid<TARGET> = Validated<TARGET, Configuration.Validation.Error>

@ -134,7 +134,7 @@ data class NodeConfigurationImpl(
fun database(devMode: Boolean) = DatabaseConfig(
exportHibernateJMXStatistics = devMode
)
val telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true)
val telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = false, copyBaggageToTags = false)
}
companion object {

@ -229,10 +229,11 @@ internal object TelemetryConfigurationSpec : Configuration.Specification<Telemet
private val openTelemetryEnabled by boolean()
private val simpleLogTelemetryEnabled by boolean()
private val spanStartEndEventsEnabled by boolean()
private val copyBaggageToTags by boolean()
override fun parseValid(configuration: Config, options: Configuration.Options): Valid<TelemetryConfiguration> {
val config = configuration.withOptions(options)
return valid(TelemetryConfiguration(config[openTelemetryEnabled], config[simpleLogTelemetryEnabled], config[spanStartEndEventsEnabled]))
return valid(TelemetryConfiguration(config[openTelemetryEnabled], config[simpleLogTelemetryEnabled], config[spanStartEndEventsEnabled], config[copyBaggageToTags]))
}
}

@ -28,5 +28,6 @@ verifierType = InMemory
telemetry {
openTelemetryEnabled = true,
simpleLogTelemetryEnabled = false,
spanStartEndEventsEnabled = true
spanStartEndEventsEnabled = false,
copyBaggageToTags = false
}

@ -196,7 +196,7 @@ class NodeTest {
rpcUsers = emptyList(),
verifierType = VerifierType.InMemory,
flowTimeout = FlowTimeoutConfiguration(timeout = Duration.ZERO, backoffBase = 1.0, maxRestartCount = 1),
telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true),
telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = false, copyBaggageToTags = false),
rpcSettings = NodeRpcSettings(address = fakeAddress, adminAddress = null, ssl = null),
messagingServerAddress = null,
notary = null,

@ -397,7 +397,7 @@ class NodeConfigurationImplTest {
p2pAddress = NetworkHostAndPort("localhost", 0),
messagingServerAddress = null,
flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true),
telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = false, copyBaggageToTags = false),
notary = null,
devMode = true,
noLocalShell = false,

@ -1,12 +1,10 @@
import static org.gradle.api.JavaVersion.VERSION_1_8
plugins {
// Apply the org.jetbrains.kotlin.jvm Plugin to add support for Kotlin.
id 'org.jetbrains.kotlin.jvm' // version '1.6.21'
// Apply the java-library plugin for API and implementation separation.
id 'org.jetbrains.kotlin.jvm'
id 'java-library'
//id 'com.github.johnrengelman.shadow' // version '7.1.2'
id 'net.corda.plugins.publish-utils'
id 'com.jfrog.artifactory'
}
description 'OpenTelemetry SDK Bundle'

@ -1,12 +1,11 @@
import static org.gradle.api.JavaVersion.VERSION_1_8
plugins {
// Apply the org.jetbrains.kotlin.jvm Plugin to add support for Kotlin.
id 'org.jetbrains.kotlin.jvm' // version '1.6.21'
// Apply the java-library plugin for API and implementation separation.
id 'org.jetbrains.kotlin.jvm'
id 'java-library'
id 'com.github.johnrengelman.shadow' // version '7.1.2'
id 'com.github.johnrengelman.shadow'
id 'net.corda.plugins.publish-utils'
id 'com.jfrog.artifactory'
}
description 'OpenTelemetry Driver'
@ -37,5 +36,4 @@ jar {
publish {
disableDefaultJar = true
name 'corda-opentelemetry-driver'
}
}

@ -1,9 +1,11 @@
package net.corda.opentelemetrydriver
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.context.propagation.TextMapPropagator
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.sdk.OpenTelemetrySdk
@ -32,7 +34,8 @@ class OpenTelemetryDriver(serviceName: String) {
val openTelemetry: OpenTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setMeterProvider(sdkMeterProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(),
W3CBaggagePropagator.getInstance())))
.build()
fun shutdown() {

@ -492,7 +492,7 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
doReturn(emptyList<SecureHash>()).whenever(it).extraNetworkMapKeys
doReturn(listOf(baseDirectory / "cordapps")).whenever(it).cordappDirectories
doReturn(emptyList<String>()).whenever(it).quasarExcludePackages
doReturn(TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true)).whenever(it).telemetry
doReturn(TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = false, copyBaggageToTags = false)).whenever(it).telemetry
parameters.configOverrides(it)
}