From f2b3db9c7b0891de284bd35ee58c0e4ef04ee0df Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Tue, 11 Jan 2022 10:22:49 +0000 Subject: [PATCH] ENT-6315 Allow dumping of paused flows (#7008) Checkpoint dumping of paused flows was not working because the dumper expects a flow to have a `FlowState` of `Unstarted` or `Started`, however due to a memory optimisation paused flows have their `FlowState` set to `Paused`. This was causing causing an exception as well as a loss of potentially useful information. A flag `alwaysDeserializeCheckpoint` has been added to `Checkpoint.Serialized.deserialize` which skips the memory optimisation and forces the deserialization of the flow's `FlowState`. Paused flows are now included in the dumped output along with their real `FlowState` which is useful to users even if the flow is paused rather than waiting for something to complete. The status of the flow has also been added to the JSON output to assist users in debugging their flows. --- .../node/services/rpc/DumpCheckpointsTest.kt | 59 +++++++++++++++++-- .../node/services/rpc/CheckpointDumperImpl.kt | 14 ++++- .../statemachine/StateMachineState.kt | 44 +++++++++++--- 3 files changed, 101 insertions(+), 16 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt index 21ac40a96d..a4582f6740 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt @@ -14,8 +14,11 @@ import net.corda.core.internal.list import net.corda.core.internal.readFully import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds import net.corda.node.internal.NodeStartup import net.corda.node.services.Permissions +import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.CountUpDownLatch import net.corda.testing.core.ALICE_NAME import net.corda.testing.driver.DriverParameters @@ -36,8 +39,8 @@ class DumpCheckpointsTest { private val flowProceedLatch = CountUpDownLatch(1) } - @Test(timeout=300_000) - fun `verify checkpoint dump via RPC`() { + @Test(timeout = 300_000) + fun `verify checkpoint dump via RPC`() { val user = User("mark", "dadada", setOf(Permissions.all())) driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) { @@ -55,20 +58,44 @@ class DumpCheckpointsTest { flowProceedLatch.countDown() assertEquals(1, checkPointCountFuture.get()) - checkDumpFile(logDirPath) + checkDumpFile(logDirPath, GetNumberOfCheckpointsFlow::class.java, Checkpoint.FlowStatus.RUNNABLE) } } } - private fun checkDumpFile(dir: Path) { + @Test(timeout = 300_000) + fun `paused flows included in checkpoint dump output`() { + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + + it.proxy.startFlow(::EasyFlow) + + // Hack to get the flow to show as paused + it.proxy.startFlow(::SetAllFlowsToPausedFlow).returnValue.getOrThrow(10.seconds) + + val logDirPath = nodeAHandle.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME + logDirPath.createDirectories() + nodeAHandle.checkpointsRpc.use { checkpointRPCOps -> checkpointRPCOps.dumpCheckpoints() } + + checkDumpFile(logDirPath, EasyFlow::class.java, Checkpoint.FlowStatus.PAUSED) + } + } + } + + private fun checkDumpFile(dir: Path, containsClass: Class>, flowStatus: Checkpoint.FlowStatus) { // The directory supposed to contain a single ZIP file val file = dir.list().single { it.isRegularFile() } ZipInputStream(file.inputStream()).use { zip -> val entry = zip.nextEntry assertThat(entry.name, containsSubstring("json")) - val content = zip.readFully() - assertThat(String(content), containsSubstring(GetNumberOfCheckpointsFlow::class.java.name)) + val content = String(zip.readFully()) + assertThat(content, containsSubstring(containsClass.name)) + assertThat(content, containsSubstring(flowStatus.name)) } } @@ -94,4 +121,24 @@ class DumpCheckpointsTest { flowProceedLatch.await() } } + + @StartableByRPC + class EasyFlow : FlowLogic() { + @Suspendable + override fun call(): Int { + sleep(2.minutes) + return 1 + } + } + + @StartableByRPC + class SetAllFlowsToPausedFlow : FlowLogic() { + @Suspendable + override fun call(): Int { + return serviceHub + .jdbcSession() + .prepareStatement("UPDATE node_checkpoints SET status = '${Checkpoint.FlowStatus.PAUSED.ordinal}'") + .use { ps -> ps.executeUpdate() } + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt index eb9258d44e..4e2e2a3153 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt @@ -221,7 +221,10 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri instrumentCheckpointAgent(runId) val (bytes, fileName) = try { - val checkpoint = serialisedCheckpoint.deserialize(checkpointSerializationContext) + val checkpoint = serialisedCheckpoint.deserialize( + checkpointSerializationContext, + alwaysDeserializeFlowState = true + ) val json = checkpoint.toJson(runId.uuid, now) val jsonBytes = writer.writeValueAsBytes(json) jsonBytes to "${json.topLevelFlowClass.simpleName}-${runId.uuid}.json" @@ -259,7 +262,12 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri //Dump checkpoints in "fibers" folder for((runId, serializedCheckpoint) in stream) { - val flowState = serializedCheckpoint.deserialize(checkpointSerializationContext).flowState + val flowState = serializedCheckpoint.deserialize( + checkpointSerializationContext, + alwaysDeserializeFlowState = true + ).flowState + // This includes paused flows because we have forced the deserialization of the checkpoint's flow state + // which will show as started. if(flowState is FlowState.Started) writeFiber2Zip(zip, checkpointSerializationContext, runId, flowState) } @@ -354,6 +362,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri topLevelFlowLogic = flowLogic, flowCallStackSummary = flowCallStack.toSummary(), flowCallStack = flowCallStack, + status = status, suspendedOn = (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn( timestamp, now @@ -436,6 +445,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri val topLevelFlowClass: Class>, val topLevelFlowLogic: FlowLogic<*>, val flowCallStackSummary: List, + val status: Checkpoint.FlowStatus, val suspendedOn: SuspendedOn?, val flowCallStack: List, val origin: Origin, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 5bd7cd8e9c..a2bc214675 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -215,17 +215,28 @@ data class Checkpoint( /** * Deserializes the serialized fields contained in [Checkpoint.Serialized]. * - * @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized] + * Depending on the [FlowStatus] of the [Checkpoint.Serialized], the deserialized [Checkpoint] may or may not have its [flowState] + * properly deserialized. This is to optimise the process's memory footprint by not holding the checkpoints of flows that are not + * running in-memory. + * + * The [flowState] will not be deserialized when the [FlowStatus] is: + * + * - [FlowStatus.PAUSED] + * - [FlowStatus.COMPLETED] + * - [FlowStatus.FAILED] + * + * Any other status returns a [FlowState.Unstarted] or [FlowState.Started] depending on the content of [serializedFlowState]. + * + * @param checkpointSerializationContext The [CheckpointSerializationContext] to deserialize the checkpoint's serialized content with. + * @param alwaysDeserializeFlowState A flag to specify if [flowState] should be deserialized, disregarding the [FlowStatus] of the + * checkpoint and ignoring the memory optimisation. + * + * @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized]. */ - fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint { - val flowState = when(status) { - FlowStatus.PAUSED -> FlowState.Paused - FlowStatus.COMPLETED, FlowStatus.FAILED -> FlowState.Finished - else -> serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext) - } + fun deserialize(checkpointSerializationContext: CheckpointSerializationContext, alwaysDeserializeFlowState: Boolean = false): Checkpoint { return Checkpoint( checkpointState = serializedCheckpointState.checkpointDeserialize(checkpointSerializationContext), - flowState = flowState, + flowState = getFlowState(checkpointSerializationContext, alwaysDeserializeFlowState), errorState = errorState, result = result?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT), status = status, @@ -234,6 +245,23 @@ data class Checkpoint( compatible = compatible ) } + + private fun getFlowState( + checkpointSerializationContext: CheckpointSerializationContext, + alwaysDeserializeFlowState: Boolean + ): FlowState { + return when { + alwaysDeserializeFlowState -> deserializeFlowState(checkpointSerializationContext) + status == FlowStatus.PAUSED -> FlowState.Paused + status == FlowStatus.COMPLETED -> FlowState.Finished + status == FlowStatus.FAILED -> FlowState.Finished + else -> deserializeFlowState(checkpointSerializationContext) + } + } + + private fun deserializeFlowState(checkpointSerializationContext: CheckpointSerializationContext): FlowState { + return serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext) + } } }