CORDA-3655 Do not run COMPLETED, FAILED or KILLED flows on node startup (#6070)

* CheckpointStorage.getAllCheckpoints will not fetch COMPLETED, FAILED and KILLED flows by default

* Rename getAllCheckpoints to getAllRunnableCheckpoints for clarity

* Fix Detekt issue

* Rename getAllRunnableCheckpoints to getRunnableCheckpoints

* Minor kdoc update

* Bring back in CheckpointStorage.getAllCheckpoints to co-exist with getRunnableCheckpoints
This commit is contained in:
Kyriakos Tharrouniatis 2020-03-16 15:25:36 +00:00 committed by GitHub
parent 1025ee1dee
commit 121c789c59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 56 additions and 10 deletions

View File

@ -5,7 +5,6 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.SubFlow
import net.corda.node.services.statemachine.SubFlowVersion
@ -36,7 +35,7 @@ object CheckpointVerifier {
val cordappsByHash = currentCordapps.associateBy { it.jarHash }
checkpointStorage.getAllCheckpoints().use {
checkpointStorage.getRunnableCheckpoints().use {
it.forEach { (_, serializedCheckpoint) ->
val checkpoint = try {
serializedCheckpoint.deserialize(checkpointSerializationContext)

View File

@ -1,7 +1,6 @@
package net.corda.node.services.api
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.FlowState
@ -42,4 +41,10 @@ interface CheckpointStorage {
* underlying database connection is closed, so any processing should happen before it is closed.
*/
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
/**
* Stream runnable checkpoints from the store. If this is backed by a database the stream will be valid
* until the underlying database connection is closed, so any processing should happen before it is closed.
*/
fun getRunnableCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
}

View File

@ -2,7 +2,6 @@ package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
@ -45,6 +44,8 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
private const val MAX_PROGRESS_STEP_LENGTH = 256
private val NOT_RUNNABLE_CHECKPOINTS = listOf(FlowStatus.COMPLETED, FlowStatus.FAILED, FlowStatus.KILLED)
/**
* This needs to run before Hibernate is initialised.
*
@ -251,6 +252,18 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
}
}
override fun getRunnableCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBFlowCheckpoint::class.java)
val root = criteriaQuery.from(DBFlowCheckpoint::class.java)
criteriaQuery.select(root)
.where(criteriaBuilder.not(root.get<FlowStatus>(DBFlowCheckpoint::status.name).`in`(NOT_RUNNABLE_CHECKPOINTS)))
return session.createQuery(criteriaQuery).stream().map {
StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint()
}
}
private fun getDBCheckpoint(id: StateMachineRunId): DBFlowCheckpoint? {
return currentDBSession().find(DBFlowCheckpoint::class.java, id.uuid.toString())
}

View File

@ -141,7 +141,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
try {
if (lock.getAndIncrement() == 0 && !file.exists()) {
database.transaction {
checkpointStorage.getAllCheckpoints().use { stream ->
checkpointStorage.getRunnableCheckpoints().use { stream ->
ZipOutputStream(file.outputStream()).use { zip ->
stream.forEach { (runId, serialisedCheckpoint) ->

View File

@ -329,7 +329,7 @@ class SingleThreadedStateMachineManager(
}
private fun restoreFlowsFromCheckpoints(): List<Flow> {
return checkpointStorage.getAllCheckpoints().use {
return checkpointStorage.getRunnableCheckpoints().use {
it.mapNotNull { (id, serializedCheckpoint) ->
// If a flow is added before start() then don't attempt to restore it
mutex.locked { if (id in flows) return@mapNotNull null }

View File

@ -63,7 +63,7 @@ import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
internal fun CheckpointStorage.getAllIncompleteCheckpoints(): List<Checkpoint.Serialized> {
return getAllCheckpoints().use {
return getRunnableCheckpoints().use {
it.map { it.second }.toList()
}.filter { it.status != Checkpoint.FlowStatus.COMPLETED }
}

View File

@ -44,7 +44,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertTrue
internal fun CheckpointStorage.checkpoints(): List<Checkpoint.Serialized> {
return getAllCheckpoints().use {
return getRunnableCheckpoints().use {
it.map { it.second }.toList()
}
}
@ -493,6 +493,35 @@ class DBCheckpointStorageTests {
}
}
@Test(timeout = 300_000)
fun `fetch runnable checkpoints`() {
val (_, checkpoint) = newCheckpoint(1)
// runnables
val runnable = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
val hospitalized = checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
// not runnables
val completed = checkpoint.copy(status = Checkpoint.FlowStatus.COMPLETED)
val failed = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
val killed = checkpoint.copy(status = Checkpoint.FlowStatus.KILLED)
// tentative
val paused = checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED) // is considered runnable
database.transaction {
val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), runnable, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), hospitalized, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), completed, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), failed, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), killed, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), paused, serializedFlowState)
}
database.transaction {
assertEquals(3, checkpointStorage.getRunnableCheckpoints().count())
}
}
private fun newCheckpointStorage() {
database.transaction {
checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {

View File

@ -646,7 +646,7 @@ class FlowFrameworkTests {
}
SuspendingFlow.hookAfterCheckpoint = {
checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getRunnableCheckpoints().toList().single().second.status
}
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow()
@ -691,7 +691,7 @@ class FlowFrameworkTests {
// the following method should be removed when implementing CORDA-3604.
private fun manuallyFailCheckpointInDB(node: TestStartedNode) {
val idCheckpoint = node.internals.checkpointStorage.getAllCheckpoints().toList().single()
val idCheckpoint = node.internals.checkpointStorage.getRunnableCheckpoints().toList().single()
val checkpoint = idCheckpoint.second
val updatedCheckpoint = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
node.internals.checkpointStorage.updateCheckpoint(idCheckpoint.first,