From 133e6fe39a7c309e8e611ca889fdd72a0129d526 Mon Sep 17 00:00:00 2001 From: William Vigor <58432369+williamvigorr3@users.noreply.github.com> Date: Mon, 24 Aug 2020 12:59:51 +0100 Subject: [PATCH] CORDA-4001 Verify Paused Checkpoints on Node Startup (#6655) We should check that PAUSED Checkpoints can be deserialised on node startup as we do for RUNNABLE checkpoints. Otherwise a user might get into trouble if they update the CorDapp. --- ...owCheckpointVersionNodeStartupCheckTest.kt | 67 ++++++++++++++----- .../corda/node/internal/CheckpointVerifier.kt | 5 +- 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt index 0cb4c7fd77..87175bb06d 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt @@ -10,11 +10,13 @@ import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.internal.CheckpointIncompatibleException +import net.corda.node.services.statemachine.Checkpoint import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverDSL import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.NodeParameters import net.corda.testing.driver.driver import net.corda.testing.node.internal.assertUncompletedCheckpoints @@ -32,7 +34,7 @@ class FlowCheckpointVersionNodeStartupCheckTest { } @Test(timeout=300_000) - fun `restart node with mismatch between suspended flow and installed CorDapps`() { + fun `restart node with mismatch between suspended flow and installed CorDapps`() { driver(DriverParameters( startNodesInProcess = false, inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows @@ -40,7 +42,44 @@ class FlowCheckpointVersionNodeStartupCheckTest { notarySpecs = emptyList(), allowHibernateToManageAppSchema = false )) { - createSuspendedFlowInBob() + val (bob, _) = createSuspendedFlowInBob() + bob.stop() + restartBobWithMismatchedCorDapp() + } + } + + @Test(timeout=300_000) + fun `restart node with mismatch between suspended paused flow and installed CorDapps`() { + driver(DriverParameters( + startNodesInProcess = false, + inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows + cordappsForAllNodes = emptyList(), + notarySpecs = emptyList(), + allowHibernateToManageAppSchema = false + )) { + val (bob, flowId) = createSuspendedFlowInBob() + val flow = bob.rpc.startFlow(::UpdateStatusToPaused, flowId) + flow.returnValue.getOrThrow() + bob.stop() + restartBobWithMismatchedCorDapp() + } + } + + private fun DriverDSL.createSuspendedFlowInBob(): Pair { + val (alice, bob) = listOf( + startNode(providedName = ALICE_NAME), + startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp))) + ).map { it.getOrThrow() } + + alice.stop() // Stop Alice so that Bob never receives the message + + val flowId = bob.rpc.startFlow(FlowCheckpointVersionNodeStartupCheckTest::ReceiverFlow, alice.nodeInfo.singleIdentity()).id + // Wait until Bob's flow has started + bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first() + return Pair(bob, flowId) + } + + fun DriverDSL.restartBobWithMismatchedCorDapp() { val cordappsDir = baseDirectory(BOB_NAME) / "cordapps" // Test the scenerio where the CorDapp no longer exists @@ -58,21 +97,6 @@ class FlowCheckpointVersionNodeStartupCheckTest { // The part of the log message generated by CheckpointIncompatibleException.FlowVersionIncompatibleException "that is incompatible with the current installed version of" ) - } - } - - private fun DriverDSL.createSuspendedFlowInBob() { - val (alice, bob) = listOf( - startNode(providedName = ALICE_NAME), - startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp))) - ).map { it.getOrThrow() } - - alice.stop() // Stop Alice so that Bob never receives the message - - bob.rpc.startFlow(FlowCheckpointVersionNodeStartupCheckTest::ReceiverFlow, alice.nodeInfo.singleIdentity()) - // Wait until Bob's flow has started - bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first() - bob.stop() } private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) { @@ -107,4 +131,13 @@ class FlowCheckpointVersionNodeStartupCheckTest { @Suspendable override fun call() = otherSide.send("Hello!") } + + @StartableByRPC + class UpdateStatusToPaused(private val id: StateMachineRunId): FlowLogic() { + @Suspendable + override fun call() { + val statement = "Update node_checkpoints set status = ${Checkpoint.FlowStatus.PAUSED.ordinal} where flow_id = '${id.uuid}'" + serviceHub.jdbcSession().prepareStatement(statement).execute() + } + } } diff --git a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt index 5042e2e9ff..052de0ab3c 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt @@ -6,6 +6,7 @@ import net.corda.core.flows.FlowLogic import net.corda.core.node.ServiceHub import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.node.services.api.CheckpointStorage +import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.SubFlow import net.corda.node.services.statemachine.SubFlowVersion import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl @@ -13,6 +14,8 @@ import net.corda.serialization.internal.withTokenContext object CheckpointVerifier { + private val statusToVerify = setOf(Checkpoint.FlowStatus.RUNNABLE, Checkpoint.FlowStatus.HOSPITALIZED, Checkpoint.FlowStatus.PAUSED) + /** * Verifies that all Checkpoints stored in the db can be safely loaded with the currently installed version. * @throws CheckpointIncompatibleException if any offending checkpoint is found. @@ -35,7 +38,7 @@ object CheckpointVerifier { val cordappsByHash = currentCordapps.associateBy { it.jarHash } - checkpointStorage.getCheckpointsToRun().use { + checkpointStorage.getCheckpoints(statusToVerify).use { it.forEach { (_, serializedCheckpoint) -> val checkpoint = try { serializedCheckpoint.deserialize(checkpointSerializationContext)