diff --git a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt index 340226492e..bebb451d74 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt @@ -5,7 +5,6 @@ import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.node.ServiceHub import net.corda.core.serialization.internal.CheckpointSerializationDefaults -import net.corda.core.serialization.internal.checkpointDeserialize import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.SubFlow import net.corda.node.services.statemachine.SubFlowVersion @@ -36,7 +35,7 @@ object CheckpointVerifier { val cordappsByHash = currentCordapps.associateBy { it.jarHash } - checkpointStorage.getAllCheckpoints().use { + checkpointStorage.getRunnableCheckpoints().use { it.forEach { (_, serializedCheckpoint) -> val checkpoint = try { serializedCheckpoint.deserialize(checkpointSerializationContext) diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index 953c2fc562..c9d3ee0eb4 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -1,7 +1,6 @@ package net.corda.node.services.api import net.corda.core.flows.StateMachineRunId -import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.SerializedBytes import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.FlowState @@ -42,4 +41,10 @@ interface CheckpointStorage { * underlying database connection is closed, so any processing should happen before it is closed. */ fun getAllCheckpoints(): Stream> + + /** + * Stream runnable checkpoints from the store. If this is backed by a database the stream will be valid + * until the underlying database connection is closed, so any processing should happen before it is closed. + */ + fun getRunnableCheckpoints(): Stream> } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 7f554edbf6..cd0a465706 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -2,7 +2,6 @@ package net.corda.node.services.persistence import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.PLATFORM_VERSION -import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.serialize @@ -45,6 +44,8 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP private const val MAX_PROGRESS_STEP_LENGTH = 256 + private val NOT_RUNNABLE_CHECKPOINTS = listOf(FlowStatus.COMPLETED, FlowStatus.FAILED, FlowStatus.KILLED) + /** * This needs to run before Hibernate is initialised. * @@ -251,6 +252,18 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP } } + override fun getRunnableCheckpoints(): Stream> { + val session = currentDBSession() + val criteriaBuilder = session.criteriaBuilder + val criteriaQuery = criteriaBuilder.createQuery(DBFlowCheckpoint::class.java) + val root = criteriaQuery.from(DBFlowCheckpoint::class.java) + criteriaQuery.select(root) + .where(criteriaBuilder.not(root.get(DBFlowCheckpoint::status.name).`in`(NOT_RUNNABLE_CHECKPOINTS))) + return session.createQuery(criteriaQuery).stream().map { + StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint() + } + } + private fun getDBCheckpoint(id: StateMachineRunId): DBFlowCheckpoint? { return currentDBSession().find(DBFlowCheckpoint::class.java, id.uuid.toString()) } diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt index c3979612ab..200947538b 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt @@ -141,7 +141,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri try { if (lock.getAndIncrement() == 0 && !file.exists()) { database.transaction { - checkpointStorage.getAllCheckpoints().use { stream -> + checkpointStorage.getRunnableCheckpoints().use { stream -> ZipOutputStream(file.outputStream()).use { zip -> stream.forEach { (runId, serialisedCheckpoint) -> diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index c7f750fd31..910815a9d3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -329,7 +329,7 @@ class SingleThreadedStateMachineManager( } private fun restoreFlowsFromCheckpoints(): List { - return checkpointStorage.getAllCheckpoints().use { + return checkpointStorage.getRunnableCheckpoints().use { it.mapNotNull { (id, serializedCheckpoint) -> // If a flow is added before start() then don't attempt to restore it mutex.locked { if (id in flows) return@mapNotNull null } diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index a8016ed17a..789909f3ab 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -63,7 +63,7 @@ import kotlin.test.assertFailsWith import kotlin.test.assertTrue internal fun CheckpointStorage.getAllIncompleteCheckpoints(): List { - return getAllCheckpoints().use { + return getRunnableCheckpoints().use { it.map { it.second }.toList() }.filter { it.status != Checkpoint.FlowStatus.COMPLETED } } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index d1da5967b0..726541775f 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -44,7 +44,7 @@ import kotlin.test.assertEquals import kotlin.test.assertTrue internal fun CheckpointStorage.checkpoints(): List { - return getAllCheckpoints().use { + return getRunnableCheckpoints().use { it.map { it.second }.toList() } } @@ -493,6 +493,35 @@ class DBCheckpointStorageTests { } } + @Test(timeout = 300_000) + fun `fetch runnable checkpoints`() { + val (_, checkpoint) = newCheckpoint(1) + // runnables + val runnable = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE) + val hospitalized = checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED) + // not runnables + val completed = checkpoint.copy(status = Checkpoint.FlowStatus.COMPLETED) + val failed = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED) + val killed = checkpoint.copy(status = Checkpoint.FlowStatus.KILLED) + // tentative + val paused = checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED) // is considered runnable + + database.transaction { + val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + + checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), runnable, serializedFlowState) + checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), hospitalized, serializedFlowState) + checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), completed, serializedFlowState) + checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), failed, serializedFlowState) + checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), killed, serializedFlowState) + checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), paused, serializedFlowState) + } + + database.transaction { + assertEquals(3, checkpointStorage.getRunnableCheckpoints().count()) + } + } + private fun newCheckpointStorage() { database.transaction { checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 242eaa0dba..8bb06ae51d 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -646,7 +646,7 @@ class FlowFrameworkTests { } SuspendingFlow.hookAfterCheckpoint = { - checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status + checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getRunnableCheckpoints().toList().single().second.status } aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow() @@ -691,7 +691,7 @@ class FlowFrameworkTests { // the following method should be removed when implementing CORDA-3604. private fun manuallyFailCheckpointInDB(node: TestStartedNode) { - val idCheckpoint = node.internals.checkpointStorage.getAllCheckpoints().toList().single() + val idCheckpoint = node.internals.checkpointStorage.getRunnableCheckpoints().toList().single() val checkpoint = idCheckpoint.second val updatedCheckpoint = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED) node.internals.checkpointStorage.updateCheckpoint(idCheckpoint.first,