mirror of
https://github.com/corda/corda.git
synced 2025-01-31 00:24:59 +00:00
This is a cherry-pick of https://github.com/corda/corda/pull/7008, which also resolves ENT-6515 Co-authored-by: Dan Newton <dan.newton@r3.com>
This commit is contained in:
parent
50ef6b7e9a
commit
0951853207
@ -14,8 +14,11 @@ import net.corda.core.internal.list
|
||||
import net.corda.core.internal.readFully
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.node.services.statemachine.CountUpDownLatch
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
@ -36,8 +39,8 @@ class DumpCheckpointsTest {
|
||||
private val flowProceedLatch = CountUpDownLatch(1)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `verify checkpoint dump via RPC`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `verify checkpoint dump via RPC`() {
|
||||
val user = User("mark", "dadada", setOf(Permissions.all()))
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
|
||||
|
||||
@ -55,20 +58,44 @@ class DumpCheckpointsTest {
|
||||
|
||||
flowProceedLatch.countDown()
|
||||
assertEquals(1, checkPointCountFuture.get())
|
||||
checkDumpFile(logDirPath)
|
||||
checkDumpFile(logDirPath, GetNumberOfCheckpointsFlow::class.java, Checkpoint.FlowStatus.RUNNABLE)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkDumpFile(dir: Path) {
|
||||
@Test(timeout = 300_000)
|
||||
fun `paused flows included in checkpoint dump output`() {
|
||||
val user = User("mark", "dadada", setOf(Permissions.all()))
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
|
||||
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
|
||||
it.proxy.startFlow(::EasyFlow)
|
||||
|
||||
// Hack to get the flow to show as paused
|
||||
it.proxy.startFlow(::SetAllFlowsToPausedFlow).returnValue.getOrThrow(10.seconds)
|
||||
|
||||
val logDirPath = nodeAHandle.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME
|
||||
logDirPath.createDirectories()
|
||||
nodeAHandle.checkpointsRpc.use { checkpointRPCOps -> checkpointRPCOps.dumpCheckpoints() }
|
||||
|
||||
checkDumpFile(logDirPath, EasyFlow::class.java, Checkpoint.FlowStatus.PAUSED)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkDumpFile(dir: Path, containsClass: Class<out FlowLogic<*>>, flowStatus: Checkpoint.FlowStatus) {
|
||||
// The directory supposed to contain a single ZIP file
|
||||
val file = dir.list().single { it.isRegularFile() }
|
||||
|
||||
ZipInputStream(file.inputStream()).use { zip ->
|
||||
val entry = zip.nextEntry
|
||||
assertThat(entry.name, containsSubstring("json"))
|
||||
val content = zip.readFully()
|
||||
assertThat(String(content), containsSubstring(GetNumberOfCheckpointsFlow::class.java.name))
|
||||
val content = String(zip.readFully())
|
||||
assertThat(content, containsSubstring(containsClass.name))
|
||||
assertThat(content, containsSubstring(flowStatus.name))
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,4 +121,24 @@ class DumpCheckpointsTest {
|
||||
flowProceedLatch.await()
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class EasyFlow : FlowLogic<Int>() {
|
||||
@Suspendable
|
||||
override fun call(): Int {
|
||||
sleep(2.minutes)
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class SetAllFlowsToPausedFlow : FlowLogic<Int>() {
|
||||
@Suspendable
|
||||
override fun call(): Int {
|
||||
return serviceHub
|
||||
.jdbcSession()
|
||||
.prepareStatement("UPDATE node_checkpoints SET status = '${Checkpoint.FlowStatus.PAUSED.ordinal}'")
|
||||
.use { ps -> ps.executeUpdate() }
|
||||
}
|
||||
}
|
||||
}
|
@ -221,7 +221,10 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
instrumentCheckpointAgent(runId)
|
||||
|
||||
val (bytes, fileName) = try {
|
||||
val checkpoint = serialisedCheckpoint.deserialize(checkpointSerializationContext)
|
||||
val checkpoint = serialisedCheckpoint.deserialize(
|
||||
checkpointSerializationContext,
|
||||
alwaysDeserializeFlowState = true
|
||||
)
|
||||
val json = checkpoint.toJson(runId.uuid, now)
|
||||
val jsonBytes = writer.writeValueAsBytes(json)
|
||||
jsonBytes to "${json.topLevelFlowClass.simpleName}-${runId.uuid}.json"
|
||||
@ -259,7 +262,12 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
|
||||
//Dump checkpoints in "fibers" folder
|
||||
for((runId, serializedCheckpoint) in stream) {
|
||||
val flowState = serializedCheckpoint.deserialize(checkpointSerializationContext).flowState
|
||||
val flowState = serializedCheckpoint.deserialize(
|
||||
checkpointSerializationContext,
|
||||
alwaysDeserializeFlowState = true
|
||||
).flowState
|
||||
// This includes paused flows because we have forced the deserialization of the checkpoint's flow state
|
||||
// which will show as started.
|
||||
if(flowState is FlowState.Started) writeFiber2Zip(zip, checkpointSerializationContext, runId, flowState)
|
||||
}
|
||||
|
||||
@ -354,6 +362,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
topLevelFlowLogic = flowLogic,
|
||||
flowCallStackSummary = flowCallStack.toSummary(),
|
||||
flowCallStack = flowCallStack,
|
||||
status = status,
|
||||
suspendedOn = (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(
|
||||
timestamp,
|
||||
now
|
||||
@ -436,6 +445,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
val topLevelFlowClass: Class<FlowLogic<*>>,
|
||||
val topLevelFlowLogic: FlowLogic<*>,
|
||||
val flowCallStackSummary: List<FlowCallSummary>,
|
||||
val status: Checkpoint.FlowStatus,
|
||||
val suspendedOn: SuspendedOn?,
|
||||
val flowCallStack: List<FlowCall>,
|
||||
val origin: Origin,
|
||||
|
@ -215,17 +215,28 @@ data class Checkpoint(
|
||||
/**
|
||||
* Deserializes the serialized fields contained in [Checkpoint.Serialized].
|
||||
*
|
||||
* @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized]
|
||||
* Depending on the [FlowStatus] of the [Checkpoint.Serialized], the deserialized [Checkpoint] may or may not have its [flowState]
|
||||
* properly deserialized. This is to optimise the process's memory footprint by not holding the checkpoints of flows that are not
|
||||
* running in-memory.
|
||||
*
|
||||
* The [flowState] will not be deserialized when the [FlowStatus] is:
|
||||
*
|
||||
* - [FlowStatus.PAUSED]
|
||||
* - [FlowStatus.COMPLETED]
|
||||
* - [FlowStatus.FAILED]
|
||||
*
|
||||
* Any other status returns a [FlowState.Unstarted] or [FlowState.Started] depending on the content of [serializedFlowState].
|
||||
*
|
||||
* @param checkpointSerializationContext The [CheckpointSerializationContext] to deserialize the checkpoint's serialized content with.
|
||||
* @param alwaysDeserializeFlowState A flag to specify if [flowState] should be deserialized, disregarding the [FlowStatus] of the
|
||||
* checkpoint and ignoring the memory optimisation.
|
||||
*
|
||||
* @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized].
|
||||
*/
|
||||
fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint {
|
||||
val flowState = when(status) {
|
||||
FlowStatus.PAUSED -> FlowState.Paused
|
||||
FlowStatus.COMPLETED, FlowStatus.FAILED -> FlowState.Finished
|
||||
else -> serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext)
|
||||
}
|
||||
fun deserialize(checkpointSerializationContext: CheckpointSerializationContext, alwaysDeserializeFlowState: Boolean = false): Checkpoint {
|
||||
return Checkpoint(
|
||||
checkpointState = serializedCheckpointState.checkpointDeserialize(checkpointSerializationContext),
|
||||
flowState = flowState,
|
||||
flowState = getFlowState(checkpointSerializationContext, alwaysDeserializeFlowState),
|
||||
errorState = errorState,
|
||||
result = result?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT),
|
||||
status = status,
|
||||
@ -234,6 +245,23 @@ data class Checkpoint(
|
||||
compatible = compatible
|
||||
)
|
||||
}
|
||||
|
||||
private fun getFlowState(
|
||||
checkpointSerializationContext: CheckpointSerializationContext,
|
||||
alwaysDeserializeFlowState: Boolean
|
||||
): FlowState {
|
||||
return when {
|
||||
alwaysDeserializeFlowState -> deserializeFlowState(checkpointSerializationContext)
|
||||
status == FlowStatus.PAUSED -> FlowState.Paused
|
||||
status == FlowStatus.COMPLETED -> FlowState.Finished
|
||||
status == FlowStatus.FAILED -> FlowState.Finished
|
||||
else -> deserializeFlowState(checkpointSerializationContext)
|
||||
}
|
||||
}
|
||||
|
||||
private fun deserializeFlowState(checkpointSerializationContext: CheckpointSerializationContext): FlowState {
|
||||
return serializedFlowState!!.checkpointDeserialize(checkpointSerializationContext)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user