ENT-8823: StartEnd events now dont get mangled when parent span is a user span. Also other refactorings like using OT propagator now.

This commit is contained in:
Adel El-Beik 2022-12-06 19:24:19 +00:00
parent c3083af4c3
commit 50a1a4ce09
2 changed files with 133 additions and 82 deletions

View File

@ -7,10 +7,7 @@ import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.common.AttributesBuilder
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanContext
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.api.trace.TraceFlags
import io.opentelemetry.api.trace.TraceState
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.Context
import io.opentelemetry.context.Scope
@ -21,26 +18,19 @@ import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import net.corda.opentelemetrydriver.OpenTelemetryDriver
import io.opentelemetry.context.propagation.TextMapGetter
import java.util.concurrent.ConcurrentLinkedDeque
const val TRACEIDHEX_LENGTH = 32
const val SPANIDHEX_LENGTH = 16
@CordaSerializable
data class SerializableSpanContext(val traceIdHex : String, val spanIdHex : String, val traceFlagsHex : String, val traceStateMap : Map<String, String>) {
data class SpanEventContexts(val child: Context, val parent: Context)
@CordaSerializable
data class ContextCarrier(val context: MutableMap<String,String>)
@CordaSerializable
data class OpenTelemetryContext(val context: ContextCarrier, val spanEventChildContext: ContextCarrier, val spanEventParentContext: ContextCarrier, val baggage: Map<String,String>): TelemetryDataItem
constructor(spanContext: SpanContext) : this(spanContext.traceId, spanContext.spanId, spanContext.traceFlags.asHex(), spanContext.traceState.asMap().toMap())
constructor() : this("0".repeat(TRACEIDHEX_LENGTH), "0".repeat(SPANIDHEX_LENGTH), TraceFlags.getDefault().asHex(), TraceState.getDefault().asMap())
fun createSpanContext() = SpanContext.create(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState())
fun createRemoteSpanContext() = SpanContext.createFromRemoteParent(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState())
private fun createTraceState() = traceStateMap.toList().fold(TraceState.builder()) { builder, pair -> builder.put(pair.first, pair.second)}.build()
}
data class OpenTelemetryContext(val spanContext: SerializableSpanContext, val startedSpanContext: SerializableSpanContext, val baggage: Map<String,String>): TelemetryDataItem
data class SpanInfo(val name: String, val span: Span, val spanScope: Scope, val spanEventContext: SerializableSpanContext? = null, val parentSpanEventContext: SerializableSpanContext? = null)
data class SpanInfo(val name: String, val span: Span, val spanScope: Scope,
val spanEventContext: SpanEventContexts? = null,
val spanEventContextQueue: ConcurrentLinkedDeque<SpanEventContexts>? = null)
class TracerSetup(serviceName: String) {
private var openTelemetryDriver: Any? = null
@ -98,11 +88,31 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
tracerSetup.shutdown()
}
private fun extractContext(carrier: ContextCarrier): Context {
val getter = object : TextMapGetter<ContextCarrier?> {
override fun get(carrier: ContextCarrier?, key: String): String? {
return if (carrier?.context?.containsKey(key) == true) {
val value = carrier.context[key]
value
} else null
}
override fun keys(carrier: ContextCarrier?): MutableIterable<String> {
return carrier?.context?.keys ?: mutableListOf()
}
}
return carrier.let {
tracerSetup.openTelemetry.propagators.textMapPropagator.extract(Context.current(), it, getter)
}
}
@Suppress("LongParameterList")
private fun startSpanForFlow(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?,
telemetryDataItem: TelemetryDataItem?) {
val baggageAttributes = (telemetryDataItem as? OpenTelemetryContext)?.baggage?.let {
val openTelemetryContext = telemetryDataItem as? OpenTelemetryContext
val extractedContext = openTelemetryContext?.let { extractContext(it.context) }
val spanEventContexts = openTelemetryContext?.let { SpanEventContexts(extractContext(it.spanEventChildContext), extractContext(it.spanEventParentContext)) }
val baggageAttributes = openTelemetryContext?.baggage?.let {
val baggageBuilder = it.toList().fold(Baggage.current().toBuilder()) {builder, attribute -> builder.put(attribute.first, attribute.second)}
baggages[telemetryId] = baggageBuilder.build().makeCurrent()
it
@ -113,14 +123,29 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
.fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also {
populateWithFlowAttributes(it, flowLogic)
}.build()
if (telemetryDataItem != null) {
startSpanForFlowWithRemoteParent(name, attributesMap, telemetryId, telemetryDataItem)
if (extractedContext != null && spanEventContexts != null) {
startSpanForFlowWithRemoteParent(name, attributesMap, telemetryId, extractedContext, spanEventContexts)
}
else {
startSpanForFlowWithNoParent(name, attributesMap, telemetryId)
}
}
private fun startSpanForFlowWithRemoteParent(name: String, attributesMap: Attributes, telemetryId: UUID, parentContext: Context, spanEventContexts: SpanEventContexts) {
val span = tracer.spanBuilder(name).setParent(parentContext).setAllAttributes(attributesMap).startSpan()
val spanScope = span.makeCurrent()
val contextAndQueue = startEndEventForFlowWithRemoteParent(name, attributesMap, spanEventContexts)
spans[telemetryId] = SpanInfo(name, span, spanScope, contextAndQueue?.first, contextAndQueue?.second)
}
private fun startEndEventForFlowWithRemoteParent(name: String, attributesMap: Attributes, spanEventContexts: SpanEventContexts): Pair<SpanEventContexts, ConcurrentLinkedDeque<SpanEventContexts>>? {
if (spanStartEndEventsEnabled) {
val contexts = createSpanToCaptureStartedSpanEventWithRemoteParent(name, spanEventContexts, attributesMap)
return Pair( contexts, ConcurrentLinkedDeque<SpanEventContexts>().also { it.add(contexts) })
}
return null
}
private fun startSpanForFlowWithNoParent(name: String, attributesMap: Attributes, telemetryId: UUID) {
val rootSpan = tracer.spanBuilder(name).setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.flow"), "true")).startSpan()
val rootSpanScope = rootSpan.makeCurrent()
@ -129,50 +154,31 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
val span = tracer.spanBuilder("Child Spans").setParent(Context.current().with(rootSpan)).startSpan()
val spanScope = span.makeCurrent()
rootSpans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope)
spans[telemetryId] = SpanInfo(name, span, spanScope, startedSpanContexts.first, startedSpanContexts.second)
val spanEventContextStack = ConcurrentLinkedDeque<SpanEventContexts>().also { it.add(startedSpanContexts) }
spans[telemetryId] = SpanInfo(name, span, spanScope, startedSpanContexts, spanEventContextStack)
}
else {
spans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope)
}
}
private fun startSpanForFlowWithRemoteParent(name: String, attributesMap: Attributes, telemetryId: UUID, telemetryDataItem: TelemetryDataItem) {
val parentContext = (telemetryDataItem as OpenTelemetryContext).spanContext
val spanContext = parentContext.createRemoteSpanContext()
val parentSpan = Span.wrap(spanContext)
val span = tracer.spanBuilder(name).setParent(Context.current().with(parentSpan)).setAllAttributes(attributesMap).startSpan()
val spanScope = span.makeCurrent()
if (spanStartEndEventsEnabled) {
val contexts = createSpanToCaptureStartedSpanEventWithRemoteParent(name, telemetryDataItem, attributesMap)
spans[telemetryId] = SpanInfo(name, span, spanScope, contexts.first, contexts.second)
}
else {
spans[telemetryId] = SpanInfo(name, span, spanScope)
}
}
private fun createSpanToCaptureStartedSpanEvent(name: String, rootSpan: Span, attributesMap: Attributes): Pair<SerializableSpanContext, SerializableSpanContext> {
private fun createSpanToCaptureStartedSpanEvent(name: String, rootSpan: Span, attributesMap: Attributes): SpanEventContexts {
val startedSpan = tracer.spanBuilder("Started Events").setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.startend.events"), "true")).setParent(Context.current().with(rootSpan)).startSpan()
val serializableSpanContext = SerializableSpanContext(startedSpan.spanContext)
val parentContext = Context.current().with(startedSpan)
startedSpan.end()
val startedSpanContext = serializableSpanContext.createSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(Context.current().with(startedSpanFromContext)).startSpan()
val childSerializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext)
.setParent(parentContext).startSpan()
val childContext = Context.current().with(startedSpanChild)
startedSpanChild.end()
return Pair(childSerializableSpanContext, serializableSpanContext)
return SpanEventContexts(childContext, parentContext)
}
private fun createSpanToCaptureStartedSpanEventWithRemoteParent(name: String, telemetryDataItem: OpenTelemetryContext, attributesMap: Attributes ): Pair<SerializableSpanContext, SerializableSpanContext> {
val startedSpanParentContext = telemetryDataItem.startedSpanContext
val startedSpanContext = startedSpanParentContext.createRemoteSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
private fun createSpanToCaptureStartedSpanEventWithRemoteParent(name: String, spanEventContexts: SpanEventContexts, attributesMap: Attributes ): SpanEventContexts {
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(Context.current().with(startedSpanFromContext)).startSpan()
val serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext)
.setParent(spanEventContexts.child).startSpan()
val grandChildContext = Context.current().with(startedSpanChild)
startedSpanChild.end()
return Pair(serializableSpanContext, startedSpanParentContext)
return SpanEventContexts(grandChildContext, spanEventContexts.child)
}
private fun endSpanForFlow(telemetryId: UUID){
@ -194,27 +200,27 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
private fun createSpanToCaptureEndSpanEvent(spanInfo: SpanInfo?) {
spanInfo?.parentSpanEventContext?.let {
val startedSpanContext = it.createSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${spanInfo.name}-end").setParent(Context.current().with(startedSpanFromContext)).startSpan()
spanInfo?.spanEventContext?.parent?.let {
val startedSpanChild = tracer.spanBuilder("${spanInfo.name}-end").setParent(it).startSpan()
startedSpanChild.end()
}
val spanEventContextStack = spanInfo?.spanEventContextQueue
val filteredSpanEventContexts = spanEventContextStack?.filter { it == spanInfo.spanEventContext }
filteredSpanEventContexts?.forEach { spanEventContextStack.remove(it) }
}
private fun startSpan(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?) {
val currentBaggage = Baggage.current()
val baggageAttributes = mutableMapOf<String,String>()
currentBaggage.forEach { t, u -> baggageAttributes[t] = u.value }
val parentSpan = Span.current()
val attributesMap = (attributes+baggageAttributes).toList().fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also {
populateWithFlowAttributes(it, flowLogic)
}.build()
val span = tracer.spanBuilder(name).setAllAttributes(attributesMap).startSpan()
val spanScope = span.makeCurrent()
val startedEventContexts = createStartedEventSpan(name, attributesMap, parentSpan)
spans[telemetryId] = SpanInfo(name, span, spanScope, startedEventContexts.first, startedEventContexts.second)
val spanEventContexts = createStartedEventSpan(name, attributesMap, parentSpan)
spans[telemetryId] = SpanInfo(name, span, spanScope, spanEventContexts?.peekLast(), spanEventContexts)
}
private fun populateWithFlowAttributes(attributesBuilder: AttributesBuilder, flowLogic: FlowLogic<*>?) {
@ -225,28 +231,36 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
}
private fun createStartedEventSpan(name: String, attributesMap: Attributes, parentSpan: Span): Pair<SerializableSpanContext?, SerializableSpanContext?> {
if (spanStartEndEventsEnabled) {
// Fix up null contexts - make not null
private fun createStartedEventSpan(name: String, attributesMap: Attributes, parentSpan: Span): ConcurrentLinkedDeque<SpanEventContexts>? {
return if (spanStartEndEventsEnabled) {
val filteredSpans = spans.filter { it.value.span == parentSpan }.toList()
var serializableSpanContext = SerializableSpanContext()
var parentStartedSpanContext = SerializableSpanContext()
if (filteredSpans.isNotEmpty()) {
parentStartedSpanContext = filteredSpans[0].second.spanEventContext ?: SerializableSpanContext()
val startedSpanContext = parentStartedSpanContext.createSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(Context.current().with(startedSpanFromContext)).startSpan()
serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext)
startedSpanChild.end()
}
return Pair(serializableSpanContext, parentStartedSpanContext)
val (startEventParentContext, spanEventContextQueue) = getStartEventParentContext(filteredSpans, parentSpan)
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(startEventParentContext).startSpan()
val childContext = Context.current().with(startedSpanChild)
startedSpanChild.end()
spanEventContextQueue?.offer(SpanEventContexts(childContext, startEventParentContext))
spanEventContextQueue
}
else {
return Pair(null, null)
null
}
}
private fun getStartEventParentContext(filteredSpans: List<Pair<UUID, SpanInfo>>, parentSpan: Span): Pair<Context, ConcurrentLinkedDeque<SpanEventContexts>?> {
return if (filteredSpans.isNotEmpty()) {
Pair(filteredSpans[0].second.spanEventContext?.child ?: Context.current(), filteredSpans[0].second.spanEventContextQueue)
}
else {
// Copes with case where user has created their own span. So we just use the most
// recent span we know about on the stack.
val altFilteredSpans = spans.filter { it.value.span.spanContext.traceId == parentSpan.spanContext.traceId }.toList()
val spanEventContexts = altFilteredSpans[0].second.spanEventContextQueue
Pair(spanEventContexts?.peekLast()?.child ?: Context.current(), spanEventContexts)
}
}
private fun endSpan(telemetryId: UUID){
val spanInfo = spans[telemetryId]
createSpanToCaptureEndSpanEvent(spanInfo)
@ -256,11 +270,45 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
override fun getCurrentTelemetryData(): TelemetryDataItem {
val currentSpan = Span.current()
val spanContext = SerializableSpanContext(currentSpan.spanContext)
val currentContextCarrier = inject(tracerSetup, Context.current())
val filteredSpans = spans.filter { it.value.span == currentSpan }.toList()
val startedSpanContext = filteredSpans.getOrNull(0)?.second?.spanEventContext ?: SerializableSpanContext()
return OpenTelemetryContext(spanContext, startedSpanContext, Baggage.current().asMap().mapValues { it.value.value })
val childContext = if (filteredSpans.isNotEmpty()) {
filteredSpans.getOrNull(0)?.second?.spanEventContext?.child
}
else {
val altFilteredSpans = spans.filter { it.value.span.spanContext.traceId == currentSpan.spanContext.traceId }.toList()
if (altFilteredSpans.isNotEmpty()) {
altFilteredSpans[0].second.spanEventContextQueue?.peekLast()?.child
}
else {
null
}
}
val childContextCarrier = inject(tracerSetup, childContext)
val parentContext = if (filteredSpans.isNotEmpty()) {
filteredSpans.getOrNull(0)?.second?.spanEventContext?.parent
}
else {
val altFilteredSpans = spans.filter { it.value.span.spanContext.traceId == currentSpan.spanContext.traceId }.toList()
if (altFilteredSpans.isNotEmpty()) {
altFilteredSpans[0].second.spanEventContextQueue?.peekLast()?.parent
}
else {
null
}
}
val parentContextCarrier = inject(tracerSetup, parentContext)
return OpenTelemetryContext(currentContextCarrier, childContextCarrier, parentContextCarrier, Baggage.current().asMap().mapValues { it.value.value })
}
private fun inject(tracerSetup: TracerSetup, context: Context?) : ContextCarrier {
val contextCarrier = ContextCarrier(mutableMapOf())
context?.let { tracerSetup.openTelemetry.propagators.textMapPropagator.inject(it, contextCarrier) { carrier, key, value -> carrier?.context?.put(key, value) }}
return contextCarrier
}
override fun getCurrentTelemetryId(): UUID {
@ -273,7 +321,7 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
override fun setCurrentTelemetryId(id: UUID) {
val spanInfo = spans.get(id)
val spanInfo = spans[id]
spanInfo?.let {
it.spanScope.close() // close the old scope
val childSpanScope = it.span.makeCurrent()
@ -295,7 +343,7 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
}
override fun getTelemetryHandles(): List<Any> {
return tracerSetup.openTelemetry.let { listOf(it) }
return listOf(tracerSetup.openTelemetry)
}
private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) {

View File

@ -1,9 +1,11 @@
package net.corda.opentelemetrydriver
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.context.propagation.TextMapPropagator
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.sdk.OpenTelemetrySdk
@ -32,7 +34,8 @@ class OpenTelemetryDriver(serviceName: String) {
val openTelemetry: OpenTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setMeterProvider(sdkMeterProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(),
W3CBaggagePropagator.getInstance())))
.build()
fun shutdown() {