From 7a133f687cb112514f511deeeb85d525e9385967 Mon Sep 17 00:00:00 2001 From: Adel El-Beik <48713346+adelel1@users.noreply.github.com> Date: Fri, 28 Oct 2022 14:41:39 +0100 Subject: [PATCH] ENT-6893: First cut of telemetry integration. (#7247) First cut of telemetry integration. Open telemetry can be enabled in two ways, first is via an opentelemetry java agent specified on the command line. With this way you get the advantage of spans created from other libraries, like hibernate. The java agent does byte code rewriting to insert spans. The second way is with the open telemetry driver (that links with the opentelemetry sdk). This is a fat jar provided with this project and needs to go into the node drivers directory. --- .ci/api-current.txt | 45 ++- build.gradle | 4 +- .../net/corda/client/rpc/CordaRPCClient.kt | 64 +++- .../net/corda/client/rpc/RPCConnection.kt | 6 + .../corda/client/rpc/internal/RPCClient.kt | 10 +- .../rpc/internal/RPCClientProxyHandler.kt | 12 +- .../client/rpc/internal/RPCClientTelemetry.kt | 42 +++ .../rpc/internal/ReconnectingCordaRPCOps.kt | 4 + constants.properties | 2 + core-deterministic/build.gradle | 3 + core/build.gradle | 6 +- .../corda/core/context/InvocationContext.kt | 47 ++- .../kotlin/net/corda/core/cordapp/Cordapp.kt | 2 + .../net/corda/core/flows/FinalityFlow.kt | 27 +- .../kotlin/net/corda/core/flows/FlowLogic.kt | 17 +- .../corda/core/internal/FlowStateMachine.kt | 3 +- .../core/internal/cordapp/CordappImpl.kt | 3 + .../core/internal/notary/NotaryServiceFlow.kt | 18 +- .../telemetry/OpenTelemetryComponent.kt | 309 +++++++++++++++++ .../internal/telemetry/OpenTelemetryHandle.kt | 3 + .../telemetry/SimpleLogTelemetryComponent.kt | 126 +++++++ .../telemetry/TelemetryServiceImpl.kt | 235 +++++++++++++ .../kotlin/net/corda/core/node/ServiceHub.kt | 13 + .../core/node/services/TelemetryService.kt | 9 + .../main/kotlin/net/corda/nodeapi/RPCApi.kt | 36 +- node/build.gradle | 4 +- node/capsule/build.gradle | 2 +- .../net/corda/node/internal/AbstractNode.kt | 80 ++++- .../cordapp/JarScanningCordappLoader.kt | 15 + .../node/internal/cordapp/VirtualCordapps.kt | 4 + .../node/services/config/NodeConfiguration.kt | 7 + .../services/config/NodeConfigurationImpl.kt | 2 + .../config/schema/v1/ConfigSections.kt | 12 + .../schema/v1/V1NodeConfigurationSpec.kt | 2 + .../keys/BasicHSMKeyManagementService.kt | 18 +- .../net/corda/node/services/rpc/RPCServer.kt | 4 +- .../corda/node/services/statemachine/Event.kt | 3 +- .../node/services/statemachine/FlowCreator.kt | 4 +- .../services/statemachine/FlowSessionImpl.kt | 43 ++- .../statemachine/FlowStateMachineImpl.kt | 39 ++- .../services/statemachine/SessionMessage.kt | 6 +- .../SingleThreadedStateMachineManager.kt | 2 +- .../transitions/StartedFlowTransition.kt | 29 +- .../transitions/TopLevelTransition.kt | 2 +- node/src/main/resources/corda-reference.conf | 5 + .../node/internal/NodeH2SecurityTests.kt | 3 + .../net/corda/node/internal/NodeTest.kt | 1 + .../node/internal/telemetry/TelemetryTests.kt | 328 ++++++++++++++++++ .../config/NodeConfigurationImplTest.kt | 1 + .../statemachine/FlowFrameworkTests.kt | 6 +- opentelemetry-driver/build.gradle | 48 +++ .../OpenTelemetryDriver.kt | 49 +++ settings.gradle | 1 + .../net/corda/testing/node/MockServices.kt | 12 +- .../node/internal/InternalMockNetwork.kt | 5 +- 55 files changed, 1675 insertions(+), 108 deletions(-) create mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientTelemetry.kt create mode 100644 core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryComponent.kt create mode 100644 core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryHandle.kt create mode 100644 core/src/main/kotlin/net/corda/core/internal/telemetry/SimpleLogTelemetryComponent.kt create mode 100644 core/src/main/kotlin/net/corda/core/internal/telemetry/TelemetryServiceImpl.kt create mode 100644 core/src/main/kotlin/net/corda/core/node/services/TelemetryService.kt create mode 100644 node/src/test/kotlin/net/corda/node/internal/telemetry/TelemetryTests.kt create mode 100644 opentelemetry-driver/build.gradle create mode 100644 opentelemetry-driver/src/main/kotlin/net/corda/opentelemetrydriver/OpenTelemetryDriver.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index f189b97fea..fafb54a2c0 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -157,6 +157,8 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob public (net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, int, kotlin.jvm.internal.DefaultConstructorMarker) public (net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String) public (net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String, int, kotlin.jvm.internal.DefaultConstructorMarker) + public (net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String, net.corda.core.internal.telemetry.SerializedTelemetry) + public (net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String, net.corda.core.internal.telemetry.SerializedTelemetry, int, kotlin.jvm.internal.DefaultConstructorMarker) @NotNull public final net.corda.core.context.InvocationOrigin component1() @NotNull @@ -171,10 +173,14 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob public final java.util.List component6() @Nullable public final String component7() + @Nullable + public final net.corda.core.internal.telemetry.SerializedTelemetry component8() @NotNull public final net.corda.core.context.InvocationContext copy(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor) @NotNull public final net.corda.core.context.InvocationContext copy(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String) + @NotNull + public final net.corda.core.context.InvocationContext copy(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String, net.corda.core.internal.telemetry.SerializedTelemetry) public boolean equals(Object) @Nullable public final net.corda.core.context.Actor getActor() @@ -188,6 +194,8 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob public final net.corda.core.context.Actor getImpersonatedActor() @NotNull public final net.corda.core.context.InvocationOrigin getOrigin() + @Nullable + public final net.corda.core.internal.telemetry.SerializedTelemetry getSerializedTelemetry() @NotNull public final net.corda.core.context.Trace getTrace() public int hashCode() @@ -206,6 +214,8 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob @NotNull public static final net.corda.core.context.InvocationContext newInstance(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String) @NotNull + public static final net.corda.core.context.InvocationContext newInstance(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String, net.corda.core.internal.telemetry.SerializedTelemetry) + @NotNull public static final net.corda.core.context.InvocationContext peer(net.corda.core.identity.CordaX500Name, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor) @NotNull public final java.security.Principal principal() @@ -220,6 +230,8 @@ public final class net.corda.core.context.InvocationContext extends java.lang.Ob @NotNull public static final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List) @NotNull + public static final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, net.corda.core.internal.telemetry.SerializedTelemetry) + @NotNull public static final net.corda.core.context.InvocationContext scheduled(net.corda.core.contracts.ScheduledStateRef, net.corda.core.context.Trace, net.corda.core.context.Trace) @NotNull public static final net.corda.core.context.InvocationContext service(String, net.corda.core.identity.CordaX500Name, net.corda.core.context.Trace, net.corda.core.context.Trace) @@ -246,6 +258,8 @@ public static final class net.corda.core.context.InvocationContext$Companion ext @NotNull public final net.corda.core.context.InvocationContext newInstance(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String) @NotNull + public final net.corda.core.context.InvocationContext newInstance(net.corda.core.context.InvocationOrigin, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, String, net.corda.core.internal.telemetry.SerializedTelemetry) + @NotNull public final net.corda.core.context.InvocationContext peer(net.corda.core.identity.CordaX500Name, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor) @NotNull public final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor) @@ -258,6 +272,8 @@ public static final class net.corda.core.context.InvocationContext$Companion ext @NotNull public final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List) @NotNull + public final net.corda.core.context.InvocationContext rpc(net.corda.core.context.Actor, net.corda.core.context.Trace, net.corda.core.context.Trace, net.corda.core.context.Actor, java.util.List, net.corda.core.internal.telemetry.SerializedTelemetry) + @NotNull public final net.corda.core.context.InvocationContext scheduled(net.corda.core.contracts.ScheduledStateRef, net.corda.core.context.Trace, net.corda.core.context.Trace) @NotNull public final net.corda.core.context.InvocationContext service(String, net.corda.core.identity.CordaX500Name, net.corda.core.context.Trace, net.corda.core.context.Trace) @@ -1379,6 +1395,8 @@ public interface net.corda.core.cordapp.Cordapp @NotNull public abstract java.util.List> getServices() public abstract int getTargetPlatformVersion() + @NotNull + public abstract java.util.List> getTelemetryComponents() ## @DoNotImplement public static interface net.corda.core.cordapp.Cordapp$Info @@ -4052,6 +4070,8 @@ public interface net.corda.core.node.ServiceHub extends net.corda.core.node.Serv @NotNull public abstract T cordaService(Class) @NotNull + public abstract T cordaTelemetryComponent(Class) + @NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction) @NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @@ -4074,6 +4094,8 @@ public interface net.corda.core.node.ServiceHub extends net.corda.core.node.Serv @NotNull public abstract net.corda.core.node.services.NetworkMapCache getNetworkMapCache() @NotNull + public abstract net.corda.core.node.services.TelemetryService getTelemetryService() + @NotNull public abstract net.corda.core.node.services.TransactionVerifierService getTransactionVerifierService() @NotNull public abstract net.corda.core.node.services.TransactionStorage getValidatedTransactions() @@ -4373,6 +4395,11 @@ public final class net.corda.core.node.services.StatesNotAvailableException exte @NotNull public String toString() ## +@DoNotImplement +public interface net.corda.core.node.services.TelemetryService + @Nullable + public abstract net.corda.core.internal.telemetry.OpenTelemetryHandle getOpenTelemetry() +## public final class net.corda.core.node.services.TimeWindowChecker extends java.lang.Object public () public (java.time.Clock) @@ -9052,6 +9079,8 @@ public class net.corda.testing.node.MockServices extends java.lang.Object implem @NotNull public T cordaService(Class) @NotNull + public T cordaTelemetryComponent(Class) + @NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction) @NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @@ -9088,6 +9117,8 @@ public class net.corda.testing.node.MockServices extends java.lang.Object implem @NotNull protected final net.corda.core.node.ServicesForResolution getServicesForResolution() @NotNull + public net.corda.core.internal.telemetry.TelemetryServiceImpl getTelemetryService() + @NotNull public net.corda.core.node.services.TransactionVerifierService getTransactionVerifierService() @NotNull public net.corda.core.node.services.TransactionStorage getValidatedTransactions() @@ -9357,7 +9388,10 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang. public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int) public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int) public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration) - public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, int, kotlin.jvm.internal.DefaultConstructorMarker) + public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean) + public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean) + public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean) + public (java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean, int, kotlin.jvm.internal.DefaultConstructorMarker) @NotNull public final java.time.Duration component1() @NotNull @@ -9384,6 +9418,8 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang. public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int) @NotNull public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration) + @NotNull + public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration, int, boolean, java.time.Duration, int, int, java.time.Duration, double, int, int, java.time.Duration, boolean, boolean, boolean) public boolean equals(Object) public int getCacheConcurrencyLevel() @NotNull @@ -9397,8 +9433,11 @@ public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang. public int getMaxReconnectAttempts() public int getMinimumServerProtocolVersion() public int getObservationExecutorPoolSize() + public boolean getOpenTelemetryEnabled() @NotNull public java.time.Duration getReapInterval() + public boolean getSimpleLogTelemetryEnabled() + public boolean getSpanStartEndEventsEnabled() public boolean getTrackRpcCallSites() public int hashCode() @NotNull @@ -9416,6 +9455,8 @@ public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Obj public (net.corda.client.rpc.RPCConnection, java.util.concurrent.ExecutorService, net.corda.client.rpc.internal.ReconnectingCordaRPCOps, kotlin.jvm.internal.DefaultConstructorMarker) public void close() public void forceClose() + @Nullable + public net.corda.core.internal.telemetry.OpenTelemetryHandle getOpenTelemetry() @NotNull public net.corda.core.messaging.CordaRPCOps getProxy() public int getServerProtocolVersion() @@ -9450,6 +9491,8 @@ public final class net.corda.client.rpc.PermissionException extends net.corda.co public interface net.corda.client.rpc.RPCConnection extends java.io.Closeable public abstract void close() public abstract void forceClose() + @Nullable + public abstract net.corda.core.internal.telemetry.OpenTelemetryHandle getOpenTelemetry() @NotNull public abstract I getProxy() public abstract int getServerProtocolVersion() diff --git a/build.gradle b/build.gradle index 1ad956bd92..12e27dbc7f 100644 --- a/build.gradle +++ b/build.gradle @@ -57,7 +57,8 @@ buildscript { ] ext.capsule_version = constants.getProperty("capsuleVersion") - + ext.open_telemetry_version = constants.getProperty("openTelemetryVersion") + ext.open_telemetry_sem_conv_version = constants.getProperty("openTelemetrySemConvVersion") ext.asm_version = constants.getProperty("asmVersion") ext.artemis_version = constants.getProperty("artemisVersion") ext.jackson_version = constants.getProperty("jacksonVersion") @@ -643,6 +644,7 @@ bintrayConfig { gpgSign = true gpgPassphrase = System.getenv('CORDA_BINTRAY_GPG_PASSPHRASE') publications = [ + 'corda-opentelemetry-driver', 'corda-jfx', 'corda-mock', 'corda-rpc', diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index d90befe6ae..7613ef9d8f 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -11,6 +11,7 @@ import net.corda.core.context.Trace import net.corda.core.identity.CordaX500Name import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.telemetry.OpenTelemetryHandle import net.corda.core.messaging.ClientRpcSslOptions import net.corda.core.messaging.CordaRPCOps import net.corda.core.serialization.SerializationCustomSerializer @@ -77,6 +78,10 @@ class CordaRPCConnection private constructor( override fun forceClose() = doCloseLogic { actualConnection.forceClose() } + override fun getOpenTelemetry(): OpenTelemetryHandle? { + return actualConnection.getOpenTelemetry() + } + private inline fun doCloseLogic(close: () -> Unit) { try { close.invoke() @@ -169,8 +174,13 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( /** * The cache expiry of a deduplication watermark per client. Default is 1 day. */ - open val deduplicationCacheExpiry: Duration = 1.days + open val deduplicationCacheExpiry: Duration = 1.days, + open val openTelemetryEnabled: Boolean = true, + + open val simpleLogTelemetryEnabled: Boolean = false, + + open val spanStartEndEventsEnabled: Boolean = true ) { companion object { @@ -214,7 +224,46 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( connectionRetryIntervalMultiplier, maxReconnectAttempts, maxFileSize, - deduplicationCacheExpiry + deduplicationCacheExpiry, + openTelemetryEnabled, + simpleLogTelemetryEnabled, + spanStartEndEventsEnabled + ) + } + + @Suppress("LongParameterList") + fun copy( + connectionMaxRetryInterval: Duration = this.connectionMaxRetryInterval, + minimumServerProtocolVersion: Int = this.minimumServerProtocolVersion, + trackRpcCallSites: Boolean = this.trackRpcCallSites, + reapInterval: Duration = this.reapInterval, + observationExecutorPoolSize: Int = this.observationExecutorPoolSize, + @Suppress("DEPRECATION") + cacheConcurrencyLevel: Int = this.cacheConcurrencyLevel, + connectionRetryInterval: Duration = this.connectionRetryInterval, + connectionRetryIntervalMultiplier: Double = this.connectionRetryIntervalMultiplier, + maxReconnectAttempts: Int = this.maxReconnectAttempts, + maxFileSize: Int = this.maxFileSize, + deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry, + openTelemetryEnabled: Boolean = this.openTelemetryEnabled, + simpleLogTelemetryEnabled: Boolean = this.simpleLogTelemetryEnabled, + spanStartEndEventsEnabled: Boolean = this.spanStartEndEventsEnabled + ): CordaRPCClientConfiguration { + return CordaRPCClientConfiguration( + connectionMaxRetryInterval, + minimumServerProtocolVersion, + trackRpcCallSites, + reapInterval, + observationExecutorPoolSize, + cacheConcurrencyLevel, + connectionRetryInterval, + connectionRetryIntervalMultiplier, + maxReconnectAttempts, + maxFileSize, + deduplicationCacheExpiry, + openTelemetryEnabled, + simpleLogTelemetryEnabled, + spanStartEndEventsEnabled ) } @@ -235,6 +284,9 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( if (maxReconnectAttempts != other.maxReconnectAttempts) return false if (maxFileSize != other.maxFileSize) return false if (deduplicationCacheExpiry != other.deduplicationCacheExpiry) return false + if (openTelemetryEnabled != other.openTelemetryEnabled) return false + if (simpleLogTelemetryEnabled != other.simpleLogTelemetryEnabled) return false + if (spanStartEndEventsEnabled != other.spanStartEndEventsEnabled) return false return true } @@ -252,6 +304,9 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( result = 31 * result + maxReconnectAttempts result = 31 * result + maxFileSize result = 31 * result + deduplicationCacheExpiry.hashCode() + result = 31 * result + openTelemetryEnabled.hashCode() + result = 31 * result + simpleLogTelemetryEnabled.hashCode() + result = 31 * result + spanStartEndEventsEnabled.hashCode() return result } @@ -264,7 +319,10 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( "cacheConcurrencyLevel=$cacheConcurrencyLevel, connectionRetryInterval=$connectionRetryInterval, " + "connectionRetryIntervalMultiplier=$connectionRetryIntervalMultiplier, " + "maxReconnectAttempts=$maxReconnectAttempts, maxFileSize=$maxFileSize, " + - "deduplicationCacheExpiry=$deduplicationCacheExpiry)" + "deduplicationCacheExpiry=$deduplicationCacheExpiry, " + + "openTelemetryEnabled=$openTelemetryEnabled, " + + "simpleLogTelemetryEnabled=$simpleLogTelemetryEnabled, " + + "spanStartEndEventsEnabled=$spanStartEndEventsEnabled)" } // Left in for backwards compatibility with version 3.1 diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/RPCConnection.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/RPCConnection.kt index 211e6a32f5..336d66e435 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/RPCConnection.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/RPCConnection.kt @@ -1,6 +1,7 @@ package net.corda.client.rpc import net.corda.core.DoNotImplement +import net.corda.core.internal.telemetry.OpenTelemetryHandle import net.corda.core.messaging.RPCOps import java.io.Closeable @@ -21,6 +22,11 @@ interface RPCConnection : Closeable { /** The RPC protocol version reported by the server. */ val serverProtocolVersion: Int + /** + * Returns the configured openTelemetry global. Returns null if opentelemetry has not been configured. + */ + fun getOpenTelemetry(): OpenTelemetryHandle? + /** * Closes this client gracefully by sending a notification to the server, so it can immediately clean up resources. * If the server is not available this method may block for a short period until it's clear the server is not diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 8458b4fb3f..d677cde23e 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -1,6 +1,7 @@ package net.corda.client.rpc.internal import net.corda.client.rpc.CordaRPCClientConfiguration +import net.corda.core.internal.telemetry.OpenTelemetryHandle import net.corda.client.rpc.RPCConnection import net.corda.client.rpc.UnrecoverableRPCException import net.corda.client.rpc.ext.RPCConnectionListener @@ -98,10 +99,13 @@ class RPCClient( // Without this any type of "send" time failures will not be delivered back to the client isBlockOnNonDurableSend = true } + val rpcClientTelemetry = RPCClientTelemetry("rpcClient-${targetLegalIdentity.toString()}", rpcConfiguration.openTelemetryEnabled, + rpcConfiguration.simpleLogTelemetryEnabled, rpcConfiguration.spanStartEndEventsEnabled) val sessionId = Trace.SessionId.newInstance() val distributionMux = DistributionMux(listeners, username) val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, - rpcOpsClass, serializationContext, sessionId, externalTrace, impersonatedActor, targetLegalIdentity, distributionMux) + rpcOpsClass, serializationContext, sessionId, externalTrace, impersonatedActor, targetLegalIdentity, distributionMux, + rpcClientTelemetry) try { proxyHandler.start() val ops: I = uncheckedCast(Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler)) @@ -118,6 +122,10 @@ class RPCClient( override val proxy = ops override val serverProtocolVersion = serverProtocolVersion + override fun getOpenTelemetry(): OpenTelemetryHandle? { + return rpcClientTelemetry.getOpenTelemetryHandle() + } + private fun close(notify: Boolean) { if (notify) { proxyHandler.notifyServerAndClose() diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index c13266264b..a79cf6bfbe 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -20,6 +20,7 @@ import net.corda.core.internal.LazyStickyPool import net.corda.core.internal.LifeCycle import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.ThreadBox +import net.corda.core.internal.telemetry.TelemetryStatusCode import net.corda.core.internal.times import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.RPCOps @@ -110,6 +111,7 @@ internal class RPCClientProxyHandler( private val impersonatedActor: Actor?, private val targetLegalIdentity: CordaX500Name?, private val notificationDistributionMux: DistributionMux, + private val rpcClientTelemetry: RPCClientTelemetry, private val cacheFactory: NamedCacheFactory = ClientCacheFactory() ) : InvocationHandler { @@ -330,6 +332,8 @@ internal class RPCClientProxyHandler( val replyId = InvocationId.newInstance() val methodFqn = produceMethodFullyQualifiedName(method) callSiteMap?.set(replyId, CallSite(methodFqn)) + + val telemetryId = rpcClientTelemetry.telemetryService.startSpanForFlow("client-$methodFqn", emptyMap()) try { val serialisedArguments = (arguments?.toList() ?: emptyList()).serialize(context = serializationContextWithObservableContext) val request = RPCApi.ClientToServer.RpcRequest( @@ -339,7 +343,8 @@ internal class RPCClientProxyHandler( replyId, sessionId, externalTrace, - impersonatedActor + impersonatedActor, + rpcClientTelemetry.telemetryService.getCurrentTelemetryData() ) val replyFuture = SettableFuture.create() require(rpcReplyMap.put(replyId, replyFuture) == null) { @@ -353,13 +358,18 @@ internal class RPCClientProxyHandler( sendMessage(request) return replyFuture.getOrThrow() } catch (e: RuntimeException) { + rpcClientTelemetry.telemetryService.recordException(telemetryId, e) + rpcClientTelemetry.telemetryService.setStatus(telemetryId, TelemetryStatusCode.ERROR, e.message ?: "RuntimeException occurred") // Already an unchecked exception, so just rethrow it throw e } catch (e: Exception) { + rpcClientTelemetry.telemetryService.recordException(telemetryId, e) + rpcClientTelemetry.telemetryService.setStatus(telemetryId, TelemetryStatusCode.ERROR, e.message ?: "Exception occurred") // This must be a checked exception, so wrap it throw RPCException(e.message ?: "", e) } finally { callSiteMap?.remove(replyId) + rpcClientTelemetry.telemetryService.endSpanForFlow(telemetryId) } } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientTelemetry.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientTelemetry.kt new file mode 100644 index 0000000000..b0cbebea21 --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientTelemetry.kt @@ -0,0 +1,42 @@ +package net.corda.client.rpc.internal + +import net.corda.core.internal.telemetry.OpenTelemetryHandle +import net.corda.core.internal.telemetry.OpenTelemetryComponent +import net.corda.core.internal.telemetry.SimpleLogTelemetryComponent +import net.corda.core.internal.telemetry.TelemetryServiceImpl +import net.corda.core.utilities.contextLogger + +class RPCClientTelemetry(val serviceName: String, val openTelemetryEnabled: Boolean, val simpleLogTelemetryEnabled: Boolean, val spanStartEndEventsEnabled: Boolean) { + + companion object { + private val log = contextLogger() + } + + val telemetryService = TelemetryServiceImpl() + + init { + if (openTelemetryEnabled) { + try { + val openTelemetryComponent = OpenTelemetryComponent(serviceName, spanStartEndEventsEnabled) + if (openTelemetryComponent.isEnabled()) { + telemetryService.addTelemetryComponent(openTelemetryComponent) + log.debug("OpenTelemetry enabled") + } + } + catch (ex: NoClassDefFoundError) { + // Do nothing api or sdk not available on classpath + log.debug("OpenTelemetry not enabled, api or sdk not found on classpath") + } + } + if (simpleLogTelemetryEnabled) { + val simpleLogTelemetryComponent = SimpleLogTelemetryComponent() + telemetryService.addTelemetryComponent(simpleLogTelemetryComponent) + log.debug("SimpleLogTelemetry enabled") + } + } + + fun getOpenTelemetryHandle(): OpenTelemetryHandle? { + // Will return a handle clients can use to interact with opentelemetry + return null + } +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 0bbf8acf0f..bc45db07d2 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -6,6 +6,7 @@ import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.CordaRPCConnection import net.corda.client.rpc.GracefulReconnect import net.corda.client.rpc.MaxRpcRetryException +import net.corda.core.internal.telemetry.OpenTelemetryHandle import net.corda.client.rpc.PermissionException import net.corda.client.rpc.RPCConnection import net.corda.client.rpc.RPCException @@ -297,6 +298,9 @@ class ReconnectingCordaRPCOps private constructor( currentRPCConnection?.forceClose() } } + override fun getOpenTelemetry(): OpenTelemetryHandle? { + return currentRPCConnection?.getOpenTelemetry() + } fun isClosed(): Boolean = currentState == CLOSED } private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler { diff --git a/constants.properties b/constants.properties index eee0ada384..63f4c9383b 100644 --- a/constants.properties +++ b/constants.properties @@ -13,6 +13,8 @@ java8MinUpdateVersion=171 # net.corda.core.internal.CordaUtilsKt.PLATFORM_VERSION as well. # # ***************************************************************# platformVersion=12 +openTelemetryVersion=1.17.0 +openTelemetrySemConvVersion=1.17.0-alpha guavaVersion=28.0-jre # Quasar version to use with Java 8: quasarVersion=0.7.15_r3 diff --git a/core-deterministic/build.gradle b/core-deterministic/build.gradle index 73ae2b879a..dbf572f06d 100644 --- a/core-deterministic/build.gradle +++ b/core-deterministic/build.gradle @@ -44,6 +44,9 @@ dependencies { api "com.google.code.findbugs:jsr305:$jsr305_version" api "org.slf4j:slf4j-api:$slf4j_version" + compileOnly "io.opentelemetry:opentelemetry-api:${open_telemetry_version}" + compileOnly project(':opentelemetry-driver') + // These dependencies will become "runtime" scoped in our published POM. // See publish.dependenciesFrom.defaultScope. deterministicLibraries "org.bouncycastle:bcprov-jdk15on:$bouncycastle_version" diff --git a/core/build.gradle b/core/build.gradle index 5e72674928..3939983353 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -27,7 +27,8 @@ configurations { dependencies { obfuscatorImplementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" - + compileOnly "io.opentelemetry:opentelemetry-api:${open_telemetry_version}" + compileOnly project(':opentelemetry-driver') testImplementation sourceSets.obfuscator.output testImplementation "org.junit.jupiter:junit-jupiter-api:${junit_jupiter_version}" testImplementation "junit:junit:$junit_version" @@ -161,7 +162,8 @@ quasar { "org.w3c.**", "org.xml**", "org.yaml**", - "rx**") + "rx**", + "io.opentelemetry.**") } artifacts { diff --git a/core/src/main/kotlin/net/corda/core/context/InvocationContext.kt b/core/src/main/kotlin/net/corda/core/context/InvocationContext.kt index 06e9210801..dbcfd0c192 100644 --- a/core/src/main/kotlin/net/corda/core/context/InvocationContext.kt +++ b/core/src/main/kotlin/net/corda/core/context/InvocationContext.kt @@ -4,6 +4,7 @@ import net.corda.core.DeleteForDJVM import net.corda.core.KeepForDJVM import net.corda.core.contracts.ScheduledStateRef import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.telemetry.SerializedTelemetry import net.corda.core.serialization.CordaSerializable import java.security.Principal @@ -25,7 +26,8 @@ data class InvocationContext( val externalTrace: Trace? = null, val impersonatedActor: Actor? = null, val arguments: List? = emptyList(), // 'arguments' is nullable so that a - >= 4.6 version - RPC client can be backwards compatible against - < 4.6 version - nodes - val clientId: String? = null + val clientId: String? = null, + val serializedTelemetry: SerializedTelemetry? = null ) { constructor( @@ -36,6 +38,16 @@ data class InvocationContext( impersonatedActor: Actor? = null ) : this(origin, trace, actor, externalTrace, impersonatedActor, emptyList()) + constructor( + origin: InvocationOrigin, + trace: Trace, + actor: Actor?, + externalTrace: Trace? = null, + impersonatedActor: Actor? = null, + arguments: List? = emptyList(), + clientId: String? = null + ) : this(origin, trace, actor, externalTrace, impersonatedActor, arguments, clientId, serializedTelemetry = null) + companion object { /** * Creates an [InvocationContext] with a [Trace] that defaults to a [java.util.UUID] as value and [java.time.Instant.now] timestamp. @@ -51,8 +63,9 @@ data class InvocationContext( externalTrace: Trace? = null, impersonatedActor: Actor? = null, arguments: List = emptyList(), - clientId: String? = null - ) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor, arguments, clientId) + clientId: String? = null, + serializedTelemetry: SerializedTelemetry? = null + ) = InvocationContext(origin, trace, actor, externalTrace, impersonatedActor, arguments, clientId, serializedTelemetry) /** * Creates an [InvocationContext] with [InvocationOrigin.RPC] origin. @@ -60,13 +73,15 @@ data class InvocationContext( @DeleteForDJVM @JvmStatic @JvmOverloads + @Suppress("LongParameterList") fun rpc( actor: Actor, trace: Trace = Trace.newInstance(), externalTrace: Trace? = null, impersonatedActor: Actor? = null, - arguments: List = emptyList() - ): InvocationContext = newInstance(InvocationOrigin.RPC(actor), trace, actor, externalTrace, impersonatedActor, arguments) + arguments: List = emptyList(), + serializedTelemetry: SerializedTelemetry? = null + ): InvocationContext = newInstance(InvocationOrigin.RPC(actor), trace, actor, externalTrace, impersonatedActor, arguments, serializedTelemetry = serializedTelemetry) /** * Creates an [InvocationContext] with [InvocationOrigin.Peer] origin. @@ -119,6 +134,28 @@ data class InvocationContext( clientId = clientId ) } + + @Suppress("LongParameterList") + fun copy( + origin: InvocationOrigin = this.origin, + trace: Trace = this.trace, + actor: Actor? = this.actor, + externalTrace: Trace? = this.externalTrace, + impersonatedActor: Actor? = this.impersonatedActor, + arguments: List? = this.arguments, + clientId: String? = this.clientId + ): InvocationContext { + return copy( + origin = origin, + trace = trace, + actor = actor, + externalTrace = externalTrace, + impersonatedActor = impersonatedActor, + arguments = arguments, + clientId = clientId, + serializedTelemetry = serializedTelemetry + ) + } } /** diff --git a/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt b/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt index 1dd153e0ae..f10a17ef86 100644 --- a/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt +++ b/core/src/main/kotlin/net/corda/core/cordapp/Cordapp.kt @@ -6,6 +6,7 @@ import net.corda.core.cordapp.Cordapp.Info.* import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.internal.cordapp.CordappImpl.Companion.UNKNOWN_VALUE +import net.corda.core.internal.telemetry.TelemetryComponent import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.SerializationCustomSerializer @@ -49,6 +50,7 @@ interface Cordapp { val serviceFlows: List>> val schedulableFlows: List>> val services: List> + val telemetryComponents: List> val serializationWhitelists: List val serializationCustomSerializers: List> val checkpointCustomSerializers: List> diff --git a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt index 810b143dac..7d5a1505c1 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt @@ -7,6 +7,7 @@ import net.corda.core.crypto.isFulfilledBy import net.corda.core.identity.Party import net.corda.core.identity.groupAbstractPartyByWellKnownParty import net.corda.core.internal.pushToLoggingContext +import net.corda.core.internal.telemetry.telemetryServiceInternal import net.corda.core.internal.warnOnce import net.corda.core.node.StatesToRecord import net.corda.core.node.StatesToRecord.ONLY_RELEVANT @@ -221,18 +222,22 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, @Suspendable private fun notariseAndRecord(): SignedTransaction { - val notarised = if (needsNotarySignature(transaction)) { - progressTracker.currentStep = NOTARISING - val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true)) - transaction + notarySignatures - } else { - logger.info("No need to notarise this transaction.") - transaction + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord", flowLogic = this) { + val notarised = if (needsNotarySignature(transaction)) { + progressTracker.currentStep = NOTARISING + val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true)) + transaction + notarySignatures + } else { + logger.info("No need to notarise this transaction.") + transaction + } + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord:recordTransactions", flowLogic = this) { + logger.info("Recording transaction locally.") + serviceHub.recordTransactions(statesToRecord, listOf(notarised)) + logger.info("Recorded transaction locally successfully.") + } + return notarised } - logger.info("Recording transaction locally.") - serviceHub.recordTransactions(statesToRecord, listOf(notarised)) - logger.info("Recorded transaction locally successfully.") - return notarised } private fun needsNotarySignature(stx: SignedTransaction): Boolean { diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 1f3ffed189..8c207766fc 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -16,6 +16,7 @@ import net.corda.core.internal.ServiceHubCoreInternal import net.corda.core.internal.WaitForStateConsumption import net.corda.core.internal.abbreviate import net.corda.core.internal.checkPayloadIs +import net.corda.core.internal.telemetry.telemetryServiceInternal import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.DataFeed import net.corda.core.node.NodeInfo @@ -157,7 +158,7 @@ abstract class FlowLogic { fun initiateFlow(destination: Destination): FlowSession { require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" } return stateMachine.initiateFlow(destination, serviceHub.identityService.wellKnownPartyFromAnonymous(destination as AbstractParty) - ?: throw IllegalArgumentException("Could not resolve destination: $destination")) + ?: throw IllegalArgumentException("Could not resolve destination: $destination"), serviceHub.telemetryServiceInternal.getCurrentTelemetryData()) } /** @@ -165,7 +166,7 @@ abstract class FlowLogic { * that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive. */ @Suspendable - fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, party) + fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, party, serviceHub.telemetryServiceInternal.getCurrentTelemetryData()) /** * Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that @@ -285,11 +286,13 @@ abstract class FlowLogic { @Suspendable internal fun FlowSession.sendAndReceiveWithRetry(receiveType: Class, payload: Any): UntrustworthyData { - val request = FlowIORequest.SendAndReceive( - sessionToMessage = stateMachine.serialize(mapOf(this to payload)), - shouldRetrySend = true - ) - return stateMachine.suspend(request, maySkipCheckpoint = false)[this]!!.checkPayloadIs(receiveType) + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAndReceiveWithRetry", mapOf("destination" to destination.toString())) { + val request = FlowIORequest.SendAndReceive( + sessionToMessage = stateMachine.serialize(mapOf(this to payload)), + shouldRetrySend = true + ) + return stateMachine.suspend(request, maySkipCheckpoint = false)[this]!!.checkPayloadIs(receiveType) + } } @Suspendable diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt index 42db120f36..bc5f0b0157 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt @@ -8,6 +8,7 @@ import net.corda.core.context.InvocationContext import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.node.ServiceHub +import net.corda.core.internal.telemetry.SerializedTelemetry import net.corda.core.serialization.SerializedBytes import org.slf4j.Logger @@ -30,7 +31,7 @@ interface FlowStateMachine : FlowStateMachineHandle { fun serialize(payloads: Map): Map> @Suspendable - fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession + fun initiateFlow(destination: Destination, wellKnownParty: Party, serializedTelemetry: SerializedTelemetry?): FlowSession fun checkFlowPermission(permissionName: String, extraAuditData: Map) diff --git a/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt b/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt index 1c5d69e511..327723cb32 100644 --- a/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt +++ b/core/src/main/kotlin/net/corda/core/internal/cordapp/CordappImpl.kt @@ -8,6 +8,7 @@ import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.notary.NotaryService import net.corda.core.internal.toPath +import net.corda.core.internal.telemetry.TelemetryComponent import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.SerializationCustomSerializer @@ -24,6 +25,7 @@ data class CordappImpl( override val serviceFlows: List>>, override val schedulableFlows: List>>, override val services: List>, + override val telemetryComponents: List>, override val serializationWhitelists: List, override val serializationCustomSerializers: List>, override val checkpointCustomSerializers: List>, @@ -79,6 +81,7 @@ data class CordappImpl( serviceFlows = emptyList(), schedulableFlows = emptyList(), services = emptyList(), + telemetryComponents = emptyList(), serializationWhitelists = emptyList(), serializationCustomSerializers = emptyList(), checkpointCustomSerializers = emptyList(), diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt index 3aa7d9cfe7..097b616fa2 100644 --- a/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/notary/NotaryServiceFlow.kt @@ -10,6 +10,7 @@ import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.internal.PlatformVersionSwitches import net.corda.core.internal.checkParameterHash +import net.corda.core.internal.telemetry.telemetryServiceInternal import net.corda.core.utilities.seconds import net.corda.core.utilities.unwrap import java.lang.IllegalStateException @@ -74,14 +75,15 @@ abstract class NotaryServiceFlow( sleep(Duration.ZERO) } } - - service.commitInputStates( - tx.inputs, - tx.id, - otherSideSession.counterparty, - requestPayload.requestSignature, - tx.timeWindow, - tx.references) + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#call:commitInputStates", flowLogic = this) { + service.commitInputStates( + tx.inputs, + tx.id, + otherSideSession.counterparty, + requestPayload.requestSignature, + tx.timeWindow, + tx.references) + } } catch (e: NotaryInternalException) { logError(e.error) // Any exception that's not a NotaryInternalException is assumed to be an unexpected internal error diff --git a/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryComponent.kt b/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryComponent.kt new file mode 100644 index 0000000000..fb31a2d899 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryComponent.kt @@ -0,0 +1,309 @@ +package net.corda.core.internal.telemetry + +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.baggage.Baggage +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.common.AttributesBuilder +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.SpanContext +import io.opentelemetry.api.trace.StatusCode +import io.opentelemetry.api.trace.TraceFlags +import io.opentelemetry.api.trace.TraceState +import io.opentelemetry.api.trace.Tracer +import io.opentelemetry.context.Context +import io.opentelemetry.context.Scope +import net.corda.core.flows.FlowLogic +import net.corda.core.serialization.CordaSerializable +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import net.corda.opentelemetrydriver.OpenTelemetryDriver + +const val TRACEIDHEX_LENGTH = 32 +const val SPANIDHEX_LENGTH = 16 +@CordaSerializable +data class SerializableSpanContext(val traceIdHex : String, val spanIdHex : String, val traceFlagsHex : String, val traceStateMap : Map) { + + constructor(spanContext: SpanContext) : this(spanContext.traceId, spanContext.spanId, spanContext.traceFlags.asHex(), spanContext.traceState.asMap().toMap()) + + constructor() : this("0".repeat(TRACEIDHEX_LENGTH), "0".repeat(SPANIDHEX_LENGTH), TraceFlags.getDefault().asHex(), TraceState.getDefault().asMap()) + + fun createSpanContext() = SpanContext.create(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState()) + + fun createRemoteSpanContext() = SpanContext.createFromRemoteParent(traceIdHex, spanIdHex, TraceFlags.fromHex(traceFlagsHex, 0), createTraceState()) + + private fun createTraceState() = traceStateMap.toList().fold(TraceState.builder()) { builder, pair -> builder.put(pair.first, pair.second)}.build() +} + +data class OpenTelemetryContext(val spanContext: SerializableSpanContext, val startedSpanContext: SerializableSpanContext, val baggage: Map): TelemetryDataItem + +data class SpanInfo(val name: String, val span: Span, val spanScope: Scope, val spanEventContext: SerializableSpanContext? = null, val parentSpanEventContext: SerializableSpanContext? = null) + +class TracerSetup { + var openTelemetry: OpenTelemetry? = null + fun getTracer(serviceName: String): Tracer { + return try { + with(OpenTelemetryDriver.getOpenTelemetry(serviceName)) { + openTelemetry = this + tracerProvider.get(OpenTelemetryComponent::class.java.name) + } + } + catch (ex: NoClassDefFoundError) { + GlobalOpenTelemetry.getTracerProvider().get(OpenTelemetryComponent::class.java.name) + } + } +} + +@Suppress("TooManyFunctions") +class OpenTelemetryComponent(val serviceName: String, val spanStartEndEventsEnabled: Boolean) : TelemetryComponent { + val tracerSetup = TracerSetup() + val tracer: Tracer = tracerSetup.getTracer(serviceName) + + companion object { + private val log: Logger = LoggerFactory.getLogger(OpenTelemetryComponent::class.java) + const val OPENTELEMETRY_COMPONENT_NAME = "OpenTelemetry" + } + + val rootSpans = ConcurrentHashMap() + val spans = ConcurrentHashMap() + val baggages = ConcurrentHashMap() + + override fun isEnabled(): Boolean { + if (tracerSetup.openTelemetry != null) { + // The SDK is on the classpath. + return true + } + // Now see if the open telemetry java agent is available + val tracer = GlobalOpenTelemetry.getTracerProvider().get(OpenTelemetryComponent::class.java.name) + return tracer.javaClass.name != "io.opentelemetry.api.trace.DefaultTracer" + } + + override fun name(): String = OPENTELEMETRY_COMPONENT_NAME + override fun onTelemetryEvent(event: TelemetryEvent) { + when (event) { + is StartSpanForFlowEvent -> startSpanForFlow(event.name, event.attributes, event.telemetryId, event.flowLogic, event.telemetryDataItem) + is EndSpanForFlowEvent -> endSpanForFlow(event.telemetryId) + is StartSpanEvent -> startSpan(event.name, event.attributes, event.telemetryId, event.flowLogic) + is EndSpanEvent -> endSpan(event.telemetryId) + is SetStatusEvent -> setStatus(event.telemetryId, event.telemetryStatusCode, event.message) + is RecordExceptionEvent -> recordException(event.telemetryId, event.throwable) + } + } + + @Suppress("LongParameterList") + private fun startSpanForFlow(name: String, attributes: Map, telemetryId: UUID, flowLogic: FlowLogic<*>?, + telemetryDataItem: TelemetryDataItem?) { + + val baggageAttributes = (telemetryDataItem as? OpenTelemetryContext)?.baggage?.let { + val baggageBuilder = it.toList().fold(Baggage.current().toBuilder()) {builder, attribute -> builder.put(attribute.first, attribute.second)} + baggages[telemetryId] = baggageBuilder.build().makeCurrent() + it + } ?: emptyMap() + + // Also add any baggage to the span + val attributesMap = (attributes+baggageAttributes).toList() + .fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also { + populateWithFlowAttributes(it, flowLogic) + }.build() + if (telemetryDataItem != null) { + startSpanForFlowWithRemoteParent(name, attributesMap, telemetryId, telemetryDataItem) + } + else { + startSpanForFlowWithNoParent(name, attributesMap, telemetryId) + } + } + + private fun startSpanForFlowWithNoParent(name: String, attributesMap: Attributes, telemetryId: UUID) { + val rootSpan = tracer.spanBuilder(name).setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.flow"), "true")).startSpan() + val rootSpanScope = rootSpan.makeCurrent() + if (spanStartEndEventsEnabled) { + val startedSpanContexts = createSpanToCaptureStartedSpanEvent(name, rootSpan, attributesMap) + val span = tracer.spanBuilder("Child Spans").setParent(Context.current().with(rootSpan)).startSpan() + val spanScope = span.makeCurrent() + rootSpans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope) + spans[telemetryId] = SpanInfo(name, span, spanScope, startedSpanContexts.first, startedSpanContexts.second) + } + else { + spans[telemetryId] = SpanInfo(name, rootSpan, rootSpanScope) + } + } + + private fun startSpanForFlowWithRemoteParent(name: String, attributesMap: Attributes, telemetryId: UUID, telemetryDataItem: TelemetryDataItem) { + val parentContext = (telemetryDataItem as OpenTelemetryContext).spanContext + val spanContext = parentContext.createRemoteSpanContext() + val parentSpan = Span.wrap(spanContext) + val span = tracer.spanBuilder(name).setParent(Context.current().with(parentSpan)).setAllAttributes(attributesMap).startSpan() + val spanScope = span.makeCurrent() + if (spanStartEndEventsEnabled) { + val contexts = createSpanToCaptureStartedSpanEventWithRemoteParent(name, telemetryDataItem, attributesMap) + spans[telemetryId] = SpanInfo(name, span, spanScope, contexts.first, contexts.second) + } + else { + spans[telemetryId] = SpanInfo(name, span, spanScope) + } + } + + private fun createSpanToCaptureStartedSpanEvent(name: String, rootSpan: Span, attributesMap: Attributes): Pair { + val startedSpan = tracer.spanBuilder("Started Events").setAllAttributes(attributesMap).setAllAttributes(Attributes.of(AttributeKey.stringKey("root.startend.events"), "true")).setParent(Context.current().with(rootSpan)).startSpan() + val serializableSpanContext = SerializableSpanContext(startedSpan.spanContext) + startedSpan.end() + val startedSpanContext = serializableSpanContext.createSpanContext() + val startedSpanFromContext = Span.wrap(startedSpanContext) + val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap) + .setParent(Context.current().with(startedSpanFromContext)).startSpan() + val childSerializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext) + startedSpanChild.end() + return Pair(childSerializableSpanContext, serializableSpanContext) + } + + private fun createSpanToCaptureStartedSpanEventWithRemoteParent(name: String, telemetryDataItem: OpenTelemetryContext, attributesMap: Attributes ): Pair { + val startedSpanParentContext = telemetryDataItem.startedSpanContext + val startedSpanContext = startedSpanParentContext.createRemoteSpanContext() + val startedSpanFromContext = Span.wrap(startedSpanContext) + val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap) + .setParent(Context.current().with(startedSpanFromContext)).startSpan() + val serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext) + startedSpanChild.end() + return Pair(serializableSpanContext, startedSpanParentContext) + } + + private fun endSpanForFlow(telemetryId: UUID){ + val spanInfo = spans[telemetryId] + val rootSpanInfo = rootSpans[telemetryId] + if (spanStartEndEventsEnabled) { + createSpanToCaptureEndSpanEvent(spanInfo) + } + spanInfo?.spanScope?.close() + spanInfo?.span?.end() + rootSpanInfo?.spanScope?.close() + rootSpanInfo?.span?.end() + spans.remove(telemetryId) + rootSpans.remove(telemetryId) + + val baggageScope = baggages[telemetryId] + baggageScope?.close() + baggages.remove(telemetryId) + } + + private fun createSpanToCaptureEndSpanEvent(spanInfo: SpanInfo?) { + spanInfo?.parentSpanEventContext?.let { + val startedSpanContext = it.createSpanContext() + val startedSpanFromContext = Span.wrap(startedSpanContext) + val startedSpanChild = tracer.spanBuilder("${spanInfo.name}-end").setParent(Context.current().with(startedSpanFromContext)).startSpan() + startedSpanChild.end() + } + } + + private fun startSpan(name: String, attributes: Map, telemetryId: UUID, flowLogic: FlowLogic<*>?) { + val currentBaggage = Baggage.current() + val baggageAttributes = mutableMapOf() + currentBaggage.forEach { t, u -> baggageAttributes[t] = u.value } + + val parentSpan = Span.current() + val attributesMap = (attributes+baggageAttributes).toList().fold(Attributes.builder()) { builder, attribute -> builder.put(attribute.first, attribute.second) }.also { + populateWithFlowAttributes(it, flowLogic) + }.build() + val span = tracer.spanBuilder(name).setAllAttributes(attributesMap).startSpan() + val spanScope = span.makeCurrent() + val startedEventContexts = createStartedEventSpan(name, attributesMap, parentSpan) + spans[telemetryId] = SpanInfo(name, span, spanScope, startedEventContexts.first, startedEventContexts.second) + } + + private fun populateWithFlowAttributes(attributesBuilder: AttributesBuilder, flowLogic: FlowLogic<*>?) { + flowLogic?.let { + attributesBuilder.put("flow.id", flowLogic.runId.uuid.toString()) + attributesBuilder.put("creation.time", flowLogic.stateMachine.creationTime) + attributesBuilder.put("class.name", flowLogic.javaClass.name) + } + } + + private fun createStartedEventSpan(name: String, attributesMap: Attributes, parentSpan: Span): Pair { + if (spanStartEndEventsEnabled) { + // Fix up null contexts - make not null + val filteredSpans = spans.filter { it.value.span == parentSpan }.toList() + var serializableSpanContext = SerializableSpanContext() + var parentStartedSpanContext = SerializableSpanContext() + if (filteredSpans.isNotEmpty()) { + parentStartedSpanContext = filteredSpans[0].second.spanEventContext ?: SerializableSpanContext() + val startedSpanContext = parentStartedSpanContext.createSpanContext() + val startedSpanFromContext = Span.wrap(startedSpanContext) + val startedSpanChild = tracer.spanBuilder("${name}-start").setAllAttributes(attributesMap) + .setParent(Context.current().with(startedSpanFromContext)).startSpan() + serializableSpanContext = SerializableSpanContext(startedSpanChild.spanContext) + startedSpanChild.end() + } + return Pair(serializableSpanContext, parentStartedSpanContext) + } + else { + return Pair(null, null) + } + } + + private fun endSpan(telemetryId: UUID){ + val spanInfo = spans[telemetryId] + createSpanToCaptureEndSpanEvent(spanInfo) + spanInfo?.spanScope?.close() + spanInfo?.span?.end() + spans.remove(telemetryId) + } + + override fun getCurrentTelemetryData(): TelemetryDataItem { + val currentSpan = Span.current() + val spanContext = SerializableSpanContext(currentSpan.spanContext) + val filteredSpans = spans.filter { it.value.span == currentSpan }.toList() + val startedSpanContext = filteredSpans.getOrNull(0)?.second?.spanEventContext ?: SerializableSpanContext() + return OpenTelemetryContext(spanContext, startedSpanContext, Baggage.current().asMap().mapValues { it.value.value }) + } + + override fun getCurrentTelemetryId(): UUID { + val currentSpan = Span.current() + val filteredSpans = spans.filter { it.value.span == currentSpan }.toList() + if (filteredSpans.isEmpty()) { + return UUID(0, 0) + } + return filteredSpans[0].first // return UUID associated with current span + } + + override fun setCurrentTelemetryId(id: UUID) { + val spanInfo = spans.get(id) + spanInfo?.let { + it.spanScope.close() // close the old scope + val childSpanScope = it.span.makeCurrent() + val newSpanInfo = spanInfo.copy(spanScope = childSpanScope) + spans[id] = newSpanInfo + } + } + + override fun getCurrentSpanId(): String { + return Span.current().spanContext.spanId + } + + override fun getCurrentTraceId(): String { + return Span.current().spanContext.traceId + } + + override fun getCurrentBaggage(): Map { + return Baggage.current().asMap().mapValues { it.value.value } + } + + private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) { + val spanInfo = spans[telemetryId] + spanInfo?.span?.setStatus(toOpenTelemetryStatus(telemetryStatusCode), message) + } + + private fun toOpenTelemetryStatus(telemetryStatusCode: TelemetryStatusCode): StatusCode { + return when(telemetryStatusCode) { + TelemetryStatusCode.ERROR -> StatusCode.ERROR + TelemetryStatusCode.OK -> StatusCode.OK + TelemetryStatusCode.UNSET -> StatusCode.UNSET + } + } + + private fun recordException(telemetryId: UUID, throwable: Throwable) { + val spanInfo = spans[telemetryId] + spanInfo?.span?.recordException(throwable) + } +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryHandle.kt b/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryHandle.kt new file mode 100644 index 0000000000..2f55ad0b36 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/telemetry/OpenTelemetryHandle.kt @@ -0,0 +1,3 @@ +package net.corda.core.internal.telemetry + +interface OpenTelemetryHandle \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/internal/telemetry/SimpleLogTelemetryComponent.kt b/core/src/main/kotlin/net/corda/core/internal/telemetry/SimpleLogTelemetryComponent.kt new file mode 100644 index 0000000000..f7f2114eb0 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/telemetry/SimpleLogTelemetryComponent.kt @@ -0,0 +1,126 @@ +package net.corda.core.internal.telemetry + +import net.corda.core.flows.FlowLogic +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.debug +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.slf4j.MDC +import java.lang.IllegalStateException +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +@CordaSerializable +data class SimpleLogContext(val traceId: UUID, val baggage: Map): TelemetryDataItem + +const val CLIENT_ID = "client.id" +const val TRACE_ID = "trace.id" + +// Simple telemetry class that creates a single UUID and uses this for the trace id. When the flow starts we use the trace is passed in. After this +// though we must use the trace id propagated to us (if remote), or the trace id associated with thread local. +@Suppress("TooManyFunctions") +class SimpleLogTelemetryComponent : TelemetryComponent { + companion object { + private val log: Logger = LoggerFactory.getLogger(SimpleLogTelemetryComponent::class.java) + } + + private val traces: InheritableThreadLocal = InheritableThreadLocal() + private val logContexts = ConcurrentHashMap() + + override fun isEnabled(): Boolean { + return true + } + + override fun name(): String = "SimpleLogTelemetry" + + override fun onTelemetryEvent(event: TelemetryEvent) { + when (event) { + is StartSpanForFlowEvent -> startSpanForFlow(event.name, event.attributes, event.telemetryId, event.flowLogic, event.telemetryDataItem) + is EndSpanForFlowEvent -> endSpanForFlow(event.telemetryId) + is StartSpanEvent -> startSpan(event.name, event.attributes, event.telemetryId, event.flowLogic) + is EndSpanEvent -> endSpan(event.telemetryId) + is SetStatusEvent -> setStatus(event.telemetryId, event.telemetryStatusCode, event.message) + is RecordExceptionEvent -> recordException(event.telemetryId, event.throwable) + } + } + + @Suppress("LongParameterList") + private fun startSpanForFlow(name: String, attributes: Map, telemetryId: UUID, flowLogic: FlowLogic<*>?, telemetryDataItem: TelemetryDataItem?) { + val simpleLogTelemetryDataItem = telemetryDataItem?.let {(telemetryDataItem as? SimpleLogContext) ?: + throw IllegalStateException("Type of telemetryDataItem no a SimpleLogContext, actual class is ${telemetryDataItem::class.java.name}")} + val traceId = simpleLogTelemetryDataItem?.traceId ?: telemetryId + val flowId = flowLogic?.runId + val clientId = simpleLogTelemetryDataItem?.baggage?.get(CLIENT_ID) ?: flowLogic?.stateMachine?.clientId + traces.set(traceId) + val baggageAttributes = simpleLogTelemetryDataItem?.baggage ?: emptyMap() + + logContexts[traceId] = SimpleLogContext(traceId, baggageAttributes) + clientId?.let { MDC.put(CLIENT_ID, it) } + MDC.put(TRACE_ID, traceId.toString()) + log.debug {"startSpanForFlow: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: ${attributes+baggageAttributes}"} + } + + // Check when you start a top level flow the startSpanForFlow appears just once, and so the endSpanForFlow also appears just once + // So its valid to do the MDC clear here. For remotes nodes as well + private fun endSpanForFlow(telemetryId: UUID) { + log.debug {"endSpanForFlow: traceId: ${traces.get()}"} + logContexts.remove(telemetryId) + MDC.clear() + } + + @Suppress("UNUSED_PARAMETER") + private fun startSpan(name: String, attributes: Map, telemetryId: UUID, flowLogic: FlowLogic<*>?) { + val flowId = flowLogic?.runId + val clientId = flowLogic?.stateMachine?.clientId + val traceId = traces.get() + log.debug {"startSpan: name: $name, traceId: $traceId, flowId: $flowId, clientId: $clientId, attributes: $attributes"} + } + + @Suppress("UNUSED_PARAMETER") + private fun endSpan(telemetryId: UUID) { + log.debug {"endSpan: traceId: ${traces.get()}"} + } + + override fun getCurrentTelemetryData(): SimpleLogContext { + traces.get()?.let { + logContexts[it]?.let { simpleLogContext -> + return simpleLogContext + } + } + return SimpleLogContext(UUID(0, 0), emptyMap()) + } + + override fun getCurrentTelemetryId(): UUID { + return traces.get() ?: UUID(0,0) + } + + override fun setCurrentTelemetryId(id: UUID) { + traces.set(id) + } + + override fun getCurrentSpanId(): String { + return traces.get()?.toString() ?: "" + } + + override fun getCurrentTraceId(): String { + return traces.get()?.toString() ?: "" + } + + override fun getCurrentBaggage(): Map { + val uuid = traces.get() + return logContexts[uuid]?.baggage ?: emptyMap() + } + + @Suppress("UNUSED_PARAMETER") + private fun setStatus(telemetryId: UUID, telemetryStatusCode: TelemetryStatusCode, message: String) { + when(telemetryStatusCode) { + TelemetryStatusCode.ERROR -> log.error("setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message") + TelemetryStatusCode.OK, TelemetryStatusCode.UNSET -> log.debug {"setStatus: traceId: ${traces.get()}, statusCode: ${telemetryStatusCode}, message: message" } + } + } + + @Suppress("UNUSED_PARAMETER") + private fun recordException(telemetryId: UUID, throwable: Throwable) { + log.error("recordException: traceId: ${traces.get()}, throwable: ${throwable}}") + } +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/internal/telemetry/TelemetryServiceImpl.kt b/core/src/main/kotlin/net/corda/core/internal/telemetry/TelemetryServiceImpl.kt new file mode 100644 index 0000000000..889b60b5b0 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/telemetry/TelemetryServiceImpl.kt @@ -0,0 +1,235 @@ +package net.corda.core.internal.telemetry + +import net.corda.core.CordaInternal +import net.corda.core.flows.FlowLogic +import net.corda.core.node.ServiceHub +import net.corda.core.node.services.TelemetryService +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SerializationFactory +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.serialize +import net.corda.core.utilities.OpaqueBytes +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.util.* + +@CordaSerializable +interface TelemetryDataItem + +@CordaSerializable +data class SerializedTelemetry(val serializedTelemetryData: Map) + +enum class TelemetryStatusCode { + /** The default status. */ + UNSET, + + /** + * The operation has been validated by an Application developers or Operator to have completed + * successfully. + */ + OK, + + /** The operation contains an error. */ + ERROR +} + +@CordaSerializable +data class TelemetryId(private val telemetryService: TelemetryServiceImpl) { + val id: UUID = UUID.randomUUID() + + fun setStatus(telemetryStatusCode: TelemetryStatusCode, message: String) { + telemetryService.setStatus(this, telemetryStatusCode, message) + } + + fun recordException(throwable: Throwable) { + telemetryService.recordException( this, throwable) + } + + fun close() { + telemetryService.endSpan(this) + } +} + +@CordaSerializable +data class ComponentTelemetryIds(val componentTelemetryIds: Map) + + + +interface TelemetryEvent + +class StartSpanForFlowEvent(val name: String, + val attributes: Map, + val telemetryId: UUID, val flowLogic: FlowLogic<*>?, + val telemetryDataItem: TelemetryDataItem?): TelemetryEvent +class EndSpanForFlowEvent(val telemetryId: UUID): TelemetryEvent +class StartSpanEvent(val name: String, val attributes: Map, val telemetryId: UUID, val flowLogic: FlowLogic<*>?): TelemetryEvent +class EndSpanEvent(val telemetryId: UUID): TelemetryEvent +class SetStatusEvent(val telemetryId: UUID, val telemetryStatusCode: TelemetryStatusCode, val message: String): TelemetryEvent +class RecordExceptionEvent(val telemetryId: UUID, val throwable: Throwable): TelemetryEvent + +interface TelemetryComponent { + fun name(): String + fun isEnabled(): Boolean + fun onTelemetryEvent(event: TelemetryEvent) + fun getCurrentTelemetryData(): TelemetryDataItem + fun getCurrentTelemetryId(): UUID + fun setCurrentTelemetryId(id: UUID) + fun getCurrentSpanId(): String + fun getCurrentTraceId(): String + fun getCurrentBaggage(): Map +} + +interface TelemetryComponentId { + fun name(): String +} + +@Suppress("TooManyFunctions") +class TelemetryServiceImpl : SingletonSerializeAsToken(), TelemetryService { + + companion object { + private val log: Logger = LoggerFactory.getLogger(TelemetryServiceImpl::class.java) + } + + fun getCurrentSpanId(telemetryComponentName: String): String? { + return telemetryComponents[telemetryComponentName]?.getCurrentSpanId() + } + + fun getCurrentTraceId(telemetryComponentName: String): String? { + return telemetryComponents[telemetryComponentName]?.getCurrentTraceId() + } + + fun getCurrentBaggage(telemetryComponentName: String): Map? { + return telemetryComponents[telemetryComponentName]?.getCurrentBaggage() + } + + fun setStatus(telemetryId: TelemetryId, telemetryStatusCode: TelemetryStatusCode, message: String) { + telemetryComponents.values.forEach { + it.onTelemetryEvent(SetStatusEvent(telemetryId.id, telemetryStatusCode, message)) + } + } + + fun recordException(telemetryId: TelemetryId, throwable: Throwable) { + telemetryComponents.values.forEach { + it.onTelemetryEvent(RecordExceptionEvent(telemetryId.id, throwable)) + } + } + + @CordaInternal + fun deserialize(data: OpaqueBytes): TelemetryDataItem { + return SerializationFactory.defaultFactory.deserialize(data, TelemetryDataItem::class.java, SerializationFactory.defaultFactory.defaultContext) + } + + private val telemetryComponents: MutableMap = mutableMapOf() + + @CordaInternal + fun addTelemetryComponent(telemetryComponent: TelemetryComponent) { + telemetryComponents[telemetryComponent.name()] = telemetryComponent + } + + @CordaInternal + fun startSpanForFlow(name: String, attributes: Map, flowLogic: FlowLogic<*>? = null, remoteSerializedTelemetry: SerializedTelemetry? = null): TelemetryId { + val telemetryId = TelemetryId(this) + telemetryComponents.values.forEach { + val bytes = remoteSerializedTelemetry?.serializedTelemetryData?.get(it.name()) + val telemetryDataItem = bytes?.let { deserialize(bytes) } + it.onTelemetryEvent(StartSpanForFlowEvent(name, attributes, telemetryId.id, flowLogic, telemetryDataItem)) + } + return telemetryId + } + + @CordaInternal + fun endSpanForFlow(telemetryId: TelemetryId) { + telemetryComponents.values.forEach { + it.onTelemetryEvent(EndSpanForFlowEvent(telemetryId.id)) + } + } + + + fun startSpan(name: String, attributes: Map = emptyMap(), flowLogic: FlowLogic<*>? = null): TelemetryId { + val telemetryId = TelemetryId(this) + telemetryComponents.values.forEach { + it.onTelemetryEvent(StartSpanEvent(name, attributes, telemetryId.id, flowLogic)) + } + return telemetryId + } + + fun endSpan(telemetryId: TelemetryId) { + telemetryComponents.values.forEach { + it.onTelemetryEvent(EndSpanEvent(telemetryId.id)) + } + } + + @Suppress("TooGenericExceptionCaught") + inline fun span(name: String, attributes: Map = emptyMap(), flowLogic: FlowLogic<*>? = null, block: () -> R): R { + val telemetryId = startSpan(name, attributes, flowLogic) + try { + return block() + } + catch(ex: Throwable) { + recordException(telemetryId, ex) + setStatus(telemetryId, TelemetryStatusCode.ERROR, "Exception raised: ${ex.message}") + throw ex + } + finally { + endSpan(telemetryId) + } + } + + @CordaInternal + @Suppress("LongParameterList", "TooGenericExceptionCaught") + inline fun spanForFlow(name: String, attributes: Map, flowLogic: FlowLogic<*>? = null, remoteSerializedTelemetry: SerializedTelemetry? = null, block: () -> R): R { + val telemetryId = startSpanForFlow(name, attributes, flowLogic, remoteSerializedTelemetry) + try { + return block() + } + catch(ex: Throwable) { + recordException(telemetryId, ex) + setStatus(telemetryId, TelemetryStatusCode.ERROR, "Exception raised: ${ex.message}") + throw ex + } + finally { + endSpanForFlow(telemetryId) + } + } + + @CordaInternal + fun getCurrentTelemetryData(): SerializedTelemetry? { + if (telemetryComponents.isEmpty()) { + return null + } + val serializedTelemetryData = mutableMapOf() + telemetryComponents.values.forEach { + val currentTelemetryData = it.getCurrentTelemetryData() + serializedTelemetryData[it.name()] = currentTelemetryData.serialize() + } + return SerializedTelemetry(serializedTelemetryData) + } + + @CordaInternal + fun getCurrentTelemetryIds(): ComponentTelemetryIds? { + if (telemetryComponents.isEmpty()) { + return null + } + val telemetryIds = mutableMapOf() + telemetryComponents.values.forEach { + telemetryIds[it.name()] = it.getCurrentTelemetryId() + } + return ComponentTelemetryIds(telemetryIds) + } + + @CordaInternal + fun setCurrentTelemetryId(telemetryIds: ComponentTelemetryIds) { + telemetryComponents.values.forEach { + it.setCurrentTelemetryId(telemetryIds.componentTelemetryIds[it.name()]!!) + } + } + + override fun getOpenTelemetry(): OpenTelemetryHandle? { + return telemetryComponents[OpenTelemetryComponent.OPENTELEMETRY_COMPONENT_NAME]?.let { + null // (it as? OpenTelemetryComponent)?.tracerSetup?.openTelemetry + } + } +} + +val ServiceHub.telemetryServiceInternal + get() = this.telemetryService as TelemetryServiceImpl diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index d63b63edf4..b1e464de6d 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -12,6 +12,7 @@ import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.ContractUpgradeFlow import net.corda.core.node.services.* import net.corda.core.node.services.diagnostics.DiagnosticsService +import net.corda.core.internal.telemetry.TelemetryComponent import net.corda.core.serialization.SerializeAsToken import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.LedgerTransaction @@ -165,6 +166,11 @@ interface ServiceHub : ServicesForResolution { */ val diagnosticsService: DiagnosticsService + /** + * Provides operations to support telemetry and telemetry data between nodes. + */ + val telemetryService: TelemetryService + /** * INTERNAL. DO NOT USE. * @suppress @@ -187,6 +193,13 @@ interface ServiceHub : ServicesForResolution { */ fun cordaService(type: Class): T + /** + * Return the singleton instance of the given Corda telemetry component type. This is a class that implements TelemetryComponent + * and will have automatically been registered by the node. + * @throws IllegalArgumentException If the instance is not found. + */ + fun cordaTelemetryComponent(type: Class): T + /** * Stores the given [SignedTransaction]s in the local transaction storage and then sends them to the vault for * further processing if [notifyVault] is true. This is expected to be run within a database transaction. diff --git a/core/src/main/kotlin/net/corda/core/node/services/TelemetryService.kt b/core/src/main/kotlin/net/corda/core/node/services/TelemetryService.kt new file mode 100644 index 0000000000..2b0e54df34 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/node/services/TelemetryService.kt @@ -0,0 +1,9 @@ +package net.corda.core.node.services + +import net.corda.core.DoNotImplement +import net.corda.core.internal.telemetry.OpenTelemetryHandle + +@DoNotImplement +interface TelemetryService { + fun getOpenTelemetry(): OpenTelemetryHandle? +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt index c4af5510eb..dff288edc2 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt @@ -6,7 +6,9 @@ import net.corda.core.context.Trace import net.corda.core.context.Trace.InvocationId import net.corda.core.context.Trace.SessionId import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.telemetry.SerializedTelemetry import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.Id @@ -116,7 +118,8 @@ object RPCApi { val replyId: InvocationId, val sessionId: SessionId, val externalTrace: Trace? = null, - val impersonatedActor: Actor? = null + val impersonatedActor: Actor? = null, + val serializedTelemetry: SerializedTelemetry? = null ) : ClientToServer() { override fun writeToClientMessage(message: ClientMessage) { MessageUtil.setJMSReplyTo(message, clientAddress) @@ -130,6 +133,8 @@ object RPCApi { message.putStringProperty(METHOD_NAME_FIELD_NAME, methodName) message.bodyBuffer.writeBytes(serialisedArguments.bytes) + val telemetryBytes: SerializedBytes? = serializedTelemetry?.serialize() + telemetryBytes?.let { message.putBytesProperty(TELEMETRY_PROPERTY, it.bytes) } } } @@ -148,15 +153,20 @@ object RPCApi { fun fromClientMessage(message: ClientMessage): ClientToServer { val tag = Tag.values()[message.getIntProperty(TAG_FIELD_NAME)] return when (tag) { - RPCApi.ClientToServer.Tag.RPC_REQUEST -> RpcRequest( - clientAddress = MessageUtil.getJMSReplyTo(message), - methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME), - serialisedArguments = OpaqueBytes(message.getBodyAsByteArray()), - replyId = message.replyId(), - sessionId = message.sessionId(), - externalTrace = message.externalTrace(), - impersonatedActor = message.impersonatedActor() - ) + RPCApi.ClientToServer.Tag.RPC_REQUEST -> { + val telemetryBytes = message.getBytesProperty(TELEMETRY_PROPERTY) + val serializedTelemetry: SerializedTelemetry? = telemetryBytes?.let {OpaqueBytes(it).deserialize()} + RpcRequest( + clientAddress = MessageUtil.getJMSReplyTo(message), + methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME), + serialisedArguments = OpaqueBytes(message.getBodyAsByteArray()), + replyId = message.replyId(), + sessionId = message.sessionId(), + externalTrace = message.externalTrace(), + impersonatedActor = message.impersonatedActor(), + serializedTelemetry = serializedTelemetry + ) + } RPCApi.ClientToServer.Tag.OBSERVABLES_CLOSED -> { val ids = ArrayList() val buffer = message.bodyBuffer @@ -279,6 +289,7 @@ private const val DEDUPLICATION_IDENTITY_FIELD_NAME = "deduplication-identity" private const val OBSERVABLE_ID_FIELD_NAME = "observable-id" private const val OBSERVABLE_ID_TIMESTAMP_FIELD_NAME = "observable-id-timestamp" private const val METHOD_NAME_FIELD_NAME = "method-name" +private const val TELEMETRY_PROPERTY = "telemetry-data" fun ClientMessage.replyId(): InvocationId { @@ -301,6 +312,11 @@ fun ClientMessage.externalTrace(): Trace? { } } +fun ClientMessage.serializedTelemetry(): SerializedTelemetry? { + val telemetryBytes = this.getBytesProperty(TELEMETRY_PROPERTY) + return telemetryBytes?.let { OpaqueBytes(it).deserialize() } +} + fun ClientMessage.impersonatedActor(): Actor? { return getStringProperty(RPC_IMPERSONATED_ACTOR_ID)?.let { diff --git a/node/build.gradle b/node/build.gradle index 3e247a5ed5..c788acb66c 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -100,6 +100,7 @@ dependencies { compile project(':common-configuration-parsing') compile project(':common-logging') + implementation "io.opentelemetry:opentelemetry-api:${open_telemetry_version}" // Backwards compatibility goo: Apps expect confidential-identities to be loaded by default. // We could eventually gate this on a target-version check. compile project(':confidential-identities') @@ -309,7 +310,8 @@ quasar { "org.w3c.**", "org.xml**", "org.yaml**", - "rx**") + "rx**", + "io.opentelemetry.**") } jar { diff --git a/node/capsule/build.gradle b/node/capsule/build.gradle index c1c4e2f4c9..2eb546be0d 100644 --- a/node/capsule/build.gradle +++ b/node/capsule/build.gradle @@ -96,7 +96,7 @@ task buildCordaJAR(type: FatCapsule, dependsOn: [ applicationVersion = corda_release_version applicationId = "net.corda.node.Corda" // See experimental/quasar-hook/README.md for how to generate. - def quasarExcludeExpression = "x(antlr**;bftsmart**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;io.github**;io.netty**;jdk**;kotlin**;net.corda.djvm**;djvm**;net.bytebuddy**;net.i2p**;org.apache**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.objectweb**;org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**;org.jolokia**;com.lmax**;picocli**;liquibase**;com.github.benmanes**;org.json**;org.postgresql**;nonapi.io.github.classgraph**)" + def quasarExcludeExpression = "x(antlr**;bftsmart**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;io.github**;io.netty**;jdk**;kotlin**;net.corda.djvm**;djvm**;net.bytebuddy**;net.i2p**;org.apache**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.objectweb**;org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**;org.jolokia**;com.lmax**;picocli**;liquibase**;com.github.benmanes**;org.json**;org.postgresql**;nonapi.io.github.classgraph**;io.opentelemetry**)" def quasarClassLoaderExclusion = "l(net.corda.djvm.**;net.corda.core.serialization.internal.**)" javaAgents = quasar_classifier ? ["quasar-core-${quasar_version}-${quasar_classifier}.jar=${quasarExcludeExpression}${quasarClassLoaderExclusion}"] : ["quasar-core-${quasar_version}.jar=${quasarExcludeExpression}${quasarClassLoaderExclusion}"] systemProperties['visualvm.display.name'] = 'Corda' diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 53b1aa9bdc..ed3b2a8ec9 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -56,6 +56,11 @@ import net.corda.core.node.services.ContractUpgradeService import net.corda.core.node.services.CordaService import net.corda.core.node.services.IdentityService import net.corda.core.node.services.KeyManagementService +import net.corda.core.internal.telemetry.SimpleLogTelemetryComponent +import net.corda.core.internal.telemetry.TelemetryComponent +import net.corda.core.internal.telemetry.OpenTelemetryComponent +import net.corda.core.internal.telemetry.TelemetryServiceImpl +import net.corda.core.node.services.TelemetryService import net.corda.core.node.services.TransactionVerifierService import net.corda.core.node.services.diagnostics.DiagnosticsService import net.corda.core.schemas.MappedSchema @@ -250,6 +255,15 @@ abstract class AbstractNode(val configuration: NodeConfiguration, NotaryLoader(it, versionInfo) } val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop(false) + val telemetryService: TelemetryServiceImpl = TelemetryServiceImpl().also { + val openTelemetryComponent = OpenTelemetryComponent(configuration.myLegalName.toString(), configuration.telemetry.spanStartEndEventsEnabled) + if (configuration.telemetry.openTelemetryEnabled && openTelemetryComponent.isEnabled()) { + it.addTelemetryComponent(openTelemetryComponent) + } + if (configuration.telemetry.simpleLogTelemetryEnabled) { + it.addTelemetryComponent(SimpleLogTelemetryComponent()) + } + }.tokenize() val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize() val identityService = PersistentIdentityService(cacheFactory).tokenize() val database: CordaPersistence = createCordaPersistence( @@ -337,6 +351,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, private val schedulerService = makeNodeSchedulerService() private val cordappServices = MutableClassToInstanceMap.create() + private val cordappTelemetryComponents = MutableClassToInstanceMap.create() private val shutdownExecutor = Executors.newSingleThreadExecutor() protected abstract val transactionVerifierWorkerCount: Int @@ -629,6 +644,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with // the identity key. But the infrastructure to make that easy isn't here yet. keyManagementService.start(keyStoreHandler.signingKeys.map { it.key to it.alias }) + installTelemetryComponents() installCordaServices() notaryService = maybeStartNotaryService(keyStoreHandler.notaryIdentity) contractUpgradeService.start() @@ -972,6 +988,62 @@ abstract class AbstractNode(val configuration: NodeConfiguration, return service } + private class TelemetryComponentInstantiationException(cause: Throwable?) : CordaException("Service Instantiation Error", cause) + + @Suppress("ThrowsCount", "ComplexMethod", "NestedBlockDepth") + private fun installTelemetryComponents() { + val loadedTelemetryComponents: List> = cordappLoader.cordapps.flatMap { it.telemetryComponents }.filterNot { + it.name == OpenTelemetryComponent::class.java.name || + it.name == SimpleLogTelemetryComponent::class.java.name } + + // This sets the Cordapp classloader on the contextClassLoader of the current thread, prior to initializing telemetry components + // Needed because of bug CORDA-2653 - some telemetry components can utilise third-party libraries that require access to + // the Thread context class loader. (Same as installCordaServices). + val oldContextClassLoader: ClassLoader? = Thread.currentThread().contextClassLoader + try { + Thread.currentThread().contextClassLoader = cordappLoader.appClassLoader + + loadedTelemetryComponents.forEach { + try { + installTelemetryComponent(it) + } catch (e: NoSuchMethodException) { + log.error("Missing no arg ctor for ${it.name}") + throw e + } catch (e: TelemetryComponentInstantiationException) { + if (e.cause != null) { + log.error("Corda telemetry component ${it.name} failed to instantiate. Reason was: ${e.cause?.rootMessage}", e.cause) + } else { + log.error("Corda telemetry component ${it.name} failed to instantiate", e) + } + throw e + } catch (e: Exception) { + log.error("Unable to install Corda telemetry component ${it.name}", e) + throw e + } + } + } finally { + Thread.currentThread().contextClassLoader = oldContextClassLoader + } + } + + private fun installTelemetryComponent(telemetryComponentClass: Class) { + val telemetryComponent = try { + val extendedTelemetryComponentConstructor = telemetryComponentClass.getDeclaredConstructor().apply { isAccessible = true } + val telemetryComponent = extendedTelemetryComponentConstructor.newInstance() + telemetryComponent + } catch (e: InvocationTargetException) { + throw TelemetryComponentInstantiationException(e.cause) + } + cordappTelemetryComponents.putInstance(telemetryComponentClass, telemetryComponent) + if (telemetryComponent.isEnabled()) { + telemetryService.addTelemetryComponent(telemetryComponent) + log.info("Installed ${telemetryComponentClass.name} Telemetry component") + } + else { + log.info("${telemetryComponentClass.name} not enabled so not installing") + } + } + private fun registerCordappFlows() { cordappLoader.cordapps.forEach { cordapp -> cordapp.initiatedFlows.groupBy { it.requireAnnotation().value.java }.forEach { initiator, responders -> @@ -1065,7 +1137,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because // the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with // the identity key. But the infrastructure to make that easy isn't here yet. - return BasicHSMKeyManagementService(cacheFactory, identityService, database, cryptoService) + return BasicHSMKeyManagementService(cacheFactory, identityService, database, cryptoService, telemetryService) } open fun stop() { @@ -1146,6 +1218,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, override val diagnosticsService: DiagnosticsService get() = this@AbstractNode.diagnosticsService override val externalOperationExecutor: ExecutorService get() = this@AbstractNode.externalOperationExecutor override val notaryService: NotaryService? get() = this@AbstractNode.notaryService + override val telemetryService: TelemetryService get() = this@AbstractNode.telemetryService private lateinit var _myInfo: NodeInfo override val myInfo: NodeInfo get() = _myInfo @@ -1167,6 +1240,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration, ?: throw IllegalArgumentException("Corda service ${type.name} does not exist") } + override fun cordaTelemetryComponent(type: Class): T { + return cordappTelemetryComponents.getInstance(type) + ?: throw IllegalArgumentException("Corda telemetry component ${type.name} does not exist") + } + override fun getFlowFactory(initiatingFlowClass: Class>): InitiatedFlowFactory<*>? { return flowManager.getFlowFactoryForInitiatingFlow(initiatingFlowClass) } diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt index ef008b252c..ce64bd28f5 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/JarScanningCordappLoader.kt @@ -17,6 +17,7 @@ import net.corda.core.internal.cordapp.get import net.corda.core.internal.notary.NotaryService import net.corda.core.internal.notary.SinglePartyNotaryService import net.corda.core.node.services.CordaService +import net.corda.core.internal.telemetry.TelemetryComponent import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.SerializationCustomSerializer @@ -184,6 +185,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths: findServiceFlows(this), findSchedulableFlows(this), findServices(this), + findTelemetryComponents(this), findWhitelists(url), findSerializers(this), findCheckpointSerializers(this), @@ -281,6 +283,10 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths: return scanResult.getClassesWithAnnotation(SerializeAsToken::class, CordaService::class) } + private fun findTelemetryComponents(scanResult: RestrictedScanResult): List> { + return scanResult.getClassesImplementing(TelemetryComponent::class) + } + private fun findInitiatedFlows(scanResult: RestrictedScanResult): List>> { return scanResult.getClassesWithAnnotation(FlowLogic::class, InitiatedBy::class) } @@ -414,6 +420,15 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths: .map { it.kotlin.objectOrNewInstance() } } + fun getClassesImplementing(type: KClass): List> { + return scanResult + .getClassesImplementing(type.java.name) + .filter { it.name.startsWith(qualifiedNamePrefix) } + .mapNotNull { + loadClass(it.name, type) } + .filterNot { it.isAbstractClass } + } + fun getClassesWithAnnotation(type: KClass, annotation: KClass): List> { return scanResult .getClassesWithAnnotation(annotation.java.name) diff --git a/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt b/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt index 72c9b0a90a..08b7adb372 100644 --- a/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt +++ b/node/src/main/kotlin/net/corda/node/internal/cordapp/VirtualCordapps.kt @@ -30,6 +30,7 @@ internal object VirtualCordapp { serviceFlows = listOf(), schedulableFlows = listOf(), services = listOf(), + telemetryComponents = listOf(), serializationWhitelists = listOf(), serializationCustomSerializers = listOf(), checkpointCustomSerializers = listOf(), @@ -54,6 +55,7 @@ internal object VirtualCordapp { serviceFlows = listOf(), schedulableFlows = listOf(), services = listOf(), + telemetryComponents = listOf(), serializationWhitelists = listOf(), serializationCustomSerializers = listOf(), checkpointCustomSerializers = listOf(), @@ -79,6 +81,7 @@ internal object VirtualCordapp { serviceFlows = listOf(), schedulableFlows = listOf(), services = listOf(), + telemetryComponents = listOf(), serializationWhitelists = listOf(), serializationCustomSerializers = listOf(), checkpointCustomSerializers = listOf(), @@ -103,6 +106,7 @@ internal object VirtualCordapp { serviceFlows = listOf(), schedulableFlows = listOf(), services = listOf(), + telemetryComponents = listOf(), serializationWhitelists = listOf(), serializationCustomSerializers = listOf(), checkpointCustomSerializers = listOf(), diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 21ed0db28b..412a0a8833 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -43,6 +43,7 @@ interface NodeConfiguration : ConfigurationWithOptionsContainer { val certificateChainCheckPolicies: List val verifierType: VerifierType val flowTimeout: FlowTimeoutConfiguration + val telemetry: TelemetryConfiguration val notary: NotaryConfig? val additionalNodeInfoPollingFrequencyMsec: Long val p2pAddress: NetworkHostAndPort @@ -220,6 +221,12 @@ data class FlowTimeoutConfiguration( val backoffBase: Double ) +data class TelemetryConfiguration( + val openTelemetryEnabled: Boolean, + val simpleLogTelemetryEnabled: Boolean, + val spanStartEndEventsEnabled: Boolean +) + internal typealias Valid = Validated fun Config.parseAsNodeConfiguration(options: Configuration.Options = Configuration.Options(strict = true)): Valid = V1NodeConfigurationSpec.parse(this, options) diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt index c6efe102ea..d3f9f3b9d7 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt @@ -42,6 +42,7 @@ data class NodeConfigurationImpl( override val security: SecurityConfiguration? = Defaults.security, override val verifierType: VerifierType, override val flowTimeout: FlowTimeoutConfiguration, + override val telemetry: TelemetryConfiguration = Defaults.telemetry, override val p2pAddress: NetworkHostAndPort, override val additionalP2PAddresses: List = Defaults.additionalP2PAddresses, private val rpcAddress: NetworkHostAndPort? = Defaults.rpcAddress, @@ -133,6 +134,7 @@ data class NodeConfigurationImpl( fun database(devMode: Boolean) = DatabaseConfig( exportHibernateJMXStatistics = devMode ) + val telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true) } companion object { diff --git a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt index 79a9c81dea..f6f906c9be 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt @@ -30,6 +30,7 @@ import net.corda.node.services.config.NotaryConfig import net.corda.node.services.config.PasswordEncryption import net.corda.node.services.config.SecurityConfiguration import net.corda.node.services.config.SecurityConfiguration.AuthService.Companion.defaultAuthServiceId +import net.corda.node.services.config.TelemetryConfiguration import net.corda.node.services.config.Valid import net.corda.node.services.config.schema.parsers.attempt import net.corda.node.services.config.schema.parsers.badValue @@ -224,6 +225,17 @@ internal object FlowTimeoutConfigurationSpec : Configuration.Specification("TelemetryConfiguration") { + private val openTelemetryEnabled by boolean() + private val simpleLogTelemetryEnabled by boolean() + private val spanStartEndEventsEnabled by boolean() + + override fun parseValid(configuration: Config, options: Configuration.Options): Valid { + val config = configuration.withOptions(options) + return valid(TelemetryConfiguration(config[openTelemetryEnabled], config[simpleLogTelemetryEnabled], config[spanStartEndEventsEnabled])) + } +} + internal object NotaryConfigSpec : Configuration.Specification("NotaryConfig") { private val validating by boolean() private val serviceLegalName by string().mapValid(::toCordaX500Name).optional() diff --git a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt index 3808597620..023a1e66df 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt @@ -25,6 +25,7 @@ internal object V1NodeConfigurationSpec : Configuration.Specification { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt index 10659b948e..272ea50abf 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt @@ -4,6 +4,7 @@ import net.corda.core.flows.Destination import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.telemetry.SerializedTelemetry import net.corda.core.serialization.SerializedBytes import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker @@ -72,7 +73,7 @@ sealed class Event { * Initiate a flow. This causes a new session object to be created and returned to the flow. Note that no actual * communication takes place at this time, only on the first send/receive operation on the session. */ - data class InitiateFlow(val destination: Destination, val wellKnownParty: Party) : Event() + data class InitiateFlow(val destination: Destination, val wellKnownParty: Party, val serializedTelemetry: SerializedTelemetry?) : Event() /** * Signal the entering into a subflow. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt index ad57528218..504faa9995 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt @@ -142,7 +142,7 @@ class FlowCreator( senderUUID: String?): Flow { // Before we construct the state machine state by freezing the FlowLogic we need to make sure that lazy properties // have access to the fiber (and thereby the service hub) - val flowStateMachineImpl = FlowStateMachineImpl(flowId, flowLogic, scheduler) + val flowStateMachineImpl = FlowStateMachineImpl(flowId, flowLogic, scheduler, serializedTelemetry = (flowStart as? FlowStart.Initiated)?.initiatingMessage?.serializedTelemetry) val resultFuture = openFuture() flowStateMachineImpl.transientValues = createTransientValues(flowId, resultFuture) flowLogic.stateMachine = flowStateMachineImpl @@ -180,7 +180,7 @@ class FlowCreator( return when(flowState) { is FlowState.Unstarted -> { val logic = deserializeFlowState(flowState.frozenFlowLogic) - FlowStateMachineImpl(runId, logic, scheduler) + FlowStateMachineImpl(runId, logic, scheduler, serializedTelemetry = null) } is FlowState.Started -> deserializeFlowState(flowState.frozenFiber) // Places calling this function is rely on it to return null if the flow cannot be created from the checkpoint. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt index 7d02a23c99..9e590547eb 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt @@ -9,6 +9,8 @@ import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.checkPayloadIs +import net.corda.core.internal.telemetry.SerializedTelemetry +import net.corda.core.internal.telemetry.telemetryServiceInternal import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.serialize @@ -18,7 +20,8 @@ import net.corda.core.utilities.UntrustworthyData class FlowSessionImpl( override val destination: Destination, private val wellKnownParty: Party, - val sourceSessionId: SessionId + val sourceSessionId: SessionId, + val serializedTelemetry: SerializedTelemetry? ) : FlowSession() { override val counterparty: Party get() = wellKnownParty @@ -30,6 +33,7 @@ class FlowSessionImpl( override fun hashCode(): Int = sourceSessionId.hashCode() private val flowStateMachine: FlowStateMachine<*> get() = Fiber.currentFiber() as FlowStateMachine<*> + private val telemetryMap = mapOf("destination" to destination.toString()) @Suspendable override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo { @@ -46,15 +50,16 @@ class FlowSessionImpl( payload: Any, maySkipCheckpoint: Boolean ): UntrustworthyData { - enforceNotPrimitive(receiveType) - val request = FlowIORequest.SendAndReceive( - sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)), - shouldRetrySend = false - ) - val responseValues: Map> = flowStateMachine.suspend(request, maySkipCheckpoint) - val responseForCurrentSession = responseValues.getValue(this) - - return responseForCurrentSession.checkPayloadIs(receiveType) + flowStateMachine.serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAndReceive", telemetryMap, flowLogic = flowStateMachine.logic) { + enforceNotPrimitive(receiveType) + val request = FlowIORequest.SendAndReceive( + sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)), + shouldRetrySend = false + ) + val responseValues: Map> = flowStateMachine.suspend(request, maySkipCheckpoint) + val responseForCurrentSession = responseValues.getValue(this) + return responseForCurrentSession.checkPayloadIs(receiveType) + } } @Suspendable @@ -62,9 +67,11 @@ class FlowSessionImpl( @Suspendable override fun receive(receiveType: Class, maySkipCheckpoint: Boolean): UntrustworthyData { - enforceNotPrimitive(receiveType) - val request = FlowIORequest.Receive(NonEmptySet.of(this)) - return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this).checkPayloadIs(receiveType) + flowStateMachine.serviceHub.telemetryServiceInternal.span("${this::class.java.name}#receive", telemetryMap, flowStateMachine.logic ) { + enforceNotPrimitive(receiveType) + val request = FlowIORequest.Receive(NonEmptySet.of(this)) + return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this).checkPayloadIs(receiveType) + } } @Suspendable @@ -72,10 +79,12 @@ class FlowSessionImpl( @Suspendable override fun send(payload: Any, maySkipCheckpoint: Boolean) { - val request = FlowIORequest.Send( - sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)) - ) - return flowStateMachine.suspend(request, maySkipCheckpoint) + flowStateMachine.serviceHub.telemetryServiceInternal.span("${this::class.java.name}#send", telemetryMap, flowStateMachine.logic) { + val request = FlowIORequest.Send( + sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)) + ) + return flowStateMachine.suspend(request, maySkipCheckpoint) + } } @Suspendable diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index db697148cd..551023cd81 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -36,6 +36,9 @@ import net.corda.core.internal.isRegularFile import net.corda.core.internal.location import net.corda.core.internal.toPath import net.corda.core.internal.uncheckedCast +import net.corda.core.internal.telemetry.ComponentTelemetryIds +import net.corda.core.internal.telemetry.SerializedTelemetry +import net.corda.core.internal.telemetry.telemetryServiceInternal import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationContext @@ -71,7 +74,8 @@ class TransientReference(@Transient val value: A) class FlowStateMachineImpl(override val id: StateMachineRunId, override val logic: FlowLogic, scheduler: FiberScheduler, - override val creationTime: Long = System.currentTimeMillis() + override val creationTime: Long = System.currentTimeMillis(), + val serializedTelemetry: SerializedTelemetry? = null ) : Fiber(id.toString(), scheduler), FlowStateMachine, FlowFiber { companion object { /** @@ -346,8 +350,14 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, // Needed because in previous versions of the finance app we used Thread.contextClassLoader to resolve services defined in cordapps. Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader - val result = logic.call() - suspend(FlowIORequest.WaitForSessionConfirmations(), maySkipCheckpoint = true) + // context.serializedTelemetry is from an rpc client, serializedTelemetry is from a peer, otherwise nothing + val serializedTelemetrySrc = context.serializedTelemetry ?: serializedTelemetry + val result = serviceHub.telemetryServiceInternal.spanForFlow(logic.javaClass.name, emptyMap(), logic, serializedTelemetrySrc) { + val ret = logic.call() + // Note suspend stores the telemetry ids back in the components from checkpoint, so must be done, before we end the span + suspend(FlowIORequest.WaitForSessionConfirmations(), maySkipCheckpoint = true) + ret + } Try.Success(result) } catch (t: Throwable) { if(t.isUnrecoverable()) { @@ -417,8 +427,11 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, isDbTransactionOpenOnExit = true ) return try { - subFlow.call() - } finally { + serviceHub.telemetryServiceInternal.span(subFlow.javaClass.name, emptyMap(), subFlow) { + subFlow.call() + } + } + finally { processEventImmediately( Event.LeaveSubFlow, isDbTransactionOpenOnEntry = true, @@ -457,10 +470,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable - override fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession { + override fun initiateFlow(destination: Destination, wellKnownParty: Party, serializedTelemetry: SerializedTelemetry?): FlowSession { require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" } val resume = processEventImmediately( - Event.InitiateFlow(destination, wellKnownParty), + Event.InitiateFlow(destination, wellKnownParty, serializedTelemetry), isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = true ) as FlowContinuation.Resume @@ -527,6 +540,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, override fun suspend(ioRequest: FlowIORequest, maySkipCheckpoint: Boolean): R { val serializationContext = TransientReference(transientValues.checkpointSerializationContext) val transaction = extractThreadLocalTransaction() + val telemetryIds = retrieveTelemetryIds() parkAndSerialize { _, _ -> setLoggingContext() logger.trace { "Suspended on $ioRequest" } @@ -563,6 +577,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } } + storeTelemetryIds(telemetryIds) transientState.reloadCheckpointAfterSuspendCount?.let { count -> if (count < transientState.checkpoint.checkpointState.numberOfSuspends) { onReloadFlowFromCheckpoint?.invoke(id) @@ -580,6 +595,16 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, )) } + private fun retrieveTelemetryIds(): ComponentTelemetryIds? { + return serviceHub.telemetryServiceInternal.getCurrentTelemetryIds() + } + + private fun storeTelemetryIds(telemetryIds: ComponentTelemetryIds?) { + telemetryIds?.let { + serviceHub.telemetryServiceInternal.setCurrentTelemetryId(it) + } + } + private fun containsIdempotentFlows(): Boolean { val subFlowStack = snapshot().checkpoint.checkpointState.subFlowStack return subFlowStack.any { IdempotentFlow::class.java.isAssignableFrom(it.flowClass) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt index 97fd21a3c0..74158f80fa 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt @@ -2,6 +2,7 @@ package net.corda.node.services.statemachine import net.corda.core.flows.FlowException import net.corda.core.flows.FlowInfo +import net.corda.core.internal.telemetry.SerializedTelemetry import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializedBytes import java.security.SecureRandom @@ -38,6 +39,7 @@ data class SessionId(val toLong: Long) { * @param flowVersion the version of the initiating flow. * @param appName the name of the cordapp defining the initiating flow, or "corda" if it's a core flow. * @param firstPayload the optional first payload. + * @param serializedTelemetry the telemetry data */ data class InitialSessionMessage( val initiatorSessionId: SessionId, @@ -45,7 +47,8 @@ data class InitialSessionMessage( val initiatorFlowClassName: String, val flowVersion: Int, val appName: String, - val firstPayload: SerializedBytes? + val firstPayload: SerializedBytes?, + val serializedTelemetry: SerializedTelemetry? ) : SessionMessage() { override fun toString() = "InitialSessionMessage(" + "initiatorSessionId=$initiatorSessionId, " + @@ -53,6 +56,7 @@ data class InitialSessionMessage( "initiatorFlowClassName=$initiatorFlowClassName, " + "appName=$appName, " + "firstPayload=${firstPayload?.javaClass}" + + "telemetryContext=$serializedTelemetry" + ")" } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 6a86ec5dbd..7d22beb30b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -863,7 +863,7 @@ internal class SingleThreadedStateMachineManager( try { val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage) val initiatedSessionId = SessionId.createRandom(secureRandom) - val senderSession = FlowSessionImpl(sender, sender, initiatedSessionId) + val senderSession = FlowSessionImpl(sender, sender, initiatedSessionId, sessionMessage.serializedTelemetry) val flowLogic = initiatedFlowFactory.createFlow(senderSession) val initiatedFlowInfo = when (initiatedFlowFactory) { is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 79bad802fe..b0c2020802 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -7,6 +7,7 @@ import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.identity.Party import net.corda.core.internal.DeclaredField import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.telemetry.SerializedTelemetry import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.contextLogger import net.corda.core.utilities.toNonEmptySet @@ -74,7 +75,7 @@ class StartedFlowTransition( return builder { // Initialise uninitialised sessions in order to receive the associated FlowInfo. Some or all sessions may // not be initialised yet. - sendInitialSessionMessagesIfNeeded(sessionIdToSession.keys) + sendInitialSessionMessagesIfNeeded(sessionIdToSession) val flowInfoMap = getFlowInfoFromSessions(sessionIdToSession) if (flowInfoMap == null) { FlowContinuation.ProcessEvents @@ -140,7 +141,7 @@ class StartedFlowTransition( sessionIdToSession[sessionId] = session } return builder { - sendToSessionsTransition(sessionIdToMessage) + sendToSessionsTransition(sessionIdToMessage, sessionIdToSession) if (isErrored()) { FlowContinuation.ProcessEvents } else { @@ -202,7 +203,7 @@ class StartedFlowTransition( sessionIdToSession[(session as FlowSessionImpl).sourceSessionId] = session } // send initialises to uninitialised sessions - sendInitialSessionMessagesIfNeeded(sessionIdToSession.keys) + sendInitialSessionMessagesIfNeeded(sessionIdToSession) try { val receivedMap = receiveFromSessionsTransition(sessionIdToSession) if (receivedMap == null) { @@ -272,11 +273,11 @@ class StartedFlowTransition( } } - private fun TransitionBuilder.sendInitialSessionMessagesIfNeeded(sourceSessions: Set) { + private fun TransitionBuilder.sendInitialSessionMessagesIfNeeded(sessionIdToSession: Map) { val checkpoint = startingState.checkpoint val newSessions = LinkedHashMap(checkpoint.checkpointState.sessions) var index = 0 - for (sourceSessionId in sourceSessions) { + for (sourceSessionId in sessionIdToSession.keys) { val sessionState = checkpoint.checkpointState.sessions[sourceSessionId] if (sessionState == null) { return freshErrorTransition(CannotFindSessionException(sourceSessionId)) @@ -284,7 +285,8 @@ class StartedFlowTransition( if (sessionState !is SessionState.Uninitiated) { continue } - val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, sessionState.additionalEntropy, null) + val telemetryData = sessionIdToSession[sourceSessionId]?.serializedTelemetry + val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, sessionState.additionalEntropy, null, telemetryData) val newSessionState = SessionState.Initiating( bufferedMessages = arrayListOf(), rejectionError = null, @@ -302,7 +304,8 @@ class StartedFlowTransition( val sessionIdToMessage = flowIORequest.sessionToMessage.mapKeys { sessionToSessionId(it.key) } - sendToSessionsTransition(sessionIdToMessage) + val sessionIdToSession = flowIORequest.sessionToMessage.map { sessionToSessionId(it.key) to it.key as FlowSessionImpl}.toMap() + sendToSessionsTransition(sessionIdToMessage, sessionIdToSession) if (isErrored()) { FlowContinuation.ProcessEvents } else { @@ -311,7 +314,8 @@ class StartedFlowTransition( } } - private fun TransitionBuilder.sendToSessionsTransition(sourceSessionIdToMessage: Map>) { + private fun TransitionBuilder.sendToSessionsTransition(sourceSessionIdToMessage: Map>, sourceSessionIdToSession: Map) { val checkpoint = startingState.checkpoint val newSessions = LinkedHashMap(checkpoint.checkpointState.sessions) var index = 0 @@ -323,7 +327,8 @@ class StartedFlowTransition( val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.map { (sourceSessionId, sessionState, message) -> val uninitiatedSessionState = sessionState as SessionState.Uninitiated val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, sessionState) - val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, uninitiatedSessionState.additionalEntropy, message) + val serializedTelemetry: SerializedTelemetry? = sourceSessionIdToSession[sourceSessionId]?.serializedTelemetry + val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, uninitiatedSessionState.additionalEntropy, message, serializedTelemetry) newSessions[sourceSessionId] = SessionState.Initiating( bufferedMessages = arrayListOf(), rejectionError = null, @@ -495,7 +500,8 @@ class StartedFlowTransition( initiatingSubFlow: SubFlow.Initiating, sourceSessionId: SessionId, additionalEntropy: Long, - payload: SerializedBytes? + payload: SerializedBytes?, + serializedTelemetry: SerializedTelemetry? ): InitialSessionMessage { return InitialSessionMessage( initiatorSessionId = sourceSessionId, @@ -504,7 +510,8 @@ class StartedFlowTransition( initiatorFlowClassName = initiatingSubFlow.classToInitiateWith.name, flowVersion = initiatingSubFlow.flowInfo.flowVersion, appName = initiatingSubFlow.flowInfo.appName, - firstPayload = payload + firstPayload = payload, + serializedTelemetry = serializedTelemetry ) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 9eba35f0db..19de1970b7 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -305,7 +305,7 @@ class TopLevelTransition( return@builder FlowContinuation.ProcessEvents } val sourceSessionId = SessionId.createRandom(context.secureRandom) - val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId) + val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId, event.serializedTelemetry) val newSessions = checkpoint.checkpointState.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong())) currentState = currentState.copy(checkpoint = checkpoint.setSessions(newSessions)) actions.add(Action.AddSessionBinding(context.id, sourceSessionId)) diff --git a/node/src/main/resources/corda-reference.conf b/node/src/main/resources/corda-reference.conf index 9dc12306e6..50f729a313 100644 --- a/node/src/main/resources/corda-reference.conf +++ b/node/src/main/resources/corda-reference.conf @@ -25,3 +25,8 @@ rpcSettings = { trustStorePassword = "trustpass" useTestClock = false verifierType = InMemory +telemetry { + openTelemetryEnabled = true, + simpleLogTelemetryEnabled = false, + spanStartEndEventsEnabled = true +} diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeH2SecurityTests.kt b/node/src/test/kotlin/net/corda/node/internal/NodeH2SecurityTests.kt index 75926df0cb..6bd7e357e9 100644 --- a/node/src/test/kotlin/net/corda/node/internal/NodeH2SecurityTests.kt +++ b/node/src/test/kotlin/net/corda/node/internal/NodeH2SecurityTests.kt @@ -4,6 +4,7 @@ import com.nhaarman.mockito_kotlin.atLeast import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.verify import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.identity.CordaX500Name import net.corda.core.serialization.SerializeAsToken import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.VersionInfo @@ -134,6 +135,8 @@ class NodeH2SecurityTests { whenever(config.dataSourceProperties).thenReturn(hikaryProperties) whenever(config.baseDirectory).thenReturn(mock()) whenever(config.effectiveH2Settings).thenAnswer { NodeH2Settings(address) } + whenever(config.telemetry).thenReturn(mock()) + whenever(config.myLegalName).thenReturn(CordaX500Name(null, "client-${address.toString()}", "Corda", "London", null, "GB")) } private inner class MockNode: Node(config, VersionInfo.UNKNOWN, false) { diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt index da4a5c423c..427dbf5147 100644 --- a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt +++ b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt @@ -196,6 +196,7 @@ class NodeTest { rpcUsers = emptyList(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(timeout = Duration.ZERO, backoffBase = 1.0, maxRestartCount = 1), + telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true), rpcSettings = NodeRpcSettings(address = fakeAddress, adminAddress = null, ssl = null), messagingServerAddress = null, notary = null, diff --git a/node/src/test/kotlin/net/corda/node/internal/telemetry/TelemetryTests.kt b/node/src/test/kotlin/net/corda/node/internal/telemetry/TelemetryTests.kt new file mode 100644 index 0000000000..862b7d7543 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/telemetry/TelemetryTests.kt @@ -0,0 +1,328 @@ +package net.corda.node.internal.telemetry + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.context.InvocationContext +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.internal.concurrent.transpose +import net.corda.core.messaging.startFlow +import net.corda.core.internal.telemetry.EndSpanEvent +import net.corda.core.internal.telemetry.EndSpanForFlowEvent +import net.corda.core.internal.telemetry.RecordExceptionEvent +import net.corda.core.internal.telemetry.SetStatusEvent +import net.corda.core.internal.telemetry.StartSpanEvent +import net.corda.core.internal.telemetry.StartSpanForFlowEvent +import net.corda.core.internal.telemetry.TelemetryComponent +import net.corda.core.internal.telemetry.TelemetryDataItem +import net.corda.core.internal.telemetry.TelemetryEvent +import net.corda.core.internal.telemetry.TelemetryStatusCode +import net.corda.core.internal.telemetry.telemetryServiceInternal +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.ProgressTracker +import net.corda.core.utilities.getOrThrow +import net.corda.finance.DOLLARS +import net.corda.finance.flows.CashIssueAndPaymentFlow +import net.corda.finance.flows.CashPaymentReceiverFlow +import net.corda.node.services.Permissions +import net.corda.node.services.config.NodeConfiguration +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.InProcess +import net.corda.testing.driver.driver +import net.corda.testing.node.User +import net.corda.testing.node.internal.FINANCE_CORDAPPS +import net.corda.testing.node.internal.enclosedCordapp +import org.junit.Test +import java.time.Duration +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class TelemetryTests { + + private companion object { + val cordapps = listOf(enclosedCordapp()) + } + + @Suspendable + data class TestTelemetryItem(val name: String, val randomUUID: UUID): TelemetryDataItem + + + @Test(timeout = 300_000) + fun `test passing a block with suspend to span func`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + val alice = startNode().getOrThrow() + val handle = alice.rpc.startFlow(TelemetryTests::FlowWithSpanCallAndSleep) + handle.returnValue.getOrThrow() + // assertion for test is in Component function setCurrentTelemetryId below. + } + } + + @Test(timeout = 300_000) + fun `run flow with a suspend then check thread locals for fibre are the same`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + val alice = startNode().getOrThrow() + val handle = alice.rpc.startFlow(TelemetryTests::FlowWithSleep) + handle.returnValue.getOrThrow() + // assertion for test is in Component function setCurrentTelemetryId below. + } + } + + @Test(timeout = 300_000) + fun `can find 2 distinct telemetry components on node`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + val alice = startNode().getOrThrow() + val telemetryComponentAlice: TestCordaTelemetryComponent = (alice as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java) + val telemetryComponent2Alice: TestCordaTelemetryComponent2 = alice.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java) + assertEquals(TestCordaTelemetryComponent::class.java, telemetryComponentAlice::class.java) + assertEquals(TestCordaTelemetryComponent2::class.java, telemetryComponent2Alice::class.java) + } + } + + @Test(timeout = 300_000) + fun `telemetryId is restored after flow is reloaded from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() { + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) { + val alice = startNode( + providedName = ALICE_NAME, + customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)).getOrThrow() + val telemetryComponentAlice: TestCordaTelemetryComponent = (alice as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java) + val telemetryComponent2Alice: TestCordaTelemetryComponent2 = alice.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java) + telemetryComponentAlice.restoredFromCheckpoint = true + telemetryComponent2Alice.restoredFromCheckpoint = true + val handle = alice.rpc.startFlow(TelemetryTests::FlowWithSleep) + handle.returnValue.getOrThrow() + // assertion for test is in Component function setCurrentTelemetryId below. + } + } + + @Test(timeout=300_000) + fun `run flow and check telemetry components invoked`() { + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS + cordapps, startNodesInProcess = true)) { + val (nodeA, nodeB) = listOf(startNode(rpcUsers = listOf(user)), + startNode()) + .transpose() + .getOrThrow() + val amount = 1.DOLLARS + val ref = OpaqueBytes.of(0) + val recipient = nodeB.nodeInfo.legalIdentities[0] + nodeA.rpc.startFlow(::CashIssueAndPaymentFlow, amount, ref, recipient, false, defaultNotaryIdentity).returnValue.getOrThrow() + val flowName = CashIssueAndPaymentFlow::class.java.name + val receiverFlowName = CashPaymentReceiverFlow::class.java.name + + checkTelemetryComponentCounts(flowName, (nodeA as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java)) + checkTelemetryComponentCounts(flowName, nodeA.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java)) + + val nodeBTelemetryComponent = (nodeB as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java) + val nodeBTelemetryComponent2 = nodeB.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java) + + assertTrue(nodeBTelemetryComponent.endSpanForFlowLatch.await(1, TimeUnit.MINUTES), "Timed out waiting for endSpanForFlow operation on node B") + assertTrue(nodeBTelemetryComponent2.endSpanForFlowLatch.await(1, TimeUnit.MINUTES), "Timed out waiting for endSpanForFlow operation on node B") + + checkTelemetryComponentCounts(receiverFlowName, nodeB.services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java)) + checkTelemetryComponentCounts(receiverFlowName, nodeB.services.cordaTelemetryComponent(TestCordaTelemetryComponent2::class.java)) + } + } + + private fun checkTelemetryComponentCounts(flowName: String, telemetryComponent: TelemetryComponentCounts) { + assertEquals(1, telemetryComponent.startSpanForFlowEvent) + assertEquals(1, telemetryComponent.endSpanForFlowEvent) + assertTrue(telemetryComponent.startSpanEvent > 0) + assertEquals(telemetryComponent.startSpanEvent, telemetryComponent.endSpanEvent) + assertEquals(flowName, telemetryComponent.startSpanForFlowName) + } + + @Test(timeout=300_000) + fun `telemetry data is sent from nodeA to nodeB as part of a flow`() { + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS + cordapps, startNodesInProcess = true)) { + val (nodeA, nodeB) = listOf(startNode(rpcUsers = listOf(user)), + startNode()) + .transpose() + .getOrThrow() + val amount = 1.DOLLARS + val ref = OpaqueBytes.of(0) + val recipient = nodeB.nodeInfo.legalIdentities[0] + nodeA.rpc.startFlow(::CashIssueAndPaymentFlow, amount, ref, recipient, false, defaultNotaryIdentity).returnValue.getOrThrow() + + val telemetryComponentNodeA = (nodeA as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java) + val telemetryComponentNodeB = (nodeB as InProcess).services.cordaTelemetryComponent(TestCordaTelemetryComponent::class.java) + + assertNull(telemetryComponentNodeA.retrievedTelemetryDataItem) + assertEquals(telemetryComponentNodeA.dummyTelemetryItem, telemetryComponentNodeB.retrievedTelemetryDataItem) + } + } + + class TestCordaTelemetryComponent: TelemetryComponentCounts() { + override fun name(): String { + return "TestCordaTelemetryComponent" + } + } + + class TestCordaTelemetryComponent2: TelemetryComponentCounts() { + override fun name(): String { + return "TestCordaTelemetryComponent2" + } + } + + abstract class TelemetryComponentCounts: TelemetryComponent { + var restoredFromCheckpoint = false + val endSpanForFlowLatch = CountDownLatch(1) + var retrievedTelemetryDataItem: TelemetryDataItem? = null + var startSpanForFlowEvent = 0 + var endSpanForFlowEvent = 0 + var startSpanEvent = 0 + var endSpanEvent = 0 + var setStatusEvent = 0 + var recordExceptionEvent = 0 + var startSpanForFlowName: String? = null + var startSpanName: String? = null + var spanTelemetryIds = ArrayDeque() + val currentUUID = ThreadLocal() + var previousTelemetryId: UUID? = null + val dummyTelemetryItem = TestTelemetryItem("this is a dummy string", UUID.randomUUID()) + + override fun getCurrentSpanId(): String { + TODO("Not yet implemented") + } + + override fun getCurrentTraceId(): String { + TODO("Not yet implemented") + } + + override fun getCurrentBaggage(): Map { + TODO("Not yet implemented") + } + + override fun isEnabled(): Boolean { + return true + } + + override fun setCurrentTelemetryId(id: UUID) { + if (!restoredFromCheckpoint) { + // If we have not been restored from a checkpoint then the threadlocal should be the same + // as it was, so check this. + // If we have been restored from a checkpoint then the threadlocal will be null. So skip this step if we have. + assertEquals(currentUUID.get(), id) + } + // Here threadlocal will be null as we have been check pointed. + // Check the uuid passed to us is the same as the one we returned earlier. + assertEquals(previousTelemetryId, id) + currentUUID.set(id) + } + + override fun getCurrentTelemetryId(): UUID { + val uuid = currentUUID.get() ?: UUID(0, 0) + // Store the uuid we return so can check for same value after the checkpoint + previousTelemetryId = uuid + return uuid + } + + override fun onTelemetryEvent(event: TelemetryEvent) { + when (event) { + is StartSpanForFlowEvent -> { + startSpanForFlowEvent++ + startSpanForFlow(event.name, event.attributes, event.telemetryId, event.flowLogic, event.telemetryDataItem) + } + is EndSpanForFlowEvent -> { + endSpanForFlowEvent++ + endSpanForFlow(event.telemetryId) + endSpanForFlowLatch.countDown() + } + is StartSpanEvent -> { + startSpanEvent++ + startSpan(event.name, event.attributes, event.telemetryId, event.flowLogic) + } + is EndSpanEvent -> { + endSpanEvent++ + endSpan(event.telemetryId) + } + is SetStatusEvent -> { + setStatusEvent++ + setStatus(event.telemetryId, event.telemetryStatusCode, event.message) + } + is RecordExceptionEvent -> { + recordExceptionEvent++ + recordException(event.telemetryId, event.throwable) + } + } + } + + @Suppress("UNUSED_PARAMETER") + fun startSpanForFlow(name: String, attributes: Map, telemetryId: UUID, flowLogic: FlowLogic<*>?, telemetryDataItem: TelemetryDataItem?) { + retrievedTelemetryDataItem = telemetryDataItem + startSpanForFlowName = name + spanTelemetryIds.push(telemetryId) + currentUUID.set(telemetryId) + } + + fun endSpanForFlow(telemetryId: UUID) { + assertEquals(spanTelemetryIds.pop(), telemetryId) + val newUUID = spanTelemetryIds.peek() ?: UUID(0, 0) + currentUUID.set(newUUID) + } + + @Suppress("UNUSED_PARAMETER") + fun startSpan(name: String, attributes: Map, telemetryId: UUID, flowLogic: FlowLogic<*>?) { + startSpanName = name + spanTelemetryIds.push(telemetryId) + currentUUID.set(telemetryId) + } + + fun endSpan(telemetryId: UUID) { + assertEquals(spanTelemetryIds.pop(), telemetryId) + val newUUID = spanTelemetryIds.peek() ?: UUID(0, 0) + currentUUID.set(newUUID) + } + + override fun getCurrentTelemetryData(): TelemetryDataItem { + return dummyTelemetryItem + } + + @Suppress("UNUSED_PARAMETER") + fun setStatus(telemetryId: UUID, statusCode: TelemetryStatusCode, message: String) { + } + + @Suppress("UNUSED_PARAMETER") + fun recordException(telemetryId: UUID, throwable: Throwable) { + } + } + + + @StartableByRPC + class FlowWithSleep : FlowLogic() { + companion object { + object TESTSTEP : ProgressTracker.Step("Custom progress step") + } + override val progressTracker: ProgressTracker = ProgressTracker(TESTSTEP) + + @Suspendable + override fun call(): InvocationContext { + // Do a sleep which invokes a suspend + sleep(Duration.ofSeconds(1)) + progressTracker.currentStep = TESTSTEP + return stateMachine.context + } + } + + @StartableByRPC + class FlowWithSpanCallAndSleep : FlowLogic() { + companion object { + object TESTSTEP : ProgressTracker.Step("Custom progress step") + } + override val progressTracker: ProgressTracker = ProgressTracker(TESTSTEP) + + @Suspendable + override fun call(): InvocationContext { + return serviceHub.telemetryServiceInternal.span(this::class.java.name, emptyMap(), this) { + // Do a sleep which invokes a suspend + sleep(Duration.ofSeconds(1)) + progressTracker.currentStep = TESTSTEP + stateMachine.context + } + } + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index 273e898063..d0314d5782 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -397,6 +397,7 @@ class NodeConfigurationImplTest { p2pAddress = NetworkHostAndPort("localhost", 0), messagingServerAddress = null, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0), + telemetry = TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true), notary = null, devMode = true, noLocalShell = false, diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 8aabbdbe82..def6530e5e 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -571,7 +571,7 @@ class FlowFrameworkTests { @Test(timeout=300_000) fun `session init with unknown class is sent to the flow hospital, from where we then drop it`() { - aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob) + aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null, null), bob) mockNet.runNetwork() assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot @@ -587,7 +587,7 @@ class FlowFrameworkTests { @Test(timeout=300_000) fun `non-flow class in session init`() { - aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, String::class.java.name, 1, "", null), bob) + aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, String::class.java.name, 1, "", null, null), bob) mockNet.runNetwork() assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage @@ -1103,7 +1103,7 @@ internal data class SessionTransfer(val from: Int, val message: SessionMessage, } internal fun sessionInit(clientFlowClass: KClass>, flowVersion: Int = 1, payload: Any? = null): InitialSessionMessage { - return InitialSessionMessage(SessionId(0), 0, clientFlowClass.java.name, flowVersion, "", payload?.serialize()) + return InitialSessionMessage(SessionId(0), 0, clientFlowClass.java.name, flowVersion, "", payload?.serialize(), serializedTelemetry = null) } internal fun sessionData(payload: Any) = ExistingSessionMessage(SessionId(0), DataSessionMessage(payload.serialize())) diff --git a/opentelemetry-driver/build.gradle b/opentelemetry-driver/build.gradle new file mode 100644 index 0000000000..32b848f7ee --- /dev/null +++ b/opentelemetry-driver/build.gradle @@ -0,0 +1,48 @@ +import static org.gradle.api.JavaVersion.VERSION_1_8 + +plugins { + // Apply the org.jetbrains.kotlin.jvm Plugin to add support for Kotlin. + id 'org.jetbrains.kotlin.jvm' // version '1.6.21' + // Apply the java-library plugin for API and implementation separation. + id 'java-library' + id 'com.github.johnrengelman.shadow' // version '7.1.2' + id 'net.corda.plugins.publish-utils' +} + +description 'OpenTelemetry Driver' + +// This driver is required by core, so must always be 1.8. See core build.gradle. +targetCompatibility = VERSION_1_8 + +dependencies { + // Use the Kotlin JDK 8 standard library. + implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:4.9.1' + implementation "io.opentelemetry:opentelemetry-api:$open_telemetry_version" + + implementation platform("io.opentelemetry:opentelemetry-bom:$open_telemetry_version") + implementation "io.opentelemetry:opentelemetry-sdk" + implementation 'io.opentelemetry:opentelemetry-exporter-otlp' + implementation "io.opentelemetry:opentelemetry-semconv:$open_telemetry_sem_conv_version" +} + +shadowJar { + archiveClassifier = jdkClassifier + relocate 'kotlin', 'privatekotlin' + exclude "**/Log4j2Plugins.dat" + zip64 true +} + +artifacts { + archives shadowJar + publish shadowJar +} + +jar { + enabled = false +} + +publish { + disableDefaultJar = true + name 'corda-opentelemetry-driver' +} + diff --git a/opentelemetry-driver/src/main/kotlin/net/corda/opentelemetrydriver/OpenTelemetryDriver.kt b/opentelemetry-driver/src/main/kotlin/net/corda/opentelemetrydriver/OpenTelemetryDriver.kt new file mode 100644 index 0000000000..7c52764a17 --- /dev/null +++ b/opentelemetry-driver/src/main/kotlin/net/corda/opentelemetrydriver/OpenTelemetryDriver.kt @@ -0,0 +1,49 @@ +package net.corda.opentelemetrydriver + +import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator +import io.opentelemetry.context.propagation.ContextPropagators +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter +import io.opentelemetry.sdk.OpenTelemetrySdk +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.sdk.trace.SdkTracerProvider +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes + +object OpenTelemetryDriver { + private fun buildAndGetOpenTelemetry(serviceName: String): OpenTelemetry { + val resource: Resource = Resource.getDefault() + .merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, serviceName))) + + val sdkTracerProvider: SdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(OtlpGrpcSpanExporter.builder().build()).build()) + .setResource(resource) + .build() + + val sdkMeterProvider: SdkMeterProvider = SdkMeterProvider.builder() + .registerMetricReader(PeriodicMetricReader.builder(OtlpGrpcMetricExporter.builder().build()).build()) + .setResource(resource) + .build() + + return OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .setMeterProvider(sdkMeterProvider) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .buildAndRegisterGlobal() + } + + @Volatile + private var OPENTELEMETRY_INSTANCE: OpenTelemetry? = null + + fun getOpenTelemetry(serviceName: String): OpenTelemetry { + return OPENTELEMETRY_INSTANCE ?: synchronized(this) { + OPENTELEMETRY_INSTANCE ?: buildAndGetOpenTelemetry(serviceName).also { + OPENTELEMETRY_INSTANCE = it + } + } + } +} diff --git a/settings.gradle b/settings.gradle index 18cc562f45..e89f1ac33b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,6 +28,7 @@ pluginManagement { // The project is named 'corda-project' and not 'corda' because if this is named the same as the // output JAR from the capsule then the buildCordaJAR task goes into an infinite loop. rootProject.name = 'corda-project' +include 'opentelemetry-driver' include 'confidential-identities' include 'finance:contracts' include 'finance:workflows' diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index efd5813736..05c898a0ba 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -20,6 +20,8 @@ import net.corda.core.messaging.StateMachineTransactionMapping import net.corda.core.node.* import net.corda.core.node.services.* import net.corda.core.node.services.diagnostics.DiagnosticsService +import net.corda.core.internal.telemetry.TelemetryComponent +import net.corda.core.internal.telemetry.TelemetryServiceImpl import net.corda.core.node.services.vault.CordaTransactionSupport import net.corda.core.serialization.SerializeAsToken import net.corda.core.transactions.SignedTransaction @@ -213,7 +215,7 @@ open class MockServices private constructor( TestingNamedCacheFactory(), identityService, persistence, - MockCryptoService(aliasKeyMap) + MockCryptoService(aliasKeyMap), TelemetryServiceImpl() ) persistence.transaction { keyManagementService.start(aliasedMoreKeys + aliasedIdentityKey) } @@ -438,6 +440,7 @@ open class MockServices private constructor( override val vaultService: VaultService get() = throw UnsupportedOperationException() override val contractUpgradeService: ContractUpgradeService get() = throw UnsupportedOperationException() override val networkMapCache: NetworkMapCache get() = throw UnsupportedOperationException() + override val telemetryService: TelemetryServiceImpl get() = throw java.lang.UnsupportedOperationException() override val clock: TestClock get() = TestClock(Clock.systemUTC()) override val myInfo: NodeInfo get() { @@ -467,12 +470,19 @@ open class MockServices private constructor( /** A map of available [CordaService] implementations */ internal val cordappServices: MutableClassToInstanceMap = MutableClassToInstanceMap.create() + internal val cordappTelemetryComponents: MutableClassToInstanceMap = MutableClassToInstanceMap.create() + override fun cordaService(type: Class): T { require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" } return cordappServices.getInstance(type) ?: throw IllegalArgumentException("Corda service ${type.name} does not exist") } + override fun cordaTelemetryComponent(type: Class): T { + return cordappTelemetryComponents.getInstance(type) + ?: throw IllegalArgumentException("Corda telemetry component ${type.name} does not exist") + } + override fun jdbcSession(): Connection = throw UnsupportedOperationException() override fun withEntityManager(block: EntityManager.() -> T): T { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index f21b57c68f..c5a12bd83a 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -27,6 +27,7 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo import net.corda.core.node.NotaryInfo +import net.corda.core.internal.telemetry.TelemetryServiceImpl import net.corda.core.serialization.SerializationWhitelist import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger @@ -46,6 +47,7 @@ import net.corda.node.services.config.FlowTimeoutConfiguration import net.corda.node.services.config.NetworkParameterAcceptanceSettings import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NotaryConfig +import net.corda.node.services.config.TelemetryConfiguration import net.corda.node.services.config.VerifierType import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.BasicHSMKeyManagementService @@ -409,7 +411,7 @@ open class InternalMockNetwork(cordappPackages: List = emptyList(), } override fun makeKeyManagementService(identityService: PersistentIdentityService): KeyManagementServiceInternal { - return BasicHSMKeyManagementService(cacheFactory, identityService, database, cryptoService) + return BasicHSMKeyManagementService(cacheFactory, identityService, database, cryptoService, TelemetryServiceImpl()) } override fun startShell() { @@ -490,6 +492,7 @@ open class InternalMockNetwork(cordappPackages: List = emptyList(), doReturn(emptyList()).whenever(it).extraNetworkMapKeys doReturn(listOf(baseDirectory / "cordapps")).whenever(it).cordappDirectories doReturn(emptyList()).whenever(it).quasarExcludePackages + doReturn(TelemetryConfiguration(openTelemetryEnabled = true, simpleLogTelemetryEnabled = false, spanStartEndEventsEnabled = true)).whenever(it).telemetry parameters.configOverrides(it) }