ENT-6315 Allow dumping of paused flows (#7008)

Checkpoint dumping of paused flows was not working because the dumper
expects a flow to have a `FlowState` of `Unstarted` or `Started`,
however due to a memory optimisation paused flows have their `FlowState`
set to `Paused`. This was causing causing an exception as well as a loss
of potentially useful information.

A flag `alwaysDeserializeCheckpoint` has been added to
`Checkpoint.Serialized.deserialize` which skips the memory optimisation
and forces the deserialization of the flow's `FlowState`.

Paused flows are now included in the dumped output along with their real
`FlowState` which is useful to users even if the flow is paused rather
than waiting for something to complete.

The status of the flow has also been added to the JSON output to assist
users in debugging their flows.
This commit is contained in:
Dan Newton 2022-01-11 10:22:49 +00:00 committed by GitHub
parent 044202550d
commit f2b3db9c7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 16 deletions

View File

@ -14,8 +14,11 @@ import net.corda.core.internal.list
import net.corda.core.internal.readFully
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.internal.NodeStartup
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.CountUpDownLatch
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
@ -36,8 +39,8 @@ class DumpCheckpointsTest {
private val flowProceedLatch = CountUpDownLatch(1)
}
@Test(timeout=300_000)
fun `verify checkpoint dump via RPC`() {
@Test(timeout = 300_000)
fun `verify checkpoint dump via RPC`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
@ -55,20 +58,44 @@ class DumpCheckpointsTest {
flowProceedLatch.countDown()
assertEquals(1, checkPointCountFuture.get())
checkDumpFile(logDirPath)
checkDumpFile(logDirPath, GetNumberOfCheckpointsFlow::class.java, Checkpoint.FlowStatus.RUNNABLE)
}
}
}
private fun checkDumpFile(dir: Path) {
@Test(timeout = 300_000)
fun `paused flows included in checkpoint dump output`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::EasyFlow)
// Hack to get the flow to show as paused
it.proxy.startFlow(::SetAllFlowsToPausedFlow).returnValue.getOrThrow(10.seconds)
val logDirPath = nodeAHandle.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME
logDirPath.createDirectories()
nodeAHandle.checkpointsRpc.use { checkpointRPCOps -> checkpointRPCOps.dumpCheckpoints() }
checkDumpFile(logDirPath, EasyFlow::class.java, Checkpoint.FlowStatus.PAUSED)
}
}
}
private fun checkDumpFile(dir: Path, containsClass: Class<out FlowLogic<*>>, flowStatus: Checkpoint.FlowStatus) {
// The directory supposed to contain a single ZIP file
val file = dir.list().single { it.isRegularFile() }
ZipInputStream(file.inputStream()).use { zip ->
val entry = zip.nextEntry
assertThat(entry.name, containsSubstring("json"))
val content = zip.readFully()
assertThat(String(content), containsSubstring(GetNumberOfCheckpointsFlow::class.java.name))
val content = String(zip.readFully())
assertThat(content, containsSubstring(containsClass.name))
assertThat(content, containsSubstring(flowStatus.name))
}
}
@ -94,4 +121,24 @@ class DumpCheckpointsTest {
flowProceedLatch.await()
}
}
@StartableByRPC
class EasyFlow : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
sleep(2.minutes)
return 1
}
}
@StartableByRPC
class SetAllFlowsToPausedFlow : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
return serviceHub
.jdbcSession()
.prepareStatement("UPDATE node_checkpoints SET status = '${Checkpoint.FlowStatus.PAUSED.ordinal}'")
.use { ps -> ps.executeUpdate() }
}
}
}

View File

@ -221,7 +221,10 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
instrumentCheckpointAgent(runId)
val (bytes, fileName) = try {
val checkpoint = serialisedCheckpoint.deserialize(checkpointSerializationContext)
val checkpoint = serialisedCheckpoint.deserialize(
checkpointSerializationContext,
alwaysDeserializeFlowState = true
)
val json = checkpoint.toJson(runId.uuid, now)
val jsonBytes = writer.writeValueAsBytes(json)
jsonBytes to "${json.topLevelFlowClass.simpleName}-${runId.uuid}.json"
@ -259,7 +262,12 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
//Dump checkpoints in "fibers" folder
for((runId, serializedCheckpoint) in stream) {
val flowState = serializedCheckpoint.deserialize(checkpointSerializationContext).flowState
val flowState = serializedCheckpoint.deserialize(
checkpointSerializationContext,
alwaysDeserializeFlowState = true
).flowState
// This includes paused flows because we have forced the deserialization of the checkpoint's flow state
// which will show as started.
if(flowState is FlowState.Started) writeFiber2Zip(zip, checkpointSerializationContext, runId, flowState)
}
@ -354,6 +362,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
topLevelFlowLogic = flowLogic,
flowCallStackSummary = flowCallStack.toSummary(),
flowCallStack = flowCallStack,
status = status,
suspendedOn = (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(
timestamp,
now
@ -436,6 +445,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
val topLevelFlowClass: Class<FlowLogic<*>>,
val topLevelFlowLogic: FlowLogic<*>,
val flowCallStackSummary: List<FlowCallSummary>,
val status: Checkpoint.FlowStatus,
val suspendedOn: SuspendedOn?,
val flowCallStack: List<FlowCall>,
val origin: Origin,

View File

@ -215,17 +215,28 @@ data class Checkpoint(
/**
* Deserializes the serialized fields contained in [Checkpoint.Serialized].
*
* @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized]
* Depending on the [FlowStatus] of the [Checkpoint.Serialized], the deserialized [Checkpoint] may or may not have its [flowState]
* properly deserialized. This is to optimise the process's memory footprint by not holding the checkpoints of flows that are not
* running in-memory.
*
* The [flowState] will not be deserialized when the [FlowStatus] is:
*
* - [FlowStatus.PAUSED]
* - [FlowStatus.COMPLETED]
* - [FlowStatus.FAILED]
*
* Any other status returns a [FlowState.Unstarted] or [FlowState.Started] depending on the content of [serializedFlowState].
*
* @param checkpointSerializationContext The [CheckpointSerializationContext] to deserialize the checkpoint's serialized content with.
* @param alwaysDeserializeFlowState A flag to specify if [flowState] should be deserialized, disregarding the [FlowStatus] of the
* checkpoint and ignoring the memory optimisation.
*
* @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized].
*/
fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint {
val flowState = when(status) {
FlowStatus.PAUSED -> FlowState.Paused
FlowStatus.COMPLETED, FlowStatus.FAILED -> FlowState.Finished
else -> serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext)
}
fun deserialize(checkpointSerializationContext: CheckpointSerializationContext, alwaysDeserializeFlowState: Boolean = false): Checkpoint {
return Checkpoint(
checkpointState = serializedCheckpointState.checkpointDeserialize(checkpointSerializationContext),
flowState = flowState,
flowState = getFlowState(checkpointSerializationContext, alwaysDeserializeFlowState),
errorState = errorState,
result = result?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT),
status = status,
@ -234,6 +245,23 @@ data class Checkpoint(
compatible = compatible
)
}
private fun getFlowState(
checkpointSerializationContext: CheckpointSerializationContext,
alwaysDeserializeFlowState: Boolean
): FlowState {
return when {
alwaysDeserializeFlowState -> deserializeFlowState(checkpointSerializationContext)
status == FlowStatus.PAUSED -> FlowState.Paused
status == FlowStatus.COMPLETED -> FlowState.Finished
status == FlowStatus.FAILED -> FlowState.Finished
else -> deserializeFlowState(checkpointSerializationContext)
}
}
private fun deserializeFlowState(checkpointSerializationContext: CheckpointSerializationContext): FlowState {
return serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext)
}
}
}