ENT-6893: Added interface for clients to grab openetelemetry handle. (#7264)

* ENT-6893: Added interface for clients to grab openetelemetry handle.

* ENT-6893: Make detekt happy.

* ENT-6893: Fix warnings.

* ENT-6893: Make detekt happy.

* ENT-6893: Now shutdown opentelemetry when node stops or client is closed.

* ENT-6893: OpenTelemetryDriver is not not a singleton.
This commit is contained in:
Adel El-Beik 2022-11-24 13:34:08 +00:00 committed by GitHub
parent 4d14d718d5
commit e46b7bdd5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 139 additions and 78 deletions

View File

@ -4398,7 +4398,7 @@ public final class net.corda.core.node.services.StatesNotAvailableException exte
@DoNotImplement
public interface net.corda.core.node.services.TelemetryService
@Nullable
public abstract net.corda.core.internal.telemetry.OpenTelemetryHandle getOpenTelemetry()
public abstract T getTelemetryHandle(Class<T>)
##
public final class net.corda.core.node.services.TimeWindowChecker extends java.lang.Object
public <init>()
@ -9455,11 +9455,11 @@ public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Obj
public <init>(net.corda.client.rpc.RPCConnection, java.util.concurrent.ExecutorService, net.corda.client.rpc.internal.ReconnectingCordaRPCOps, kotlin.jvm.internal.DefaultConstructorMarker)
public void close()
public void forceClose()
@Nullable
public net.corda.core.internal.telemetry.OpenTelemetryHandle getOpenTelemetry()
@NotNull
public net.corda.core.messaging.CordaRPCOps getProxy()
public int getServerProtocolVersion()
@Nullable
public T getTelemetryHandle(Class<T>)
public void notifyServerAndClose()
public static final net.corda.client.rpc.CordaRPCConnection$Companion Companion
##
@ -9491,11 +9491,11 @@ public final class net.corda.client.rpc.PermissionException extends net.corda.co
public interface net.corda.client.rpc.RPCConnection extends java.io.Closeable
public abstract void close()
public abstract void forceClose()
@Nullable
public abstract net.corda.core.internal.telemetry.OpenTelemetryHandle getOpenTelemetry()
@NotNull
public abstract I getProxy()
public abstract int getServerProtocolVersion()
@Nullable
public abstract T getTelemetryHandle(Class<T>)
public abstract void notifyServerAndClose()
##
public class net.corda.client.rpc.RPCException extends net.corda.core.CordaRuntimeException

View File

@ -644,6 +644,7 @@ bintrayConfig {
gpgSign = true
gpgPassphrase = System.getenv('CORDA_BINTRAY_GPG_PASSPHRASE')
publications = [
'corda-opentelemetry',
'corda-opentelemetry-driver',
'corda-jfx',
'corda-mock',

View File

@ -11,7 +11,6 @@ import net.corda.core.context.Trace
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.SerializationCustomSerializer
@ -78,8 +77,8 @@ class CordaRPCConnection private constructor(
override fun forceClose() = doCloseLogic { actualConnection.forceClose() }
override fun getOpenTelemetry(): OpenTelemetryHandle? {
return actualConnection.getOpenTelemetry()
override fun <T> getTelemetryHandle(telemetryClass: Class<T>): T? {
return actualConnection.getTelemetryHandle(telemetryClass)
}
private inline fun doCloseLogic(close: () -> Unit) {

View File

@ -1,7 +1,6 @@
package net.corda.client.rpc
import net.corda.core.DoNotImplement
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.core.messaging.RPCOps
import java.io.Closeable
@ -25,7 +24,8 @@ interface RPCConnection<out I : RPCOps> : Closeable {
/**
* Returns the configured openTelemetry global. Returns null if opentelemetry has not been configured.
*/
fun getOpenTelemetry(): OpenTelemetryHandle?
fun <T> getTelemetryHandle(telemetryClass: Class<T>): T?
/**
* Closes this client gracefully by sending a notification to the server, so it can immediately clean up resources.

View File

@ -1,7 +1,6 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.UnrecoverableRPCException
import net.corda.client.rpc.ext.RPCConnectionListener
@ -24,6 +23,7 @@ import net.corda.nodeapi.internal.RoundRobinConnectionPolicy
import net.corda.nodeapi.internal.config.SslConfiguration
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.lang.reflect.Proxy
import java.util.concurrent.CopyOnWriteArraySet
@ -99,7 +99,9 @@ class RPCClient<I : RPCOps>(
// Without this any type of "send" time failures will not be delivered back to the client
isBlockOnNonDurableSend = true
}
val rpcClientTelemetry = RPCClientTelemetry("rpcClient-${targetLegalIdentity.toString()}", rpcConfiguration.openTelemetryEnabled,
val targetString = "${transport.params[TransportConstants.HOST_PROP_NAME]}:${transport.params[TransportConstants.PORT_PROP_NAME]}"
val rpcClientTelemetry = RPCClientTelemetry("rpcClient-$targetString", rpcConfiguration.openTelemetryEnabled,
rpcConfiguration.simpleLogTelemetryEnabled, rpcConfiguration.spanStartEndEventsEnabled)
val sessionId = Trace.SessionId.newInstance()
val distributionMux = DistributionMux(listeners, username)
@ -122,8 +124,8 @@ class RPCClient<I : RPCOps>(
override val proxy = ops
override val serverProtocolVersion = serverProtocolVersion
override fun getOpenTelemetry(): OpenTelemetryHandle? {
return rpcClientTelemetry.getOpenTelemetryHandle()
override fun <T> getTelemetryHandle(telemetryClass: Class<T>): T? {
return rpcClientTelemetry.getTelemetryHandle(telemetryClass)
}
private fun close(notify: Boolean) {
@ -133,6 +135,7 @@ class RPCClient<I : RPCOps>(
proxyHandler.forceClose()
}
serverLocator.close()
rpcClientTelemetry.telemetryService.shutdownTelemetry()
}
override fun notifyServerAndClose() {

View File

@ -1,6 +1,5 @@
package net.corda.client.rpc.internal
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.core.internal.telemetry.OpenTelemetryComponent
import net.corda.core.internal.telemetry.SimpleLogTelemetryComponent
import net.corda.core.internal.telemetry.TelemetryServiceImpl
@ -35,8 +34,7 @@ class RPCClientTelemetry(val serviceName: String, val openTelemetryEnabled: Bool
}
}
fun getOpenTelemetryHandle(): OpenTelemetryHandle? {
// Will return a handle clients can use to interact with opentelemetry
return null
fun <T> getTelemetryHandle(telemetryClass: Class<T>): T? {
return telemetryService.getTelemetryHandle(telemetryClass)
}
}

View File

@ -6,7 +6,6 @@ import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.MaxRpcRetryException
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.RPCException
@ -298,8 +297,8 @@ class ReconnectingCordaRPCOps private constructor(
currentRPCConnection?.forceClose()
}
}
override fun getOpenTelemetry(): OpenTelemetryHandle? {
return currentRPCConnection?.getOpenTelemetry()
override fun <T> getTelemetryHandle(telemetryClass: Class<T>): T? {
return currentRPCConnection?.getTelemetryHandle(telemetryClass)
}
fun isClosed(): Boolean = currentState == CLOSED
}

View File

@ -45,7 +45,7 @@ dependencies {
api "org.slf4j:slf4j-api:$slf4j_version"
compileOnly "io.opentelemetry:opentelemetry-api:${open_telemetry_version}"
compileOnly project(':opentelemetry-driver')
compileOnly project(':opentelemetry')
// These dependencies will become "runtime" scoped in our published POM.
// See publish.dependenciesFrom.defaultScope.

View File

@ -28,7 +28,7 @@ dependencies {
obfuscatorImplementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compileOnly "io.opentelemetry:opentelemetry-api:${open_telemetry_version}"
compileOnly project(':opentelemetry-driver')
compileOnly project(':opentelemetry')
testImplementation sourceSets.obfuscator.output
testImplementation "org.junit.jupiter:junit-jupiter-api:${junit_jupiter_version}"
testImplementation "junit:junit:$junit_version"

View File

@ -42,25 +42,30 @@ data class OpenTelemetryContext(val spanContext: SerializableSpanContext, val st
data class SpanInfo(val name: String, val span: Span, val spanScope: Scope, val spanEventContext: SerializableSpanContext? = null, val parentSpanEventContext: SerializableSpanContext? = null)
class TracerSetup {
var openTelemetry: OpenTelemetry? = null
fun getTracer(serviceName: String): Tracer {
return try {
with(OpenTelemetryDriver.getOpenTelemetry(serviceName)) {
openTelemetry = this
tracerProvider.get(OpenTelemetryComponent::class.java.name)
}
class TracerSetup(serviceName: String) {
private var openTelemetryDriver: Any? = null
val openTelemetry: OpenTelemetry by lazy {
try {
openTelemetryDriver = OpenTelemetryDriver(serviceName)
(openTelemetryDriver as OpenTelemetryDriver).openTelemetry
}
catch (ex: NoClassDefFoundError) {
GlobalOpenTelemetry.getTracerProvider().get(OpenTelemetryComponent::class.java.name)
GlobalOpenTelemetry.get()
}
}
fun getTracer(): Tracer {
return openTelemetry.tracerProvider.get(OpenTelemetryComponent::class.java.name)
}
fun shutdown() {
(openTelemetryDriver as? OpenTelemetryDriver)?.shutdown()
}
}
@Suppress("TooManyFunctions")
class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnabled: Boolean) : TelemetryComponent {
val tracerSetup = TracerSetup()
val tracer: Tracer = tracerSetup.getTracer(serviceName)
val tracerSetup = TracerSetup(serviceName)
val tracer: Tracer = tracerSetup.getTracer()
companion object {
private val log: Logger = LoggerFactory.getLogger(OpenTelemetryComponent::class.java)
@ -72,13 +77,8 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
val baggages = ConcurrentHashMap<UUID, Scope>()
override fun isEnabled(): Boolean {
if (tracerSetup.openTelemetry != null) {
// The SDK is on the classpath.
return true
}
// Now see if the open telemetry java agent is available
val tracer = GlobalOpenTelemetry.getTracerProvider().get(OpenTelemetryComponent::class.java.name)
return tracer.javaClass.name != "io.opentelemetry.api.trace.DefaultTracer"
// DefaultTracer is the NoOp tracer in the OT API
return tracerSetup.getTracer().javaClass.name != "io.opentelemetry.api.trace.DefaultTracer"
}
override fun name(): String = OPENTELEMETRY_COMPONENT_NAME
@ -90,9 +90,14 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
is EndSpanEvent -> endSpan(event.telemetryId)
is SetStatusEvent -> setStatus(event.telemetryId, event.telemetryStatusCode, event.message)
is RecordExceptionEvent -> recordException(event.telemetryId, event.throwable)
is ShutdownTelemetryEvent -> shutdownTelemetry()
}
}
private fun shutdownTelemetry() {
tracerSetup.shutdown()
}
@Suppress("LongParameterList")
private fun startSpanForFlow(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?,
telemetryDataItem: TelemetryDataItem?) {
@ -289,6 +294,10 @@ class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnab
return Baggage.current().asMap().mapValues { it.value.value }
}
override fun getTelemetryHandles(): List<Any> {
return tracerSetup.openTelemetry.let { listOf(it) }
}
private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) {
val spanInfo = spans[telemetryId]
spanInfo?.span?.setStatus(toOpenTelemetryStatus(telemetryStatusCode), message)

View File

@ -1,3 +0,0 @@
package net.corda.core.internal.telemetry
interface OpenTelemetryHandle

View File

@ -111,6 +111,10 @@ class SimpleLogTelemetryComponent : TelemetryComponent {
return logContexts[uuid]?.baggage ?: emptyMap()
}
override fun getTelemetryHandles(): List<Any> {
return emptyList()
}
@Suppress("UNUSED_PARAMETER")
private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) {
when(telemetryStatusCode) {

View File

@ -2,6 +2,7 @@ package net.corda.core.internal.telemetry
import net.corda.core.CordaInternal
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.TelemetryService
import net.corda.core.serialization.CordaSerializable
@ -66,6 +67,8 @@ class StartSpanEvent(val name: String, val attributes: Map<String, String>, val
class EndSpanEvent(val telemetryId: UUID): TelemetryEvent
class SetStatusEvent(val telemetryId: UUID, val telemetryStatusCode: TelemetryStatusCode, val message: String): TelemetryEvent
class RecordExceptionEvent(val telemetryId: UUID, val throwable: Throwable): TelemetryEvent
class InitialiseTelemetryEvent: TelemetryEvent
class ShutdownTelemetryEvent: TelemetryEvent
interface TelemetryComponent {
fun name(): String
@ -77,6 +80,7 @@ interface TelemetryComponent {
fun getCurrentSpanId(): String
fun getCurrentTraceId(): String
fun getCurrentBaggage(): Map<String, String>
fun getTelemetryHandles(): List<Any>
}
interface TelemetryComponentId {
@ -121,6 +125,21 @@ class TelemetryServiceImpl : SingletonSerializeAsToken(), TelemetryService {
private val telemetryComponents: MutableMap<String, TelemetryComponent> = mutableMapOf()
@CordaInternal
fun initialiseTelemetry() {
telemetryComponents.values.forEach {
it.onTelemetryEvent(InitialiseTelemetryEvent())
}
}
@CordaInternal
fun shutdownTelemetry() {
telemetryComponents.values.forEach {
it.onTelemetryEvent(ShutdownTelemetryEvent())
}
telemetryComponents.clear()
}
@CordaInternal
fun addTelemetryComponent(telemetryComponent: TelemetryComponent) {
telemetryComponents[telemetryComponent.name()] = telemetryComponent
@ -223,11 +242,18 @@ class TelemetryServiceImpl : SingletonSerializeAsToken(), TelemetryService {
it.setCurrentTelemetryId(telemetryIds.componentTelemetryIds[it.name()]!!)
}
}
private fun getTelemetryHandles(): List<Any> {
return telemetryComponents.values.map { it.getTelemetryHandles() }.flatten()
}
override fun getOpenTelemetry(): OpenTelemetryHandle? {
return telemetryComponents[OpenTelemetryComponent.OPENTELEMETRY_COMPONENT_NAME]?.let {
null // (it as? OpenTelemetryComponent)?.tracerSetup?.openTelemetry
override fun <T> getTelemetryHandle(telemetryClass: Class<T>): T? {
getTelemetryHandles().forEach {
if (telemetryClass.isInstance(it))
@Suppress("UNCHECKED_CAST")
return uncheckedCast(it as T)
}
return null
}
}

View File

@ -1,9 +1,9 @@
package net.corda.core.node.services
import net.corda.core.DoNotImplement
import net.corda.core.internal.telemetry.OpenTelemetryHandle
@DoNotImplement
interface TelemetryService {
fun getOpenTelemetry(): OpenTelemetryHandle?
fun <T> getTelemetryHandle(telemetryClass: Class<T>): T?
}

View File

@ -263,6 +263,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
if (configuration.telemetry.simpleLogTelemetryEnabled) {
it.addTelemetryComponent(SimpleLogTelemetryComponent())
}
runOnStop += { it.shutdownTelemetry() }
}.tokenize()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize()
val identityService = PersistentIdentityService(cacheFactory).tokenize()

View File

@ -289,6 +289,10 @@ class TelemetryTests {
@Suppress("UNUSED_PARAMETER")
fun recordException(telemetryId: UUID, throwable: Throwable) {
}
override fun getTelemetryHandles(): List<Any> {
return emptyList()
}
}

View File

@ -0,0 +1,31 @@
import static org.gradle.api.JavaVersion.VERSION_1_8
plugins {
// Apply the org.jetbrains.kotlin.jvm Plugin to add support for Kotlin.
id 'org.jetbrains.kotlin.jvm' // version '1.6.21'
// Apply the java-library plugin for API and implementation separation.
id 'java-library'
//id 'com.github.johnrengelman.shadow' // version '7.1.2'
id 'net.corda.plugins.publish-utils'
}
description 'OpenTelemetry SDK Bundle'
// This driver is required by core, so must always be 1.8. See core build.gradle.
targetCompatibility = VERSION_1_8
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
implementation platform("io.opentelemetry:opentelemetry-bom:$open_telemetry_version")
implementation "io.opentelemetry:opentelemetry-sdk"
implementation "io.opentelemetry:opentelemetry-exporter-otlp"
implementation "io.opentelemetry:opentelemetry-semconv:$open_telemetry_sem_conv_version"
implementation ("com.squareup.okhttp3:okhttp:$okhttp_version") {
force = true
}
}
publish {
name 'corda-opentelemetry'
}

View File

@ -15,19 +15,12 @@ description 'OpenTelemetry Driver'
targetCompatibility = VERSION_1_8
dependencies {
// Use the Kotlin JDK 8 standard library.
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:4.9.1'
implementation "io.opentelemetry:opentelemetry-api:$open_telemetry_version"
implementation platform("io.opentelemetry:opentelemetry-bom:$open_telemetry_version")
implementation "io.opentelemetry:opentelemetry-sdk"
implementation 'io.opentelemetry:opentelemetry-exporter-otlp'
implementation "io.opentelemetry:opentelemetry-semconv:$open_telemetry_sem_conv_version"
implementation project(":opentelemetry")
}
shadowJar {
archiveClassifier = jdkClassifier
relocate 'kotlin', 'privatekotlin'
archiveClassifier = null
classifier = null
exclude "**/Log4j2Plugins.dat"
zip64 true
}

View File

@ -14,36 +14,31 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
object OpenTelemetryDriver {
private fun buildAndGetOpenTelemetry(serviceName: String): OpenTelemetry {
val resource: Resource = Resource.getDefault()
class OpenTelemetryDriver(serviceName: String) {
private val resource: Resource = Resource.getDefault()
.merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, serviceName)))
val sdkTracerProvider: SdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(OtlpGrpcSpanExporter.builder().build()).build())
private val sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(OtlpGrpcSpanExporter.builder().build()).build())
.setResource(resource)
.build()
val sdkMeterProvider: SdkMeterProvider = SdkMeterProvider.builder()
private val sdkMeterProvider = SdkMeterProvider.builder()
.registerMetricReader(PeriodicMetricReader.builder(OtlpGrpcMetricExporter.builder().build()).build())
.setResource(resource)
.build()
return OpenTelemetrySdk.builder()
val openTelemetry: OpenTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setMeterProvider(sdkMeterProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal()
}
.build()
@Volatile
private var OPENTELEMETRY_INSTANCE: OpenTelemetry? = null
fun getOpenTelemetry(serviceName: String): OpenTelemetry {
return OPENTELEMETRY_INSTANCE ?: synchronized(this) {
OPENTELEMETRY_INSTANCE ?: buildAndGetOpenTelemetry(serviceName).also {
OPENTELEMETRY_INSTANCE = it
}
}
fun shutdown() {
sdkTracerProvider?.forceFlush()
sdkMeterProvider?.forceFlush()
sdkTracerProvider?.shutdown()
sdkMeterProvider?.shutdown()
}
}

View File

@ -28,7 +28,8 @@ pluginManagement {
// The project is named 'corda-project' and not 'corda' because if this is named the same as the
// output JAR from the capsule then the buildCordaJAR task goes into an infinite loop.
rootProject.name = 'corda-project'
include 'opentelemetry-driver'
include 'opentelemetry'
include 'opentelemetry:opentelemetry-driver'
include 'confidential-identities'
include 'finance:contracts'
include 'finance:workflows'