From 09518532077affe290f33daf879481fb46f421fe Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Thu, 11 May 2023 09:49:40 +0100 Subject: [PATCH] ENT-6515: Cherry-pick of ENT-6315 - Allow dumping of paused flows (#7008) (#7363) This is a cherry-pick of https://github.com/corda/corda/pull/7008, which also resolves ENT-6515 Co-authored-by: Dan Newton --- .../node/services/rpc/DumpCheckpointsTest.kt | 59 +++++++++++++++++-- .../node/services/rpc/CheckpointDumperImpl.kt | 14 ++++- .../statemachine/StateMachineState.kt | 44 +++++++++++--- 3 files changed, 101 insertions(+), 16 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt index 21ac40a96d..a4582f6740 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt @@ -14,8 +14,11 @@ import net.corda.core.internal.list import net.corda.core.internal.readFully import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds import net.corda.node.internal.NodeStartup import net.corda.node.services.Permissions +import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.CountUpDownLatch import net.corda.testing.core.ALICE_NAME import net.corda.testing.driver.DriverParameters @@ -36,8 +39,8 @@ class DumpCheckpointsTest { private val flowProceedLatch = CountUpDownLatch(1) } - @Test(timeout=300_000) - fun `verify checkpoint dump via RPC`() { + @Test(timeout = 300_000) + fun `verify checkpoint dump via RPC`() { val user = User("mark", "dadada", setOf(Permissions.all())) driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) { @@ -55,20 +58,44 @@ class DumpCheckpointsTest { flowProceedLatch.countDown() assertEquals(1, checkPointCountFuture.get()) - checkDumpFile(logDirPath) + checkDumpFile(logDirPath, GetNumberOfCheckpointsFlow::class.java, Checkpoint.FlowStatus.RUNNABLE) } } } - private fun checkDumpFile(dir: Path) { + @Test(timeout = 300_000) + fun `paused flows included in checkpoint dump output`() { + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + + it.proxy.startFlow(::EasyFlow) + + // Hack to get the flow to show as paused + it.proxy.startFlow(::SetAllFlowsToPausedFlow).returnValue.getOrThrow(10.seconds) + + val logDirPath = nodeAHandle.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME + logDirPath.createDirectories() + nodeAHandle.checkpointsRpc.use { checkpointRPCOps -> checkpointRPCOps.dumpCheckpoints() } + + checkDumpFile(logDirPath, EasyFlow::class.java, Checkpoint.FlowStatus.PAUSED) + } + } + } + + private fun checkDumpFile(dir: Path, containsClass: Class>, flowStatus: Checkpoint.FlowStatus) { // The directory supposed to contain a single ZIP file val file = dir.list().single { it.isRegularFile() } ZipInputStream(file.inputStream()).use { zip -> val entry = zip.nextEntry assertThat(entry.name, containsSubstring("json")) - val content = zip.readFully() - assertThat(String(content), containsSubstring(GetNumberOfCheckpointsFlow::class.java.name)) + val content = String(zip.readFully()) + assertThat(content, containsSubstring(containsClass.name)) + assertThat(content, containsSubstring(flowStatus.name)) } } @@ -94,4 +121,24 @@ class DumpCheckpointsTest { flowProceedLatch.await() } } + + @StartableByRPC + class EasyFlow : FlowLogic() { + @Suspendable + override fun call(): Int { + sleep(2.minutes) + return 1 + } + } + + @StartableByRPC + class SetAllFlowsToPausedFlow : FlowLogic() { + @Suspendable + override fun call(): Int { + return serviceHub + .jdbcSession() + .prepareStatement("UPDATE node_checkpoints SET status = '${Checkpoint.FlowStatus.PAUSED.ordinal}'") + .use { ps -> ps.executeUpdate() } + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt index 0849fdf919..a9aceab60b 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt @@ -221,7 +221,10 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri instrumentCheckpointAgent(runId) val (bytes, fileName) = try { - val checkpoint = serialisedCheckpoint.deserialize(checkpointSerializationContext) + val checkpoint = serialisedCheckpoint.deserialize( + checkpointSerializationContext, + alwaysDeserializeFlowState = true + ) val json = checkpoint.toJson(runId.uuid, now) val jsonBytes = writer.writeValueAsBytes(json) jsonBytes to "${json.topLevelFlowClass.simpleName}-${runId.uuid}.json" @@ -259,7 +262,12 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri //Dump checkpoints in "fibers" folder for((runId, serializedCheckpoint) in stream) { - val flowState = serializedCheckpoint.deserialize(checkpointSerializationContext).flowState + val flowState = serializedCheckpoint.deserialize( + checkpointSerializationContext, + alwaysDeserializeFlowState = true + ).flowState + // This includes paused flows because we have forced the deserialization of the checkpoint's flow state + // which will show as started. if(flowState is FlowState.Started) writeFiber2Zip(zip, checkpointSerializationContext, runId, flowState) } @@ -354,6 +362,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri topLevelFlowLogic = flowLogic, flowCallStackSummary = flowCallStack.toSummary(), flowCallStack = flowCallStack, + status = status, suspendedOn = (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn( timestamp, now @@ -436,6 +445,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri val topLevelFlowClass: Class>, val topLevelFlowLogic: FlowLogic<*>, val flowCallStackSummary: List, + val status: Checkpoint.FlowStatus, val suspendedOn: SuspendedOn?, val flowCallStack: List, val origin: Origin, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 5bd7cd8e9c..a2bc214675 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -215,17 +215,28 @@ data class Checkpoint( /** * Deserializes the serialized fields contained in [Checkpoint.Serialized]. * - * @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized] + * Depending on the [FlowStatus] of the [Checkpoint.Serialized], the deserialized [Checkpoint] may or may not have its [flowState] + * properly deserialized. This is to optimise the process's memory footprint by not holding the checkpoints of flows that are not + * running in-memory. + * + * The [flowState] will not be deserialized when the [FlowStatus] is: + * + * - [FlowStatus.PAUSED] + * - [FlowStatus.COMPLETED] + * - [FlowStatus.FAILED] + * + * Any other status returns a [FlowState.Unstarted] or [FlowState.Started] depending on the content of [serializedFlowState]. + * + * @param checkpointSerializationContext The [CheckpointSerializationContext] to deserialize the checkpoint's serialized content with. + * @param alwaysDeserializeFlowState A flag to specify if [flowState] should be deserialized, disregarding the [FlowStatus] of the + * checkpoint and ignoring the memory optimisation. + * + * @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized]. */ - fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint { - val flowState = when(status) { - FlowStatus.PAUSED -> FlowState.Paused - FlowStatus.COMPLETED, FlowStatus.FAILED -> FlowState.Finished - else -> serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext) - } + fun deserialize(checkpointSerializationContext: CheckpointSerializationContext, alwaysDeserializeFlowState: Boolean = false): Checkpoint { return Checkpoint( checkpointState = serializedCheckpointState.checkpointDeserialize(checkpointSerializationContext), - flowState = flowState, + flowState = getFlowState(checkpointSerializationContext, alwaysDeserializeFlowState), errorState = errorState, result = result?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT), status = status, @@ -234,6 +245,23 @@ data class Checkpoint( compatible = compatible ) } + + private fun getFlowState( + checkpointSerializationContext: CheckpointSerializationContext, + alwaysDeserializeFlowState: Boolean + ): FlowState { + return when { + alwaysDeserializeFlowState -> deserializeFlowState(checkpointSerializationContext) + status == FlowStatus.PAUSED -> FlowState.Paused + status == FlowStatus.COMPLETED -> FlowState.Finished + status == FlowStatus.FAILED -> FlowState.Finished + else -> deserializeFlowState(checkpointSerializationContext) + } + } + + private fun deserializeFlowState(checkpointSerializationContext: CheckpointSerializationContext): FlowState { + return serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext) + } } }