From 0174d996bd6ab86f0a1d06767e21d0bdf21e1e6c Mon Sep 17 00:00:00 2001 From: Kyriakos Tharrouniatis Date: Thu, 12 Mar 2020 17:57:35 +0000 Subject: [PATCH] CORDA-3598 Set Checkpoint.status to RUNNABLE (#6019) * Set/ Reset Checkpoint.status to RUNNABLE after when suspending * Removing/ Moving comment as it makes no longer sense to be there since, we now always create a new Checkpoint object in SingleThreadedStateMachineManager.createFlowFromCheckpoint through tryDeserializeCheckpoint * Set -in memory- Checkpoint.status to RUNNABLE when a flow is retrying from Checkpoint --- .../SingleThreadedStateMachineManager.kt | 11 +- .../statemachine/StateMachineState.kt | 7 +- .../statemachine/FlowFrameworkTests.kt | 107 ++++++++++++++++++ 3 files changed, 116 insertions(+), 9 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index d722ad9fdb..c7f750fd31 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -604,7 +604,7 @@ class SingleThreadedStateMachineManager( // This is a brand new flow null } - val checkpoint = existingCheckpoint ?: Checkpoint.create( + val checkpoint = existingCheckpoint?.copy(status = Checkpoint.FlowStatus.RUNNABLE) ?: Checkpoint.create( invocationContext, flowStart, flowLogic.javaClass, @@ -774,7 +774,7 @@ class SingleThreadedStateMachineManager( isStartIdempotent: Boolean, initialDeduplicationHandler: DeduplicationHandler? ): Flow? { - val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return null + val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.copy(status = Checkpoint.FlowStatus.RUNNABLE) ?: return null val flowState = checkpoint.flowState val resultFuture = openFuture() val fiber = when (flowState) { @@ -800,12 +800,7 @@ class SingleThreadedStateMachineManager( is FlowState.Started -> { val fiber = tryCheckpointDeserialize(flowState.frozenFiber, id) ?: return null val state = StateMachineState( - // Do a trivial checkpoint copy below, to update the Checkpoint#timestamp value. - // The Checkpoint#timestamp is being used by FlowMonitor as the starting time point of a potential suspension. - // We need to refresh the Checkpoint#timestamp here, in case of an e.g. node start up after a long period. - // If not then, there is a time window (until the next checkpoint update) in which the FlowMonitor - // could log this flow as a waiting flow, from the last checkpoint update i.e. before the node's start up. - checkpoint = checkpoint.copy(), + checkpoint = checkpoint, pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, isTransactionTracked = false, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index d1f75dbcf3..4e4a1414a4 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -75,7 +75,12 @@ data class Checkpoint( PAUSED } - val timestamp: Instant = Instant.now() // This will get updated every time a Checkpoint object is created/ created by copy. + /** + * [timestamp] will get updated every time a [Checkpoint] object is created/ created by copy. + * It will be updated, therefore, for example when a flow is being suspended or whenever a flow + * is being loaded from [Checkpoint] through [Serialized.deserialize]. + */ + val timestamp: Instant = Instant.now() companion object { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 54e863d76f..01bd6e76e6 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -20,6 +20,7 @@ import net.corda.core.flows.ReceiveFinalityFlow import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.identity.Party import net.corda.core.internal.DeclaredField +import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowIORequest import net.corda.core.internal.concurrent.flatMap import net.corda.core.messaging.MessageRecipients @@ -27,6 +28,7 @@ import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.queryBy import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize +import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.serialize import net.corda.core.toFuture import net.corda.core.transactions.SignedTransaction @@ -34,10 +36,14 @@ import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker.Change import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds import net.corda.core.utilities.unwrap import net.corda.node.services.persistence.CheckpointPerformanceRecorder import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.checkpoints +import net.corda.nodeapi.internal.persistence.contextDatabase +import net.corda.nodeapi.internal.persistence.contextTransaction +import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyState import net.corda.testing.core.ALICE_NAME @@ -66,12 +72,16 @@ import org.junit.Before import org.junit.Test import rx.Notification import rx.Observable +import java.sql.SQLException import java.time.Duration import java.time.Instant import java.util.* import java.util.function.Predicate import kotlin.reflect.KClass +import kotlin.streams.toList +import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertTrue class FlowFrameworkTests { companion object { @@ -597,6 +607,88 @@ class FlowFrameworkTests { assertThat(result.getOrThrow()).isEqualTo("HelloHello") } + @Test(timeout=300_000) + fun `Checkpoint status changes to RUNNABLE when flow is loaded from checkpoint - FlowState Unstarted`() { + var firstExecution = true + var checkpointStatusInDBBeforeSuspension: Checkpoint.FlowStatus? = null + var checkpointStatusInDBAfterSuspension: Checkpoint.FlowStatus? = null + var checkpointStatusInMemoryBeforeSuspension: Checkpoint.FlowStatus? = null + + SuspendingFlow.hookBeforeCheckpoint = { + val flowFiber = this as? FlowStateMachineImpl<*> + assertTrue(flowFiber!!.transientState!!.value.checkpoint.flowState is FlowState.Unstarted) + + if (firstExecution) { + // the following manual persisting Checkpoint.status to FAILED should be removed when implementing CORDA-3604. + manuallyFailCheckpointInDB(aliceNode) + + firstExecution = false + throw SQLException("deadlock") // will cause flow to retry + } else { + // The persisted Checkpoint should be still failed here -> it should change to RUNNABLE after suspension + checkpointStatusInDBBeforeSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status + checkpointStatusInMemoryBeforeSuspension = flowFiber.transientState!!.value.checkpoint.status + } + } + + SuspendingFlow.hookAfterCheckpoint = { + checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status + } + + aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow() + + assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatusInDBBeforeSuspension) + assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInMemoryBeforeSuspension) + assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInDBAfterSuspension) + + SuspendingFlow.hookBeforeCheckpoint = {} + SuspendingFlow.hookAfterCheckpoint = {} + } + + @Test(timeout=300_000) + fun `Checkpoint status changes to RUNNABLE when flow is loaded from checkpoint - FlowState Started`() { + var firstExecution = true + var checkpointStatusInDB: Checkpoint.FlowStatus? = null + var checkpointStatusInMemory: Checkpoint.FlowStatus? = null + + SuspendingFlow.hookAfterCheckpoint = { + val flowFiber = this as? FlowStateMachineImpl<*> + assertTrue(flowFiber!!.transientState!!.value.checkpoint.flowState is FlowState.Started) + + if (firstExecution) { + // the following manual persisting Checkpoint.status to FAILED should be removed when implementing CORDA-3604. + manuallyFailCheckpointInDB(aliceNode) + + firstExecution = false + throw SQLException("deadlock") // will cause flow to retry + } else { + checkpointStatusInDB = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status + checkpointStatusInMemory = flowFiber.transientState!!.value.checkpoint.status + } + } + + aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow() + + assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatusInDB) + assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInMemory) + + SuspendingFlow.hookAfterCheckpoint = {} + } + + // the following method should be removed when implementing CORDA-3604. + private fun manuallyFailCheckpointInDB(node: TestStartedNode) { + val idCheckpoint = node.internals.checkpointStorage.getAllCheckpoints().toList().single() + val checkpoint = idCheckpoint.second + val updatedCheckpoint = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED) + node.internals.checkpointStorage.updateCheckpoint(idCheckpoint.first, + updatedCheckpoint.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT), + updatedCheckpoint.serializedFlowState) + contextTransaction.commit() + contextTransaction.close() + contextTransactionOrNull = null + contextDatabase.newTransaction() + } + //region Helpers private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) @@ -949,4 +1041,19 @@ internal class ExceptionFlow(val exception: () -> E) : FlowLogic< exceptionThrown = exception() throw exceptionThrown } +} + +internal class SuspendingFlow : FlowLogic() { + + companion object { + var hookBeforeCheckpoint: FlowStateMachine<*>.() -> Unit = {} + var hookAfterCheckpoint: FlowStateMachine<*>.() -> Unit = {} + } + + @Suspendable + override fun call() { + stateMachine.hookBeforeCheckpoint() + sleep(1.seconds) // flow checkpoints => checkpoint is in DB + stateMachine.hookAfterCheckpoint() + } } \ No newline at end of file