Flow stack snapshot feature (#1094)

* flows: Add StackDump, debugStackDump, test (doesnt work)

* Polishing Quasar dump feature, extending it with persisting to a file, adding integration tests

* Addressing review comments

* Addressing 2nd round of review comments

* Refactoring implementation according to Shams suggestion

* Reverting changes and restoring the feature to be the part of the core API

* Switching to ServiceLoader
This commit is contained in:
mkit 2017-08-15 08:22:37 +01:00 committed by GitHub
parent 8f8a5ff774
commit 3ba42b4ccd
9 changed files with 646 additions and 21 deletions

View File

@ -140,7 +140,7 @@ abstract class FlowLogic<out T> {
* network's event horizon time.
*/
@Suspendable
open fun send(otherParty: Party, payload: Any) = stateMachine.send(otherParty, payload, flowUsedForSessions)
open fun send(otherParty: Party, payload: Any): Unit = stateMachine.send(otherParty, payload, flowUsedForSessions)
/**
* Invokes the given subflow. This function returns once the subflow completes successfully with the result
@ -235,6 +235,29 @@ abstract class FlowLogic<out T> {
@Suspendable
fun waitForLedgerCommit(hash: SecureHash): SignedTransaction = stateMachine.waitForLedgerCommit(hash, this)
/**
* Returns a shallow copy of the Quasar stack frames at the time of call to [flowStackSnapshot]. Use this to inspect
* what objects would be serialised at the time of call to a suspending action (e.g. send/receive).
* Note: This logic is only available during tests and is not meant to be used during the production deployment.
* Therefore the default implementationdoes nothing.
*/
@Suspendable
fun flowStackSnapshot(): FlowStackSnapshot? = stateMachine.flowStackSnapshot(this::class.java)
/**
* Persists a shallow copy of the Quasar stack frames at the time of call to [persistFlowStackSnapshot].
* Use this to track the monitor evolution of the quasar stack values during the flow execution.
* The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
* where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.
*
* Note: With respect to the [flowStackSnapshot], the snapshot being persisted by this method is partial,
* meaning that only flow relevant traces and local variables are persisted.
* Also, this logic is only available during tests and is not meant to be used during the production deployment.
* Therefore the default implementation does nothing.
*/
@Suspendable
fun persistFlowStackSnapshot(): Unit = stateMachine.persistFlowStackSnapshot(this::class.java)
////////////////////////////////////////////////////////////////////////////////////////////////////////////
private var _stateMachine: FlowStateMachine<*>? = null

View File

@ -0,0 +1,73 @@
package net.corda.core.flows
import net.corda.core.utilities.loggerFor
import java.nio.file.Path
import java.util.*
interface FlowStackSnapshotFactory {
private object Holder {
val INSTANCE: FlowStackSnapshotFactory
init {
val serviceFactory = ServiceLoader.load(FlowStackSnapshotFactory::class.java).singleOrNull()
INSTANCE = serviceFactory ?: FlowStackSnapshotDefaultFactory()
}
}
companion object {
val instance: FlowStackSnapshotFactory by lazy { Holder.INSTANCE }
}
/**
* Returns flow stack data snapshot extracted from Quasar stack.
* It is designed to be used in the debug mode of the flow execution.
* Note. This logic is only available during tests and is not meant to be used during the production deployment.
* Therefore the default implementation does nothing.
*/
fun getFlowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot?
/** Stores flow stack snapshot as a json file. The stored shapshot is only partial and consists
* only data (i.e. stack traces and local variables values) relevant to the flow. It does not
* persist corda internal data (e.g. FlowStateMachine). Instead it uses [StackFrameDataToken] to indicate
* the class of the element on the stack.
* The flow stack snapshot is stored in a file located in
* {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
* where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.
* Note. This logic is only available during tests and is not meant to be used during the production deployment.
* Therefore the default implementation does nothing.
*/
fun persistAsJsonFile(flowClass: Class<*>, baseDir: Path, flowId: String): Unit
}
private class FlowStackSnapshotDefaultFactory : FlowStackSnapshotFactory {
val log = loggerFor<FlowStackSnapshotDefaultFactory>()
override fun getFlowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot? {
log.warn("Flow stack snapshot are not supposed to be used in a production deployment")
return null
}
override fun persistAsJsonFile(flowClass: Class<*>, baseDir: Path, flowId: String) {
log.warn("Flow stack snapshot are not supposed to be used in a production deployment")
}
}
/**
* Main data object representing snapshot of the flow stack, extracted from the Quasar stack.
*/
data class FlowStackSnapshot constructor(
val timestamp: Long = System.currentTimeMillis(),
val flowClass: Class<*>? = null,
val stackFrames: List<Frame> = listOf()
) {
data class Frame(
val stackTraceElement: StackTraceElement? = null, // This should be the call that *pushed* the frame of [objects]
val stackObjects: List<Any?> = listOf()
)
}
/**
* Token class, used to indicate stack presence of the corda internal data. Since this data is of no use for
* a CordApp developer, it is skipped from serialisation and its presence is only marked by this token.
*/
data class StackFrameDataToken(val className: String)

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowContext
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
@ -29,14 +30,33 @@ interface FlowStateMachine<R> {
fun <T : Any> receive(receiveType: Class<T>, otherParty: Party, sessionFlow: FlowLogic<*>): UntrustworthyData<T>
@Suspendable
fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>)
fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>): Unit
@Suspendable
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String,String>)
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>): Unit
fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String,String>)
fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>): Unit
/**
* Returns a shallow copy of the Quasar stack frames at the time of call to [flowStackSnapshot]. Use this to inspect
* what objects would be serialised at the time of call to a suspending action (e.g. send/receive).
*/
@Suspendable
fun flowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot?
/**
* Persists a shallow copy of the Quasar stack frames at the time of call to [persistFlowStackSnapshot].
* Use this to track the monitor evolution of the quasar stack values during the flow execution.
* The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
* where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.
*
* Note: With respect to the [flowStackSnapshot], the snapshot being persisted by this method is partial,
* meaning that only flow relevant traces and local variables are persisted.
*/
@Suspendable
fun persistFlowStackSnapshot(flowClass: Class<*>): Unit
val serviceHub: ServiceHub
val logger: Logger

View File

@ -6,13 +6,13 @@ import kotlin.reflect.KProperty
* A write-once property to be used as delegate for Kotlin var properties. The expectation is that this is initialised
* prior to the spawning of any threads that may access it and so there's no need for it to be volatile.
*/
class WriteOnceProperty<T : Any>() {
private var v: T? = null
class WriteOnceProperty<T : Any>(private val defaultValue:T? = null) {
private var v: T? = defaultValue
operator fun getValue(thisRef: Any?, property: KProperty<*>) = v ?: throw IllegalStateException("Write-once property $property not set.")
operator fun setValue(thisRef: Any?, property: KProperty<*>, value: T) {
check(v == null) { "Cannot set write-once property $property more than once." }
check(v == defaultValue || v === value) { "Cannot set write-once property $property more than once." }
v = value
}
}

View File

@ -227,14 +227,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) {
val permissionGranted = true // TODO define permission control service on ServiceHubInternal and actually check authorization.
val checkPermissionEvent = FlowPermissionAuditEvent(
serviceHub.clock.instant(),
flowInitiator,
"Flow Permission Required: $permissionName",
extraAuditData,
logic.javaClass,
id,
permissionName,
permissionGranted)
serviceHub.clock.instant(),
flowInitiator,
"Flow Permission Required: $permissionName",
extraAuditData,
logic.javaClass,
id,
permissionName,
permissionGranted)
serviceHub.auditService.recordAuditEvent(checkPermissionEvent)
if (!permissionGranted) {
throw FlowPermissionException("User $flowInitiator not permissioned for $permissionName on flow $id")
@ -242,18 +242,29 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
// TODO Dummy implementation of access to application specific audit logging
override fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String,String>) {
override fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>): Unit {
val flowAuditEvent = FlowAppAuditEvent(
serviceHub.clock.instant(),
flowInitiator,
comment,
extraAuditData,
logic.javaClass,
serviceHub.clock.instant(),
flowInitiator,
comment,
extraAuditData,
logic.javaClass,
id,
eventType)
serviceHub.auditService.recordAuditEvent(flowAuditEvent)
}
@Suspendable
override fun flowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot? {
val factory = FlowStackSnapshotFactory.instance
return factory.getFlowStackSnapshot(flowClass)
}
override fun persistFlowStackSnapshot(flowClass: Class<*>): Unit {
val factory = FlowStackSnapshotFactory.instance
factory.persistAsJsonFile(flowClass, serviceHub.configuration.baseDirectory, id.toString())
}
/**
* This method will suspend the state machine and wait for incoming session init response from other party.
*/

View File

@ -7,6 +7,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowContext
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
@ -32,6 +33,7 @@ class InteractiveShellTest {
constructor(amount: Amount<Currency>) : this(amount.toString())
constructor(pair: Pair<Amount<Currency>, SecureHash.SHA256>) : this(pair.toString())
constructor(party: Party) : this(party.name.toString())
override fun call() = a
}
@ -99,5 +101,13 @@ class InteractiveShellTest {
override fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>) {
// Do nothing
}
override fun flowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot? {
return null
}
override fun persistFlowStackSnapshot(flowClass: Class<*>) {
// Do nothing
}
}
}

View File

@ -0,0 +1,297 @@
package net.corda.testing
import co.paralleluniverse.fibers.Suspendable
import com.fasterxml.jackson.databind.ObjectMapper
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.serialization.CordaSerializable
import net.corda.node.services.startFlowPermission
import net.corda.nodeapi.User
import net.corda.testing.driver.driver
import org.junit.Ignore
import org.junit.Test
import java.io.File
import java.nio.file.Path
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@CordaSerializable
data class StackSnapshotFrame(val method: String, val clazz: String, val dataTypes: List<String?>, val flowId: String? = null)
/**
* Calculates the count of full and empty frames. We consider frame to be empty if there is no stack data
* associated with it (i.e. the stackObjects is an empty list). Otherwise (i.e. when the stackObjects is not
* an empty list the frame is considered to be full.
*/
fun convertToStackSnapshotFrames(snapshot: FlowStackSnapshot): List<StackSnapshotFrame> {
return snapshot.stackFrames.map {
val dataTypes = it.stackObjects.map {
if (it == null) null else it::class.qualifiedName
}
val stackTraceElement = it.stackTraceElement!!
StackSnapshotFrame(stackTraceElement.methodName, stackTraceElement.className, dataTypes)
}
}
/**
* Flow that during its execution performs calls with side effects in terms of Quasar. The presence of
* side effect calls drives Quasar decision on stack optimisation application. The stack optimisation method aims
* to reduce the amount of data stored on Quasar stack to minimum and is based on static code analyses performed during
* the code instrumentation phase, during which Quasar checks if a method performs side effect calls. If not,
* the method is annotated to be optimised, meaning that none of its local variables are stored on the stack and
* during the runtime the method can be replayed with a guarantee to be idempotent.
*/
@StartableByRPC
class SideEffectFlow : FlowLogic<List<StackSnapshotFrame>>() {
var sideEffectField = ""
@Suspendable
override fun call(): List<StackSnapshotFrame> {
sideEffectField = "sideEffectInCall"
// Expected to be on stack
@Suppress("UNUSED_VARIABLE")
val unusedVar = Constants.IN_CALL_VALUE
val numberOfFullFrames = retrieveStackSnapshot()
return numberOfFullFrames
}
@Suspendable
fun retrieveStackSnapshot(): List<StackSnapshotFrame> {
sideEffectField = "sideEffectInRetrieveStackSnapshot"
// Expected to be on stack
@Suppress("UNUSED_VARIABLE")
val unusedVar = Constants.IN_RETRIEVE_STACK_SNAPSHOT_VALUE
val snapshot = flowStackSnapshot()
return convertToStackSnapshotFrames(snapshot!!)
}
}
/**
* Flow that during its execution performs calls with no side effects in terms of Quasar.
* Thus empty frames are expected on in the stack snapshot as Quasar will optimise.
*/
@StartableByRPC
class NoSideEffectFlow : FlowLogic<List<StackSnapshotFrame>>() {
@Suspendable
override fun call(): List<StackSnapshotFrame> {
// Using the [Constants] object here is considered by Quasar as a side effect. Thus explicit initialization
@Suppress("UNUSED_VARIABLE")
val unusedVar = "inCall"
val numberOfFullFrames = retrieveStackSnapshot()
return numberOfFullFrames
}
@Suspendable
fun retrieveStackSnapshot(): List<StackSnapshotFrame> {
// Using the [Constants] object here is considered by Quasar as a side effect. Thus explicit initialization
@Suppress("UNUSED_VARIABLE")
val unusedVar = "inRetrieveStackSnapshot"
val snapshot = flowStackSnapshot()
return convertToStackSnapshotFrames(snapshot!!)
}
}
object Constants {
val IN_PERSIST_VALUE = "inPersist"
val IN_CALL_VALUE = "inCall"
val IN_RETRIEVE_STACK_SNAPSHOT_VALUE = "inRetrieveStackSnapshot"
val USER = "User"
val PASSWORD = "Password"
}
/**
* No side effect flow that stores the partial snapshot into a file, path to which is passed in the flow constructor.
*/
@StartableByRPC
class PersistingNoSideEffectFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
// Using the [Constants] object here is considered by Quasar as a side effect. Thus explicit initialization
@Suppress("UNUSED_VARIABLE")
val unusedVar = "inCall"
persist()
return stateMachine.id.toString()
}
@Suspendable
fun persist() {
// Using the [Constants] object here is considered by Quasar as a side effect. Thus explicit initialization
@Suppress("UNUSED_VARIABLE")
val unusedVar = "inPersist"
persistFlowStackSnapshot()
}
}
/**
* Flow with side effects that stores the partial snapshot into a file, path to which is passed in the flow constructor.
*/
@StartableByRPC
class PersistingSideEffectFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
@Suppress("UNUSED_VARIABLE")
val unusedVar = Constants.IN_CALL_VALUE
persist()
return stateMachine.id.toString()
}
@Suspendable
fun persist() {
@Suppress("UNUSED_VARIABLE")
val unusedVar = Constants.IN_PERSIST_VALUE
persistFlowStackSnapshot()
}
}
/**
* Similar to [PersistingSideEffectFlow] but aims to produce multiple snapshot files.
*/
@StartableByRPC
class MultiplePersistingSideEffectFlow(val persistCallCount: Int) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
@Suppress("UNUSED_VARIABLE")
val unusedVar = Constants.IN_CALL_VALUE
for (i in 1..persistCallCount) {
persist()
}
return stateMachine.id.toString()
}
@Suspendable
fun persist() {
@Suppress("UNUSED_VARIABLE")
val unusedVar = Constants.IN_PERSIST_VALUE
persistFlowStackSnapshot()
}
}
fun readFlowStackSnapshotFromDir(baseDir: Path, flowId: String): FlowStackSnapshot {
val snapshotFile = File(baseDir.toFile(), "flowStackSnapshots/${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE)}/$flowId/flowStackSnapshot.json")
return ObjectMapper().readValue(snapshotFile.inputStream(), FlowStackSnapshot::class.java)
}
fun countFilesInDir(baseDir: Path, flowId: String): Int {
val flowDir = File(baseDir.toFile(), "flowStackSnapshots/${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE)}/$flowId/")
return flowDir.listFiles().size
}
fun assertFrame(expectedMethod: String, expectedEmpty: Boolean, frame: StackSnapshotFrame) {
assertEquals(expectedMethod, frame.method)
assertEquals(expectedEmpty, frame.dataTypes.isEmpty())
}
class FlowStackSnapshotTest {
@Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically: extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `flowStackSnapshot contains full frames when methods with side effects are called`() {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<SideEffectFlow>())))).get()
a.rpcClientToNode().use(Constants.USER, Constants.PASSWORD) { connection ->
val stackSnapshotFrames = connection.proxy.startFlow(::SideEffectFlow).returnValue.get()
val iterator = stackSnapshotFrames.listIterator()
assertFrame("run", false, iterator.next())
assertFrame("call", false, iterator.next())
assertFrame("retrieveStackSnapshot", false, iterator.next())
assertFrame("flowStackSnapshot", false, iterator.next())
}
}
}
@Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `flowStackSnapshot contains empty frames when methods with no side effects are called`() {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<NoSideEffectFlow>())))).get()
a.rpcClientToNode().use(Constants.USER, Constants.PASSWORD) { connection ->
val stackSnapshotFrames = connection.proxy.startFlow(::NoSideEffectFlow).returnValue.get()
val iterator = stackSnapshotFrames.listIterator()
assertFrame("run", false, iterator.next())
assertFrame("call", true, iterator.next())
assertFrame("retrieveStackSnapshot", true, iterator.next())
assertFrame("flowStackSnapshot", false, iterator.next())
}
}
}
@Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `persistFlowStackSnapshot persists empty frames to a file when methods with no side effects are called`() {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<PersistingNoSideEffectFlow>())))).get()
a.rpcClientToNode().use(Constants.USER, Constants.PASSWORD) { connection ->
val flowId = connection.proxy.startFlow(::PersistingNoSideEffectFlow).returnValue.get()
val snapshotFromFile = readFlowStackSnapshotFromDir(a.configuration.baseDirectory, flowId)
val stackSnapshotFrames = convertToStackSnapshotFrames(snapshotFromFile)
val iterator = stackSnapshotFrames.listIterator()
assertFrame("call", true, iterator.next())
assertFrame("persist", true, iterator.next())
assertFrame("persistFlowStackSnapshot", false, iterator.next())
}
}
}
@Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `persistFlowStackSnapshot persists multiple snapshots in different files`() {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<MultiplePersistingSideEffectFlow>())))).get()
a.rpcClientToNode().use(Constants.USER, Constants.PASSWORD) { connection ->
val numberOfFlowSnapshots = 5
val flowId = connection.proxy.startFlow(::MultiplePersistingSideEffectFlow, 5).returnValue.get()
val fileCount = countFilesInDir(a.configuration.baseDirectory, flowId)
assertEquals(numberOfFlowSnapshots, fileCount)
}
}
}
@Test
@Ignore("This test is skipped due to Jacoco agent interference with the quasar instrumentation process. " +
"This violates tested criteria (specifically extra objects are introduced to the quasar stack by th Jacoco agent)")
fun `persistFlowStackSnapshot stack traces are aligned with stack objects`() {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User(Constants.USER, Constants.PASSWORD, setOf(startFlowPermission<PersistingSideEffectFlow>())))).get()
a.rpcClientToNode().use(Constants.USER, Constants.PASSWORD) { connection ->
val flowId = connection.proxy.startFlow(::PersistingSideEffectFlow).returnValue.get()
val snapshotFromFile = readFlowStackSnapshotFromDir(a.configuration.baseDirectory, flowId)
var inCallCount = 0
var inPersistCount = 0
snapshotFromFile.stackFrames.forEach {
val trace = it.stackTraceElement
it.stackObjects.forEach {
when (it) {
Constants.IN_CALL_VALUE -> {
assertEquals(PersistingSideEffectFlow::call.name, trace!!.methodName)
inCallCount++
}
Constants.IN_PERSIST_VALUE -> {
assertEquals(PersistingSideEffectFlow::persist.name, trace!!.methodName)
inPersistCount++
}
}
}
}
assertTrue(inCallCount > 0)
assertTrue(inPersistCount > 0)
}
}
}
}

View File

@ -0,0 +1,190 @@
package net.corda.testing
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Instrumented
import co.paralleluniverse.fibers.Stack
import co.paralleluniverse.fibers.Suspendable
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.FlowStackSnapshot.Frame
import net.corda.core.flows.FlowStackSnapshotFactory
import net.corda.core.flows.StackFrameDataToken
import net.corda.core.internal.FlowStateMachine
import net.corda.core.serialization.SerializeAsToken
import java.io.File
import java.nio.file.Path
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
class FlowStackSnapshotFactoryImpl : FlowStackSnapshotFactory {
@Suspendable
override fun getFlowStackSnapshot(flowClass: Class<*>): FlowStackSnapshot? {
var snapshot: FlowStackSnapshot? = null
val stackTrace = Fiber.currentFiber().stackTrace
Fiber.parkAndSerialize { fiber, _ ->
snapshot = extractStackSnapshotFromFiber(fiber, stackTrace.toList(), flowClass)
Fiber.unparkDeserialized(fiber, fiber.scheduler)
}
// This is because the dump itself is on the stack, which means it creates a loop in the object graph, we set
// it to null to break the loop
val temporarySnapshot = snapshot
snapshot = null
return temporarySnapshot!!
}
override fun persistAsJsonFile(flowClass: Class<*>, baseDir: Path, flowId: String) {
val flowStackSnapshot = getFlowStackSnapshot(flowClass)
val mapper = ObjectMapper()
mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
mapper.enable(SerializationFeature.INDENT_OUTPUT)
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
val file = createFile(baseDir, flowId)
file.bufferedWriter().use { out ->
mapper.writeValue(out, filterOutStackDump(flowStackSnapshot!!))
}
}
private fun extractStackSnapshotFromFiber(fiber: Fiber<*>, stackTrace: List<StackTraceElement>, flowClass: Class<*>): FlowStackSnapshot {
val stack = getFiberStack(fiber)
val objectStack = getObjectStack(stack).toList()
val frameOffsets = getFrameOffsets(stack)
val frameObjects = frameOffsets.map { (frameOffset, frameSize) ->
objectStack.subList(frameOffset + 1, frameOffset + frameSize + 1)
}
// We drop the first element as it is corda internal call irrelevant from the perspective of a CordApp developer
val relevantStackTrace = removeConstructorStackTraceElements(stackTrace).drop(1)
val stackTraceToAnnotation = relevantStackTrace.map {
val element = StackTraceElement(it.className, it.methodName, it.fileName, it.lineNumber)
element to getInstrumentedAnnotation(element)
}
val frameObjectsIterator = frameObjects.listIterator()
val frames = stackTraceToAnnotation.reversed().map { (element, annotation) ->
// If annotation is null then the case indicates that this is an entry point - i.e.
// the net.corda.node.services.statemachine.FlowStateMachineImpl.run method
if (frameObjectsIterator.hasNext() && (annotation == null || !annotation.methodOptimized)) {
Frame(element, frameObjectsIterator.next())
} else {
Frame(element, listOf())
}
}
return FlowStackSnapshot(flowClass = flowClass, stackFrames = frames)
}
private fun getInstrumentedAnnotation(element: StackTraceElement): Instrumented? {
Class.forName(element.className).methods.forEach {
if (it.name == element.methodName && it.isAnnotationPresent(Instrumented::class.java)) {
return it.getAnnotation(Instrumented::class.java)
}
}
return null
}
private fun removeConstructorStackTraceElements(stackTrace: List<StackTraceElement>): List<StackTraceElement> {
val newStackTrace = ArrayList<StackTraceElement>()
var previousElement: StackTraceElement? = null
for (element in stackTrace) {
if (element.methodName == previousElement?.methodName &&
element.className == previousElement?.className &&
element.fileName == previousElement?.fileName) {
continue
}
newStackTrace.add(element)
previousElement = element
}
return newStackTrace
}
private fun filterOutStackDump(flowStackSnapshot: FlowStackSnapshot): FlowStackSnapshot {
val framesFilteredByStackTraceElement = flowStackSnapshot.stackFrames.filter {
!FlowStateMachine::class.java.isAssignableFrom(Class.forName(it.stackTraceElement!!.className))
}
val framesFilteredByObjects = framesFilteredByStackTraceElement.map {
Frame(it.stackTraceElement, it.stackObjects.map {
if (it != null && (it is FlowLogic<*> || it is FlowStateMachine<*> || it is Fiber<*> || it is SerializeAsToken)) {
StackFrameDataToken(it::class.java.name)
} else {
it
}
})
}
return FlowStackSnapshot(flowStackSnapshot.timestamp, flowStackSnapshot.flowClass, framesFilteredByObjects)
}
private fun createFile(baseDir: Path, flowId: String): File {
val file: File
val dir = File(baseDir.toFile(), "flowStackSnapshots/${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE)}/$flowId/")
val index = ThreadLocalIndex.currentIndex.get()
if (index == 0) {
dir.mkdirs()
file = File(dir, "flowStackSnapshot.json")
} else {
file = File(dir, "flowStackSnapshot-$index.json")
}
ThreadLocalIndex.currentIndex.set(index + 1)
return file
}
private class ThreadLocalIndex private constructor() {
companion object {
val currentIndex = object : ThreadLocal<Int>() {
override fun initialValue() = 0
}
}
}
}
private inline fun <reified R, A> R.getField(name: String): A {
val field = R::class.java.getDeclaredField(name)
field.isAccessible = true
@Suppress("UNCHECKED_CAST")
return field.get(this) as A
}
private fun getFiberStack(fiber: Fiber<*>): Stack {
return fiber.getField("stack")
}
private fun getObjectStack(stack: Stack): Array<Any?> {
return stack.getField("dataObject")
}
private fun getPrimitiveStack(stack: Stack): LongArray {
return stack.getField("dataLong")
}
/*
* Returns pairs of (offset, size of frame)
*/
private fun getFrameOffsets(stack: Stack): List<Pair<Int, Int>> {
val primitiveStack = getPrimitiveStack(stack)
val offsets = ArrayList<Pair<Int, Int>>()
var offset = 0
while (true) {
val record = primitiveStack[offset]
val slots = getNumSlots(record)
if (slots > 0) {
offsets.add(offset to slots)
offset += slots + 1
} else {
break
}
}
return offsets
}
private val MASK_FULL: Long = -1L
private fun getNumSlots(record: Long): Int {
return getUnsignedBits(record, 14, 16).toInt()
}
private fun getUnsignedBits(word: Long, offset: Int, length: Int): Long {
val a = 64 - length
val b = a - offset
return word.ushr(b) and MASK_FULL.ushr(a)
}

View File

@ -0,0 +1 @@
net.corda.testing.FlowStackSnapshotFactoryImpl