ENT-6893: First cut of telemetry integration. (#7247)

First cut of telemetry integration.

Open telemetry can be enabled in two ways, first is via an opentelemetry java agent specified on the command line. With this way you get the advantage of spans created from other libraries, like hibernate. The java agent does byte code rewriting to insert spans.
The second way is with the open telemetry driver (that links with the opentelemetry sdk). This is a fat jar provided with this project and needs to go into the node drivers directory.
This commit is contained in:
Adel El-Beik 2022-10-28 14:41:39 +01:00 committed by GitHub
parent bdcd25477d
commit 7a133f687c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 1675 additions and 108 deletions

View File

@ -157,6 +157,8 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob
public <init>(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, String)
public <init>(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, String, net.corda.core.internal.telemetry.SerializedTelemetry)
public <init>(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String, net.corda.core.internal.telemetry.SerializedTelemetry, int, kotlin.jvm.internal.DefaultConstructorMarker)
@NotNull
public final net.corda.core.context.InvocationOrigin component1()
@NotNull
@ -171,10 +173,14 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob
public final java.util.List<Object> component6()
@Nullable
public final String component7()
@Nullable
public final net.corda.core.internal.telemetry.SerializedTelemetry component8()
@NotNull
public final net.corda.core.context.InvocationContext copy(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor)
@NotNull
public final net.corda.core.context.InvocationContext copy(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, String)
@NotNull
public final net.corda.core.context.InvocationContext copy(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, String, net.corda.core.internal.telemetry.SerializedTelemetry)
public boolean equals(Object)
@Nullable
public final net.corda.core.context.Actor getActor()
@ -188,6 +194,8 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob
public final net.corda.core.context.Actor getImpersonatedActor()
@NotNull
public final net.corda.core.context.InvocationOrigin getOrigin()
@Nullable
public final net.corda.core.internal.telemetry.SerializedTelemetry getSerializedTelemetry()
@NotNull
public final net.corda.core.context.Trace getTrace()
public int hashCode()
@ -206,6 +214,8 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob
@NotNull
public static final net.corda.core.context.InvocationContext newInstance(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, String)
@NotNull
public static final net.corda.core.context.InvocationContext newInstance(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, String, net.corda.core.internal.telemetry.SerializedTelemetry)
@NotNull
public static final net.corda.core.context.InvocationContext peer(net.corda.core.identity.CordaX500Name, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor)
@NotNull
public final java.security.Principal principal()
@ -220,6 +230,8 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob
@NotNull
public static final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>)
@NotNull
public static final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, net.corda.core.internal.telemetry.SerializedTelemetry)
@NotNull
public static final net.corda.core.context.InvocationContext scheduled(net.corda.core.contracts.ScheduledStateRef, net.corda.core.context.Trace, net.corda.core.context.Trace)
@NotNull
public static final net.corda.core.context.InvocationContext service(String, net.corda.core.identity.CordaX500Name, net.corda.core.context.Trace, net.corda.core.context.Trace)
@ -246,6 +258,8 @@ public static final class net.corda.core.context.InvocationContext$Companion ext
@NotNull
public final net.corda.core.context.InvocationContext newInstance(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, String)
@NotNull
public final net.corda.core.context.InvocationContext newInstance(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, String, net.corda.core.internal.telemetry.SerializedTelemetry)
@NotNull
public final net.corda.core.context.InvocationContext peer(net.corda.core.identity.CordaX500Name, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor)
@NotNull
public final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor)
@ -258,6 +272,8 @@ public static final class net.corda.core.context.InvocationContext$Companion ext
@NotNull
public final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>)
@NotNull
public final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List<?>, net.corda.core.internal.telemetry.SerializedTelemetry)
@NotNull
public final net.corda.core.context.InvocationContext scheduled(net.corda.core.contracts.ScheduledStateRef, net.corda.core.context.Trace, net.corda.core.context.Trace)
@NotNull
public final net.corda.core.context.InvocationContext service(String, net.corda.core.identity.CordaX500Name, net.corda.core.context.Trace, net.corda.core.context.Trace)
@ -1379,6 +1395,8 @@ public interface net.corda.core.cordapp.Cordapp
@NotNull
public abstract java.util.List<Class<? extends net.corda.core.serialization.SerializeAsToken>> getServices()
public abstract int getTargetPlatformVersion()
@NotNull
public abstract java.util.List<Class<? extends net.corda.core.internal.telemetry.TelemetryComponent>> getTelemetryComponents()
##
@DoNotImplement
public static interface net.corda.core.cordapp.Cordapp$Info
@ -4052,6 +4070,8 @@ public interface net.corda.core.node.ServiceHub extends net.corda.core.node.Serv
@NotNull
public abstract T cordaService(Class<T>)
@NotNull
public abstract T cordaTelemetryComponent(Class<T>)
@NotNull
public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction)
@NotNull
public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey)
@ -4074,6 +4094,8 @@ public interface net.corda.core.node.ServiceHub extends net.corda.core.node.Serv
@NotNull
public abstract net.corda.core.node.services.NetworkMapCache getNetworkMapCache()
@NotNull
public abstract net.corda.core.node.services.TelemetryService getTelemetryService()
@NotNull
public abstract net.corda.core.node.services.TransactionVerifierService getTransactionVerifierService()
@NotNull
public abstract net.corda.core.node.services.TransactionStorage getValidatedTransactions()
@ -4373,6 +4395,11 @@ public final class net.corda.core.node.services.StatesNotAvailableException exte
@NotNull
public String toString()
##
@DoNotImplement
public interface net.corda.core.node.services.TelemetryService
@Nullable
public abstract net.corda.core.internal.telemetry.OpenTelemetryHandle getOpenTelemetry()
##
public final class net.corda.core.node.services.TimeWindowChecker extends java.lang.Object
public <init>()
public <init>(java.time.Clock)
@ -9052,6 +9079,8 @@ public class net.corda.testing.node.MockServices extends java.lang.Object implem
@NotNull
public T cordaService(Class<T>)
@NotNull
public T cordaTelemetryComponent(Class<T>)
@NotNull
public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction)
@NotNull
public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey)
@ -9088,6 +9117,8 @@ public class net.corda.testing.node.MockServices extends java.lang.Object implem
@NotNull
protected final net.corda.core.node.ServicesForResolution getServicesForResolution()
@NotNull
public net.corda.core.internal.telemetry.TelemetryServiceImpl getTelemetryService()
@NotNull
public net.corda.core.node.services.TransactionVerifierService getTransactionVerifierService()
@NotNull
public net.corda.core.node.services.TransactionStorage getValidatedTransactions()
@ -9357,7 +9388,10 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean)
public <init>(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, int, kotlin.jvm.internal.DefaultConstructorMarker)
@NotNull
public final java.time.Duration component1()
@NotNull
@ -9384,6 +9418,8 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.
public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int)
@NotNull
public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration)
@NotNull
public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean)
public boolean equals(Object)
public int getCacheConcurrencyLevel()
@NotNull
@ -9397,8 +9433,11 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.
public int getMaxReconnectAttempts()
public int getMinimumServerProtocolVersion()
public int getObservationExecutorPoolSize()
public boolean getOpenTelemetryEnabled()
@NotNull
public java.time.Duration getReapInterval()
public boolean getSimpleLogTelemetryEnabled()
public boolean getSpanStartEndEventsEnabled()
public boolean getTrackRpcCallSites()
public int hashCode()
@NotNull
@ -9416,6 +9455,8 @@ public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Obj
public <init>(net.corda.client.rpc.RPCConnection, java.util.concurrent.ExecutorService, net.corda.client.rpc.internal.ReconnectingCordaRPCOps, kotlin.jvm.internal.DefaultConstructorMarker)
public void close()
public void forceClose()
@Nullable
public net.corda.core.internal.telemetry.OpenTelemetryHandle getOpenTelemetry()
@NotNull
public net.corda.core.messaging.CordaRPCOps getProxy()
public int getServerProtocolVersion()
@ -9450,6 +9491,8 @@ public final class net.corda.client.rpc.PermissionException extends net.corda.co
public interface net.corda.client.rpc.RPCConnection extends java.io.Closeable
public abstract void close()
public abstract void forceClose()
@Nullable
public abstract net.corda.core.internal.telemetry.OpenTelemetryHandle getOpenTelemetry()
@NotNull
public abstract I getProxy()
public abstract int getServerProtocolVersion()

View File

@ -57,7 +57,8 @@ buildscript {
]
ext.capsule_version = constants.getProperty("capsuleVersion")
ext.open_telemetry_version = constants.getProperty("openTelemetryVersion")
ext.open_telemetry_sem_conv_version = constants.getProperty("openTelemetrySemConvVersion")
ext.asm_version = constants.getProperty("asmVersion")
ext.artemis_version = constants.getProperty("artemisVersion")
ext.jackson_version = constants.getProperty("jacksonVersion")
@ -643,6 +644,7 @@ bintrayConfig {
gpgSign = true
gpgPassphrase = System.getenv('CORDA_BINTRAY_GPG_PASSPHRASE')
publications = [
'corda-opentelemetry-driver',
'corda-jfx',
'corda-mock',
'corda-rpc',

View File

@ -11,6 +11,7 @@ import net.corda.core.context.Trace
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.SerializationCustomSerializer
@ -77,6 +78,10 @@ class CordaRPCConnection private constructor(
override fun forceClose() = doCloseLogic { actualConnection.forceClose() }
override fun getOpenTelemetry(): OpenTelemetryHandle? {
return actualConnection.getOpenTelemetry()
}
private inline fun doCloseLogic(close: () -> Unit) {
try {
close.invoke()
@ -169,8 +174,13 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
/**
* The cache expiry of a deduplication watermark per client. Default is 1 day.
*/
open val deduplicationCacheExpiry: Duration = 1.days
open val deduplicationCacheExpiry: Duration = 1.days,
open val openTelemetryEnabled: Boolean = true,
open val simpleLogTelemetryEnabled: Boolean = false,
open val spanStartEndEventsEnabled: Boolean = true
) {
companion object {
@ -214,7 +224,46 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
connectionRetryIntervalMultiplier,
maxReconnectAttempts,
maxFileSize,
deduplicationCacheExpiry
deduplicationCacheExpiry,
openTelemetryEnabled,
simpleLogTelemetryEnabled,
spanStartEndEventsEnabled
)
}
@Suppress("LongParameterList")
fun copy(
connectionMaxRetryInterval: Duration = this.connectionMaxRetryInterval,
minimumServerProtocolVersion: Int = this.minimumServerProtocolVersion,
trackRpcCallSites: Boolean = this.trackRpcCallSites,
reapInterval: Duration = this.reapInterval,
observationExecutorPoolSize: Int = this.observationExecutorPoolSize,
@Suppress("DEPRECATION")
cacheConcurrencyLevel: Int = this.cacheConcurrencyLevel,
connectionRetryInterval: Duration = this.connectionRetryInterval,
connectionRetryIntervalMultiplier: Double = this.connectionRetryIntervalMultiplier,
maxReconnectAttempts: Int = this.maxReconnectAttempts,
maxFileSize: Int = this.maxFileSize,
deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry,
openTelemetryEnabled: Boolean = this.openTelemetryEnabled,
simpleLogTelemetryEnabled: Boolean = this.simpleLogTelemetryEnabled,
spanStartEndEventsEnabled: Boolean = this.spanStartEndEventsEnabled
): CordaRPCClientConfiguration {
return CordaRPCClientConfiguration(
connectionMaxRetryInterval,
minimumServerProtocolVersion,
trackRpcCallSites,
reapInterval,
observationExecutorPoolSize,
cacheConcurrencyLevel,
connectionRetryInterval,
connectionRetryIntervalMultiplier,
maxReconnectAttempts,
maxFileSize,
deduplicationCacheExpiry,
openTelemetryEnabled,
simpleLogTelemetryEnabled,
spanStartEndEventsEnabled
)
}
@ -235,6 +284,9 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
if (maxReconnectAttempts != other.maxReconnectAttempts) return false
if (maxFileSize != other.maxFileSize) return false
if (deduplicationCacheExpiry != other.deduplicationCacheExpiry) return false
if (openTelemetryEnabled != other.openTelemetryEnabled) return false
if (simpleLogTelemetryEnabled != other.simpleLogTelemetryEnabled) return false
if (spanStartEndEventsEnabled != other.spanStartEndEventsEnabled) return false
return true
}
@ -252,6 +304,9 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
result = 31 * result + maxReconnectAttempts
result = 31 * result + maxFileSize
result = 31 * result + deduplicationCacheExpiry.hashCode()
result = 31 * result + openTelemetryEnabled.hashCode()
result = 31 * result + simpleLogTelemetryEnabled.hashCode()
result = 31 * result + spanStartEndEventsEnabled.hashCode()
return result
}
@ -264,7 +319,10 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
"cacheConcurrencyLevel=$cacheConcurrencyLevel, connectionRetryInterval=$connectionRetryInterval, " +
"connectionRetryIntervalMultiplier=$connectionRetryIntervalMultiplier, " +
"maxReconnectAttempts=$maxReconnectAttempts, maxFileSize=$maxFileSize, " +
"deduplicationCacheExpiry=$deduplicationCacheExpiry)"
"deduplicationCacheExpiry=$deduplicationCacheExpiry, " +
"openTelemetryEnabled=$openTelemetryEnabled, " +
"simpleLogTelemetryEnabled=$simpleLogTelemetryEnabled, " +
"spanStartEndEventsEnabled=$spanStartEndEventsEnabled)"
}
// Left in for backwards compatibility with version 3.1

View File

@ -1,6 +1,7 @@
package net.corda.client.rpc
import net.corda.core.DoNotImplement
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.core.messaging.RPCOps
import java.io.Closeable
@ -21,6 +22,11 @@ interface RPCConnection<out I : RPCOps> : Closeable {
/** The RPC protocol version reported by the server. */
val serverProtocolVersion: Int
/**
* Returns the configured openTelemetry global. Returns null if opentelemetry has not been configured.
*/
fun getOpenTelemetry(): OpenTelemetryHandle?
/**
* Closes this client gracefully by sending a notification to the server, so it can immediately clean up resources.
* If the server is not available this method may block for a short period until it's clear the server is not

View File

@ -1,6 +1,7 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.UnrecoverableRPCException
import net.corda.client.rpc.ext.RPCConnectionListener
@ -98,10 +99,13 @@ class RPCClient<I : RPCOps>(
// Without this any type of "send" time failures will not be delivered back to the client
isBlockOnNonDurableSend = true
}
val rpcClientTelemetry = RPCClientTelemetry("rpcClient-${targetLegalIdentity.toString()}", rpcConfiguration.openTelemetryEnabled,
rpcConfiguration.simpleLogTelemetryEnabled, rpcConfiguration.spanStartEndEventsEnabled)
val sessionId = Trace.SessionId.newInstance()
val distributionMux = DistributionMux(listeners, username)
val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator,
rpcOpsClass, serializationContext, sessionId, externalTrace, impersonatedActor, targetLegalIdentity, distributionMux)
rpcOpsClass, serializationContext, sessionId, externalTrace, impersonatedActor, targetLegalIdentity, distributionMux,
rpcClientTelemetry)
try {
proxyHandler.start()
val ops: I = uncheckedCast(Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler))
@ -118,6 +122,10 @@ class RPCClient<I : RPCOps>(
override val proxy = ops
override val serverProtocolVersion = serverProtocolVersion
override fun getOpenTelemetry(): OpenTelemetryHandle? {
return rpcClientTelemetry.getOpenTelemetryHandle()
}
private fun close(notify: Boolean) {
if (notify) {
proxyHandler.notifyServerAndClose()

View File

@ -20,6 +20,7 @@ import net.corda.core.internal.LazyStickyPool
import net.corda.core.internal.LifeCycle
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.telemetry.TelemetryStatusCode
import net.corda.core.internal.times
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
@ -110,6 +111,7 @@ internal class RPCClientProxyHandler(
private val impersonatedActor: Actor?,
private val targetLegalIdentity: CordaX500Name?,
private val notificationDistributionMux: DistributionMux<out RPCOps>,
private val rpcClientTelemetry: RPCClientTelemetry,
private val cacheFactory: NamedCacheFactory = ClientCacheFactory()
) : InvocationHandler {
@ -330,6 +332,8 @@ internal class RPCClientProxyHandler(
val replyId = InvocationId.newInstance()
val methodFqn = produceMethodFullyQualifiedName(method)
callSiteMap?.set(replyId, CallSite(methodFqn))
val telemetryId = rpcClientTelemetry.telemetryService.startSpanForFlow("client-$methodFqn", emptyMap())
try {
val serialisedArguments = (arguments?.toList() ?: emptyList()).serialize(context = serializationContextWithObservableContext)
val request = RPCApi.ClientToServer.RpcRequest(
@ -339,7 +343,8 @@ internal class RPCClientProxyHandler(
replyId,
sessionId,
externalTrace,
impersonatedActor
impersonatedActor,
rpcClientTelemetry.telemetryService.getCurrentTelemetryData()
)
val replyFuture = SettableFuture.create<Any>()
require(rpcReplyMap.put(replyId, replyFuture) == null) {
@ -353,13 +358,18 @@ internal class RPCClientProxyHandler(
sendMessage(request)
return replyFuture.getOrThrow()
} catch (e: RuntimeException) {
rpcClientTelemetry.telemetryService.recordException(telemetryId, e)
rpcClientTelemetry.telemetryService.setStatus(telemetryId, TelemetryStatusCode.ERROR, e.message ?: "RuntimeException occurred")
// Already an unchecked exception, so just rethrow it
throw e
} catch (e: Exception) {
rpcClientTelemetry.telemetryService.recordException(telemetryId, e)
rpcClientTelemetry.telemetryService.setStatus(telemetryId, TelemetryStatusCode.ERROR, e.message ?: "Exception occurred")
// This must be a checked exception, so wrap it
throw RPCException(e.message ?: "", e)
} finally {
callSiteMap?.remove(replyId)
rpcClientTelemetry.telemetryService.endSpanForFlow(telemetryId)
}
}

View File

@ -0,0 +1,42 @@
package net.corda.client.rpc.internal
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.core.internal.telemetry.OpenTelemetryComponent
import net.corda.core.internal.telemetry.SimpleLogTelemetryComponent
import net.corda.core.internal.telemetry.TelemetryServiceImpl
import net.corda.core.utilities.contextLogger
class RPCClientTelemetry(val serviceName: String, val openTelemetryEnabled: Boolean, val simpleLogTelemetryEnabled: Boolean, val spanStartEndEventsEnabled: Boolean) {
companion object {
private val log = contextLogger()
}
val telemetryService = TelemetryServiceImpl()
init {
if (openTelemetryEnabled) {
try {
val openTelemetryComponent = OpenTelemetryComponent(serviceName, spanStartEndEventsEnabled)
if (openTelemetryComponent.isEnabled()) {
telemetryService.addTelemetryComponent(openTelemetryComponent)
log.debug("OpenTelemetry enabled")
}
}
catch (ex: NoClassDefFoundError) {
// Do nothing api or sdk not available on classpath
log.debug("OpenTelemetry not enabled, api or sdk not found on classpath")
}
}
if (simpleLogTelemetryEnabled) {
val simpleLogTelemetryComponent = SimpleLogTelemetryComponent()
telemetryService.addTelemetryComponent(simpleLogTelemetryComponent)
log.debug("SimpleLogTelemetry enabled")
}
}
fun getOpenTelemetryHandle(): OpenTelemetryHandle? {
// Will return a handle clients can use to interact with opentelemetry
return null
}
}

View File

@ -6,6 +6,7 @@ import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.MaxRpcRetryException
import net.corda.core.internal.telemetry.OpenTelemetryHandle
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.RPCException
@ -297,6 +298,9 @@ class ReconnectingCordaRPCOps private constructor(
currentRPCConnection?.forceClose()
}
}
override fun getOpenTelemetry(): OpenTelemetryHandle? {
return currentRPCConnection?.getOpenTelemetry()
}
fun isClosed(): Boolean = currentState == CLOSED
}
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {

View File

@ -13,6 +13,8 @@ java8MinUpdateVersion=171
# net.corda.core.internal.CordaUtilsKt.PLATFORM_VERSION as well. #
# ***************************************************************#
platformVersion=12
openTelemetryVersion=1.17.0
openTelemetrySemConvVersion=1.17.0-alpha
guavaVersion=28.0-jre
# Quasar version to use with Java 8:
quasarVersion=0.7.15_r3

View File

@ -44,6 +44,9 @@ dependencies {
api "com.google.code.findbugs:jsr305:$jsr305_version"
api "org.slf4j:slf4j-api:$slf4j_version"
compileOnly "io.opentelemetry:opentelemetry-api:${open_telemetry_version}"
compileOnly project(':opentelemetry-driver')
// These dependencies will become "runtime" scoped in our published POM.
// See publish.dependenciesFrom.defaultScope.
deterministicLibraries "org.bouncycastle:bcprov-jdk15on:$bouncycastle_version"

View File

@ -27,7 +27,8 @@ configurations {
dependencies {
obfuscatorImplementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compileOnly "io.opentelemetry:opentelemetry-api:${open_telemetry_version}"
compileOnly project(':opentelemetry-driver')
testImplementation sourceSets.obfuscator.output
testImplementation "org.junit.jupiter:junit-jupiter-api:${junit_jupiter_version}"
testImplementation "junit:junit:$junit_version"
@ -161,7 +162,8 @@ quasar {
"org.w3c.**",
"org.xml**",
"org.yaml**",
"rx**")
"rx**",
"io.opentelemetry.**")
}
artifacts {

View File

@ -4,6 +4,7 @@ import net.corda.core.DeleteForDJVM
import net.corda.core.KeepForDJVM
import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.telemetry.SerializedTelemetry
import net.corda.core.serialization.CordaSerializable
import java.security.Principal
@ -25,7 +26,8 @@ data class InvocationContext(
val externalTrace: Trace? = null,
val impersonatedActor: Actor? = null,
val arguments: List<Any?>? = emptyList(), // 'arguments' is nullable so that a - >= 4.6 version - RPC client can be backwards compatible against - < 4.6 version - nodes
val clientId: String? = null
val clientId: String? = null,
val serializedTelemetry: SerializedTelemetry? = null
) {
constructor(
@ -36,6 +38,16 @@ data class InvocationContext(
impersonatedActor: Actor? = null
) : this(origin, trace, actor, externalTrace, impersonatedActor, emptyList())
constructor(
origin: InvocationOrigin,
trace: Trace,
actor: Actor?,
externalTrace: Trace? = null,
impersonatedActor: Actor? = null,
arguments: List<Any?>? = emptyList(),
clientId: String? = null
) : this(origin, trace, actor, externalTrace, impersonatedActor, arguments, clientId, serializedTelemetry = null)
companion object {
/**
* Creates an [InvocationContext] with a [Trace] that defaults to a [java.util.UUID] as value and [java.time.Instant.now] timestamp.
@ -51,8 +63,9 @@ data class InvocationContext(
externalTrace: Trace? = null,
impersonatedActor: Actor? = null,
arguments: List<Any?> = emptyList(),
clientId: String? = null
) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor, arguments, clientId)
clientId: String? = null,
serializedTelemetry: SerializedTelemetry? = null
) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor, arguments, clientId, serializedTelemetry)
/**
* Creates an [InvocationContext] with [InvocationOrigin.RPC] origin.
@ -60,13 +73,15 @@ data class InvocationContext(
@DeleteForDJVM
@JvmStatic
@JvmOverloads
@Suppress("LongParameterList")
fun rpc(
actor: Actor,
trace: Trace = Trace.newInstance(),
externalTrace: Trace? = null,
impersonatedActor: Actor? = null,
arguments: List<Any?> = emptyList()
): InvocationContext = newInstance(InvocationOrigin.RPC(actor), trace, actor, externalTrace, impersonatedActor, arguments)
arguments: List<Any?> = emptyList(),
serializedTelemetry: SerializedTelemetry? = null
): InvocationContext = newInstance(InvocationOrigin.RPC(actor), trace, actor, externalTrace, impersonatedActor, arguments, serializedTelemetry = serializedTelemetry)
/**
* Creates an [InvocationContext] with [InvocationOrigin.Peer] origin.
@ -119,6 +134,28 @@ data class InvocationContext(
clientId = clientId
)
}
@Suppress("LongParameterList")
fun copy(
origin: InvocationOrigin = this.origin,
trace: Trace = this.trace,
actor: Actor? = this.actor,
externalTrace: Trace? = this.externalTrace,
impersonatedActor: Actor? = this.impersonatedActor,
arguments: List<Any?>? = this.arguments,
clientId: String? = this.clientId
): InvocationContext {
return copy(
origin = origin,
trace = trace,
actor = actor,
externalTrace = externalTrace,
impersonatedActor = impersonatedActor,
arguments = arguments,
clientId = clientId,
serializedTelemetry = serializedTelemetry
)
}
}
/**

View File

@ -6,6 +6,7 @@ import net.corda.core.cordapp.Cordapp.Info.*
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.cordapp.CordappImpl.Companion.UNKNOWN_VALUE
import net.corda.core.internal.telemetry.TelemetryComponent
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.CheckpointCustomSerializer
import net.corda.core.serialization.SerializationCustomSerializer
@ -49,6 +50,7 @@ interface Cordapp {
val serviceFlows: List<Class<out FlowLogic<*>>>
val schedulableFlows: List<Class<out FlowLogic<*>>>
val services: List<Class<out SerializeAsToken>>
val telemetryComponents: List<Class<out TelemetryComponent>>
val serializationWhitelists: List<SerializationWhitelist>
val serializationCustomSerializers: List<SerializationCustomSerializer<*, *>>
val checkpointCustomSerializers: List<CheckpointCustomSerializer<*, *>>

View File

@ -7,6 +7,7 @@ import net.corda.core.crypto.isFulfilledBy
import net.corda.core.identity.Party
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
import net.corda.core.internal.pushToLoggingContext
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.internal.warnOnce
import net.corda.core.node.StatesToRecord
import net.corda.core.node.StatesToRecord.ONLY_RELEVANT
@ -221,18 +222,22 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
@Suspendable
private fun notariseAndRecord(): SignedTransaction {
val notarised = if (needsNotarySignature(transaction)) {
progressTracker.currentStep = NOTARISING
val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true))
transaction + notarySignatures
} else {
logger.info("No need to notarise this transaction.")
transaction
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord", flowLogic = this) {
val notarised = if (needsNotarySignature(transaction)) {
progressTracker.currentStep = NOTARISING
val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true))
transaction + notarySignatures
} else {
logger.info("No need to notarise this transaction.")
transaction
}
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord:recordTransactions", flowLogic = this) {
logger.info("Recording transaction locally.")
serviceHub.recordTransactions(statesToRecord, listOf(notarised))
logger.info("Recorded transaction locally successfully.")
}
return notarised
}
logger.info("Recording transaction locally.")
serviceHub.recordTransactions(statesToRecord, listOf(notarised))
logger.info("Recorded transaction locally successfully.")
return notarised
}
private fun needsNotarySignature(stx: SignedTransaction): Boolean {

View File

@ -16,6 +16,7 @@ import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.WaitForStateConsumption
import net.corda.core.internal.abbreviate
import net.corda.core.internal.checkPayloadIs
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
@ -157,7 +158,7 @@ abstract class FlowLogic<out T> {
fun initiateFlow(destination: Destination): FlowSession {
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
return stateMachine.initiateFlow(destination, serviceHub.identityService.wellKnownPartyFromAnonymous(destination as AbstractParty)
?: throw IllegalArgumentException("Could not resolve destination: $destination"))
?: throw IllegalArgumentException("Could not resolve destination: $destination"), serviceHub.telemetryServiceInternal.getCurrentTelemetryData())
}
/**
@ -165,7 +166,7 @@ abstract class FlowLogic<out T> {
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
*/
@Suspendable
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, party)
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, party, serviceHub.telemetryServiceInternal.getCurrentTelemetryData())
/**
* Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that
@ -285,11 +286,13 @@ abstract class FlowLogic<out T> {
@Suspendable
internal fun <R : Any> FlowSession.sendAndReceiveWithRetry(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
val request = FlowIORequest.SendAndReceive(
sessionToMessage = stateMachine.serialize(mapOf(this to payload)),
shouldRetrySend = true
)
return stateMachine.suspend(request, maySkipCheckpoint = false)[this]!!.checkPayloadIs(receiveType)
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAndReceiveWithRetry", mapOf("destination" to destination.toString())) {
val request = FlowIORequest.SendAndReceive(
sessionToMessage = stateMachine.serialize(mapOf(this to payload)),
shouldRetrySend = true
)
return stateMachine.suspend(request, maySkipCheckpoint = false)[this]!!.checkPayloadIs(receiveType)
}
}
@Suspendable

View File

@ -8,6 +8,7 @@ import net.corda.core.context.InvocationContext
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.internal.telemetry.SerializedTelemetry
import net.corda.core.serialization.SerializedBytes
import org.slf4j.Logger
@ -30,7 +31,7 @@ interface FlowStateMachine<FLOWRETURN> : FlowStateMachineHandle<FLOWRETURN> {
fun serialize(payloads: Map<FlowSession, Any>): Map<FlowSession, SerializedBytes<Any>>
@Suspendable
fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession
fun initiateFlow(destination: Destination, wellKnownParty: Party, serializedTelemetry: SerializedTelemetry?): FlowSession
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)

View File

@ -8,6 +8,7 @@ import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.notary.NotaryService
import net.corda.core.internal.toPath
import net.corda.core.internal.telemetry.TelemetryComponent
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.CheckpointCustomSerializer
import net.corda.core.serialization.SerializationCustomSerializer
@ -24,6 +25,7 @@ data class CordappImpl(
override val serviceFlows: List<Class<out FlowLogic<*>>>,
override val schedulableFlows: List<Class<out FlowLogic<*>>>,
override val services: List<Class<out SerializeAsToken>>,
override val telemetryComponents: List<Class<out TelemetryComponent>>,
override val serializationWhitelists: List<SerializationWhitelist>,
override val serializationCustomSerializers: List<SerializationCustomSerializer<*, *>>,
override val checkpointCustomSerializers: List<CheckpointCustomSerializer<*, *>>,
@ -79,6 +81,7 @@ data class CordappImpl(
serviceFlows = emptyList(),
schedulableFlows = emptyList(),
services = emptyList(),
telemetryComponents = emptyList(),
serializationWhitelists = emptyList(),
serializationCustomSerializers = emptyList(),
checkpointCustomSerializers = emptyList(),

View File

@ -10,6 +10,7 @@ import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.PlatformVersionSwitches
import net.corda.core.internal.checkParameterHash
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import java.lang.IllegalStateException
@ -74,14 +75,15 @@ abstract class NotaryServiceFlow(
sleep(Duration.ZERO)
}
}
service.commitInputStates(
tx.inputs,
tx.id,
otherSideSession.counterparty,
requestPayload.requestSignature,
tx.timeWindow,
tx.references)
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#call:commitInputStates", flowLogic = this) {
service.commitInputStates(
tx.inputs,
tx.id,
otherSideSession.counterparty,
requestPayload.requestSignature,
tx.timeWindow,
tx.references)
}
} catch (e: NotaryInternalException) {
logError(e.error)
// Any exception that's not a NotaryInternalException is assumed to be an unexpected internal error

View File

@ -0,0 +1,309 @@
package net.corda.core.internal.telemetry
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.baggage.Baggage
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.common.AttributesBuilder
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanContext
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.api.trace.TraceFlags
import io.opentelemetry.api.trace.TraceState
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.Context
import io.opentelemetry.context.Scope
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.CordaSerializable
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import net.corda.opentelemetrydriver.OpenTelemetryDriver
const val TRACEIDHEX_LENGTH = 32
const val SPANIDHEX_LENGTH = 16
@CordaSerializable
data class SerializableSpanContext(val traceIdHex : String, val spanIdHex : String, val traceFlagsHex : String, val traceStateMap : Map<String, String>) {
constructor(spanContext: SpanContext) : this(spanContext.traceId, spanContext.spanId, spanContext.traceFlags.asHex(), spanContext.traceState.asMap().toMap())
constructor() : this("0".repeat(TRACEIDHEX_LENGTH), "0".repeat(SPANIDHEX_LENGTH), TraceFlags.getDefault().asHex(), TraceState.getDefault().asMap())
fun createSpanContext() = SpanContext.create(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState())
fun createRemoteSpanContext() = SpanContext.createFromRemoteParent(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState())
private fun createTraceState() = traceStateMap.toList().fold(TraceState.builder()) { builder, pair -> builder.put(pair.first, pair.second)}.build()
}
data class OpenTelemetryContext(val spanContext: SerializableSpanContext, val startedSpanContext: SerializableSpanContext, val baggage: Map<String,String>): TelemetryDataItem
data class SpanInfo(val name: String, val span: Span, val spanScope: Scope, val spanEventContext: SerializableSpanContext? = null, val parentSpanEventContext: SerializableSpanContext? = null)
class TracerSetup {
var openTelemetry: OpenTelemetry? = null
fun getTracer(serviceName: String): Tracer {
return try {
with(OpenTelemetryDriver.getOpenTelemetry(serviceName)) {
openTelemetry = this
tracerProvider.get(OpenTelemetryComponent::class.java.name)
}
}
catch (ex: NoClassDefFoundError) {
GlobalOpenTelemetry.getTracerProvider().get(OpenTelemetryComponent::class.java.name)
}
}
}
@Suppress("TooManyFunctions")
class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnabled: Boolean) : TelemetryComponent {
val tracerSetup = TracerSetup()
val tracer: Tracer = tracerSetup.getTracer(serviceName)
companion object {
private val log: Logger = LoggerFactory.getLogger(OpenTelemetryComponent::class.java)
const val OPENTELEMETRY_COMPONENT_NAME = "OpenTelemetry"
}
val rootSpans = ConcurrentHashMap<UUID, SpanInfo>()
val spans = ConcurrentHashMap<UUID, SpanInfo>()
val baggages = ConcurrentHashMap<UUID, Scope>()
override fun isEnabled(): Boolean {
if (tracerSetup.openTelemetry != null) {
// The SDK is on the classpath.
return true
}
// Now see if the open telemetry java agent is available
val tracer = GlobalOpenTelemetry.getTracerProvider().get(OpenTelemetryComponent::class.java.name)
return tracer.javaClass.name != "io.opentelemetry.api.trace.DefaultTracer"
}
override fun name(): String = OPENTELEMETRY_COMPONENT_NAME
override fun onTelemetryEvent(event: TelemetryEvent) {
when (event) {
is StartSpanForFlowEvent -> startSpanForFlow(event.name, event.attributes, event.telemetryId, event.flowLogic, event.telemetryDataItem)
is EndSpanForFlowEvent -> endSpanForFlow(event.telemetryId)
is StartSpanEvent -> startSpan(event.name, event.attributes, event.telemetryId, event.flowLogic)
is EndSpanEvent -> endSpan(event.telemetryId)
is SetStatusEvent -> setStatus(event.telemetryId, event.telemetryStatusCode, event.message)
is RecordExceptionEvent -> recordException(event.telemetryId, event.throwable)
}
}
@Suppress("LongParameterList")
private fun startSpanForFlow(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?,
telemetryDataItem: TelemetryDataItem?) {
val baggageAttributes = (telemetryDataItem as? OpenTelemetryContext)?.baggage?.let {
val baggageBuilder = it.toList().fold(Baggage.current().toBuilder()) {builder, attribute -> builder.put(attribute.first, attribute.second)}
baggages[telemetryId] = baggageBuilder.build().makeCurrent()
it
} ?: emptyMap()
// Also add any baggage to the span
val attributesMap = (attributes+baggageAttributes).toList()
.fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also {
populateWithFlowAttributes(it, flowLogic)
}.build()
if (telemetryDataItem != null) {
startSpanForFlowWithRemoteParent(name, attributesMap, telemetryId, telemetryDataItem)
}
else {
startSpanForFlowWithNoParent(name, attributesMap, telemetryId)
}
}
private fun startSpanForFlowWithNoParent(name: String, attributesMap: Attributes, telemetryId: UUID) {
val rootSpan = tracer.spanBuilder(name).setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.flow"), "true")).startSpan()
val rootSpanScope = rootSpan.makeCurrent()
if (spanStartEndEventsEnabled) {
val startedSpanContexts = createSpanToCaptureStartedSpanEvent(name, rootSpan, attributesMap)
val span = tracer.spanBuilder("Child Spans").setParent(Context.current().with(rootSpan)).startSpan()
val spanScope = span.makeCurrent()
rootSpans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope)
spans[telemetryId] = SpanInfo(name, span, spanScope, startedSpanContexts.first, startedSpanContexts.second)
}
else {
spans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope)
}
}
private fun startSpanForFlowWithRemoteParent(name: String, attributesMap: Attributes, telemetryId: UUID, telemetryDataItem: TelemetryDataItem) {
val parentContext = (telemetryDataItem as OpenTelemetryContext).spanContext
val spanContext = parentContext.createRemoteSpanContext()
val parentSpan = Span.wrap(spanContext)
val span = tracer.spanBuilder(name).setParent(Context.current().with(parentSpan)).setAllAttributes(attributesMap).startSpan()
val spanScope = span.makeCurrent()
if (spanStartEndEventsEnabled) {
val contexts = createSpanToCaptureStartedSpanEventWithRemoteParent(name, telemetryDataItem, attributesMap)
spans[telemetryId] = SpanInfo(name, span, spanScope, contexts.first, contexts.second)
}
else {
spans[telemetryId] = SpanInfo(name, span, spanScope)
}
}
private fun createSpanToCaptureStartedSpanEvent(name: String, rootSpan: Span, attributesMap: Attributes): Pair<SerializableSpanContext, SerializableSpanContext> {
val startedSpan = tracer.spanBuilder("Started Events").setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.startend.events"), "true")).setParent(Context.current().with(rootSpan)).startSpan()
val serializableSpanContext = SerializableSpanContext(startedSpan.spanContext)
startedSpan.end()
val startedSpanContext = serializableSpanContext.createSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(Context.current().with(startedSpanFromContext)).startSpan()
val childSerializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext)
startedSpanChild.end()
return Pair(childSerializableSpanContext, serializableSpanContext)
}
private fun createSpanToCaptureStartedSpanEventWithRemoteParent(name: String, telemetryDataItem: OpenTelemetryContext, attributesMap: Attributes ): Pair<SerializableSpanContext, SerializableSpanContext> {
val startedSpanParentContext = telemetryDataItem.startedSpanContext
val startedSpanContext = startedSpanParentContext.createRemoteSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(Context.current().with(startedSpanFromContext)).startSpan()
val serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext)
startedSpanChild.end()
return Pair(serializableSpanContext, startedSpanParentContext)
}
private fun endSpanForFlow(telemetryId: UUID){
val spanInfo = spans[telemetryId]
val rootSpanInfo = rootSpans[telemetryId]
if (spanStartEndEventsEnabled) {
createSpanToCaptureEndSpanEvent(spanInfo)
}
spanInfo?.spanScope?.close()
spanInfo?.span?.end()
rootSpanInfo?.spanScope?.close()
rootSpanInfo?.span?.end()
spans.remove(telemetryId)
rootSpans.remove(telemetryId)
val baggageScope = baggages[telemetryId]
baggageScope?.close()
baggages.remove(telemetryId)
}
private fun createSpanToCaptureEndSpanEvent(spanInfo: SpanInfo?) {
spanInfo?.parentSpanEventContext?.let {
val startedSpanContext = it.createSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${spanInfo.name}-end").setParent(Context.current().with(startedSpanFromContext)).startSpan()
startedSpanChild.end()
}
}
private fun startSpan(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?) {
val currentBaggage = Baggage.current()
val baggageAttributes = mutableMapOf<String,String>()
currentBaggage.forEach { t, u -> baggageAttributes[t] = u.value }
val parentSpan = Span.current()
val attributesMap = (attributes+baggageAttributes).toList().fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also {
populateWithFlowAttributes(it, flowLogic)
}.build()
val span = tracer.spanBuilder(name).setAllAttributes(attributesMap).startSpan()
val spanScope = span.makeCurrent()
val startedEventContexts = createStartedEventSpan(name, attributesMap, parentSpan)
spans[telemetryId] = SpanInfo(name, span, spanScope, startedEventContexts.first, startedEventContexts.second)
}
private fun populateWithFlowAttributes(attributesBuilder: AttributesBuilder, flowLogic: FlowLogic<*>?) {
flowLogic?.let {
attributesBuilder.put("flow.id", flowLogic.runId.uuid.toString())
attributesBuilder.put("creation.time", flowLogic.stateMachine.creationTime)
attributesBuilder.put("class.name", flowLogic.javaClass.name)
}
}
private fun createStartedEventSpan(name: String, attributesMap: Attributes, parentSpan: Span): Pair<SerializableSpanContext?, SerializableSpanContext?> {
if (spanStartEndEventsEnabled) {
// Fix up null contexts - make not null
val filteredSpans = spans.filter { it.value.span == parentSpan }.toList()
var serializableSpanContext = SerializableSpanContext()
var parentStartedSpanContext = SerializableSpanContext()
if (filteredSpans.isNotEmpty()) {
parentStartedSpanContext = filteredSpans[0].second.spanEventContext ?: SerializableSpanContext()
val startedSpanContext = parentStartedSpanContext.createSpanContext()
val startedSpanFromContext = Span.wrap(startedSpanContext)
val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap)
.setParent(Context.current().with(startedSpanFromContext)).startSpan()
serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext)
startedSpanChild.end()
}
return Pair(serializableSpanContext, parentStartedSpanContext)
}
else {
return Pair(null, null)
}
}
private fun endSpan(telemetryId: UUID){
val spanInfo = spans[telemetryId]
createSpanToCaptureEndSpanEvent(spanInfo)
spanInfo?.spanScope?.close()
spanInfo?.span?.end()
spans.remove(telemetryId)
}
override fun getCurrentTelemetryData(): TelemetryDataItem {
val currentSpan = Span.current()
val spanContext = SerializableSpanContext(currentSpan.spanContext)
val filteredSpans = spans.filter { it.value.span == currentSpan }.toList()
val startedSpanContext = filteredSpans.getOrNull(0)?.second?.spanEventContext ?: SerializableSpanContext()
return OpenTelemetryContext(spanContext, startedSpanContext, Baggage.current().asMap().mapValues { it.value.value })
}
override fun getCurrentTelemetryId(): UUID {
val currentSpan = Span.current()
val filteredSpans = spans.filter { it.value.span == currentSpan }.toList()
if (filteredSpans.isEmpty()) {
return UUID(0, 0)
}
return filteredSpans[0].first // return UUID associated with current span
}
override fun setCurrentTelemetryId(id: UUID) {
val spanInfo = spans.get(id)
spanInfo?.let {
it.spanScope.close() // close the old scope
val childSpanScope = it.span.makeCurrent()
val newSpanInfo = spanInfo.copy(spanScope = childSpanScope)
spans[id] = newSpanInfo
}
}
override fun getCurrentSpanId(): String {
return Span.current().spanContext.spanId
}
override fun getCurrentTraceId(): String {
return Span.current().spanContext.traceId
}
override fun getCurrentBaggage(): Map<String, String> {
return Baggage.current().asMap().mapValues { it.value.value }
}
private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) {
val spanInfo = spans[telemetryId]
spanInfo?.span?.setStatus(toOpenTelemetryStatus(telemetryStatusCode), message)
}
private fun toOpenTelemetryStatus(telemetryStatusCode: TelemetryStatusCode): StatusCode {
return when(telemetryStatusCode) {
TelemetryStatusCode.ERROR -> StatusCode.ERROR
TelemetryStatusCode.OK -> StatusCode.OK
TelemetryStatusCode.UNSET -> StatusCode.UNSET
}
}
private fun recordException(telemetryId: UUID, throwable: Throwable) {
val spanInfo = spans[telemetryId]
spanInfo?.span?.recordException(throwable)
}
}

View File

@ -0,0 +1,3 @@
package net.corda.core.internal.telemetry
interface OpenTelemetryHandle

View File

@ -0,0 +1,126 @@
package net.corda.core.internal.telemetry
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.debug
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import java.lang.IllegalStateException
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@CordaSerializable
data class SimpleLogContext(val traceId: UUID, val baggage: Map<String, String>): TelemetryDataItem
const val CLIENT_ID = "client.id"
const val TRACE_ID = "trace.id"
// Simple telemetry class that creates a single UUID and uses this for the trace id. When the flow starts we use the trace is passed in. After this
// though we must use the trace id propagated to us (if remote), or the trace id associated with thread local.
@Suppress("TooManyFunctions")
class SimpleLogTelemetryComponent : TelemetryComponent {
companion object {
private val log: Logger = LoggerFactory.getLogger(SimpleLogTelemetryComponent::class.java)
}
private val traces: InheritableThreadLocal<UUID> = InheritableThreadLocal()
private val logContexts = ConcurrentHashMap<UUID, SimpleLogContext>()
override fun isEnabled(): Boolean {
return true
}
override fun name(): String = "SimpleLogTelemetry"
override fun onTelemetryEvent(event: TelemetryEvent) {
when (event) {
is StartSpanForFlowEvent -> startSpanForFlow(event.name, event.attributes, event.telemetryId, event.flowLogic, event.telemetryDataItem)
is EndSpanForFlowEvent -> endSpanForFlow(event.telemetryId)
is StartSpanEvent -> startSpan(event.name, event.attributes, event.telemetryId, event.flowLogic)
is EndSpanEvent -> endSpan(event.telemetryId)
is SetStatusEvent -> setStatus(event.telemetryId, event.telemetryStatusCode, event.message)
is RecordExceptionEvent -> recordException(event.telemetryId, event.throwable)
}
}
@Suppress("LongParameterList")
private fun startSpanForFlow(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?, telemetryDataItem: TelemetryDataItem?) {
val simpleLogTelemetryDataItem = telemetryDataItem?.let {(telemetryDataItem as? SimpleLogContext) ?:
throw IllegalStateException("Type of telemetryDataItem no a SimpleLogContext, actual class is ${telemetryDataItem::class.java.name}")}
val traceId = simpleLogTelemetryDataItem?.traceId ?: telemetryId
val flowId = flowLogic?.runId
val clientId = simpleLogTelemetryDataItem?.baggage?.get(CLIENT_ID) ?: flowLogic?.stateMachine?.clientId
traces.set(traceId)
val baggageAttributes = simpleLogTelemetryDataItem?.baggage ?: emptyMap()
logContexts[traceId] = SimpleLogContext(traceId, baggageAttributes)
clientId?.let { MDC.put(CLIENT_ID, it) }
MDC.put(TRACE_ID, traceId.toString())
log.debug {"startSpanForFlow: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: ${attributes+baggageAttributes}"}
}
// Check when you start a top level flow the startSpanForFlow appears just once, and so the endSpanForFlow also appears just once
// So its valid to do the MDC clear here. For remotes nodes as well
private fun endSpanForFlow(telemetryId: UUID) {
log.debug {"endSpanForFlow: traceId: ${traces.get()}"}
logContexts.remove(telemetryId)
MDC.clear()
}
@Suppress("UNUSED_PARAMETER")
private fun startSpan(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?) {
val flowId = flowLogic?.runId
val clientId = flowLogic?.stateMachine?.clientId
val traceId = traces.get()
log.debug {"startSpan: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: $attributes"}
}
@Suppress("UNUSED_PARAMETER")
private fun endSpan(telemetryId: UUID) {
log.debug {"endSpan: traceId: ${traces.get()}"}
}
override fun getCurrentTelemetryData(): SimpleLogContext {
traces.get()?.let {
logContexts[it]?.let { simpleLogContext ->
return simpleLogContext
}
}
return SimpleLogContext(UUID(0, 0), emptyMap())
}
override fun getCurrentTelemetryId(): UUID {
return traces.get() ?: UUID(0,0)
}
override fun setCurrentTelemetryId(id: UUID) {
traces.set(id)
}
override fun getCurrentSpanId(): String {
return traces.get()?.toString() ?: ""
}
override fun getCurrentTraceId(): String {
return traces.get()?.toString() ?: ""
}
override fun getCurrentBaggage(): Map<String, String> {
val uuid = traces.get()
return logContexts[uuid]?.baggage ?: emptyMap()
}
@Suppress("UNUSED_PARAMETER")
private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) {
when(telemetryStatusCode) {
TelemetryStatusCode.ERROR -> log.error("setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message")
TelemetryStatusCode.OK, TelemetryStatusCode.UNSET -> log.debug {"setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message" }
}
}
@Suppress("UNUSED_PARAMETER")
private fun recordException(telemetryId: UUID, throwable: Throwable) {
log.error("recordException: traceId: ${traces.get()}, throwable: ${throwable}}")
}
}

View File

@ -0,0 +1,235 @@
package net.corda.core.internal.telemetry
import net.corda.core.CordaInternal
import net.corda.core.flows.FlowLogic
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.TelemetryService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationFactory
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.OpaqueBytes
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.*
@CordaSerializable
interface TelemetryDataItem
@CordaSerializable
data class SerializedTelemetry(val serializedTelemetryData: Map<String, OpaqueBytes>)
enum class TelemetryStatusCode {
/** The default status. */
UNSET,
/**
* The operation has been validated by an Application developers or Operator to have completed
* successfully.
*/
OK,
/** The operation contains an error. */
ERROR
}
@CordaSerializable
data class TelemetryId(private val telemetryService: TelemetryServiceImpl) {
val id: UUID = UUID.randomUUID()
fun setStatus(telemetryStatusCode: TelemetryStatusCode, message: String) {
telemetryService.setStatus(this, telemetryStatusCode, message)
}
fun recordException(throwable: Throwable) {
telemetryService.recordException( this, throwable)
}
fun close() {
telemetryService.endSpan(this)
}
}
@CordaSerializable
data class ComponentTelemetryIds(val componentTelemetryIds: Map<String, UUID>)
interface TelemetryEvent
class StartSpanForFlowEvent(val name: String,
val attributes: Map<String, String>,
val telemetryId: UUID, val flowLogic: FlowLogic<*>?,
val telemetryDataItem: TelemetryDataItem?): TelemetryEvent
class EndSpanForFlowEvent(val telemetryId: UUID): TelemetryEvent
class StartSpanEvent(val name: String, val attributes: Map<String, String>, val telemetryId: UUID, val flowLogic: FlowLogic<*>?): TelemetryEvent
class EndSpanEvent(val telemetryId: UUID): TelemetryEvent
class SetStatusEvent(val telemetryId: UUID, val telemetryStatusCode: TelemetryStatusCode, val message: String): TelemetryEvent
class RecordExceptionEvent(val telemetryId: UUID, val throwable: Throwable): TelemetryEvent
interface TelemetryComponent {
fun name(): String
fun isEnabled(): Boolean
fun onTelemetryEvent(event: TelemetryEvent)
fun getCurrentTelemetryData(): TelemetryDataItem
fun getCurrentTelemetryId(): UUID
fun setCurrentTelemetryId(id: UUID)
fun getCurrentSpanId(): String
fun getCurrentTraceId(): String
fun getCurrentBaggage(): Map<String, String>
}
interface TelemetryComponentId {
fun name(): String
}
@Suppress("TooManyFunctions")
class TelemetryServiceImpl : SingletonSerializeAsToken(), TelemetryService {
companion object {
private val log: Logger = LoggerFactory.getLogger(TelemetryServiceImpl::class.java)
}
fun getCurrentSpanId(telemetryComponentName: String): String? {
return telemetryComponents[telemetryComponentName]?.getCurrentSpanId()
}
fun getCurrentTraceId(telemetryComponentName: String): String? {
return telemetryComponents[telemetryComponentName]?.getCurrentTraceId()
}
fun getCurrentBaggage(telemetryComponentName: String): Map<String, String>? {
return telemetryComponents[telemetryComponentName]?.getCurrentBaggage()
}
fun setStatus(telemetryId: TelemetryId, telemetryStatusCode: TelemetryStatusCode, message: String) {
telemetryComponents.values.forEach {
it.onTelemetryEvent(SetStatusEvent(telemetryId.id, telemetryStatusCode, message))
}
}
fun recordException(telemetryId: TelemetryId, throwable: Throwable) {
telemetryComponents.values.forEach {
it.onTelemetryEvent(RecordExceptionEvent(telemetryId.id, throwable))
}
}
@CordaInternal
fun deserialize(data: OpaqueBytes): TelemetryDataItem {
return SerializationFactory.defaultFactory.deserialize(data, TelemetryDataItem::class.java, SerializationFactory.defaultFactory.defaultContext)
}
private val telemetryComponents: MutableMap<String, TelemetryComponent> = mutableMapOf()
@CordaInternal
fun addTelemetryComponent(telemetryComponent: TelemetryComponent) {
telemetryComponents[telemetryComponent.name()] = telemetryComponent
}
@CordaInternal
fun startSpanForFlow(name: String, attributes: Map<String, String>, flowLogic: FlowLogic<*>? = null, remoteSerializedTelemetry: SerializedTelemetry? = null): TelemetryId {
val telemetryId = TelemetryId(this)
telemetryComponents.values.forEach {
val bytes = remoteSerializedTelemetry?.serializedTelemetryData?.get(it.name())
val telemetryDataItem = bytes?.let { deserialize(bytes) }
it.onTelemetryEvent(StartSpanForFlowEvent(name, attributes, telemetryId.id, flowLogic, telemetryDataItem))
}
return telemetryId
}
@CordaInternal
fun endSpanForFlow(telemetryId: TelemetryId) {
telemetryComponents.values.forEach {
it.onTelemetryEvent(EndSpanForFlowEvent(telemetryId.id))
}
}
fun startSpan(name: String, attributes: Map<String, String> = emptyMap(), flowLogic: FlowLogic<*>? = null): TelemetryId {
val telemetryId = TelemetryId(this)
telemetryComponents.values.forEach {
it.onTelemetryEvent(StartSpanEvent(name, attributes, telemetryId.id, flowLogic))
}
return telemetryId
}
fun endSpan(telemetryId: TelemetryId) {
telemetryComponents.values.forEach {
it.onTelemetryEvent(EndSpanEvent(telemetryId.id))
}
}
@Suppress("TooGenericExceptionCaught")
inline fun <R> span(name: String, attributes: Map<String, String> = emptyMap(), flowLogic: FlowLogic<*>? = null, block: () -> R): R {
val telemetryId = startSpan(name, attributes, flowLogic)
try {
return block()
}
catch(ex: Throwable) {
recordException(telemetryId, ex)
setStatus(telemetryId, TelemetryStatusCode.ERROR, "Exception raised: ${ex.message}")
throw ex
}
finally {
endSpan(telemetryId)
}
}
@CordaInternal
@Suppress("LongParameterList", "TooGenericExceptionCaught")
inline fun <R> spanForFlow(name: String, attributes: Map<String, String>, flowLogic: FlowLogic<*>? = null, remoteSerializedTelemetry: SerializedTelemetry? = null, block: () -> R): R {
val telemetryId = startSpanForFlow(name, attributes, flowLogic, remoteSerializedTelemetry)
try {
return block()
}
catch(ex: Throwable) {
recordException(telemetryId, ex)
setStatus(telemetryId, TelemetryStatusCode.ERROR, "Exception raised: ${ex.message}")
throw ex
}
finally {
endSpanForFlow(telemetryId)
}
}
@CordaInternal
fun getCurrentTelemetryData(): SerializedTelemetry? {
if (telemetryComponents.isEmpty()) {
return null
}
val serializedTelemetryData = mutableMapOf<String, OpaqueBytes>()
telemetryComponents.values.forEach {
val currentTelemetryData = it.getCurrentTelemetryData()
serializedTelemetryData[it.name()] = currentTelemetryData.serialize()
}
return SerializedTelemetry(serializedTelemetryData)
}
@CordaInternal
fun getCurrentTelemetryIds(): ComponentTelemetryIds? {
if (telemetryComponents.isEmpty()) {
return null
}
val telemetryIds = mutableMapOf<String, UUID>()
telemetryComponents.values.forEach {
telemetryIds[it.name()] = it.getCurrentTelemetryId()
}
return ComponentTelemetryIds(telemetryIds)
}
@CordaInternal
fun setCurrentTelemetryId(telemetryIds: ComponentTelemetryIds) {
telemetryComponents.values.forEach {
it.setCurrentTelemetryId(telemetryIds.componentTelemetryIds[it.name()]!!)
}
}
override fun getOpenTelemetry(): OpenTelemetryHandle? {
return telemetryComponents[OpenTelemetryComponent.OPENTELEMETRY_COMPONENT_NAME]?.let {
null // (it as? OpenTelemetryComponent)?.tracerSetup?.openTelemetry
}
}
}
val ServiceHub.telemetryServiceInternal
get() = this.telemetryService as TelemetryServiceImpl

View File

@ -12,6 +12,7 @@ import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.ContractUpgradeFlow
import net.corda.core.node.services.*
import net.corda.core.node.services.diagnostics.DiagnosticsService
import net.corda.core.internal.telemetry.TelemetryComponent
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.LedgerTransaction
@ -165,6 +166,11 @@ interface ServiceHub : ServicesForResolution {
*/
val diagnosticsService: DiagnosticsService
/**
* Provides operations to support telemetry and telemetry data between nodes.
*/
val telemetryService: TelemetryService
/**
* INTERNAL. DO NOT USE.
* @suppress
@ -187,6 +193,13 @@ interface ServiceHub : ServicesForResolution {
*/
fun <T : SerializeAsToken> cordaService(type: Class<T>): T
/**
* Return the singleton instance of the given Corda telemetry component type. This is a class that implements TelemetryComponent
* and will have automatically been registered by the node.
* @throws IllegalArgumentException If the instance is not found.
*/
fun <T : TelemetryComponent> cordaTelemetryComponent(type: Class<T>): T
/**
* Stores the given [SignedTransaction]s in the local transaction storage and then sends them to the vault for
* further processing if [notifyVault] is true. This is expected to be run within a database transaction.

View File

@ -0,0 +1,9 @@
package net.corda.core.node.services
import net.corda.core.DoNotImplement
import net.corda.core.internal.telemetry.OpenTelemetryHandle
@DoNotImplement
interface TelemetryService {
fun getOpenTelemetry(): OpenTelemetryHandle?
}

View File

@ -6,7 +6,9 @@ import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
import net.corda.core.context.Trace.SessionId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.telemetry.SerializedTelemetry
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.Id
@ -116,7 +118,8 @@ object RPCApi {
val replyId: InvocationId,
val sessionId: SessionId,
val externalTrace: Trace? = null,
val impersonatedActor: Actor? = null
val impersonatedActor: Actor? = null,
val serializedTelemetry: SerializedTelemetry? = null
) : ClientToServer() {
override fun writeToClientMessage(message: ClientMessage) {
MessageUtil.setJMSReplyTo(message, clientAddress)
@ -130,6 +133,8 @@ object RPCApi {
message.putStringProperty(METHOD_NAME_FIELD_NAME, methodName)
message.bodyBuffer.writeBytes(serialisedArguments.bytes)
val telemetryBytes: SerializedBytes<SerializedTelemetry>? = serializedTelemetry?.serialize()
telemetryBytes?.let { message.putBytesProperty(TELEMETRY_PROPERTY, it.bytes) }
}
}
@ -148,15 +153,20 @@ object RPCApi {
fun fromClientMessage(message: ClientMessage): ClientToServer {
val tag = Tag.values()[message.getIntProperty(TAG_FIELD_NAME)]
return when (tag) {
RPCApi.ClientToServer.Tag.RPC_REQUEST -> RpcRequest(
clientAddress = MessageUtil.getJMSReplyTo(message),
methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME),
serialisedArguments = OpaqueBytes(message.getBodyAsByteArray()),
replyId = message.replyId(),
sessionId = message.sessionId(),
externalTrace = message.externalTrace(),
impersonatedActor = message.impersonatedActor()
)
RPCApi.ClientToServer.Tag.RPC_REQUEST -> {
val telemetryBytes = message.getBytesProperty(TELEMETRY_PROPERTY)
val serializedTelemetry: SerializedTelemetry? = telemetryBytes?.let {OpaqueBytes(it).deserialize()}
RpcRequest(
clientAddress = MessageUtil.getJMSReplyTo(message),
methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME),
serialisedArguments = OpaqueBytes(message.getBodyAsByteArray()),
replyId = message.replyId(),
sessionId = message.sessionId(),
externalTrace = message.externalTrace(),
impersonatedActor = message.impersonatedActor(),
serializedTelemetry = serializedTelemetry
)
}
RPCApi.ClientToServer.Tag.OBSERVABLES_CLOSED -> {
val ids = ArrayList<InvocationId>()
val buffer = message.bodyBuffer
@ -279,6 +289,7 @@ private const val DEDUPLICATION_IDENTITY_FIELD_NAME = "deduplication-identity"
private const val OBSERVABLE_ID_FIELD_NAME = "observable-id"
private const val OBSERVABLE_ID_TIMESTAMP_FIELD_NAME = "observable-id-timestamp"
private const val METHOD_NAME_FIELD_NAME = "method-name"
private const val TELEMETRY_PROPERTY = "telemetry-data"
fun ClientMessage.replyId(): InvocationId {
@ -301,6 +312,11 @@ fun ClientMessage.externalTrace(): Trace? {
}
}
fun ClientMessage.serializedTelemetry(): SerializedTelemetry? {
val telemetryBytes = this.getBytesProperty(TELEMETRY_PROPERTY)
return telemetryBytes?.let { OpaqueBytes(it).deserialize() }
}
fun ClientMessage.impersonatedActor(): Actor? {
return getStringProperty(RPC_IMPERSONATED_ACTOR_ID)?.let {

View File

@ -100,6 +100,7 @@ dependencies {
compile project(':common-configuration-parsing')
compile project(':common-logging')
implementation "io.opentelemetry:opentelemetry-api:${open_telemetry_version}"
// Backwards compatibility goo: Apps expect confidential-identities to be loaded by default.
// We could eventually gate this on a target-version check.
compile project(':confidential-identities')
@ -309,7 +310,8 @@ quasar {
"org.w3c.**",
"org.xml**",
"org.yaml**",
"rx**")
"rx**",
"io.opentelemetry.**")
}
jar {

View File

@ -96,7 +96,7 @@ task buildCordaJAR(type: FatCapsule, dependsOn: [
applicationVersion = corda_release_version
applicationId = "net.corda.node.Corda"
// See experimental/quasar-hook/README.md for how to generate.
def quasarExcludeExpression = "x(antlr**;bftsmart**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;io.github**;io.netty**;jdk**;kotlin**;net.corda.djvm**;djvm**;net.bytebuddy**;net.i2p**;org.apache**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.objectweb**;org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**;org.jolokia**;com.lmax**;picocli**;liquibase**;com.github.benmanes**;org.json**;org.postgresql**;nonapi.io.github.classgraph**)"
def quasarExcludeExpression = "x(antlr**;bftsmart**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;io.github**;io.netty**;jdk**;kotlin**;net.corda.djvm**;djvm**;net.bytebuddy**;net.i2p**;org.apache**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.objectweb**;org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**;org.jolokia**;com.lmax**;picocli**;liquibase**;com.github.benmanes**;org.json**;org.postgresql**;nonapi.io.github.classgraph**;io.opentelemetry**)"
def quasarClassLoaderExclusion = "l(net.corda.djvm.**;net.corda.core.serialization.internal.**)"
javaAgents = quasar_classifier ? ["quasar-core-${quasar_version}-${quasar_classifier}.jar=${quasarExcludeExpression}${quasarClassLoaderExclusion}"] : ["quasar-core-${quasar_version}.jar=${quasarExcludeExpression}${quasarClassLoaderExclusion}"]
systemProperties['visualvm.display.name'] = 'Corda'

View File

@ -56,6 +56,11 @@ import net.corda.core.node.services.ContractUpgradeService
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.internal.telemetry.SimpleLogTelemetryComponent
import net.corda.core.internal.telemetry.TelemetryComponent
import net.corda.core.internal.telemetry.OpenTelemetryComponent
import net.corda.core.internal.telemetry.TelemetryServiceImpl
import net.corda.core.node.services.TelemetryService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.node.services.diagnostics.DiagnosticsService
import net.corda.core.schemas.MappedSchema
@ -250,6 +255,15 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
NotaryLoader(it, versionInfo)
}
val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop(false)
val telemetryService: TelemetryServiceImpl = TelemetryServiceImpl().also {
val openTelemetryComponent = OpenTelemetryComponent(configuration.myLegalName.toString(), configuration.telemetry.spanStartEndEventsEnabled)
if (configuration.telemetry.openTelemetryEnabled && openTelemetryComponent.isEnabled()) {
it.addTelemetryComponent(openTelemetryComponent)
}
if (configuration.telemetry.simpleLogTelemetryEnabled) {
it.addTelemetryComponent(SimpleLogTelemetryComponent())
}
}.tokenize()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize()
val identityService = PersistentIdentityService(cacheFactory).tokenize()
val database: CordaPersistence = createCordaPersistence(
@ -337,6 +351,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
private val schedulerService = makeNodeSchedulerService()
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
private val cordappTelemetryComponents = MutableClassToInstanceMap.create<TelemetryComponent>()
private val shutdownExecutor = Executors.newSingleThreadExecutor()
protected abstract val transactionVerifierWorkerCount: Int
@ -629,6 +644,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
// the identity key. But the infrastructure to make that easy isn't here yet.
keyManagementService.start(keyStoreHandler.signingKeys.map { it.key to it.alias })
installTelemetryComponents()
installCordaServices()
notaryService = maybeStartNotaryService(keyStoreHandler.notaryIdentity)
contractUpgradeService.start()
@ -972,6 +988,62 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
return service
}
private class TelemetryComponentInstantiationException(cause: Throwable?) : CordaException("Service Instantiation Error", cause)
@Suppress("ThrowsCount", "ComplexMethod", "NestedBlockDepth")
private fun installTelemetryComponents() {
val loadedTelemetryComponents: List<Class<out TelemetryComponent>> = cordappLoader.cordapps.flatMap { it.telemetryComponents }.filterNot {
it.name == OpenTelemetryComponent::class.java.name ||
it.name == SimpleLogTelemetryComponent::class.java.name }
// This sets the Cordapp classloader on the contextClassLoader of the current thread, prior to initializing telemetry components
// Needed because of bug CORDA-2653 - some telemetry components can utilise third-party libraries that require access to
// the Thread context class loader. (Same as installCordaServices).
val oldContextClassLoader: ClassLoader? = Thread.currentThread().contextClassLoader
try {
Thread.currentThread().contextClassLoader = cordappLoader.appClassLoader
loadedTelemetryComponents.forEach {
try {
installTelemetryComponent(it)
} catch (e: NoSuchMethodException) {
log.error("Missing no arg ctor for ${it.name}")
throw e
} catch (e: TelemetryComponentInstantiationException) {
if (e.cause != null) {
log.error("Corda telemetry component ${it.name} failed to instantiate. Reason was: ${e.cause?.rootMessage}", e.cause)
} else {
log.error("Corda telemetry component ${it.name} failed to instantiate", e)
}
throw e
} catch (e: Exception) {
log.error("Unable to install Corda telemetry component ${it.name}", e)
throw e
}
}
} finally {
Thread.currentThread().contextClassLoader = oldContextClassLoader
}
}
private fun <T : TelemetryComponent> installTelemetryComponent(telemetryComponentClass: Class<T>) {
val telemetryComponent = try {
val extendedTelemetryComponentConstructor = telemetryComponentClass.getDeclaredConstructor().apply { isAccessible = true }
val telemetryComponent = extendedTelemetryComponentConstructor.newInstance()
telemetryComponent
} catch (e: InvocationTargetException) {
throw TelemetryComponentInstantiationException(e.cause)
}
cordappTelemetryComponents.putInstance(telemetryComponentClass, telemetryComponent)
if (telemetryComponent.isEnabled()) {
telemetryService.addTelemetryComponent(telemetryComponent)
log.info("Installed ${telemetryComponentClass.name} Telemetry component")
}
else {
log.info("${telemetryComponentClass.name} not enabled so not installing")
}
}
private fun registerCordappFlows() {
cordappLoader.cordapps.forEach { cordapp ->
cordapp.initiatedFlows.groupBy { it.requireAnnotation<InitiatedBy>().value.java }.forEach { initiator, responders ->
@ -1065,7 +1137,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
// the identity key. But the infrastructure to make that easy isn't here yet.
return BasicHSMKeyManagementService(cacheFactory, identityService, database, cryptoService)
return BasicHSMKeyManagementService(cacheFactory, identityService, database, cryptoService, telemetryService)
}
open fun stop() {
@ -1146,6 +1218,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
override val diagnosticsService: DiagnosticsService get() = this@AbstractNode.diagnosticsService
override val externalOperationExecutor: ExecutorService get() = this@AbstractNode.externalOperationExecutor
override val notaryService: NotaryService? get() = this@AbstractNode.notaryService
override val telemetryService: TelemetryService get() = this@AbstractNode.telemetryService
private lateinit var _myInfo: NodeInfo
override val myInfo: NodeInfo get() = _myInfo
@ -1167,6 +1240,11 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
?: throw IllegalArgumentException("Corda service ${type.name} does not exist")
}
override fun <T : TelemetryComponent> cordaTelemetryComponent(type: Class<T>): T {
return cordappTelemetryComponents.getInstance(type)
?: throw IllegalArgumentException("Corda telemetry component ${type.name} does not exist")
}
override fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>? {
return flowManager.getFlowFactoryForInitiatingFlow(initiatingFlowClass)
}

View File

@ -17,6 +17,7 @@ import net.corda.core.internal.cordapp.get
import net.corda.core.internal.notary.NotaryService
import net.corda.core.internal.notary.SinglePartyNotaryService
import net.corda.core.node.services.CordaService
import net.corda.core.internal.telemetry.TelemetryComponent
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.CheckpointCustomSerializer
import net.corda.core.serialization.SerializationCustomSerializer
@ -184,6 +185,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
findServiceFlows(this),
findSchedulableFlows(this),
findServices(this),
findTelemetryComponents(this),
findWhitelists(url),
findSerializers(this),
findCheckpointSerializers(this),
@ -281,6 +283,10 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class)
}
private fun findTelemetryComponents(scanResult: RestrictedScanResult): List<Class<out TelemetryComponent>> {
return scanResult.getClassesImplementing(TelemetryComponent::class)
}
private fun findInitiatedFlows(scanResult: RestrictedScanResult): List<Class<out FlowLogic<*>>> {
return scanResult.getClassesWithAnnotation(FlowLogic::class, InitiatedBy::class)
}
@ -414,6 +420,15 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
.map { it.kotlin.objectOrNewInstance() }
}
fun <T : Any> getClassesImplementing(type: KClass<T>): List<Class<out T>> {
return scanResult
.getClassesImplementing(type.java.name)
.filter { it.name.startsWith(qualifiedNamePrefix) }
.mapNotNull {
loadClass(it.name, type) }
.filterNot { it.isAbstractClass }
}
fun <T : Any> getClassesWithAnnotation(type: KClass<T>, annotation: KClass<out Annotation>): List<Class<out T>> {
return scanResult
.getClassesWithAnnotation(annotation.java.name)

View File

@ -30,6 +30,7 @@ internal object VirtualCordapp {
serviceFlows = listOf(),
schedulableFlows = listOf(),
services = listOf(),
telemetryComponents = listOf(),
serializationWhitelists = listOf(),
serializationCustomSerializers = listOf(),
checkpointCustomSerializers = listOf(),
@ -54,6 +55,7 @@ internal object VirtualCordapp {
serviceFlows = listOf(),
schedulableFlows = listOf(),
services = listOf(),
telemetryComponents = listOf(),
serializationWhitelists = listOf(),
serializationCustomSerializers = listOf(),
checkpointCustomSerializers = listOf(),
@ -79,6 +81,7 @@ internal object VirtualCordapp {
serviceFlows = listOf(),
schedulableFlows = listOf(),
services = listOf(),
telemetryComponents = listOf(),
serializationWhitelists = listOf(),
serializationCustomSerializers = listOf(),
checkpointCustomSerializers = listOf(),
@ -103,6 +106,7 @@ internal object VirtualCordapp {
serviceFlows = listOf(),
schedulableFlows = listOf(),
services = listOf(),
telemetryComponents = listOf(),
serializationWhitelists = listOf(),
serializationCustomSerializers = listOf(),
checkpointCustomSerializers = listOf(),

View File

@ -43,6 +43,7 @@ interface NodeConfiguration : ConfigurationWithOptionsContainer {
val certificateChainCheckPolicies: List<CertChainPolicyConfig>
val verifierType: VerifierType
val flowTimeout: FlowTimeoutConfiguration
val telemetry: TelemetryConfiguration
val notary: NotaryConfig?
val additionalNodeInfoPollingFrequencyMsec: Long
val p2pAddress: NetworkHostAndPort
@ -220,6 +221,12 @@ data class FlowTimeoutConfiguration(
val backoffBase: Double
)
data class TelemetryConfiguration(
val openTelemetryEnabled: Boolean,
val simpleLogTelemetryEnabled: Boolean,
val spanStartEndEventsEnabled: Boolean
)
internal typealias Valid<TARGET> = Validated<TARGET, Configuration.Validation.Error>
fun Config.parseAsNodeConfiguration(options: Configuration.Options = Configuration.Options(strict = true)): Valid<NodeConfiguration> = V1NodeConfigurationSpec.parse(this, options)

View File

@ -42,6 +42,7 @@ data class NodeConfigurationImpl(
override val security: SecurityConfiguration? = Defaults.security,
override val verifierType: VerifierType,
override val flowTimeout: FlowTimeoutConfiguration,
override val telemetry: TelemetryConfiguration = Defaults.telemetry,
override val p2pAddress: NetworkHostAndPort,
override val additionalP2PAddresses: List<NetworkHostAndPort> = Defaults.additionalP2PAddresses,
private val rpcAddress: NetworkHostAndPort? = Defaults.rpcAddress,
@ -133,6 +134,7 @@ data class NodeConfigurationImpl(
fun database(devMode: Boolean) = DatabaseConfig(
exportHibernateJMXStatistics = devMode
)
val telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true)
}
companion object {

View File

@ -30,6 +30,7 @@ import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.PasswordEncryption
import net.corda.node.services.config.SecurityConfiguration
import net.corda.node.services.config.SecurityConfiguration.AuthService.Companion.defaultAuthServiceId
import net.corda.node.services.config.TelemetryConfiguration
import net.corda.node.services.config.Valid
import net.corda.node.services.config.schema.parsers.attempt
import net.corda.node.services.config.schema.parsers.badValue
@ -224,6 +225,17 @@ internal object FlowTimeoutConfigurationSpec : Configuration.Specification<FlowT
}
}
internal object TelemetryConfigurationSpec : Configuration.Specification<TelemetryConfiguration>("TelemetryConfiguration") {
private val openTelemetryEnabled by boolean()
private val simpleLogTelemetryEnabled by boolean()
private val spanStartEndEventsEnabled by boolean()
override fun parseValid(configuration: Config, options: Configuration.Options): Valid<TelemetryConfiguration> {
val config = configuration.withOptions(options)
return valid(TelemetryConfiguration(config[openTelemetryEnabled], config[simpleLogTelemetryEnabled], config[spanStartEndEventsEnabled]))
}
}
internal object NotaryConfigSpec : Configuration.Specification<NotaryConfig>("NotaryConfig") {
private val validating by boolean()
private val serviceLegalName by string().mapValid(::toCordaX500Name).optional()

View File

@ -25,6 +25,7 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
private val certificateChainCheckPolicies by nested(CertChainPolicyConfigSpec).list().optional().withDefaultValue(Defaults.certificateChainCheckPolicies)
private val verifierType by enum(VerifierType::class)
private val flowTimeout by nested(FlowTimeoutConfigurationSpec)
private val telemetry by nested(TelemetryConfigurationSpec).optional().withDefaultValue(Defaults.telemetry)
private val notary by nested(NotaryConfigSpec).optional()
private val additionalNodeInfoPollingFrequencyMsec by long().optional().withDefaultValue(Defaults.additionalNodeInfoPollingFrequencyMsec)
private val p2pAddress by string().mapValid(::toNetworkHostAndPort)
@ -95,6 +96,7 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
rpcUsers = config[rpcUsers],
verifierType = config[verifierType],
flowTimeout = config[flowTimeout],
telemetry = config[telemetry],
rpcSettings = config[rpcSettings],
messagingServerAddress = config[messagingServerAddress],
notary = config[notary],

View File

@ -2,6 +2,7 @@ package net.corda.node.services.keys
import net.corda.core.crypto.*
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.telemetry.TelemetryServiceImpl
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
@ -31,7 +32,8 @@ class BasicHSMKeyManagementService(
cacheFactory: NamedCacheFactory,
override val identityService: PersistentIdentityService,
private val database: CordaPersistence,
private val cryptoService: SignOnlyCryptoService
private val cryptoService: SignOnlyCryptoService,
val telemetryService: TelemetryServiceImpl
) : SingletonSerializeAsToken(), KeyManagementServiceInternal {
@Entity
@ -134,12 +136,14 @@ class BasicHSMKeyManagementService(
}
override fun sign(bytes: ByteArray, publicKey: PublicKey): DigitalSignature.WithKey {
val signingPublicKey = getSigningPublicKey(publicKey)
return if (signingPublicKey in originalKeysMap) {
DigitalSignature.WithKey(signingPublicKey, cryptoService.sign(originalKeysMap[signingPublicKey]!!, bytes))
} else {
val keyPair = getSigningKeyPair(signingPublicKey)
keyPair.sign(bytes)
telemetryService.span("${this::class.java.name}#sign") {
val signingPublicKey = getSigningPublicKey(publicKey)
return if (signingPublicKey in originalKeysMap) {
DigitalSignature.WithKey(signingPublicKey, cryptoService.sign(originalKeysMap[signingPublicKey]!!, bytes))
} else {
val keyPair = getSigningKeyPair(signingPublicKey)
keyPair.sign(bytes)
}
}
}

View File

@ -40,6 +40,7 @@ import net.corda.nodeapi.internal.persistence.contextDatabaseOrNull
import net.corda.nodeapi.internal.rpc.ObservableContextInterface
import net.corda.nodeapi.internal.rpc.ObservableSubscription
import net.corda.nodeapi.internal.serialization.amqp.RpcServerObservableSerializer
import net.corda.nodeapi.serializedTelemetry
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
@ -529,7 +530,8 @@ class RPCServer(
val externalTrace = externalTrace()
val rpcActor = actorFrom(this)
val impersonatedActor = impersonatedActor()
return RpcAuthContext(InvocationContext.rpc(rpcActor.first, trace, externalTrace, impersonatedActor, arguments), rpcActor.second)
val serializedTelemetry = serializedTelemetry()
return RpcAuthContext(InvocationContext.rpc(rpcActor.first, trace, externalTrace, impersonatedActor, arguments, serializedTelemetry), rpcActor.second)
}
private fun actorFrom(message: ClientMessage): Pair<Actor, AuthorizingSubject> {

View File

@ -4,6 +4,7 @@ import net.corda.core.flows.Destination
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.telemetry.SerializedTelemetry
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
@ -72,7 +73,7 @@ sealed class Event {
* Initiate a flow. This causes a new session object to be created and returned to the flow. Note that no actual
* communication takes place at this time, only on the first send/receive operation on the session.
*/
data class InitiateFlow(val destination: Destination, val wellKnownParty: Party) : Event()
data class InitiateFlow(val destination: Destination, val wellKnownParty: Party, val serializedTelemetry: SerializedTelemetry?) : Event()
/**
* Signal the entering into a subflow.

View File

@ -142,7 +142,7 @@ class FlowCreator(
senderUUID: String?): Flow<A> {
// Before we construct the state machine state by freezing the FlowLogic we need to make sure that lazy properties
// have access to the fiber (and thereby the service hub)
val flowStateMachineImpl = FlowStateMachineImpl(flowId, flowLogic, scheduler)
val flowStateMachineImpl = FlowStateMachineImpl(flowId, flowLogic, scheduler, serializedTelemetry = (flowStart as? FlowStart.Initiated)?.initiatingMessage?.serializedTelemetry)
val resultFuture = openFuture<Any?>()
flowStateMachineImpl.transientValues = createTransientValues(flowId, resultFuture)
flowLogic.stateMachine = flowStateMachineImpl
@ -180,7 +180,7 @@ class FlowCreator(
return when(flowState) {
is FlowState.Unstarted -> {
val logic = deserializeFlowState(flowState.frozenFlowLogic)
FlowStateMachineImpl(runId, logic, scheduler)
FlowStateMachineImpl(runId, logic, scheduler, serializedTelemetry = null)
}
is FlowState.Started -> deserializeFlowState(flowState.frozenFiber)
// Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint.

View File

@ -9,6 +9,8 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.checkPayloadIs
import net.corda.core.internal.telemetry.SerializedTelemetry
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
@ -18,7 +20,8 @@ import net.corda.core.utilities.UntrustworthyData
class FlowSessionImpl(
override val destination: Destination,
private val wellKnownParty: Party,
val sourceSessionId: SessionId
val sourceSessionId: SessionId,
val serializedTelemetry: SerializedTelemetry?
) : FlowSession() {
override val counterparty: Party get() = wellKnownParty
@ -30,6 +33,7 @@ class FlowSessionImpl(
override fun hashCode(): Int = sourceSessionId.hashCode()
private val flowStateMachine: FlowStateMachine<*> get() = Fiber.currentFiber() as FlowStateMachine<*>
private val telemetryMap = mapOf("destination" to destination.toString())
@Suspendable
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {
@ -46,15 +50,16 @@ class FlowSessionImpl(
payload: Any,
maySkipCheckpoint: Boolean
): UntrustworthyData<R> {
enforceNotPrimitive(receiveType)
val request = FlowIORequest.SendAndReceive(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)),
shouldRetrySend = false
)
val responseValues: Map<FlowSession, SerializedBytes<Any>> = flowStateMachine.suspend(request, maySkipCheckpoint)
val responseForCurrentSession = responseValues.getValue(this)
return responseForCurrentSession.checkPayloadIs(receiveType)
flowStateMachine.serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAndReceive", telemetryMap, flowLogic = flowStateMachine.logic) {
enforceNotPrimitive(receiveType)
val request = FlowIORequest.SendAndReceive(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)),
shouldRetrySend = false
)
val responseValues: Map<FlowSession, SerializedBytes<Any>> = flowStateMachine.suspend(request, maySkipCheckpoint)
val responseForCurrentSession = responseValues.getValue(this)
return responseForCurrentSession.checkPayloadIs(receiveType)
}
}
@Suspendable
@ -62,9 +67,11 @@ class FlowSessionImpl(
@Suspendable
override fun <R : Any> receive(receiveType: Class<R>, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
enforceNotPrimitive(receiveType)
val request = FlowIORequest.Receive(NonEmptySet.of(this))
return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this).checkPayloadIs(receiveType)
flowStateMachine.serviceHub.telemetryServiceInternal.span("${this::class.java.name}#receive", telemetryMap, flowStateMachine.logic ) {
enforceNotPrimitive(receiveType)
val request = FlowIORequest.Receive(NonEmptySet.of(this))
return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this).checkPayloadIs(receiveType)
}
}
@Suspendable
@ -72,10 +79,12 @@ class FlowSessionImpl(
@Suspendable
override fun send(payload: Any, maySkipCheckpoint: Boolean) {
val request = FlowIORequest.Send(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT))
)
return flowStateMachine.suspend(request, maySkipCheckpoint)
flowStateMachine.serviceHub.telemetryServiceInternal.span("${this::class.java.name}#send", telemetryMap, flowStateMachine.logic) {
val request = FlowIORequest.Send(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT))
)
return flowStateMachine.suspend(request, maySkipCheckpoint)
}
}
@Suspendable

View File

@ -36,6 +36,9 @@ import net.corda.core.internal.isRegularFile
import net.corda.core.internal.location
import net.corda.core.internal.toPath
import net.corda.core.internal.uncheckedCast
import net.corda.core.internal.telemetry.ComponentTelemetryIds
import net.corda.core.internal.telemetry.SerializedTelemetry
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
@ -71,7 +74,8 @@ class TransientReference<out A>(@Transient val value: A)
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override val logic: FlowLogic<R>,
scheduler: FiberScheduler,
override val creationTime: Long = System.currentTimeMillis()
override val creationTime: Long = System.currentTimeMillis(),
val serializedTelemetry: SerializedTelemetry? = null
) : Fiber<Unit>(id.toString(), scheduler), FlowStateMachine<R>, FlowFiber {
companion object {
/**
@ -346,8 +350,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
// Needed because in previous versions of the finance app we used Thread.contextClassLoader to resolve services defined in cordapps.
Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader
val result = logic.call()
suspend(FlowIORequest.WaitForSessionConfirmations(), maySkipCheckpoint = true)
// context.serializedTelemetry is from an rpc client, serializedTelemetry is from a peer, otherwise nothing
val serializedTelemetrySrc = context.serializedTelemetry ?: serializedTelemetry
val result = serviceHub.telemetryServiceInternal.spanForFlow(logic.javaClass.name, emptyMap(), logic, serializedTelemetrySrc) {
val ret = logic.call()
// Note suspend stores the telemetry ids back in the components from checkpoint, so must be done, before we end the span
suspend(FlowIORequest.WaitForSessionConfirmations(), maySkipCheckpoint = true)
ret
}
Try.Success(result)
} catch (t: Throwable) {
if(t.isUnrecoverable()) {
@ -417,8 +427,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
isDbTransactionOpenOnExit = true
)
return try {
subFlow.call()
} finally {
serviceHub.telemetryServiceInternal.span(subFlow.javaClass.name, emptyMap(), subFlow) {
subFlow.call()
}
}
finally {
processEventImmediately(
Event.LeaveSubFlow,
isDbTransactionOpenOnEntry = true,
@ -457,10 +470,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession {
override fun initiateFlow(destination: Destination, wellKnownParty: Party, serializedTelemetry: SerializedTelemetry?): FlowSession {
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
val resume = processEventImmediately(
Event.InitiateFlow(destination, wellKnownParty),
Event.InitiateFlow(destination, wellKnownParty, serializedTelemetry),
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = true
) as FlowContinuation.Resume
@ -527,6 +540,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override fun <R : Any> suspend(ioRequest: FlowIORequest<R>, maySkipCheckpoint: Boolean): R {
val serializationContext = TransientReference(transientValues.checkpointSerializationContext)
val transaction = extractThreadLocalTransaction()
val telemetryIds = retrieveTelemetryIds()
parkAndSerialize { _, _ ->
setLoggingContext()
logger.trace { "Suspended on $ioRequest" }
@ -563,6 +577,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
storeTelemetryIds(telemetryIds)
transientState.reloadCheckpointAfterSuspendCount?.let { count ->
if (count < transientState.checkpoint.checkpointState.numberOfSuspends) {
onReloadFlowFromCheckpoint?.invoke(id)
@ -580,6 +595,16 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
))
}
private fun retrieveTelemetryIds(): ComponentTelemetryIds? {
return serviceHub.telemetryServiceInternal.getCurrentTelemetryIds()
}
private fun storeTelemetryIds(telemetryIds: ComponentTelemetryIds?) {
telemetryIds?.let {
serviceHub.telemetryServiceInternal.setCurrentTelemetryId(it)
}
}
private fun containsIdempotentFlows(): Boolean {
val subFlowStack = snapshot().checkpoint.checkpointState.subFlowStack
return subFlowStack.any { IdempotentFlow::class.java.isAssignableFrom(it.flowClass) }

View File

@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowInfo
import net.corda.core.internal.telemetry.SerializedTelemetry
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializedBytes
import java.security.SecureRandom
@ -38,6 +39,7 @@ data class SessionId(val toLong: Long) {
* @param flowVersion the version of the initiating flow.
* @param appName the name of the cordapp defining the initiating flow, or "corda" if it's a core flow.
* @param firstPayload the optional first payload.
* @param serializedTelemetry the telemetry data
*/
data class InitialSessionMessage(
val initiatorSessionId: SessionId,
@ -45,7 +47,8 @@ data class InitialSessionMessage(
val initiatorFlowClassName: String,
val flowVersion: Int,
val appName: String,
val firstPayload: SerializedBytes<Any>?
val firstPayload: SerializedBytes<Any>?,
val serializedTelemetry: SerializedTelemetry?
) : SessionMessage() {
override fun toString() = "InitialSessionMessage(" +
"initiatorSessionId=$initiatorSessionId, " +
@ -53,6 +56,7 @@ data class InitialSessionMessage(
"initiatorFlowClassName=$initiatorFlowClassName, " +
"appName=$appName, " +
"firstPayload=${firstPayload?.javaClass}" +
"telemetryContext=$serializedTelemetry" +
")"
}

View File

@ -863,7 +863,7 @@ internal class SingleThreadedStateMachineManager(
try {
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
val initiatedSessionId = SessionId.createRandom(secureRandom)
val senderSession = FlowSessionImpl(sender, sender, initiatedSessionId)
val senderSession = FlowSessionImpl(sender, sender, initiatedSessionId, sessionMessage.serializedTelemetry)
val flowLogic = initiatedFlowFactory.createFlow(senderSession)
val initiatedFlowInfo = when (initiatedFlowFactory) {
is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda")

View File

@ -7,6 +7,7 @@ import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.telemetry.SerializedTelemetry
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toNonEmptySet
@ -74,7 +75,7 @@ class StartedFlowTransition(
return builder {
// Initialise uninitialised sessions in order to receive the associated FlowInfo. Some or all sessions may
// not be initialised yet.
sendInitialSessionMessagesIfNeeded(sessionIdToSession.keys)
sendInitialSessionMessagesIfNeeded(sessionIdToSession)
val flowInfoMap = getFlowInfoFromSessions(sessionIdToSession)
if (flowInfoMap == null) {
FlowContinuation.ProcessEvents
@ -140,7 +141,7 @@ class StartedFlowTransition(
sessionIdToSession[sessionId] = session
}
return builder {
sendToSessionsTransition(sessionIdToMessage)
sendToSessionsTransition(sessionIdToMessage, sessionIdToSession)
if (isErrored()) {
FlowContinuation.ProcessEvents
} else {
@ -202,7 +203,7 @@ class StartedFlowTransition(
sessionIdToSession[(session as FlowSessionImpl).sourceSessionId] = session
}
// send initialises to uninitialised sessions
sendInitialSessionMessagesIfNeeded(sessionIdToSession.keys)
sendInitialSessionMessagesIfNeeded(sessionIdToSession)
try {
val receivedMap = receiveFromSessionsTransition(sessionIdToSession)
if (receivedMap == null) {
@ -272,11 +273,11 @@ class StartedFlowTransition(
}
}
private fun TransitionBuilder.sendInitialSessionMessagesIfNeeded(sourceSessions: Set<SessionId>) {
private fun TransitionBuilder.sendInitialSessionMessagesIfNeeded(sessionIdToSession: Map<SessionId, FlowSessionImpl>) {
val checkpoint = startingState.checkpoint
val newSessions = LinkedHashMap<SessionId, SessionState>(checkpoint.checkpointState.sessions)
var index = 0
for (sourceSessionId in sourceSessions) {
for (sourceSessionId in sessionIdToSession.keys) {
val sessionState = checkpoint.checkpointState.sessions[sourceSessionId]
if (sessionState == null) {
return freshErrorTransition(CannotFindSessionException(sourceSessionId))
@ -284,7 +285,8 @@ class StartedFlowTransition(
if (sessionState !is SessionState.Uninitiated) {
continue
}
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, sessionState.additionalEntropy, null)
val telemetryData = sessionIdToSession[sourceSessionId]?.serializedTelemetry
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, sessionState.additionalEntropy, null, telemetryData)
val newSessionState = SessionState.Initiating(
bufferedMessages = arrayListOf(),
rejectionError = null,
@ -302,7 +304,8 @@ class StartedFlowTransition(
val sessionIdToMessage = flowIORequest.sessionToMessage.mapKeys {
sessionToSessionId(it.key)
}
sendToSessionsTransition(sessionIdToMessage)
val sessionIdToSession = flowIORequest.sessionToMessage.map { sessionToSessionId(it.key) to it.key as FlowSessionImpl}.toMap()
sendToSessionsTransition(sessionIdToMessage, sessionIdToSession)
if (isErrored()) {
FlowContinuation.ProcessEvents
} else {
@ -311,7 +314,8 @@ class StartedFlowTransition(
}
}
private fun TransitionBuilder.sendToSessionsTransition(sourceSessionIdToMessage: Map<SessionId, SerializedBytes<Any>>) {
private fun TransitionBuilder.sendToSessionsTransition(sourceSessionIdToMessage: Map<SessionId, SerializedBytes<Any>>, sourceSessionIdToSession: Map<SessionId, FlowSessionImpl
>) {
val checkpoint = startingState.checkpoint
val newSessions = LinkedHashMap(checkpoint.checkpointState.sessions)
var index = 0
@ -323,7 +327,8 @@ class StartedFlowTransition(
val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.map { (sourceSessionId, sessionState, message) ->
val uninitiatedSessionState = sessionState as SessionState.Uninitiated
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, sessionState)
val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, uninitiatedSessionState.additionalEntropy, message)
val serializedTelemetry: SerializedTelemetry? = sourceSessionIdToSession[sourceSessionId]?.serializedTelemetry
val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, uninitiatedSessionState.additionalEntropy, message, serializedTelemetry)
newSessions[sourceSessionId] = SessionState.Initiating(
bufferedMessages = arrayListOf(),
rejectionError = null,
@ -495,7 +500,8 @@ class StartedFlowTransition(
initiatingSubFlow: SubFlow.Initiating,
sourceSessionId: SessionId,
additionalEntropy: Long,
payload: SerializedBytes<Any>?
payload: SerializedBytes<Any>?,
serializedTelemetry: SerializedTelemetry?
): InitialSessionMessage {
return InitialSessionMessage(
initiatorSessionId = sourceSessionId,
@ -504,7 +510,8 @@ class StartedFlowTransition(
initiatorFlowClassName = initiatingSubFlow.classToInitiateWith.name,
flowVersion = initiatingSubFlow.flowInfo.flowVersion,
appName = initiatingSubFlow.flowInfo.appName,
firstPayload = payload
firstPayload = payload,
serializedTelemetry = serializedTelemetry
)
}

View File

@ -305,7 +305,7 @@ class TopLevelTransition(
return@builder FlowContinuation.ProcessEvents
}
val sourceSessionId = SessionId.createRandom(context.secureRandom)
val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId)
val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId, event.serializedTelemetry)
val newSessions = checkpoint.checkpointState.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
currentState = currentState.copy(checkpoint = checkpoint.setSessions(newSessions))
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))

View File

@ -25,3 +25,8 @@ rpcSettings = {
trustStorePassword = "trustpass"
useTestClock = false
verifierType = InMemory
telemetry {
openTelemetryEnabled = true,
simpleLogTelemetryEnabled = false,
spanStartEndEventsEnabled = true
}

View File

@ -4,6 +4,7 @@ import com.nhaarman.mockito_kotlin.atLeast
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.identity.CordaX500Name
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.VersionInfo
@ -134,6 +135,8 @@ class NodeH2SecurityTests {
whenever(config.dataSourceProperties).thenReturn(hikaryProperties)
whenever(config.baseDirectory).thenReturn(mock())
whenever(config.effectiveH2Settings).thenAnswer { NodeH2Settings(address) }
whenever(config.telemetry).thenReturn(mock())
whenever(config.myLegalName).thenReturn(CordaX500Name(null, "client-${address.toString()}", "Corda", "London", null, "GB"))
}
private inner class MockNode: Node(config, VersionInfo.UNKNOWN, false) {

View File

@ -196,6 +196,7 @@ class NodeTest {
rpcUsers = emptyList(),
verifierType = VerifierType.InMemory,
flowTimeout = FlowTimeoutConfiguration(timeout = Duration.ZERO, backoffBase = 1.0, maxRestartCount = 1),
telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true),
rpcSettings = NodeRpcSettings(address = fakeAddress, adminAddress = null, ssl = null),
messagingServerAddress = null,
notary = null,

View File

@ -0,0 +1,328 @@
package net.corda.node.internal.telemetry
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow
import net.corda.core.internal.telemetry.EndSpanEvent
import net.corda.core.internal.telemetry.EndSpanForFlowEvent
import net.corda.core.internal.telemetry.RecordExceptionEvent
import net.corda.core.internal.telemetry.SetStatusEvent
import net.corda.core.internal.telemetry.StartSpanEvent
import net.corda.core.internal.telemetry.StartSpanForFlowEvent
import net.corda.core.internal.telemetry.TelemetryComponent
import net.corda.core.internal.telemetry.TelemetryDataItem
import net.corda.core.internal.telemetry.TelemetryEvent
import net.corda.core.internal.telemetry.TelemetryStatusCode
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.finance.flows.CashPaymentReceiverFlow
import net.corda.node.services.Permissions
import net.corda.node.services.config.NodeConfiguration
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.InProcess
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Test
import java.time.Duration
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue
class TelemetryTests {
private companion object {
val cordapps = listOf(enclosedCordapp())
}
@Suspendable
data class TestTelemetryItem(val name: String, val randomUUID: UUID): TelemetryDataItem
@Test(timeout = 300_000)
fun `test passing a block with suspend to span func`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val alice = startNode().getOrThrow()
val handle = alice.rpc.startFlow(TelemetryTests::FlowWithSpanCallAndSleep)
handle.returnValue.getOrThrow()
// assertion for test is in Component function setCurrentTelemetryId below.
}
}
@Test(timeout = 300_000)
fun `run flow with a suspend then check thread locals for fibre are the same`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val alice = startNode().getOrThrow()
val handle = alice.rpc.startFlow(TelemetryTests::FlowWithSleep)
handle.returnValue.getOrThrow()
// assertion for test is in Component function setCurrentTelemetryId below.
}
}
@Test(timeout = 300_000)
fun `can find 2 distinct telemetry components on node`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val alice = startNode().getOrThrow()
val telemetryComponentAlice: TestCordaTelemetryComponent = (alice as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java)
val telemetryComponent2Alice: TestCordaTelemetryComponent2 = alice.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java)
assertEquals(TestCordaTelemetryComponent::class.java, telemetryComponentAlice::class.java)
assertEquals(TestCordaTelemetryComponent2::class.java, telemetryComponent2Alice::class.java)
}
}
@Test(timeout = 300_000)
fun `telemetryId is restored after flow is reloaded from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val alice = startNode(
providedName = ALICE_NAME,
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)).getOrThrow()
val telemetryComponentAlice: TestCordaTelemetryComponent = (alice as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java)
val telemetryComponent2Alice: TestCordaTelemetryComponent2 = alice.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java)
telemetryComponentAlice.restoredFromCheckpoint = true
telemetryComponent2Alice.restoredFromCheckpoint = true
val handle = alice.rpc.startFlow(TelemetryTests::FlowWithSleep)
handle.returnValue.getOrThrow()
// assertion for test is in Component function setCurrentTelemetryId below.
}
}
@Test(timeout=300_000)
fun `run flow and check telemetry components invoked`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS + cordapps, startNodesInProcess = true)) {
val (nodeA, nodeB) = listOf(startNode(rpcUsers = listOf(user)),
startNode())
.transpose()
.getOrThrow()
val amount = 1.DOLLARS
val ref = OpaqueBytes.of(0)
val recipient = nodeB.nodeInfo.legalIdentities[0]
nodeA.rpc.startFlow(::CashIssueAndPaymentFlow, amount, ref, recipient, false, defaultNotaryIdentity).returnValue.getOrThrow()
val flowName = CashIssueAndPaymentFlow::class.java.name
val receiverFlowName = CashPaymentReceiverFlow::class.java.name
checkTelemetryComponentCounts(flowName, (nodeA as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java))
checkTelemetryComponentCounts(flowName, nodeA.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java))
val nodeBTelemetryComponent = (nodeB as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java)
val nodeBTelemetryComponent2 = nodeB.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java)
assertTrue(nodeBTelemetryComponent.endSpanForFlowLatch.await(1, TimeUnit.MINUTES), "Timed out waiting for endSpanForFlow operation on node B")
assertTrue(nodeBTelemetryComponent2.endSpanForFlowLatch.await(1, TimeUnit.MINUTES), "Timed out waiting for endSpanForFlow operation on node B")
checkTelemetryComponentCounts(receiverFlowName, nodeB.services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java))
checkTelemetryComponentCounts(receiverFlowName, nodeB.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java))
}
}
private fun checkTelemetryComponentCounts(flowName: String, telemetryComponent: TelemetryComponentCounts) {
assertEquals(1, telemetryComponent.startSpanForFlowEvent)
assertEquals(1, telemetryComponent.endSpanForFlowEvent)
assertTrue(telemetryComponent.startSpanEvent > 0)
assertEquals(telemetryComponent.startSpanEvent, telemetryComponent.endSpanEvent)
assertEquals(flowName, telemetryComponent.startSpanForFlowName)
}
@Test(timeout=300_000)
fun `telemetry data is sent from nodeA to nodeB as part of a flow`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS + cordapps, startNodesInProcess = true)) {
val (nodeA, nodeB) = listOf(startNode(rpcUsers = listOf(user)),
startNode())
.transpose()
.getOrThrow()
val amount = 1.DOLLARS
val ref = OpaqueBytes.of(0)
val recipient = nodeB.nodeInfo.legalIdentities[0]
nodeA.rpc.startFlow(::CashIssueAndPaymentFlow, amount, ref, recipient, false, defaultNotaryIdentity).returnValue.getOrThrow()
val telemetryComponentNodeA = (nodeA as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java)
val telemetryComponentNodeB = (nodeB as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java)
assertNull(telemetryComponentNodeA.retrievedTelemetryDataItem)
assertEquals(telemetryComponentNodeA.dummyTelemetryItem, telemetryComponentNodeB.retrievedTelemetryDataItem)
}
}
class TestCordaTelemetryComponent: TelemetryComponentCounts() {
override fun name(): String {
return "TestCordaTelemetryComponent"
}
}
class TestCordaTelemetryComponent2: TelemetryComponentCounts() {
override fun name(): String {
return "TestCordaTelemetryComponent2"
}
}
abstract class TelemetryComponentCounts: TelemetryComponent {
var restoredFromCheckpoint = false
val endSpanForFlowLatch = CountDownLatch(1)
var retrievedTelemetryDataItem: TelemetryDataItem? = null
var startSpanForFlowEvent = 0
var endSpanForFlowEvent = 0
var startSpanEvent = 0
var endSpanEvent = 0
var setStatusEvent = 0
var recordExceptionEvent = 0
var startSpanForFlowName: String? = null
var startSpanName: String? = null
var spanTelemetryIds = ArrayDeque<UUID>()
val currentUUID = ThreadLocal<UUID>()
var previousTelemetryId: UUID? = null
val dummyTelemetryItem = TestTelemetryItem("this is a dummy string", UUID.randomUUID())
override fun getCurrentSpanId(): String {
TODO("Not yet implemented")
}
override fun getCurrentTraceId(): String {
TODO("Not yet implemented")
}
override fun getCurrentBaggage(): Map<String, String> {
TODO("Not yet implemented")
}
override fun isEnabled(): Boolean {
return true
}
override fun setCurrentTelemetryId(id: UUID) {
if (!restoredFromCheckpoint) {
// If we have not been restored from a checkpoint then the threadlocal should be the same
// as it was, so check this.
// If we have been restored from a checkpoint then the threadlocal will be null. So skip this step if we have.
assertEquals(currentUUID.get(), id)
}
// Here threadlocal will be null as we have been check pointed.
// Check the uuid passed to us is the same as the one we returned earlier.
assertEquals(previousTelemetryId, id)
currentUUID.set(id)
}
override fun getCurrentTelemetryId(): UUID {
val uuid = currentUUID.get() ?: UUID(0, 0)
// Store the uuid we return so can check for same value after the checkpoint
previousTelemetryId = uuid
return uuid
}
override fun onTelemetryEvent(event: TelemetryEvent) {
when (event) {
is StartSpanForFlowEvent -> {
startSpanForFlowEvent++
startSpanForFlow(event.name, event.attributes, event.telemetryId, event.flowLogic, event.telemetryDataItem)
}
is EndSpanForFlowEvent -> {
endSpanForFlowEvent++
endSpanForFlow(event.telemetryId)
endSpanForFlowLatch.countDown()
}
is StartSpanEvent -> {
startSpanEvent++
startSpan(event.name, event.attributes, event.telemetryId, event.flowLogic)
}
is EndSpanEvent -> {
endSpanEvent++
endSpan(event.telemetryId)
}
is SetStatusEvent -> {
setStatusEvent++
setStatus(event.telemetryId, event.telemetryStatusCode, event.message)
}
is RecordExceptionEvent -> {
recordExceptionEvent++
recordException(event.telemetryId, event.throwable)
}
}
}
@Suppress("UNUSED_PARAMETER")
fun startSpanForFlow(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?, telemetryDataItem: TelemetryDataItem?) {
retrievedTelemetryDataItem = telemetryDataItem
startSpanForFlowName = name
spanTelemetryIds.push(telemetryId)
currentUUID.set(telemetryId)
}
fun endSpanForFlow(telemetryId: UUID) {
assertEquals(spanTelemetryIds.pop(), telemetryId)
val newUUID = spanTelemetryIds.peek() ?: UUID(0, 0)
currentUUID.set(newUUID)
}
@Suppress("UNUSED_PARAMETER")
fun startSpan(name: String, attributes: Map<String, String>, telemetryId: UUID, flowLogic: FlowLogic<*>?) {
startSpanName = name
spanTelemetryIds.push(telemetryId)
currentUUID.set(telemetryId)
}
fun endSpan(telemetryId: UUID) {
assertEquals(spanTelemetryIds.pop(), telemetryId)
val newUUID = spanTelemetryIds.peek() ?: UUID(0, 0)
currentUUID.set(newUUID)
}
override fun getCurrentTelemetryData(): TelemetryDataItem {
return dummyTelemetryItem
}
@Suppress("UNUSED_PARAMETER")
fun setStatus(telemetryId: UUID, statusCode: TelemetryStatusCode, message: String) {
}
@Suppress("UNUSED_PARAMETER")
fun recordException(telemetryId: UUID, throwable: Throwable) {
}
}
@StartableByRPC
class FlowWithSleep : FlowLogic<InvocationContext>() {
companion object {
object TESTSTEP : ProgressTracker.Step("Custom progress step")
}
override val progressTracker: ProgressTracker = ProgressTracker(TESTSTEP)
@Suspendable
override fun call(): InvocationContext {
// Do a sleep which invokes a suspend
sleep(Duration.ofSeconds(1))
progressTracker.currentStep = TESTSTEP
return stateMachine.context
}
}
@StartableByRPC
class FlowWithSpanCallAndSleep : FlowLogic<InvocationContext>() {
companion object {
object TESTSTEP : ProgressTracker.Step("Custom progress step")
}
override val progressTracker: ProgressTracker = ProgressTracker(TESTSTEP)
@Suspendable
override fun call(): InvocationContext {
return serviceHub.telemetryServiceInternal.span(this::class.java.name, emptyMap(), this) {
// Do a sleep which invokes a suspend
sleep(Duration.ofSeconds(1))
progressTracker.currentStep = TESTSTEP
stateMachine.context
}
}
}
}

View File

@ -397,6 +397,7 @@ class NodeConfigurationImplTest {
p2pAddress = NetworkHostAndPort("localhost", 0),
messagingServerAddress = null,
flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true),
notary = null,
devMode = true,
noLocalShell = false,

View File

@ -571,7 +571,7 @@ class FlowFrameworkTests {
@Test(timeout=300_000)
fun `session init with unknown class is sent to the flow hospital, from where we then drop it`() {
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob)
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null, null), bob)
mockNet.runNetwork()
assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital
val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot
@ -587,7 +587,7 @@ class FlowFrameworkTests {
@Test(timeout=300_000)
fun `non-flow class in session init`() {
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, String::class.java.name, 1, "", null), bob)
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, String::class.java.name, 1, "", null, null), bob)
mockNet.runNetwork()
assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
@ -1103,7 +1103,7 @@ internal data class SessionTransfer(val from: Int, val message: SessionMessage,
}
internal fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): InitialSessionMessage {
return InitialSessionMessage(SessionId(0), 0, clientFlowClass.java.name, flowVersion, "", payload?.serialize())
return InitialSessionMessage(SessionId(0), 0, clientFlowClass.java.name, flowVersion, "", payload?.serialize(), serializedTelemetry = null)
}
internal fun sessionData(payload: Any) = ExistingSessionMessage(SessionId(0), DataSessionMessage(payload.serialize()))

View File

@ -0,0 +1,48 @@
import static org.gradle.api.JavaVersion.VERSION_1_8
plugins {
// Apply the org.jetbrains.kotlin.jvm Plugin to add support for Kotlin.
id 'org.jetbrains.kotlin.jvm' // version '1.6.21'
// Apply the java-library plugin for API and implementation separation.
id 'java-library'
id 'com.github.johnrengelman.shadow' // version '7.1.2'
id 'net.corda.plugins.publish-utils'
}
description 'OpenTelemetry Driver'
// This driver is required by core, so must always be 1.8. See core build.gradle.
targetCompatibility = VERSION_1_8
dependencies {
// Use the Kotlin JDK 8 standard library.
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:4.9.1'
implementation "io.opentelemetry:opentelemetry-api:$open_telemetry_version"
implementation platform("io.opentelemetry:opentelemetry-bom:$open_telemetry_version")
implementation "io.opentelemetry:opentelemetry-sdk"
implementation 'io.opentelemetry:opentelemetry-exporter-otlp'
implementation "io.opentelemetry:opentelemetry-semconv:$open_telemetry_sem_conv_version"
}
shadowJar {
archiveClassifier = jdkClassifier
relocate 'kotlin', 'privatekotlin'
exclude "**/Log4j2Plugins.dat"
zip64 true
}
artifacts {
archives shadowJar
publish shadowJar
}
jar {
enabled = false
}
publish {
disableDefaultJar = true
name 'corda-opentelemetry-driver'
}

View File

@ -0,0 +1,49 @@
package net.corda.opentelemetrydriver
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
object OpenTelemetryDriver {
private fun buildAndGetOpenTelemetry(serviceName: String): OpenTelemetry {
val resource: Resource = Resource.getDefault()
.merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, serviceName)))
val sdkTracerProvider: SdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(OtlpGrpcSpanExporter.builder().build()).build())
.setResource(resource)
.build()
val sdkMeterProvider: SdkMeterProvider = SdkMeterProvider.builder()
.registerMetricReader(PeriodicMetricReader.builder(OtlpGrpcMetricExporter.builder().build()).build())
.setResource(resource)
.build()
return OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setMeterProvider(sdkMeterProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal()
}
@Volatile
private var OPENTELEMETRY_INSTANCE: OpenTelemetry? = null
fun getOpenTelemetry(serviceName: String): OpenTelemetry {
return OPENTELEMETRY_INSTANCE ?: synchronized(this) {
OPENTELEMETRY_INSTANCE ?: buildAndGetOpenTelemetry(serviceName).also {
OPENTELEMETRY_INSTANCE = it
}
}
}
}

View File

@ -28,6 +28,7 @@ pluginManagement {
// The project is named 'corda-project' and not 'corda' because if this is named the same as the
// output JAR from the capsule then the buildCordaJAR task goes into an infinite loop.
rootProject.name = 'corda-project'
include 'opentelemetry-driver'
include 'confidential-identities'
include 'finance:contracts'
include 'finance:workflows'

View File

@ -20,6 +20,8 @@ import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.node.services.diagnostics.DiagnosticsService
import net.corda.core.internal.telemetry.TelemetryComponent
import net.corda.core.internal.telemetry.TelemetryServiceImpl
import net.corda.core.node.services.vault.CordaTransactionSupport
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.transactions.SignedTransaction
@ -213,7 +215,7 @@ open class MockServices private constructor(
TestingNamedCacheFactory(),
identityService,
persistence,
MockCryptoService(aliasKeyMap)
MockCryptoService(aliasKeyMap), TelemetryServiceImpl()
)
persistence.transaction { keyManagementService.start(aliasedMoreKeys + aliasedIdentityKey) }
@ -438,6 +440,7 @@ open class MockServices private constructor(
override val vaultService: VaultService get() = throw UnsupportedOperationException()
override val contractUpgradeService: ContractUpgradeService get() = throw UnsupportedOperationException()
override val networkMapCache: NetworkMapCache get() = throw UnsupportedOperationException()
override val telemetryService: TelemetryServiceImpl get() = throw java.lang.UnsupportedOperationException()
override val clock: TestClock get() = TestClock(Clock.systemUTC())
override val myInfo: NodeInfo
get() {
@ -467,12 +470,19 @@ open class MockServices private constructor(
/** A map of available [CordaService] implementations */
internal val cordappServices: MutableClassToInstanceMap<SerializeAsToken> = MutableClassToInstanceMap.create<SerializeAsToken>()
internal val cordappTelemetryComponents: MutableClassToInstanceMap<TelemetryComponent> = MutableClassToInstanceMap.create<TelemetryComponent>()
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }
return cordappServices.getInstance(type)
?: throw IllegalArgumentException("Corda service ${type.name} does not exist")
}
override fun <T : TelemetryComponent> cordaTelemetryComponent(type: Class<T>): T {
return cordappTelemetryComponents.getInstance(type)
?: throw IllegalArgumentException("Corda telemetry component ${type.name} does not exist")
}
override fun jdbcSession(): Connection = throw UnsupportedOperationException()
override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T {

View File

@ -27,6 +27,7 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.NotaryInfo
import net.corda.core.internal.telemetry.TelemetryServiceImpl
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
@ -46,6 +47,7 @@ import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.NetworkParameterAcceptanceSettings
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.TelemetryConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.BasicHSMKeyManagementService
@ -409,7 +411,7 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
}
override fun makeKeyManagementService(identityService: PersistentIdentityService): KeyManagementServiceInternal {
return BasicHSMKeyManagementService(cacheFactory, identityService, database, cryptoService)
return BasicHSMKeyManagementService(cacheFactory, identityService, database, cryptoService, TelemetryServiceImpl())
}
override fun startShell() {
@ -490,6 +492,7 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
doReturn(emptyList<SecureHash>()).whenever(it).extraNetworkMapKeys
doReturn(listOf(baseDirectory / "cordapps")).whenever(it).cordappDirectories
doReturn(emptyList<String>()).whenever(it).quasarExcludePackages
doReturn(TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true)).whenever(it).telemetry
parameters.configOverrides(it)
}