mirror of
https://github.com/corda/corda.git
synced 2025-01-18 10:46:38 +00:00
CORDA-4001 Verify Paused Checkpoints on Node Startup (#6655)
We should check that PAUSED Checkpoints can be deserialised on node startup as we do for RUNNABLE checkpoints. Otherwise a user might get into trouble if they update the CorDapp.
This commit is contained in:
parent
e243e9b315
commit
133e6fe39a
@ -10,11 +10,13 @@ import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.internal.CheckpointIncompatibleException
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverDSL
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.internal.assertUncompletedCheckpoints
|
||||
@ -32,7 +34,7 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `restart node with mismatch between suspended flow and installed CorDapps`() {
|
||||
fun `restart node with mismatch between suspended flow and installed CorDapps`() {
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = false,
|
||||
inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
|
||||
@ -40,7 +42,44 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
)) {
|
||||
createSuspendedFlowInBob()
|
||||
val (bob, _) = createSuspendedFlowInBob()
|
||||
bob.stop()
|
||||
restartBobWithMismatchedCorDapp()
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `restart node with mismatch between suspended paused flow and installed CorDapps`() {
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = false,
|
||||
inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
|
||||
cordappsForAllNodes = emptyList(),
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
)) {
|
||||
val (bob, flowId) = createSuspendedFlowInBob()
|
||||
val flow = bob.rpc.startFlow(::UpdateStatusToPaused, flowId)
|
||||
flow.returnValue.getOrThrow()
|
||||
bob.stop()
|
||||
restartBobWithMismatchedCorDapp()
|
||||
}
|
||||
}
|
||||
|
||||
private fun DriverDSL.createSuspendedFlowInBob(): Pair<NodeHandle, StateMachineRunId> {
|
||||
val (alice, bob) = listOf(
|
||||
startNode(providedName = ALICE_NAME),
|
||||
startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp)))
|
||||
).map { it.getOrThrow() }
|
||||
|
||||
alice.stop() // Stop Alice so that Bob never receives the message
|
||||
|
||||
val flowId = bob.rpc.startFlow(FlowCheckpointVersionNodeStartupCheckTest::ReceiverFlow, alice.nodeInfo.singleIdentity()).id
|
||||
// Wait until Bob's flow has started
|
||||
bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first()
|
||||
return Pair(bob, flowId)
|
||||
}
|
||||
|
||||
fun DriverDSL.restartBobWithMismatchedCorDapp() {
|
||||
val cordappsDir = baseDirectory(BOB_NAME) / "cordapps"
|
||||
|
||||
// Test the scenerio where the CorDapp no longer exists
|
||||
@ -58,21 +97,6 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
// The part of the log message generated by CheckpointIncompatibleException.FlowVersionIncompatibleException
|
||||
"that is incompatible with the current installed version of"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private fun DriverDSL.createSuspendedFlowInBob() {
|
||||
val (alice, bob) = listOf(
|
||||
startNode(providedName = ALICE_NAME),
|
||||
startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp)))
|
||||
).map { it.getOrThrow() }
|
||||
|
||||
alice.stop() // Stop Alice so that Bob never receives the message
|
||||
|
||||
bob.rpc.startFlow(FlowCheckpointVersionNodeStartupCheckTest::ReceiverFlow, alice.nodeInfo.singleIdentity())
|
||||
// Wait until Bob's flow has started
|
||||
bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first()
|
||||
bob.stop()
|
||||
}
|
||||
|
||||
private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) {
|
||||
@ -107,4 +131,13 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
@Suspendable
|
||||
override fun call() = otherSide.send("Hello!")
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class UpdateStatusToPaused(private val id: StateMachineRunId): FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val statement = "Update node_checkpoints set status = ${Checkpoint.FlowStatus.PAUSED.ordinal} where flow_id = '${id.uuid}'"
|
||||
serviceHub.jdbcSession().prepareStatement(statement).execute()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.node.services.statemachine.SubFlow
|
||||
import net.corda.node.services.statemachine.SubFlowVersion
|
||||
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
||||
@ -13,6 +14,8 @@ import net.corda.serialization.internal.withTokenContext
|
||||
|
||||
object CheckpointVerifier {
|
||||
|
||||
private val statusToVerify = setOf(Checkpoint.FlowStatus.RUNNABLE, Checkpoint.FlowStatus.HOSPITALIZED, Checkpoint.FlowStatus.PAUSED)
|
||||
|
||||
/**
|
||||
* Verifies that all Checkpoints stored in the db can be safely loaded with the currently installed version.
|
||||
* @throws CheckpointIncompatibleException if any offending checkpoint is found.
|
||||
@ -35,7 +38,7 @@ object CheckpointVerifier {
|
||||
|
||||
val cordappsByHash = currentCordapps.associateBy { it.jarHash }
|
||||
|
||||
checkpointStorage.getCheckpointsToRun().use {
|
||||
checkpointStorage.getCheckpoints(statusToVerify).use {
|
||||
it.forEach { (_, serializedCheckpoint) ->
|
||||
val checkpoint = try {
|
||||
serializedCheckpoint.deserialize(checkpointSerializationContext)
|
||||
|
Loading…
Reference in New Issue
Block a user