diff --git a/.ci/api-current.txt b/.ci/api-current.txt index fd07a7ea41..56123e9a97 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -9391,7 +9391,8 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang. public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean) public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean) public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean) - public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, int, kotlin.jvm.internal.DefaultConstructorMarker) + public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, boolean) + public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, boolean, int, kotlin.jvm.internal.DefaultConstructorMarker) @NotNull public final java.time.Duration component1() @NotNull @@ -9419,7 +9420,7 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang. @NotNull public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration) @NotNull - public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean) + public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, boolean) public boolean equals(Object) public int getCacheConcurrencyLevel() @NotNull @@ -9427,6 +9428,7 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang. @NotNull public java.time.Duration getConnectionRetryInterval() public double getConnectionRetryIntervalMultiplier() + public boolean getCopyBaggageToTags() @NotNull public java.time.Duration getDeduplicationCacheExpiry() public int getMaxFileSize() diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 10f9585c03..12cc2c7d1a 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -179,7 +179,9 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( open val simpleLogTelemetryEnabled: Boolean = false, - open val spanStartEndEventsEnabled: Boolean = true + open val spanStartEndEventsEnabled: Boolean = false, + + open val copyBaggageToTags: Boolean = false ) { companion object { @@ -226,7 +228,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( deduplicationCacheExpiry, openTelemetryEnabled, simpleLogTelemetryEnabled, - spanStartEndEventsEnabled + spanStartEndEventsEnabled, + copyBaggageToTags ) } @@ -246,7 +249,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry, openTelemetryEnabled: Boolean = this.openTelemetryEnabled, simpleLogTelemetryEnabled: Boolean = this.simpleLogTelemetryEnabled, - spanStartEndEventsEnabled: Boolean = this.spanStartEndEventsEnabled + spanStartEndEventsEnabled: Boolean = this.spanStartEndEventsEnabled, + copyBaggageToTags: Boolean = this.copyBaggageToTags ): CordaRPCClientConfiguration { return CordaRPCClientConfiguration( connectionMaxRetryInterval, @@ -262,7 +266,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( deduplicationCacheExpiry, openTelemetryEnabled, simpleLogTelemetryEnabled, - spanStartEndEventsEnabled + spanStartEndEventsEnabled, + copyBaggageToTags ) } @@ -286,6 +291,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( if (openTelemetryEnabled != other.openTelemetryEnabled) return false if (simpleLogTelemetryEnabled != other.simpleLogTelemetryEnabled) return false if (spanStartEndEventsEnabled != other.spanStartEndEventsEnabled) return false + if (copyBaggageToTags != other.copyBaggageToTags) return false return true } @@ -306,6 +312,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( result = 31 * result + openTelemetryEnabled.hashCode() result = 31 * result + simpleLogTelemetryEnabled.hashCode() result = 31 * result + spanStartEndEventsEnabled.hashCode() + result = 31 * result + copyBaggageToTags.hashCode() return result } @@ -321,7 +328,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( "deduplicationCacheExpiry=$deduplicationCacheExpiry, " + "openTelemetryEnabled=$openTelemetryEnabled, " + "simpleLogTelemetryEnabled=$simpleLogTelemetryEnabled, " + - "spanStartEndEventsEnabled=$spanStartEndEventsEnabled)" + "spanStartEndEventsEnabled=$spanStartEndEventsEnabled, " + + "copyBaggageToTags=$copyBaggageToTags )" } // Left in for backwards compatibility with version 3.1 diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index e529676301..fe34aa057c 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -102,7 +102,7 @@ class RPCClient( val targetString = "${transport.params[TransportConstants.HOST_PROP_NAME]}:${transport.params[TransportConstants.PORT_PROP_NAME]}" val rpcClientTelemetry = RPCClientTelemetry("rpcClient-$targetString", rpcConfiguration.openTelemetryEnabled, - rpcConfiguration.simpleLogTelemetryEnabled, rpcConfiguration.spanStartEndEventsEnabled) + rpcConfiguration.simpleLogTelemetryEnabled, rpcConfiguration.spanStartEndEventsEnabled, rpcConfiguration.copyBaggageToTags) val sessionId = Trace.SessionId.newInstance() val distributionMux = DistributionMux(listeners, username) val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientTelemetry.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientTelemetry.kt index 8073a88a2f..c2c9457919 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientTelemetry.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientTelemetry.kt @@ -5,7 +5,9 @@ import net.corda.core.internal.telemetry.SimpleLogTelemetryComponent import net.corda.core.internal.telemetry.TelemetryServiceImpl import net.corda.core.utilities.contextLogger -class RPCClientTelemetry(val serviceName: String, val openTelemetryEnabled: Boolean, val simpleLogTelemetryEnabled: Boolean, val spanStartEndEventsEnabled: Boolean) { +class RPCClientTelemetry(val serviceName: String, val openTelemetryEnabled: Boolean, + val simpleLogTelemetryEnabled: Boolean, val spanStartEndEventsEnabled: Boolean, + val copyBaggageToTags: Boolean) { companion object { private val log = contextLogger() @@ -16,7 +18,7 @@ class RPCClientTelemetry(val serviceName: String, val openTelemetryEnabled: Bool init { if (openTelemetryEnabled) { try { - val openTelemetryComponent = OpenTelemetryComponent(serviceName, spanStartEndEventsEnabled) + val openTelemetryComponent = OpenTelemetryComponent(serviceName, spanStartEndEventsEnabled, copyBaggageToTags) if (openTelemetryComponent.isEnabled()) { telemetryService.addTelemetryComponent(openTelemetryComponent) log.debug("OpenTelemetry enabled") diff --git a/constants.properties b/constants.properties index d716006f66..9a01adc17b 100644 --- a/constants.properties +++ b/constants.properties @@ -13,8 +13,8 @@ java8MinUpdateVersion=171 # net.corda.core.internal.CordaUtilsKt.PLATFORM_VERSION as well. # # ***************************************************************# platformVersion=12 -openTelemetryVersion=1.17.0 -openTelemetrySemConvVersion=1.17.0-alpha +openTelemetryVersion=1.20.1 +openTelemetrySemConvVersion=1.20.1-alpha guavaVersion=28.0-jre # Quasar version to use with Java 8: quasarVersion=0.7.15_r3 diff --git a/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryComponent.kt b/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryComponent.kt index f3f92583ee..33446f2901 100644 --- a/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryComponent.kt +++ b/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryComponent.kt @@ -7,10 +7,7 @@ import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.common.AttributesBuilder import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.SpanContext import io.opentelemetry.api.trace.StatusCode -import io.opentelemetry.api.trace.TraceFlags -import io.opentelemetry.api.trace.TraceState import io.opentelemetry.api.trace.Tracer import io.opentelemetry.context.Context import io.opentelemetry.context.Scope @@ -21,26 +18,19 @@ import org.slf4j.LoggerFactory import java.util.* import java.util.concurrent.ConcurrentHashMap import net.corda.opentelemetrydriver.OpenTelemetryDriver +import io.opentelemetry.context.propagation.TextMapGetter +import java.util.concurrent.ConcurrentLinkedDeque -const val TRACEIDHEX_LENGTH = 32 -const val SPANIDHEX_LENGTH = 16 @CordaSerializable -data class SerializableSpanContext(val traceIdHex : String, val spanIdHex : String, val traceFlagsHex : String, val traceStateMap : Map) { +data class SpanEventContexts(val child: Context, val parent: Context) +@CordaSerializable +data class ContextCarrier(val context: MutableMap) +@CordaSerializable +data class OpenTelemetryContext(val context: ContextCarrier, val spanEventChildContext: ContextCarrier, val spanEventParentContext: ContextCarrier, val baggage: Map): TelemetryDataItem - constructor(spanContext: SpanContext) : this(spanContext.traceId, spanContext.spanId, spanContext.traceFlags.asHex(), spanContext.traceState.asMap().toMap()) - - constructor() : this("0".repeat(TRACEIDHEX_LENGTH), "0".repeat(SPANIDHEX_LENGTH), TraceFlags.getDefault().asHex(), TraceState.getDefault().asMap()) - - fun createSpanContext() = SpanContext.create(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState()) - - fun createRemoteSpanContext() = SpanContext.createFromRemoteParent(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState()) - - private fun createTraceState() = traceStateMap.toList().fold(TraceState.builder()) { builder, pair -> builder.put(pair.first, pair.second)}.build() -} - -data class OpenTelemetryContext(val spanContext: SerializableSpanContext, val startedSpanContext: SerializableSpanContext, val baggage: Map): TelemetryDataItem - -data class SpanInfo(val name: String, val span: Span, val spanScope: Scope, val spanEventContext: SerializableSpanContext? = null, val parentSpanEventContext: SerializableSpanContext? = null) +data class SpanInfo(val name: String, val span: Span, val spanScope: Scope, + val spanEventContext: SpanEventContexts? = null, + val spanEventContextQueue: ConcurrentLinkedDeque? = null) class TracerSetup(serviceName: String) { private var openTelemetryDriver: Any? = null @@ -63,7 +53,7 @@ class TracerSetup(serviceName: String) { } @Suppress("TooManyFunctions") -class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnabled: Boolean) : TelemetryComponent { +class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnabled: Boolean, val copyBaggageToTags: Boolean) : TelemetryComponent { val tracerSetup = TracerSetup(serviceName) val tracer: Tracer = tracerSetup.getTracer() @@ -98,29 +88,70 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab tracerSetup.shutdown() } + private fun extractContext(carrier: ContextCarrier): Context { + val getter = object : TextMapGetter { + override fun get(carrier: ContextCarrier?, key: String): String? { + return if (carrier?.context?.containsKey(key) == true) { + val value = carrier.context[key] + value + } else null + } + override fun keys(carrier: ContextCarrier?): MutableIterable { + return carrier?.context?.keys ?: mutableListOf() + } + } + return carrier.let { + tracerSetup.openTelemetry.propagators.textMapPropagator.extract(Context.current(), it, getter) + } + } + @Suppress("LongParameterList") private fun startSpanForFlow(name: String, attributes: Map, telemetryId: UUID, flowLogic: FlowLogic<*>?, telemetryDataItem: TelemetryDataItem?) { - val baggageAttributes = (telemetryDataItem as? OpenTelemetryContext)?.baggage?.let { + val openTelemetryContext = telemetryDataItem as? OpenTelemetryContext + val extractedContext = openTelemetryContext?.let { extractContext(it.context) } + val spanEventContexts = openTelemetryContext?.let { SpanEventContexts(extractContext(it.spanEventChildContext), extractContext(it.spanEventParentContext)) } + val baggageAttributes = openTelemetryContext?.baggage?.let { val baggageBuilder = it.toList().fold(Baggage.current().toBuilder()) {builder, attribute -> builder.put(attribute.first, attribute.second)} baggages[telemetryId] = baggageBuilder.build().makeCurrent() it } ?: emptyMap() - // Also add any baggage to the span - val attributesMap = (attributes+baggageAttributes).toList() + val allAttributes = if (copyBaggageToTags) { + attributes + baggageAttributes + } + else { + attributes + } + + val attributesMap = allAttributes.toList() .fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also { populateWithFlowAttributes(it, flowLogic) }.build() - if (telemetryDataItem != null) { - startSpanForFlowWithRemoteParent(name, attributesMap, telemetryId, telemetryDataItem) + if (extractedContext != null && spanEventContexts != null) { + startSpanForFlowWithRemoteParent(name, attributesMap, telemetryId, extractedContext, spanEventContexts) } else { startSpanForFlowWithNoParent(name, attributesMap, telemetryId) } } + private fun startSpanForFlowWithRemoteParent(name: String, attributesMap: Attributes, telemetryId: UUID, parentContext: Context, spanEventContexts: SpanEventContexts) { + val span = tracer.spanBuilder(name).setParent(parentContext).setAllAttributes(attributesMap).startSpan() + val spanScope = span.makeCurrent() + val contextAndQueue = startEndEventForFlowWithRemoteParent(name, attributesMap, spanEventContexts) + spans[telemetryId] = SpanInfo(name, span, spanScope, contextAndQueue?.first, contextAndQueue?.second) + } + + private fun startEndEventForFlowWithRemoteParent(name: String, attributesMap: Attributes, spanEventContexts: SpanEventContexts): Pair>? { + if (spanStartEndEventsEnabled) { + val contexts = createSpanToCaptureStartedSpanEventWithRemoteParent(name, spanEventContexts, attributesMap) + return Pair( contexts, ConcurrentLinkedDeque().also { it.add(contexts) }) + } + return null + } + private fun startSpanForFlowWithNoParent(name: String, attributesMap: Attributes, telemetryId: UUID) { val rootSpan = tracer.spanBuilder(name).setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.flow"), "true")).startSpan() val rootSpanScope = rootSpan.makeCurrent() @@ -129,50 +160,31 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab val span = tracer.spanBuilder("Child Spans").setParent(Context.current().with(rootSpan)).startSpan() val spanScope = span.makeCurrent() rootSpans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope) - spans[telemetryId] = SpanInfo(name, span, spanScope, startedSpanContexts.first, startedSpanContexts.second) + val spanEventContextStack = ConcurrentLinkedDeque().also { it.add(startedSpanContexts) } + spans[telemetryId] = SpanInfo(name, span, spanScope, startedSpanContexts, spanEventContextStack) } else { spans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope) } } - private fun startSpanForFlowWithRemoteParent(name: String, attributesMap: Attributes, telemetryId: UUID, telemetryDataItem: TelemetryDataItem) { - val parentContext = (telemetryDataItem as OpenTelemetryContext).spanContext - val spanContext = parentContext.createRemoteSpanContext() - val parentSpan = Span.wrap(spanContext) - val span = tracer.spanBuilder(name).setParent(Context.current().with(parentSpan)).setAllAttributes(attributesMap).startSpan() - val spanScope = span.makeCurrent() - if (spanStartEndEventsEnabled) { - val contexts = createSpanToCaptureStartedSpanEventWithRemoteParent(name, telemetryDataItem, attributesMap) - spans[telemetryId] = SpanInfo(name, span, spanScope, contexts.first, contexts.second) - } - else { - spans[telemetryId] = SpanInfo(name, span, spanScope) - } - } - - private fun createSpanToCaptureStartedSpanEvent(name: String, rootSpan: Span, attributesMap: Attributes): Pair { + private fun createSpanToCaptureStartedSpanEvent(name: String, rootSpan: Span, attributesMap: Attributes): SpanEventContexts { val startedSpan = tracer.spanBuilder("Started Events").setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.startend.events"), "true")).setParent(Context.current().with(rootSpan)).startSpan() - val serializableSpanContext = SerializableSpanContext(startedSpan.spanContext) + val parentContext = Context.current().with(startedSpan) startedSpan.end() - val startedSpanContext = serializableSpanContext.createSpanContext() - val startedSpanFromContext = Span.wrap(startedSpanContext) val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap) - .setParent(Context.current().with(startedSpanFromContext)).startSpan() - val childSerializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext) + .setParent(parentContext).startSpan() + val childContext = Context.current().with(startedSpanChild) startedSpanChild.end() - return Pair(childSerializableSpanContext, serializableSpanContext) + return SpanEventContexts(childContext, parentContext) } - private fun createSpanToCaptureStartedSpanEventWithRemoteParent(name: String, telemetryDataItem: OpenTelemetryContext, attributesMap: Attributes ): Pair { - val startedSpanParentContext = telemetryDataItem.startedSpanContext - val startedSpanContext = startedSpanParentContext.createRemoteSpanContext() - val startedSpanFromContext = Span.wrap(startedSpanContext) + private fun createSpanToCaptureStartedSpanEventWithRemoteParent(name: String, spanEventContexts: SpanEventContexts, attributesMap: Attributes ): SpanEventContexts { val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap) - .setParent(Context.current().with(startedSpanFromContext)).startSpan() - val serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext) + .setParent(spanEventContexts.child).startSpan() + val grandChildContext = Context.current().with(startedSpanChild) startedSpanChild.end() - return Pair(serializableSpanContext, startedSpanParentContext) + return SpanEventContexts(grandChildContext, spanEventContexts.child) } private fun endSpanForFlow(telemetryId: UUID){ @@ -194,27 +206,29 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab } private fun createSpanToCaptureEndSpanEvent(spanInfo: SpanInfo?) { - spanInfo?.parentSpanEventContext?.let { - val startedSpanContext = it.createSpanContext() - val startedSpanFromContext = Span.wrap(startedSpanContext) - val startedSpanChild = tracer.spanBuilder("${spanInfo.name}-end").setParent(Context.current().with(startedSpanFromContext)).startSpan() + spanInfo?.spanEventContext?.parent?.let { + val startedSpanChild = tracer.spanBuilder("${spanInfo.name}-end").setParent(it).startSpan() startedSpanChild.end() } + val spanEventContextStack = spanInfo?.spanEventContextQueue + val filteredSpanEventContexts = spanEventContextStack?.filter { it == spanInfo.spanEventContext } + filteredSpanEventContexts?.forEach { spanEventContextStack.remove(it) } } private fun startSpan(name: String, attributes: Map, telemetryId: UUID, flowLogic: FlowLogic<*>?) { val currentBaggage = Baggage.current() val baggageAttributes = mutableMapOf() - currentBaggage.forEach { t, u -> baggageAttributes[t] = u.value } - + if (copyBaggageToTags) { + currentBaggage.forEach { t, u -> baggageAttributes[t] = u.value } + } val parentSpan = Span.current() val attributesMap = (attributes+baggageAttributes).toList().fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also { populateWithFlowAttributes(it, flowLogic) }.build() val span = tracer.spanBuilder(name).setAllAttributes(attributesMap).startSpan() val spanScope = span.makeCurrent() - val startedEventContexts = createStartedEventSpan(name, attributesMap, parentSpan) - spans[telemetryId] = SpanInfo(name, span, spanScope, startedEventContexts.first, startedEventContexts.second) + val spanEventContexts = createStartedEventSpan(name, attributesMap, parentSpan) + spans[telemetryId] = SpanInfo(name, span, spanScope, spanEventContexts?.peekLast(), spanEventContexts) } private fun populateWithFlowAttributes(attributesBuilder: AttributesBuilder, flowLogic: FlowLogic<*>?) { @@ -225,28 +239,36 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab } } - private fun createStartedEventSpan(name: String, attributesMap: Attributes, parentSpan: Span): Pair { - if (spanStartEndEventsEnabled) { - // Fix up null contexts - make not null + private fun createStartedEventSpan(name: String, attributesMap: Attributes, parentSpan: Span): ConcurrentLinkedDeque? { + return if (spanStartEndEventsEnabled) { val filteredSpans = spans.filter { it.value.span == parentSpan }.toList() - var serializableSpanContext = SerializableSpanContext() - var parentStartedSpanContext = SerializableSpanContext() - if (filteredSpans.isNotEmpty()) { - parentStartedSpanContext = filteredSpans[0].second.spanEventContext ?: SerializableSpanContext() - val startedSpanContext = parentStartedSpanContext.createSpanContext() - val startedSpanFromContext = Span.wrap(startedSpanContext) - val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap) - .setParent(Context.current().with(startedSpanFromContext)).startSpan() - serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext) - startedSpanChild.end() - } - return Pair(serializableSpanContext, parentStartedSpanContext) + val (startEventParentContext, spanEventContextQueue) = getStartEventParentContext(filteredSpans, parentSpan) + val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap) + .setParent(startEventParentContext).startSpan() + val childContext = Context.current().with(startedSpanChild) + startedSpanChild.end() + spanEventContextQueue?.offer(SpanEventContexts(childContext, startEventParentContext)) + spanEventContextQueue } else { - return Pair(null, null) + null } } + private fun getStartEventParentContext(filteredSpans: List>, parentSpan: Span): Pair?> { + return if (filteredSpans.isNotEmpty()) { + Pair(filteredSpans[0].second.spanEventContext?.child ?: Context.current(), filteredSpans[0].second.spanEventContextQueue) + } + else { + // Copes with case where user has created their own span. So we just use the most + // recent span we know about on the stack. + val altFilteredSpans = spans.filter { it.value.span.spanContext.traceId == parentSpan.spanContext.traceId }.toList() + val spanEventContexts = altFilteredSpans[0].second.spanEventContextQueue + Pair(spanEventContexts?.peekLast()?.child ?: Context.current(), spanEventContexts) + } + + } + private fun endSpan(telemetryId: UUID){ val spanInfo = spans[telemetryId] createSpanToCaptureEndSpanEvent(spanInfo) @@ -256,11 +278,45 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab } override fun getCurrentTelemetryData(): TelemetryDataItem { + val currentSpan = Span.current() - val spanContext = SerializableSpanContext(currentSpan.spanContext) + val currentContextCarrier = inject(tracerSetup, Context.current()) val filteredSpans = spans.filter { it.value.span == currentSpan }.toList() - val startedSpanContext = filteredSpans.getOrNull(0)?.second?.spanEventContext ?: SerializableSpanContext() - return OpenTelemetryContext(spanContext, startedSpanContext, Baggage.current().asMap().mapValues { it.value.value }) + + val childContext = if (filteredSpans.isNotEmpty()) { + filteredSpans.getOrNull(0)?.second?.spanEventContext?.child + } + else { + val altFilteredSpans = spans.filter { it.value.span.spanContext.traceId == currentSpan.spanContext.traceId }.toList() + if (altFilteredSpans.isNotEmpty()) { + altFilteredSpans[0].second.spanEventContextQueue?.peekLast()?.child + } + else { + null + } + } + val childContextCarrier = inject(tracerSetup, childContext) + + val parentContext = if (filteredSpans.isNotEmpty()) { + filteredSpans.getOrNull(0)?.second?.spanEventContext?.parent + } + else { + val altFilteredSpans = spans.filter { it.value.span.spanContext.traceId == currentSpan.spanContext.traceId }.toList() + if (altFilteredSpans.isNotEmpty()) { + altFilteredSpans[0].second.spanEventContextQueue?.peekLast()?.parent + } + else { + null + } + } + val parentContextCarrier = inject(tracerSetup, parentContext) + return OpenTelemetryContext(currentContextCarrier, childContextCarrier, parentContextCarrier, Baggage.current().asMap().mapValues { it.value.value }) + } + + private fun inject(tracerSetup: TracerSetup, context: Context?) : ContextCarrier { + val contextCarrier = ContextCarrier(mutableMapOf()) + context?.let { tracerSetup.openTelemetry.propagators.textMapPropagator.inject(it, contextCarrier) { carrier, key, value -> carrier?.context?.put(key, value) }} + return contextCarrier } override fun getCurrentTelemetryId(): UUID { @@ -273,7 +329,7 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab } override fun setCurrentTelemetryId(id: UUID) { - val spanInfo = spans.get(id) + val spanInfo = spans[id] spanInfo?.let { it.spanScope.close() // close the old scope val childSpanScope = it.span.makeCurrent() @@ -295,7 +351,7 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab } override fun getTelemetryHandles(): List { - return tracerSetup.openTelemetry.let { listOf(it) } + return listOf(tracerSetup.openTelemetry) } private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) { diff --git a/core/src/main/kotlin/net/corda/core/internal/telemetry/SimpleLogTelemetryComponent.kt b/core/src/main/kotlin/net/corda/core/internal/telemetry/SimpleLogTelemetryComponent.kt index f7dcc119a9..1430dd5caa 100644 --- a/core/src/main/kotlin/net/corda/core/internal/telemetry/SimpleLogTelemetryComponent.kt +++ b/core/src/main/kotlin/net/corda/core/internal/telemetry/SimpleLogTelemetryComponent.kt @@ -2,7 +2,6 @@ package net.corda.core.internal.telemetry import net.corda.core.flows.FlowLogic import net.corda.core.serialization.CordaSerializable -import net.corda.core.utilities.debug import org.slf4j.Logger import org.slf4j.LoggerFactory import org.slf4j.MDC @@ -57,13 +56,13 @@ class SimpleLogTelemetryComponent : TelemetryComponent { logContexts[traceId] = SimpleLogContext(traceId, baggageAttributes) clientId?.let { MDC.put(CLIENT_ID, it) } MDC.put(TRACE_ID, traceId.toString()) - log.debug {"startSpanForFlow: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: ${attributes+baggageAttributes}"} + log.info("startSpanForFlow: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: ${attributes+baggageAttributes}") } // Check when you start a top level flow the startSpanForFlow appears just once, and so the endSpanForFlow also appears just once // So its valid to do the MDC clear here. For remotes nodes as well private fun endSpanForFlow(telemetryId: UUID) { - log.debug {"endSpanForFlow: traceId: ${traces.get()}"} + log.info("endSpanForFlow: traceId: ${traces.get()}") logContexts.remove(telemetryId) MDC.clear() } @@ -73,12 +72,12 @@ class SimpleLogTelemetryComponent : TelemetryComponent { val flowId = flowLogic?.runId val clientId = flowLogic?.stateMachine?.clientId val traceId = traces.get() - log.debug {"startSpan: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: $attributes"} + log.info("startSpan: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: $attributes") } @Suppress("UNUSED_PARAMETER") private fun endSpan(telemetryId: UUID) { - log.debug {"endSpan: traceId: ${traces.get()}"} + log.info("endSpan: traceId: ${traces.get()}") } override fun getCurrentTelemetryData(): SimpleLogContext { @@ -119,7 +118,7 @@ class SimpleLogTelemetryComponent : TelemetryComponent { private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) { when(telemetryStatusCode) { TelemetryStatusCode.ERROR -> log.error("setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message") - TelemetryStatusCode.OK, TelemetryStatusCode.UNSET -> log.debug {"setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message" } + TelemetryStatusCode.OK, TelemetryStatusCode.UNSET -> log.info("setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message") } } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 192a82ecad..57c6c7906f 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -256,7 +256,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop(false) val telemetryService: TelemetryServiceImpl = TelemetryServiceImpl().also { - val openTelemetryComponent = OpenTelemetryComponent(configuration.myLegalName.toString(), configuration.telemetry.spanStartEndEventsEnabled) + val openTelemetryComponent = OpenTelemetryComponent(configuration.myLegalName.toString(), configuration.telemetry.spanStartEndEventsEnabled, configuration.telemetry.copyBaggageToTags) if (configuration.telemetry.openTelemetryEnabled && openTelemetryComponent.isEnabled()) { it.addTelemetryComponent(openTelemetryComponent) } diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 412a0a8833..3b80d91e33 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -224,7 +224,8 @@ data class FlowTimeoutConfiguration( data class TelemetryConfiguration( val openTelemetryEnabled: Boolean, val simpleLogTelemetryEnabled: Boolean, - val spanStartEndEventsEnabled: Boolean + val spanStartEndEventsEnabled: Boolean, + val copyBaggageToTags: Boolean ) internal typealias Valid = Validated diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt index d3f9f3b9d7..facf8ad3f6 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt @@ -134,7 +134,7 @@ data class NodeConfigurationImpl( fun database(devMode: Boolean) = DatabaseConfig( exportHibernateJMXStatistics = devMode ) - val telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true) + val telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = false, copyBaggageToTags = false) } companion object { diff --git a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt index f6f906c9be..c28b1b25c9 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt @@ -229,10 +229,11 @@ internal object TelemetryConfigurationSpec : Configuration.Specification { val config = configuration.withOptions(options) - return valid(TelemetryConfiguration(config[openTelemetryEnabled], config[simpleLogTelemetryEnabled], config[spanStartEndEventsEnabled])) + return valid(TelemetryConfiguration(config[openTelemetryEnabled], config[simpleLogTelemetryEnabled], config[spanStartEndEventsEnabled], config[copyBaggageToTags])) } } diff --git a/node/src/main/resources/corda-reference.conf b/node/src/main/resources/corda-reference.conf index 50f729a313..dc141e5ca1 100644 --- a/node/src/main/resources/corda-reference.conf +++ b/node/src/main/resources/corda-reference.conf @@ -28,5 +28,6 @@ verifierType = InMemory telemetry { openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, - spanStartEndEventsEnabled = true + spanStartEndEventsEnabled = false, + copyBaggageToTags = false } diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt index 427dbf5147..50fe6525c4 100644 --- a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt +++ b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt @@ -196,7 +196,7 @@ class NodeTest { rpcUsers = emptyList(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(timeout = Duration.ZERO, backoffBase = 1.0, maxRestartCount = 1), - telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true), + telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = false, copyBaggageToTags = false), rpcSettings = NodeRpcSettings(address = fakeAddress, adminAddress = null, ssl = null), messagingServerAddress = null, notary = null, diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index d0314d5782..62134b7703 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -397,7 +397,7 @@ class NodeConfigurationImplTest { p2pAddress = NetworkHostAndPort("localhost", 0), messagingServerAddress = null, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0), - telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true), + telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = false, copyBaggageToTags = false), notary = null, devMode = true, noLocalShell = false, diff --git a/opentelemetry/build.gradle b/opentelemetry/build.gradle index 57399a3c5f..779d7839d0 100644 --- a/opentelemetry/build.gradle +++ b/opentelemetry/build.gradle @@ -1,12 +1,10 @@ import static org.gradle.api.JavaVersion.VERSION_1_8 plugins { - // Apply the org.jetbrains.kotlin.jvm Plugin to add support for Kotlin. - id 'org.jetbrains.kotlin.jvm' // version '1.6.21' - // Apply the java-library plugin for API and implementation separation. + id 'org.jetbrains.kotlin.jvm' id 'java-library' - //id 'com.github.johnrengelman.shadow' // version '7.1.2' id 'net.corda.plugins.publish-utils' + id 'com.jfrog.artifactory' } description 'OpenTelemetry SDK Bundle' diff --git a/opentelemetry/opentelemetry-driver/build.gradle b/opentelemetry/opentelemetry-driver/build.gradle index 57802afed8..1b7e768696 100644 --- a/opentelemetry/opentelemetry-driver/build.gradle +++ b/opentelemetry/opentelemetry-driver/build.gradle @@ -1,12 +1,11 @@ import static org.gradle.api.JavaVersion.VERSION_1_8 plugins { - // Apply the org.jetbrains.kotlin.jvm Plugin to add support for Kotlin. - id 'org.jetbrains.kotlin.jvm' // version '1.6.21' - // Apply the java-library plugin for API and implementation separation. + id 'org.jetbrains.kotlin.jvm' id 'java-library' - id 'com.github.johnrengelman.shadow' // version '7.1.2' + id 'com.github.johnrengelman.shadow' id 'net.corda.plugins.publish-utils' + id 'com.jfrog.artifactory' } description 'OpenTelemetry Driver' @@ -37,5 +36,4 @@ jar { publish { disableDefaultJar = true name 'corda-opentelemetry-driver' -} - +} \ No newline at end of file diff --git a/opentelemetry/src/main/kotlin/net/corda/opentelemetrydriver/OpenTelemetryDriver.kt b/opentelemetry/src/main/kotlin/net/corda/opentelemetrydriver/OpenTelemetryDriver.kt index d1c568ce03..7866299291 100644 --- a/opentelemetry/src/main/kotlin/net/corda/opentelemetrydriver/OpenTelemetryDriver.kt +++ b/opentelemetry/src/main/kotlin/net/corda/opentelemetrydriver/OpenTelemetryDriver.kt @@ -1,9 +1,11 @@ package net.corda.opentelemetrydriver import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator import io.opentelemetry.context.propagation.ContextPropagators +import io.opentelemetry.context.propagation.TextMapPropagator import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter import io.opentelemetry.sdk.OpenTelemetrySdk @@ -32,7 +34,8 @@ class OpenTelemetryDriver(serviceName: String) { val openTelemetry: OpenTelemetry = OpenTelemetrySdk.builder() .setTracerProvider(sdkTracerProvider) .setMeterProvider(sdkMeterProvider) - .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), + W3CBaggagePropagator.getInstance()))) .build() fun shutdown() { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index c5a12bd83a..5c430d575e 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -492,7 +492,7 @@ open class InternalMockNetwork(cordappPackages: List = emptyList(), doReturn(emptyList()).whenever(it).extraNetworkMapKeys doReturn(listOf(baseDirectory / "cordapps")).whenever(it).cordappDirectories doReturn(emptyList()).whenever(it).quasarExcludePackages - doReturn(TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true)).whenever(it).telemetry + doReturn(TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = false, copyBaggageToTags = false)).whenever(it).telemetry parameters.configOverrides(it) }