diff --git a/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FiberMonitor.kt b/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FiberMonitor.kt index 46889f41fa..e0f1aedaca 100644 --- a/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FiberMonitor.kt +++ b/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FiberMonitor.kt @@ -2,19 +2,14 @@ package net.corda.flowhook import co.paralleluniverse.fibers.Fiber import net.corda.core.internal.uncheckedCast -import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.loggerFor import net.corda.nodeapi.internal.persistence.DatabaseTransaction import java.sql.Connection import java.time.Instant -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import kotlin.concurrent.thread - -/** - * This is a debugging helper class that dumps the map of Fiber->DB connection, or more precisely, the - * Fiber->(DB tx -> DB connection) map, as there may be multiple transactions per fiber. - */ data class MonitorEvent(val type: MonitorEventType, val keys: List, val extra: Any? = null) @@ -34,47 +29,62 @@ enum class MonitorEventType { FiberResumed, FiberEnded, - ExecuteTransition + SmExecuteTransition, + SmScheduleEvent, + + NettyThreadLocalMapCreated, + + SetThreadLocals, + SetInheritableThreadLocals, + GetThreadLocals, + GetInheritableThreadLocals } +/** + * This is a monitor processing events coming from [FlowHookContainer]. It just appends them to a log that allows + * analysis of the events. + * + * Suggested way of debugging using this class and IntelliJ: + * 1. Hook the function calls you're interested in using [FlowHookContainer]. + * 2. Add an associated event type in [MonitorEventType]. + * 3. Call [newEvent] in the hook. Provide some keys to allow analysis. Example keys are the current fiber ID or a + * specific DB transaction. You can also provide additional info about the event using [MonitorEvent.extra]. + * 4. Run your test and break on [newEvent] or [inspect]. + * 5. Inspect the [correlator] in the debugger. E.g. you can add a watch for [MonitorEventCorrelator.getByType]. + * You can search for specific objects by using filter expressions in the debugger. + */ object FiberMonitor { - private val log = contextLogger() - private val jobQueue = LinkedBlockingQueue() + private val log = loggerFor() private val started = AtomicBoolean(false) - private var trackerThread: Thread? = null + private var executor: ScheduledExecutorService? = null val correlator = MonitorEventCorrelator() - sealed class Job { - data class NewEvent(val event: FullMonitorEvent) : Job() - object Finish : Job() - } - fun newEvent(event: MonitorEvent) { - if (trackerThread != null) { - jobQueue.add(Job.NewEvent(FullMonitorEvent(Instant.now(), Exception().stackTrace.toList(), event))) + if (executor != null) { + val fullEvent = FullMonitorEvent(Instant.now(), Exception().stackTrace.toList(), event) + executor!!.execute { + processEvent(fullEvent) + } } } fun start() { if (started.compareAndSet(false, true)) { - require(trackerThread == null) - trackerThread = thread(name = "Fiber monitor", isDaemon = true) { - while (true) { - val job = jobQueue.poll(1, TimeUnit.SECONDS) - when (job) { - is Job.NewEvent -> processEvent(job) - Job.Finish -> return@thread - } - } - } + require(executor == null) + executor = Executors.newSingleThreadScheduledExecutor() + executor!!.scheduleAtFixedRate(this::inspect, 100, 100, TimeUnit.MILLISECONDS) } } - private fun processEvent(job: Job.NewEvent) { - correlator.addEvent(job.event) - checkLeakedTransactions(job.event.event) - checkLeakedConnections(job.event.event) + // Break on this function or [newEvent]. + private fun inspect() { + } + + private fun processEvent(event: FullMonitorEvent) { + correlator.addEvent(event) + checkLeakedTransactions(event.event) + checkLeakedConnections(event.event) } inline fun R.getField(name: String): A { @@ -124,7 +134,7 @@ object FiberMonitor { private fun checkLeakedConnections(event: MonitorEvent) { if (event.type == MonitorEventType.FiberParking) { - val events = correlator.events[event.keys[0]]!! + val events = correlator.merged()[event.keys[0]]!! val acquiredConnections = events.mapNotNullTo(HashSet()) { if (it.event.type == MonitorEventType.ConnectionAcquired) { it.event.keys.mapNotNull { it as? Connection }.first() @@ -147,35 +157,47 @@ object FiberMonitor { } } +/** + * This class holds the event log. + * + * Each event has a list of key associated with it. "Relatedness" is then the transitive closure of two events sharing a key. + * + * [merged] returns a map from key to related events. Note that an eventlist may be associated by several keys. + * [getUnique] makes these lists unique by keying on the set of keys associated with the events. + * [getByType] simply groups by the type of the keys. This is probably the most useful "top-level" breakdown of events. + */ class MonitorEventCorrelator { - private val _events = HashMap>() - val events: Map> get() = _events + private val events = ArrayList() - fun getUnique() = events.values.toSet().associateBy { it.flatMap { it.event.keys }.toSet() } + fun getUnique() = merged().values.toSet().associateBy { it.flatMap { it.event.keys }.toSet() } - fun getByType() = events.entries.groupBy { it.key.javaClass } + fun getByType() = merged().entries.groupBy { it.key.javaClass } fun addEvent(fullMonitorEvent: FullMonitorEvent) { - val list = link(fullMonitorEvent.event.keys) - list.add(fullMonitorEvent) - for (key in fullMonitorEvent.event.keys) { - _events[key] = list - } + events.add(fullMonitorEvent) } - fun link(keys: List): ArrayList { - val eventLists = HashSet>() - for (key in keys) { - val list = _events[key] - if (list != null) { - eventLists.add(list) + fun merged(): Map> { + val merged = HashMap>() + for (event in events) { + val eventLists = HashSet>() + for (key in event.event.keys) { + val list = merged[key] + if (list != null) { + eventLists.add(list) + } + } + val newList = when (eventLists.size) { + 0 -> ArrayList() + 1 -> eventLists.first() + else -> mergeAll(eventLists) + } + newList.add(event) + for (key in event.event.keys) { + merged[key] = newList } } - return when { - eventLists.isEmpty() -> ArrayList() - eventLists.size == 1 -> eventLists.first() - else -> mergeAll(eventLists) - } + return merged } fun mergeAll(lists: Collection>): ArrayList { diff --git a/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FlowHookContainer.kt b/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FlowHookContainer.kt index d61750b00c..686e3d46a4 100644 --- a/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FlowHookContainer.kt +++ b/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FlowHookContainer.kt @@ -1,14 +1,8 @@ package net.corda.flowhook import co.paralleluniverse.fibers.Fiber -import net.corda.node.services.statemachine.ActionExecutor import net.corda.node.services.statemachine.Event -import net.corda.node.services.statemachine.FlowFiber -import net.corda.node.services.statemachine.StateMachineState -import net.corda.node.services.statemachine.transitions.TransitionResult -import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager -import rx.subjects.Subject import java.sql.Connection @Suppress("UNUSED") @@ -26,6 +20,12 @@ object FlowHookContainer { FiberMonitor.newEvent(MonitorEvent(MonitorEventType.FiberStarted, keys = listOf(Fiber.currentFiber()))) } + @JvmStatic + @Hook("net.corda.node.services.statemachine.FlowStateMachineImpl", passThis = true) + fun scheduleEvent(fiber: Any, event: Event) { + FiberMonitor.newEvent(MonitorEvent(MonitorEventType.SmScheduleEvent, keys = listOf(fiber), extra = listOf(event, currentFiberOrThread()))) + } + @JvmStatic @Hook("co.paralleluniverse.fibers.Fiber") fun onCompleted() { @@ -45,13 +45,13 @@ object FlowHookContainer { } @JvmStatic - @Hook("net.corda.node.utilities.DatabaseTransaction", passThis = true, position = HookPosition.After) + @Hook("net.corda.nodeapi.internal.persistence.DatabaseTransaction", passThis = true, position = HookPosition.After) fun DatabaseTransaction( transaction: Any, isolation: Int, - threadLocal: ThreadLocal<*>, - transactionBoundaries: Subject<*, *>, - cordaPersistence: CordaPersistence + threadLocal: Any, + transactionBoundaries: Any, + cordaPersistence: Any ) { val keys = ArrayList().apply { add(transaction) @@ -60,9 +60,65 @@ object FlowHookContainer { FiberMonitor.newEvent(MonitorEvent(MonitorEventType.TransactionCreated, keys = keys)) } + @JvmStatic + @Hook("io.netty.util.internal.InternalThreadLocalMap", passThis = true, position = HookPosition.After) + fun InternalThreadLocalMap( + internalThreadLocalMap: Any + ) { + val keys = listOf( + internalThreadLocalMap, + currentFiberOrThread() + ) + FiberMonitor.newEvent(MonitorEvent(MonitorEventType.NettyThreadLocalMapCreated, keys = keys)) + } + + @JvmStatic + @Hook("co.paralleluniverse.concurrent.util.ThreadAccess") + fun setThreadLocals(thread: Thread, threadLocals: Any?) { + FiberMonitor.newEvent(MonitorEvent( + MonitorEventType.SetThreadLocals, + keys = listOf(currentFiberOrThread()), + extra = threadLocals?.let { FiberMonitor.getThreadLocalMapEntryValues(it) } + )) + } + + @JvmStatic + @Hook("co.paralleluniverse.concurrent.util.ThreadAccess") + fun setInheritableThreadLocals(thread: Thread, threadLocals: Any?) { + FiberMonitor.newEvent(MonitorEvent( + MonitorEventType.SetInheritableThreadLocals, + keys = listOf(currentFiberOrThread()), + extra = threadLocals?.let { FiberMonitor.getThreadLocalMapEntryValues(it) } + )) + } + + @JvmStatic + @Hook("co.paralleluniverse.concurrent.util.ThreadAccess") + fun getThreadLocals(thread: Thread): (threadLocals: Any?) -> Unit { + return { threadLocals -> + FiberMonitor.newEvent(MonitorEvent( + MonitorEventType.GetThreadLocals, + keys = listOf(currentFiberOrThread()), + extra = threadLocals?.let { FiberMonitor.getThreadLocalMapEntryValues(it) } + )) + } + } + + @JvmStatic + @Hook("co.paralleluniverse.concurrent.util.ThreadAccess") + fun getInheritableThreadLocals(thread: Thread): (threadLocals: Any?) -> Unit { + return { threadLocals -> + FiberMonitor.newEvent(MonitorEvent( + MonitorEventType.GetInheritableThreadLocals, + keys = listOf(currentFiberOrThread()), + extra = threadLocals?.let { FiberMonitor.getThreadLocalMapEntryValues(it) } + )) + } + } + @JvmStatic @Hook("com.zaxxer.hikari.HikariDataSource") - fun getConnection(): (Connection) -> Unit { + fun getConnection(): (Any) -> Unit { val transactionOrThread = currentTransactionOrThread() FiberMonitor.newEvent(MonitorEvent(MonitorEventType.ConnectionRequested, keys = listOf(transactionOrThread))) return { connection -> @@ -81,19 +137,23 @@ object FlowHookContainer { @JvmStatic @Hook("net.corda.node.services.statemachine.TransitionExecutorImpl") fun executeTransition( - fiber: FlowFiber, - previousState: StateMachineState, - event: Event, - transition: TransitionResult, - actionExecutor: ActionExecutor + fiber: Any, + previousState: Any, + event: Any, + transition: Any, + actionExecutor: Any ) { - FiberMonitor.newEvent(MonitorEvent(MonitorEventType.ExecuteTransition, keys = listOf(fiber), extra = object { + FiberMonitor.newEvent(MonitorEvent(MonitorEventType.SmExecuteTransition, keys = listOf(fiber), extra = object { val previousState = previousState val event = event val transition = transition })) } + private fun currentFiberOrThread(): Any { + return Fiber.currentFiber() ?: Thread.currentThread() + } + private fun currentTransactionOrThread(): Any { return try { DatabaseTransactionManager.currentOrNull() diff --git a/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/Hooker.kt b/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/Hooker.kt index 9a9999f260..44e26fc593 100644 --- a/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/Hooker.kt +++ b/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/Hooker.kt @@ -56,62 +56,83 @@ class Hooker(hookContainer: Any) : ClassFileTransformer { private fun instrumentClass(clazz: CtClass): CtClass? { val hookMethods = hooks[clazz.name] ?: return null val usedHookMethods = HashSet() - var isAnyInstrumented = false for (method in clazz.declaredBehaviors) { - val hookMethod = instrumentBehaviour(method, hookMethods) - if (hookMethod != null) { - isAnyInstrumented = true - usedHookMethods.add(hookMethod) - } + usedHookMethods.addAll(instrumentBehaviour(method, hookMethods)) } val unusedHookMethods = hookMethods.values.mapTo(HashSet()) { it.first } - usedHookMethods + if (usedHookMethods.isNotEmpty()) { + println("Hooked methods $usedHookMethods") + } if (unusedHookMethods.isNotEmpty()) { println("Unused hook methods $unusedHookMethods") } - return if (isAnyInstrumented) { + return if (usedHookMethods.isNotEmpty()) { clazz } else { null } } - private fun instrumentBehaviour(method: CtBehavior, methodHooks: MethodHooks): Method? { - val signature = Signature(method.name, method.parameterTypes.map { it.name }) - val (hookMethod, annotation) = methodHooks[signature] ?: return null - val invocationString = if (annotation.passThis) { - "${hookMethod.declaringClass.canonicalName}.${hookMethod.name}(this, \$\$)" - } else { - "${hookMethod.declaringClass.canonicalName}.${hookMethod.name}(\$\$)" + private val objectName = Any::class.java.name + private fun instrumentBehaviour(method: CtBehavior, methodHooks: MethodHooks): List { + val pairs = methodHooks.mapNotNull { (signature, pair) -> + if (signature.functionName != method.name) return@mapNotNull null + if (signature.parameterTypes.size != method.parameterTypes.size) return@mapNotNull null + for (i in 0 until signature.parameterTypes.size) { + if (signature.parameterTypes[i] != objectName && signature.parameterTypes[i] != method.parameterTypes[i].name) { + return@mapNotNull null + } + } + pair + } + for ((hookMethod, annotation) in pairs) { + val invocationString = if (annotation.passThis) { + "${hookMethod.declaringClass.canonicalName}.${hookMethod.name}(this, \$\$)" + } else { + "${hookMethod.declaringClass.canonicalName}.${hookMethod.name}(\$\$)" + } + + val overriddenPosition = if (method.methodInfo.isConstructor && annotation.passThis && annotation.position == HookPosition.Before) { + println("passThis=true and position=${HookPosition.Before} for a constructor. " + + "You can only inspect 'this' at the end of the constructor! Hooking *after*.. $method") + HookPosition.After + } else { + annotation.position + } + + val insertHook: (CtBehavior.(code: String) -> Unit) = when (overriddenPosition) { + HookPosition.Before -> CtBehavior::insertBefore + HookPosition.After -> CtBehavior::insertAfter + } + when { + Function0::class.java.isAssignableFrom(hookMethod.returnType) -> { + method.addLocalVariable("after", classPool.get("kotlin.jvm.functions.Function0")) + method.insertHook("after = null; ${wrapTryCatch("after = $invocationString;")}") + method.insertAfter("if (after != null) ${wrapTryCatch("after.invoke();")}") + } + Function1::class.java.isAssignableFrom(hookMethod.returnType) -> { + method.addLocalVariable("after", classPool.get("kotlin.jvm.functions.Function1")) + method.insertHook("after = null; ${wrapTryCatch("after = $invocationString;")}") + method.insertAfter("if (after != null) ${wrapTryCatch("after.invoke((\$w)\$_);")}") + } + else -> { + method.insertHook(wrapTryCatch("$invocationString;")) + } + } + } + return pairs.map { it.first } + } + + companion object { + + fun wrapTryCatch(statement: String): String { + return "try { $statement } catch (Throwable throwable) { ${Hooker::class.java.canonicalName}.${Hooker.Companion::exceptionInHook.name}(throwable); }" } - val overriddenPosition = if (method.methodInfo.isConstructor && annotation.passThis && annotation.position == HookPosition.Before) { - println("passThis=true and position=${HookPosition.Before} for a constructor. " + - "You can only inspect 'this' at the end of the constructor! Hooking *after*.. $method") - HookPosition.After - } else { - annotation.position + @JvmStatic + fun exceptionInHook(throwable: Throwable) { + throwable.printStackTrace() } - - val insertHook: (CtBehavior.(code: String) -> Unit) = when (overriddenPosition) { - HookPosition.Before -> CtBehavior::insertBefore - HookPosition.After -> CtBehavior::insertAfter - } - when { - Function0::class.java.isAssignableFrom(hookMethod.returnType) -> { - method.addLocalVariable("after", classPool.get("kotlin.jvm.functions.Function0")) - method.insertHook("after = $invocationString;") - method.insertAfter("after.invoke();") - } - Function1::class.java.isAssignableFrom(hookMethod.returnType) -> { - method.addLocalVariable("after", classPool.get("kotlin.jvm.functions.Function1")) - method.insertHook("after = $invocationString;") - method.insertAfter("after.invoke((\$w)\$_);") - } - else -> { - method.insertHook("$invocationString;") - } - } - return hookMethod } } @@ -122,7 +143,11 @@ enum class HookPosition { } @Target(AnnotationTarget.FUNCTION) -annotation class Hook(val clazz: String, val position: HookPosition = HookPosition.Before, val passThis: Boolean = false) +annotation class Hook( + val clazz: String, + val position: HookPosition = HookPosition.Before, + val passThis: Boolean = false +) private data class Signature(val functionName: String, val parameterTypes: List) diff --git a/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt b/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt index 9f212fe55d..0e507160cd 100644 --- a/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt @@ -17,6 +17,7 @@ import net.corda.nodeapi.internal.config.User import net.corda.testing.DUMMY_NOTARY import net.corda.testing.* import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.PortAllocation import net.corda.testing.driver.driver import net.corda.testing.internal.performance.div import net.corda.testing.internal.performance.startPublishingFixedRateInjector @@ -128,21 +129,40 @@ class NodePerformanceTests : IntegrationTest() { driver( notarySpecs = listOf(NotarySpec(DUMMY_NOTARY.name, rpcUsers = listOf(user))), startNodesInProcess = true, - extraCordappPackagesToScan = listOf("net.corda.finance") + extraCordappPackagesToScan = listOf("net.corda.finance"), + portAllocation = PortAllocation.Incremental(20000) ) { val notary = defaultNotaryNode.getOrThrow() as NodeHandle.InProcess val metricRegistry = startReporter(shutdownManager, notary.node.services.monitoringService.metrics) notary.rpcClientToNode().use("A", "A") { connection -> println("ISSUING") - val doneFutures = (1..100).toList().parallelStream().map { + val doneFutures = (1..100).toList().map { connection.proxy.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue }.toList() doneFutures.transpose().get() println("STARTING PAYMENT") - startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 100L / TimeUnit.SECONDS) { + startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 5L / TimeUnit.SECONDS) { connection.proxy.startFlow(::CashPaymentFlow, 1.DOLLARS, defaultNotaryIdentity).returnValue.get() } } } } + + @Test + fun `single pay`() { + val user = User("A", "A", setOf(startFlow(), startFlow())) + driver( + notarySpecs = listOf(NotarySpec(DUMMY_NOTARY.name, rpcUsers = listOf(user))), + startNodesInProcess = true, + extraCordappPackagesToScan = listOf("net.corda.finance"), + portAllocation = PortAllocation.Incremental(20000) + ) { + val notary = defaultNotaryNode.getOrThrow() as NodeHandle.InProcess + val metricRegistry = startReporter(shutdownManager, notary.node.services.monitoringService.metrics) + notary.rpcClientToNode().use("A", "A") { connection -> + connection.proxy.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.getOrThrow() + connection.proxy.startFlow(::CashPaymentFlow, 1.DOLLARS, defaultNotaryIdentity).returnValue.getOrThrow() + } + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index a879af3b56..753e9ff330 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -294,9 +294,11 @@ class P2PMessagingClient(config: NodeConfiguration, // processing a message but if so, it'll be parked waiting for us to count down the latch, so // the session itself is still around and we can still ack messages as a result. override fun acknowledge() { - state.locked { - artemisMessage.individualAcknowledge() - artemis.started!!.session.commit() + messagingExecutor.fetchFrom { + state.locked { + artemisMessage.individualAcknowledge() + artemis.started!!.session.commit() + } } } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt index e36bfcc516..7c7ed6e209 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt @@ -6,6 +6,7 @@ import net.corda.core.utilities.contextLogger import net.corda.node.services.statemachine.* import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult +import java.time.Instant import java.util.concurrent.ConcurrentHashMap /** @@ -28,13 +29,16 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti actionExecutor: ActionExecutor ): Pair { val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor) - val transitionRecord = TransitionDiagnosticRecord(fiber.id, previousState, nextState, event, transition, continuation) + val transitionRecord = TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation) val record = records.compute(fiber.id) { _, record -> (record ?: ArrayList()).apply { add(transitionRecord) } } if (nextState.checkpoint.errorState is ErrorState.Errored) { log.warn("Flow ${fiber.id} dirtied, dumping all transitions:\n${record!!.joinToString("\n")}") + for (error in nextState.checkpoint.errorState.errors) { + log.warn("Flow ${fiber.id} error", error.exception) + } } if (transition.newState.isRemoved) { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt index 1f824aec70..a0ca6d6660 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt @@ -5,6 +5,7 @@ import net.corda.core.utilities.contextLogger import net.corda.node.services.statemachine.* import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult +import java.time.Instant /** * This interceptor simply prints all state machine transitions. Useful for debugging. @@ -23,7 +24,7 @@ class PrintingInterceptor(val delegate: TransitionExecutor) : TransitionExecutor actionExecutor: ActionExecutor ): Pair { val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor) - val transitionRecord = TransitionDiagnosticRecord(fiber.id, previousState, nextState, event, transition, continuation) + val transitionRecord = TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation) log.info("Transition for flow ${fiber.id} $transitionRecord") return Pair(continuation, nextState) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/TransitionDiagnosticRecord.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/TransitionDiagnosticRecord.kt index 96235b3eb2..0a2878bfd8 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/TransitionDiagnosticRecord.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/TransitionDiagnosticRecord.kt @@ -6,12 +6,14 @@ import net.corda.node.services.statemachine.Event import net.corda.node.services.statemachine.StateMachineState import net.corda.node.services.statemachine.transitions.TransitionResult import net.corda.node.utilities.ObjectDiffer +import java.time.Instant /** * This is a diagnostic record that stores information about a state machine transition and provides pretty printing * by diffing the two states. */ data class TransitionDiagnosticRecord( + val timestamp: Instant, val flowId: StateMachineRunId, val previousState: StateMachineState, val nextState: StateMachineState, @@ -26,6 +28,7 @@ data class TransitionDiagnosticRecord( listOf( "", " --- Transition of flow $flowId ---", + " Timestamp: $timestamp", " Event: $event", " Actions: ", " ${transition.actions.joinToString("\n ")}",