Cleaned up the flow stack snapshot API

This commit is contained in:
Shams Asari 2017-08-15 15:42:01 +01:00
parent 62b26bcd89
commit 3138e2b6de
9 changed files with 127 additions and 175 deletions

View File

@ -140,7 +140,7 @@ abstract class FlowLogic<out T> {
* network's event horizon time. * network's event horizon time.
*/ */
@Suspendable @Suspendable
open fun send(otherParty: Party, payload: Any): Unit = stateMachine.send(otherParty, payload, flowUsedForSessions) open fun send(otherParty: Party, payload: Any) = stateMachine.send(otherParty, payload, flowUsedForSessions)
/** /**
* Invokes the given subflow. This function returns once the subflow completes successfully with the result * Invokes the given subflow. This function returns once the subflow completes successfully with the result
@ -239,7 +239,7 @@ abstract class FlowLogic<out T> {
* Returns a shallow copy of the Quasar stack frames at the time of call to [flowStackSnapshot]. Use this to inspect * Returns a shallow copy of the Quasar stack frames at the time of call to [flowStackSnapshot]. Use this to inspect
* what objects would be serialised at the time of call to a suspending action (e.g. send/receive). * what objects would be serialised at the time of call to a suspending action (e.g. send/receive).
* Note: This logic is only available during tests and is not meant to be used during the production deployment. * Note: This logic is only available during tests and is not meant to be used during the production deployment.
* Therefore the default implementationdoes nothing. * Therefore the default implementation does nothing.
*/ */
@Suspendable @Suspendable
fun flowStackSnapshot(): FlowStackSnapshot? = stateMachine.flowStackSnapshot(this::class.java) fun flowStackSnapshot(): FlowStackSnapshot? = stateMachine.flowStackSnapshot(this::class.java)
@ -256,7 +256,7 @@ abstract class FlowLogic<out T> {
* Therefore the default implementation does nothing. * Therefore the default implementation does nothing.
*/ */
@Suspendable @Suspendable
fun persistFlowStackSnapshot(): Unit = stateMachine.persistFlowStackSnapshot(this::class.java) fun persistFlowStackSnapshot() = stateMachine.persistFlowStackSnapshot(this::class.java)
//////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -1,69 +1,21 @@
package net.corda.core.flows package net.corda.core.flows
import net.corda.core.utilities.loggerFor import java.time.Instant
import java.nio.file.Path
import java.util.*
interface FlowStackSnapshotFactory {
private object Holder {
val INSTANCE: FlowStackSnapshotFactory
init {
val serviceFactory = ServiceLoader.load(FlowStackSnapshotFactory::class.java).singleOrNull()
INSTANCE = serviceFactory ?: FlowStackSnapshotDefaultFactory()
}
}
companion object {
val instance: FlowStackSnapshotFactory by lazy { Holder.INSTANCE }
}
/**
* Returns flow stack data snapshot extracted from Quasar stack.
* It is designed to be used in the debug mode of the flow execution.
* Note. This logic is only available during tests and is not meant to be used during the production deployment.
* Therefore the default implementation does nothing.
*/
fun getFlowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot?
/** Stores flow stack snapshot as a json file. The stored shapshot is only partial and consists
* only data (i.e. stack traces and local variables values) relevant to the flow. It does not
* persist corda internal data (e.g. FlowStateMachine). Instead it uses [StackFrameDataToken] to indicate
* the class of the element on the stack.
* The flow stack snapshot is stored in a file located in
* {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
* where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.
* Note. This logic is only available during tests and is not meant to be used during the production deployment.
* Therefore the default implementation does nothing.
*/
fun persistAsJsonFile(flowClass: Class<*>, baseDir: Path, flowId: String): Unit
}
private class FlowStackSnapshotDefaultFactory : FlowStackSnapshotFactory {
val log = loggerFor<FlowStackSnapshotDefaultFactory>()
override fun getFlowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot? {
log.warn("Flow stack snapshot are not supposed to be used in a production deployment")
return null
}
override fun persistAsJsonFile(flowClass: Class<*>, baseDir: Path, flowId: String) {
log.warn("Flow stack snapshot are not supposed to be used in a production deployment")
}
}
/** /**
* Main data object representing snapshot of the flow stack, extracted from the Quasar stack. * Main data object representing snapshot of the flow stack, extracted from the Quasar stack.
*/ */
data class FlowStackSnapshot constructor( data class FlowStackSnapshot(
val timestamp: Long = System.currentTimeMillis(), val time: Instant,
val flowClass: Class<*>? = null, val flowClass: Class<out FlowLogic<*>>,
val stackFrames: List<Frame> = listOf() val stackFrames: List<Frame>
) { ) {
data class Frame( data class Frame(
val stackTraceElement: StackTraceElement? = null, // This should be the call that *pushed* the frame of [objects] val stackTraceElement: StackTraceElement, // This should be the call that *pushed* the frame of [objects]
val stackObjects: List<Any?> = listOf() val stackObjects: List<Any?>
) ) {
override fun toString(): String = stackTraceElement.toString()
}
} }
/** /**

View File

@ -3,11 +3,7 @@ package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowContext import net.corda.core.flows.*
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
@ -39,24 +35,11 @@ interface FlowStateMachine<R> {
fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>): Unit fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>): Unit
/**
* Returns a shallow copy of the Quasar stack frames at the time of call to [flowStackSnapshot]. Use this to inspect
* what objects would be serialised at the time of call to a suspending action (e.g. send/receive).
*/
@Suspendable @Suspendable
fun flowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot? fun flowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot?
/**
* Persists a shallow copy of the Quasar stack frames at the time of call to [persistFlowStackSnapshot].
* Use this to track the monitor evolution of the quasar stack values during the flow execution.
* The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
* where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.
*
* Note: With respect to the [flowStackSnapshot], the snapshot being persisted by this method is partial,
* meaning that only flow relevant traces and local variables are persisted.
*/
@Suspendable @Suspendable
fun persistFlowStackSnapshot(flowClass: Class<*>): Unit fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>): Unit
val serviceHub: ServiceHub val serviceHub: ServiceHub
val logger: Logger val logger: Logger

View File

@ -0,0 +1,40 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.loggerFor
import java.nio.file.Path
import java.util.*
interface FlowStackSnapshotFactory {
private object Holder {
val INSTANCE: FlowStackSnapshotFactory
init {
val serviceFactory = ServiceLoader.load(FlowStackSnapshotFactory::class.java).singleOrNull()
INSTANCE = serviceFactory ?: DefaultFlowStackSnapshotFactory
}
}
companion object {
val instance: FlowStackSnapshotFactory by lazy { Holder.INSTANCE }
}
fun getFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot?
fun persistAsJsonFile(flowClass: Class<out FlowLogic<*>>, baseDir: Path, flowId: StateMachineRunId)
private object DefaultFlowStackSnapshotFactory : FlowStackSnapshotFactory {
private val log = loggerFor<DefaultFlowStackSnapshotFactory>()
override fun getFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot? {
log.warn("Flow stack snapshot are not supposed to be used in a production deployment")
return null
}
override fun persistAsJsonFile(flowClass: Class<out FlowLogic<*>>, baseDir: Path, flowId: StateMachineRunId) {
log.warn("Flow stack snapshot are not supposed to be used in a production deployment")
}
}
}

View File

@ -255,14 +255,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
@Suspendable @Suspendable
override fun flowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot? { override fun flowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot? {
val factory = FlowStackSnapshotFactory.instance return FlowStackSnapshotFactory.instance.getFlowStackSnapshot(flowClass)
return factory.getFlowStackSnapshot(flowClass)
} }
override fun persistFlowStackSnapshot(flowClass: Class<*>): Unit { override fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>) {
val factory = FlowStackSnapshotFactory.instance FlowStackSnapshotFactory.instance.persistAsJsonFile(flowClass, serviceHub.configuration.baseDirectory, id)
factory.persistAsJsonFile(flowClass, serviceHub.configuration.baseDirectory, id.toString())
} }
/** /**

View File

@ -4,11 +4,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.Amount import net.corda.core.contracts.Amount
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowContext import net.corda.core.flows.*
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowStateMachine
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
@ -93,21 +89,9 @@ class InteractiveShellTest {
override val id: StateMachineRunId get() = throw UnsupportedOperationException() override val id: StateMachineRunId get() = throw UnsupportedOperationException()
override val resultFuture: CordaFuture<Any?> get() = throw UnsupportedOperationException() override val resultFuture: CordaFuture<Any?> get() = throw UnsupportedOperationException()
override val flowInitiator: FlowInitiator get() = throw UnsupportedOperationException() override val flowInitiator: FlowInitiator get() = throw UnsupportedOperationException()
override fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) = Unit
override fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) { override fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>) = Unit
// Do nothing override fun flowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot? = null
} override fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>) = Unit
override fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>) {
// Do nothing
}
override fun flowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot? {
return null
}
override fun persistFlowStackSnapshot(flowClass: Class<*>) {
// Do nothing
}
} }
} }

View File

@ -1,21 +1,23 @@
package net.corda.testing package net.corda.testing
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.fasterxml.jackson.databind.ObjectMapper
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.div
import net.corda.core.internal.list
import net.corda.core.internal.read
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.jackson.JacksonSupport
import net.corda.node.services.startFlowPermission import net.corda.node.services.startFlowPermission
import net.corda.nodeapi.User import net.corda.nodeapi.User
import net.corda.testing.driver.driver import net.corda.testing.driver.driver
import org.junit.Ignore import org.junit.Ignore
import org.junit.Test import org.junit.Test
import java.io.File
import java.nio.file.Path import java.nio.file.Path
import java.time.LocalDateTime import java.time.LocalDate
import java.time.format.DateTimeFormatter
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertTrue import kotlin.test.assertTrue
@ -110,15 +112,15 @@ object Constants {
* No side effect flow that stores the partial snapshot into a file, path to which is passed in the flow constructor. * No side effect flow that stores the partial snapshot into a file, path to which is passed in the flow constructor.
*/ */
@StartableByRPC @StartableByRPC
class PersistingNoSideEffectFlow : FlowLogic<String>() { class PersistingNoSideEffectFlow : FlowLogic<StateMachineRunId>() {
@Suspendable @Suspendable
override fun call(): String { override fun call(): StateMachineRunId {
// Using the [Constants] object here is considered by Quasar as a side effect. Thus explicit initialization // Using the [Constants] object here is considered by Quasar as a side effect. Thus explicit initialization
@Suppress("UNUSED_VARIABLE") @Suppress("UNUSED_VARIABLE")
val unusedVar = "inCall" val unusedVar = "inCall"
persist() persist()
return stateMachine.id.toString() return stateMachine.id
} }
@Suspendable @Suspendable
@ -134,14 +136,14 @@ class PersistingNoSideEffectFlow : FlowLogic<String>() {
* Flow with side effects that stores the partial snapshot into a file, path to which is passed in the flow constructor. * Flow with side effects that stores the partial snapshot into a file, path to which is passed in the flow constructor.
*/ */
@StartableByRPC @StartableByRPC
class PersistingSideEffectFlow : FlowLogic<String>() { class PersistingSideEffectFlow : FlowLogic<StateMachineRunId>() {
@Suspendable @Suspendable
override fun call(): String { override fun call(): StateMachineRunId {
@Suppress("UNUSED_VARIABLE") @Suppress("UNUSED_VARIABLE")
val unusedVar = Constants.IN_CALL_VALUE val unusedVar = Constants.IN_CALL_VALUE
persist() persist()
return stateMachine.id.toString() return stateMachine.id
} }
@Suspendable @Suspendable
@ -156,16 +158,16 @@ class PersistingSideEffectFlow : FlowLogic<String>() {
* Similar to [PersistingSideEffectFlow] but aims to produce multiple snapshot files. * Similar to [PersistingSideEffectFlow] but aims to produce multiple snapshot files.
*/ */
@StartableByRPC @StartableByRPC
class MultiplePersistingSideEffectFlow(val persistCallCount: Int) : FlowLogic<String>() { class MultiplePersistingSideEffectFlow(val persistCallCount: Int) : FlowLogic<StateMachineRunId>() {
@Suspendable @Suspendable
override fun call(): String { override fun call(): StateMachineRunId {
@Suppress("UNUSED_VARIABLE") @Suppress("UNUSED_VARIABLE")
val unusedVar = Constants.IN_CALL_VALUE val unusedVar = Constants.IN_CALL_VALUE
for (i in 1..persistCallCount) { for (i in 1..persistCallCount) {
persist() persist()
} }
return stateMachine.id.toString() return stateMachine.id
} }
@Suspendable @Suspendable
@ -176,14 +178,19 @@ class MultiplePersistingSideEffectFlow(val persistCallCount: Int) : FlowLogic<St
} }
} }
fun readFlowStackSnapshotFromDir(baseDir: Path, flowId: String): FlowStackSnapshot { fun readFlowStackSnapshotFromDir(baseDir: Path, flowId: StateMachineRunId): FlowStackSnapshot {
val snapshotFile = File(baseDir.toFile(), "flowStackSnapshots/${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE)}/$flowId/flowStackSnapshot.json") val snapshotFile = flowSnapshotDir(baseDir, flowId) / "flowStackSnapshot.json"
return ObjectMapper().readValue(snapshotFile.inputStream(), FlowStackSnapshot::class.java) return snapshotFile.read {
JacksonSupport.createNonRpcMapper().readValue(it, FlowStackSnapshot::class.java)
}
} }
fun countFilesInDir(baseDir: Path, flowId: String): Int { private fun flowSnapshotDir(baseDir: Path, flowId: StateMachineRunId): Path {
val flowDir = File(baseDir.toFile(), "flowStackSnapshots/${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE)}/$flowId/") return baseDir / "flowStackSnapshots" / LocalDate.now().toString() / flowId.uuid.toString()
return flowDir.listFiles().size }
fun countFilesInDir(baseDir: Path, flowId: StateMachineRunId): Int {
return flowSnapshotDir(baseDir, flowId).list { it.count().toInt() }
} }
fun assertFrame(expectedMethod: String, expectedEmpty: Boolean, frame: StackSnapshotFrame) { fun assertFrame(expectedMethod: String, expectedEmpty: Boolean, frame: StackSnapshotFrame) {
@ -191,11 +198,11 @@ fun assertFrame(expectedMethod: String, expectedEmpty: Boolean, frame: StackSnap
assertEquals(expectedEmpty, frame.dataTypes.isEmpty()) assertEquals(expectedEmpty, frame.dataTypes.isEmpty())
} }
@Ignore("When running via gradle the Jacoco agent interferes with the quasar instrumentation process and violates tested" +
"criteria (specifically: extra objects are introduced to the quasar stack by th Jacoco agent). You can however " +
"run these tests via an IDE.")
class FlowStackSnapshotTest { class FlowStackSnapshotTest {
@Test @Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically: extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `flowStackSnapshot contains full frames when methods with side effects are called`() { fun `flowStackSnapshot contains full frames when methods with side effects are called`() {
driver(startNodesInProcess = true) { driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<SideEffectFlow>())))).get() val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<SideEffectFlow>())))).get()
@ -211,8 +218,6 @@ class FlowStackSnapshotTest {
} }
@Test @Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `flowStackSnapshot contains empty frames when methods with no side effects are called`() { fun `flowStackSnapshot contains empty frames when methods with no side effects are called`() {
driver(startNodesInProcess = true) { driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<NoSideEffectFlow>())))).get() val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<NoSideEffectFlow>())))).get()
@ -228,8 +233,6 @@ class FlowStackSnapshotTest {
} }
@Test @Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `persistFlowStackSnapshot persists empty frames to a file when methods with no side effects are called`() { fun `persistFlowStackSnapshot persists empty frames to a file when methods with no side effects are called`() {
driver(startNodesInProcess = true) { driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<PersistingNoSideEffectFlow>())))).get() val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<PersistingNoSideEffectFlow>())))).get()
@ -247,8 +250,6 @@ class FlowStackSnapshotTest {
} }
@Test @Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `persistFlowStackSnapshot persists multiple snapshots in different files`() { fun `persistFlowStackSnapshot persists multiple snapshots in different files`() {
driver(startNodesInProcess = true) { driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<MultiplePersistingSideEffectFlow>())))).get() val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<MultiplePersistingSideEffectFlow>())))).get()
@ -263,8 +264,6 @@ class FlowStackSnapshotTest {
} }
@Test @Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `persistFlowStackSnapshot stack traces are aligned with stack objects`() { fun `persistFlowStackSnapshot stack traces are aligned with stack objects`() {
driver(startNodesInProcess = true) { driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<PersistingSideEffectFlow>())))).get() val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<PersistingSideEffectFlow>())))).get()
@ -279,11 +278,11 @@ class FlowStackSnapshotTest {
it.stackObjects.forEach { it.stackObjects.forEach {
when (it) { when (it) {
Constants.IN_CALL_VALUE -> { Constants.IN_CALL_VALUE -> {
assertEquals(PersistingSideEffectFlow::call.name, trace!!.methodName) assertEquals(PersistingSideEffectFlow::call.name, trace.methodName)
inCallCount++ inCallCount++
} }
Constants.IN_PERSIST_VALUE -> { Constants.IN_PERSIST_VALUE -> {
assertEquals(PersistingSideEffectFlow::persist.name, trace!!.methodName) assertEquals(PersistingSideEffectFlow::persist.name, trace.methodName)
inPersistCount++ inPersistCount++
} }
} }

View File

@ -5,23 +5,25 @@ import co.paralleluniverse.fibers.Instrumented
import co.paralleluniverse.fibers.Stack import co.paralleluniverse.fibers.Stack
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.databind.SerializationFeature
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.FlowStackSnapshot.Frame import net.corda.core.flows.FlowStackSnapshot.Frame
import net.corda.core.flows.FlowStackSnapshotFactory
import net.corda.core.flows.StackFrameDataToken import net.corda.core.flows.StackFrameDataToken
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.div
import net.corda.core.internal.write
import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsToken
import java.io.File import net.corda.jackson.JacksonSupport
import net.corda.node.services.statemachine.FlowStackSnapshotFactory
import java.nio.file.Path import java.nio.file.Path
import java.time.LocalDateTime import java.time.Instant
import java.time.format.DateTimeFormatter import java.time.LocalDate
class FlowStackSnapshotFactoryImpl : FlowStackSnapshotFactory { class FlowStackSnapshotFactoryImpl : FlowStackSnapshotFactory {
@Suspendable @Suspendable
override fun getFlowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot? { override fun getFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot {
var snapshot: FlowStackSnapshot? = null var snapshot: FlowStackSnapshot? = null
val stackTrace = Fiber.currentFiber().stackTrace val stackTrace = Fiber.currentFiber().stackTrace
Fiber.parkAndSerialize { fiber, _ -> Fiber.parkAndSerialize { fiber, _ ->
@ -35,19 +37,19 @@ class FlowStackSnapshotFactoryImpl : FlowStackSnapshotFactory {
return temporarySnapshot!! return temporarySnapshot!!
} }
override fun persistAsJsonFile(flowClass: Class<*>, baseDir: Path, flowId: String) { override fun persistAsJsonFile(flowClass: Class<out FlowLogic<*>>, baseDir: Path, flowId: StateMachineRunId) {
val flowStackSnapshot = getFlowStackSnapshot(flowClass) val flowStackSnapshot = getFlowStackSnapshot(flowClass)
val mapper = ObjectMapper() val mapper = JacksonSupport.createNonRpcMapper().apply {
mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
mapper.enable(SerializationFeature.INDENT_OUTPUT) setSerializationInclusion(JsonInclude.Include.NON_NULL)
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) }
val file = createFile(baseDir, flowId) val file = createFile(baseDir, flowId)
file.bufferedWriter().use { out -> file.write(createDirs = true) {
mapper.writeValue(out, filterOutStackDump(flowStackSnapshot!!)) mapper.writeValue(it, filterOutStackDump(flowStackSnapshot))
} }
} }
private fun extractStackSnapshotFromFiber(fiber: Fiber<*>, stackTrace: List<StackTraceElement>, flowClass: Class<*>): FlowStackSnapshot { private fun extractStackSnapshotFromFiber(fiber: Fiber<*>, stackTrace: List<StackTraceElement>, flowClass: Class<out FlowLogic<*>>): FlowStackSnapshot {
val stack = getFiberStack(fiber) val stack = getFiberStack(fiber)
val objectStack = getObjectStack(stack).toList() val objectStack = getObjectStack(stack).toList()
val frameOffsets = getFrameOffsets(stack) val frameOffsets = getFrameOffsets(stack)
@ -58,24 +60,25 @@ class FlowStackSnapshotFactoryImpl : FlowStackSnapshotFactory {
val relevantStackTrace = removeConstructorStackTraceElements(stackTrace).drop(1) val relevantStackTrace = removeConstructorStackTraceElements(stackTrace).drop(1)
val stackTraceToAnnotation = relevantStackTrace.map { val stackTraceToAnnotation = relevantStackTrace.map {
val element = StackTraceElement(it.className, it.methodName, it.fileName, it.lineNumber) val element = StackTraceElement(it.className, it.methodName, it.fileName, it.lineNumber)
element to getInstrumentedAnnotation(element) element to element.instrumentedAnnotation
} }
val frameObjectsIterator = frameObjects.listIterator() val frameObjectsIterator = frameObjects.listIterator()
val frames = stackTraceToAnnotation.reversed().map { (element, annotation) -> val frames = stackTraceToAnnotation.reversed().map { (element, annotation) ->
// If annotation is null then the case indicates that this is an entry point - i.e. // If annotation is null then the case indicates that this is an entry point - i.e.
// the net.corda.node.services.statemachine.FlowStateMachineImpl.run method // the net.corda.node.services.statemachine.FlowStateMachineImpl.run method
if (frameObjectsIterator.hasNext() && (annotation == null || !annotation.methodOptimized)) { val stackObjects = if (frameObjectsIterator.hasNext() && (annotation == null || !annotation.methodOptimized)) {
Frame(element, frameObjectsIterator.next()) frameObjectsIterator.next()
} else { } else {
Frame(element, listOf()) emptyList()
} }
Frame(element, stackObjects)
} }
return FlowStackSnapshot(flowClass = flowClass, stackFrames = frames) return FlowStackSnapshot(Instant.now(), flowClass, frames)
} }
private fun getInstrumentedAnnotation(element: StackTraceElement): Instrumented? { private val StackTraceElement.instrumentedAnnotation: Instrumented? get() {
Class.forName(element.className).methods.forEach { Class.forName(className).methods.forEach {
if (it.name == element.methodName && it.isAnnotationPresent(Instrumented::class.java)) { if (it.name == methodName && it.isAnnotationPresent(Instrumented::class.java)) {
return it.getAnnotation(Instrumented::class.java) return it.getAnnotation(Instrumented::class.java)
} }
} }
@ -99,10 +102,10 @@ class FlowStackSnapshotFactoryImpl : FlowStackSnapshotFactory {
private fun filterOutStackDump(flowStackSnapshot: FlowStackSnapshot): FlowStackSnapshot { private fun filterOutStackDump(flowStackSnapshot: FlowStackSnapshot): FlowStackSnapshot {
val framesFilteredByStackTraceElement = flowStackSnapshot.stackFrames.filter { val framesFilteredByStackTraceElement = flowStackSnapshot.stackFrames.filter {
!FlowStateMachine::class.java.isAssignableFrom(Class.forName(it.stackTraceElement!!.className)) !FlowStateMachine::class.java.isAssignableFrom(Class.forName(it.stackTraceElement.className))
} }
val framesFilteredByObjects = framesFilteredByStackTraceElement.map { val framesFilteredByObjects = framesFilteredByStackTraceElement.map {
Frame(it.stackTraceElement, it.stackObjects.map { it.copy(stackObjects = it.stackObjects.map {
if (it != null && (it is FlowLogic<*> || it is FlowStateMachine<*> || it is Fiber<*> || it is SerializeAsToken)) { if (it != null && (it is FlowLogic<*> || it is FlowStateMachine<*> || it is Fiber<*> || it is SerializeAsToken)) {
StackFrameDataToken(it::class.java.name) StackFrameDataToken(it::class.java.name)
} else { } else {
@ -110,25 +113,18 @@ class FlowStackSnapshotFactoryImpl : FlowStackSnapshotFactory {
} }
}) })
} }
return FlowStackSnapshot(flowStackSnapshot.timestamp, flowStackSnapshot.flowClass, framesFilteredByObjects) return flowStackSnapshot.copy(stackFrames = framesFilteredByObjects)
} }
private fun createFile(baseDir: Path, flowId: String): File { private fun createFile(baseDir: Path, flowId: StateMachineRunId): Path {
val file: File val dir = baseDir / "flowStackSnapshots" / LocalDate.now().toString() / flowId.uuid.toString()
val dir = File(baseDir.toFile(), "flowStackSnapshots/${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE)}/$flowId/")
val index = ThreadLocalIndex.currentIndex.get() val index = ThreadLocalIndex.currentIndex.get()
if (index == 0) { val file = if (index == 0) dir / "flowStackSnapshot.json" else dir / "flowStackSnapshot-$index.json"
dir.mkdirs()
file = File(dir, "flowStackSnapshot.json")
} else {
file = File(dir, "flowStackSnapshot-$index.json")
}
ThreadLocalIndex.currentIndex.set(index + 1) ThreadLocalIndex.currentIndex.set(index + 1)
return file return file
} }
private class ThreadLocalIndex private constructor() { private class ThreadLocalIndex private constructor() {
companion object { companion object {
val currentIndex = object : ThreadLocal<Int>() { val currentIndex = object : ThreadLocal<Int>() {
override fun initialValue() = 0 override fun initialValue() = 0