CORDA-3598 Set Checkpoint.status to RUNNABLE (#6019)

* Set/ Reset Checkpoint.status to RUNNABLE after when suspending

* Removing/ Moving comment as it makes no longer sense to be there since, we now always create a new Checkpoint object in SingleThreadedStateMachineManager.createFlowFromCheckpoint through tryDeserializeCheckpoint

* Set -in memory- Checkpoint.status to RUNNABLE when a flow is retrying from Checkpoint
This commit is contained in:
Kyriakos Tharrouniatis 2020-03-12 17:57:35 +00:00 committed by GitHub
parent 499b6cf17e
commit 0174d996bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 116 additions and 9 deletions

View File

@ -604,7 +604,7 @@ class SingleThreadedStateMachineManager(
// This is a brand new flow
null
}
val checkpoint = existingCheckpoint ?: Checkpoint.create(
val checkpoint = existingCheckpoint?.copy(status = Checkpoint.FlowStatus.RUNNABLE) ?: Checkpoint.create(
invocationContext,
flowStart,
flowLogic.javaClass,
@ -774,7 +774,7 @@ class SingleThreadedStateMachineManager(
isStartIdempotent: Boolean,
initialDeduplicationHandler: DeduplicationHandler?
): Flow? {
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return null
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.copy(status = Checkpoint.FlowStatus.RUNNABLE) ?: return null
val flowState = checkpoint.flowState
val resultFuture = openFuture<Any?>()
val fiber = when (flowState) {
@ -800,12 +800,7 @@ class SingleThreadedStateMachineManager(
is FlowState.Started -> {
val fiber = tryCheckpointDeserialize(flowState.frozenFiber, id) ?: return null
val state = StateMachineState(
// Do a trivial checkpoint copy below, to update the Checkpoint#timestamp value.
// The Checkpoint#timestamp is being used by FlowMonitor as the starting time point of a potential suspension.
// We need to refresh the Checkpoint#timestamp here, in case of an e.g. node start up after a long period.
// If not then, there is a time window (until the next checkpoint update) in which the FlowMonitor
// could log this flow as a waiting flow, from the last checkpoint update i.e. before the node's start up.
checkpoint = checkpoint.copy(),
checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isTransactionTracked = false,

View File

@ -75,7 +75,12 @@ data class Checkpoint(
PAUSED
}
val timestamp: Instant = Instant.now() // This will get updated every time a Checkpoint object is created/ created by copy.
/**
* [timestamp] will get updated every time a [Checkpoint] object is created/ created by copy.
* It will be updated, therefore, for example when a flow is being suspended or whenever a flow
* is being loaded from [Checkpoint] through [Serialized.deserialize].
*/
val timestamp: Instant = Instant.now()
companion object {

View File

@ -20,6 +20,7 @@ import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.messaging.MessageRecipients
@ -27,6 +28,7 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.queryBy
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
@ -34,10 +36,14 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.ProgressTracker.Change
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.services.persistence.CheckpointPerformanceRecorder
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyState
import net.corda.testing.core.ALICE_NAME
@ -66,12 +72,16 @@ import org.junit.Before
import org.junit.Test
import rx.Notification
import rx.Observable
import java.sql.SQLException
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.function.Predicate
import kotlin.reflect.KClass
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class FlowFrameworkTests {
companion object {
@ -597,6 +607,88 @@ class FlowFrameworkTests {
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
}
@Test(timeout=300_000)
fun `Checkpoint status changes to RUNNABLE when flow is loaded from checkpoint - FlowState Unstarted`() {
var firstExecution = true
var checkpointStatusInDBBeforeSuspension: Checkpoint.FlowStatus? = null
var checkpointStatusInDBAfterSuspension: Checkpoint.FlowStatus? = null
var checkpointStatusInMemoryBeforeSuspension: Checkpoint.FlowStatus? = null
SuspendingFlow.hookBeforeCheckpoint = {
val flowFiber = this as? FlowStateMachineImpl<*>
assertTrue(flowFiber!!.transientState!!.value.checkpoint.flowState is FlowState.Unstarted)
if (firstExecution) {
// the following manual persisting Checkpoint.status to FAILED should be removed when implementing CORDA-3604.
manuallyFailCheckpointInDB(aliceNode)
firstExecution = false
throw SQLException("deadlock") // will cause flow to retry
} else {
// The persisted Checkpoint should be still failed here -> it should change to RUNNABLE after suspension
checkpointStatusInDBBeforeSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
checkpointStatusInMemoryBeforeSuspension = flowFiber.transientState!!.value.checkpoint.status
}
}
SuspendingFlow.hookAfterCheckpoint = {
checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
}
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatusInDBBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInMemoryBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInDBAfterSuspension)
SuspendingFlow.hookBeforeCheckpoint = {}
SuspendingFlow.hookAfterCheckpoint = {}
}
@Test(timeout=300_000)
fun `Checkpoint status changes to RUNNABLE when flow is loaded from checkpoint - FlowState Started`() {
var firstExecution = true
var checkpointStatusInDB: Checkpoint.FlowStatus? = null
var checkpointStatusInMemory: Checkpoint.FlowStatus? = null
SuspendingFlow.hookAfterCheckpoint = {
val flowFiber = this as? FlowStateMachineImpl<*>
assertTrue(flowFiber!!.transientState!!.value.checkpoint.flowState is FlowState.Started)
if (firstExecution) {
// the following manual persisting Checkpoint.status to FAILED should be removed when implementing CORDA-3604.
manuallyFailCheckpointInDB(aliceNode)
firstExecution = false
throw SQLException("deadlock") // will cause flow to retry
} else {
checkpointStatusInDB = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
checkpointStatusInMemory = flowFiber.transientState!!.value.checkpoint.status
}
}
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatusInDB)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInMemory)
SuspendingFlow.hookAfterCheckpoint = {}
}
// the following method should be removed when implementing CORDA-3604.
private fun manuallyFailCheckpointInDB(node: TestStartedNode) {
val idCheckpoint = node.internals.checkpointStorage.getAllCheckpoints().toList().single()
val checkpoint = idCheckpoint.second
val updatedCheckpoint = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
node.internals.checkpointStorage.updateCheckpoint(idCheckpoint.first,
updatedCheckpoint.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT),
updatedCheckpoint.serializedFlowState)
contextTransaction.commit()
contextTransaction.close()
contextTransactionOrNull = null
contextDatabase.newTransaction()
}
//region Helpers
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
@ -949,4 +1041,19 @@ internal class ExceptionFlow<E : Exception>(val exception: () -> E) : FlowLogic<
exceptionThrown = exception()
throw exceptionThrown
}
}
internal class SuspendingFlow : FlowLogic<Unit>() {
companion object {
var hookBeforeCheckpoint: FlowStateMachine<*>.() -> Unit = {}
var hookAfterCheckpoint: FlowStateMachine<*>.() -> Unit = {}
}
@Suspendable
override fun call() {
stateMachine.hookBeforeCheckpoint()
sleep(1.seconds) // flow checkpoints => checkpoint is in DB
stateMachine.hookAfterCheckpoint()
}
}