mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
CORDA-2521: Checkpoint verifier no longer cares about the CorDapp jar name (#4669)
The check on the CorDapp hash is sufficient.
This commit is contained in:
parent
262a7ad1b7
commit
88e4b85537
@ -2,6 +2,7 @@ package net.corda.node.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.messaging.startFlow
|
||||
@ -15,14 +16,12 @@ import net.corda.testing.driver.DriverDSL
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.internal.CustomCordapp
|
||||
import net.corda.testing.node.internal.ListenProcessDeathException
|
||||
import net.corda.testing.node.internal.assertCheckpoints
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
// TraderDemoTest already has a test which checks the node can resume a flow from a checkpoint
|
||||
@ -32,27 +31,30 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `restart node with incompatible version of suspended flow due to different jar name`() {
|
||||
driver(parametersForRestartingNodes()) {
|
||||
val defaultCordappJar = createSuspendedFlowInBob()
|
||||
defaultCordappJar.renameTo("renamed-${defaultCordappJar.fileName}")
|
||||
fun `restart node with mismatch between suspended flow and installed CorDapps`() {
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = false,
|
||||
inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
|
||||
cordappsForAllNodes = emptyList(),
|
||||
notarySpecs = emptyList()
|
||||
)) {
|
||||
createSuspendedFlowInBob()
|
||||
val cordappsDir = baseDirectory(BOB_NAME) / "cordapps"
|
||||
|
||||
// Test the scenerio where the CorDapp no longer exists
|
||||
cordappsDir.deleteRecursively()
|
||||
cordappsDir.createDirectories()
|
||||
assertBobFailsToStartWithLogMessage(
|
||||
CheckpointIncompatibleException.FlowNotInstalledException(ReceiverFlow::class.java).message
|
||||
CheckpointIncompatibleException.CordappNotInstalledException(ReceiverFlow::class.java.name).message
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `restart node with incompatible version of suspended flow due to different jar hash`() {
|
||||
driver(parametersForRestartingNodes()) {
|
||||
val defaultCordappJar = createSuspendedFlowInBob()
|
||||
// Clean-up
|
||||
stdOutLogFile(BOB_NAME).let { it.renameTo("${it.fileName}-no-cordapp") }
|
||||
|
||||
// The name is part of the MANIFEST so changing it is sufficient to change the jar hash
|
||||
// Now test the scenerio where the CorDapp's hash is different but the flow exists within the jar
|
||||
val modifiedCordapp = defaultCordapp.copy(name = "${defaultCordapp.name}-modified")
|
||||
val modifiedCordappJar = CustomCordapp.getJarFile(modifiedCordapp)
|
||||
modifiedCordappJar.moveTo(defaultCordappJar, REPLACE_EXISTING)
|
||||
|
||||
assertThat(defaultCordapp.jarFile.hash).isNotEqualTo(modifiedCordapp.jarFile.hash) // Just double-check the hashes are different
|
||||
modifiedCordapp.jarFile.copyToDirectory(cordappsDir)
|
||||
assertBobFailsToStartWithLogMessage(
|
||||
// The part of the log message generated by CheckpointIncompatibleException.FlowVersionIncompatibleException
|
||||
"that is incompatible with the current installed version of"
|
||||
@ -60,7 +62,7 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
}
|
||||
}
|
||||
|
||||
private fun DriverDSL.createSuspendedFlowInBob(): Path {
|
||||
private fun DriverDSL.createSuspendedFlowInBob() {
|
||||
val (alice, bob) = listOf(
|
||||
startNode(providedName = ALICE_NAME),
|
||||
startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp)))
|
||||
@ -72,12 +74,11 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
// Wait until Bob's flow has started
|
||||
bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first()
|
||||
bob.stop()
|
||||
assertCheckpoints(BOB_NAME, 1)
|
||||
|
||||
return (bob.baseDirectory / "cordapps").list().single { it.toString().endsWith(".jar") }
|
||||
}
|
||||
|
||||
private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) {
|
||||
assertCheckpoints(BOB_NAME, 1)
|
||||
|
||||
assertFailsWith(ListenProcessDeathException::class) {
|
||||
startNode(NodeParameters(
|
||||
providedName = BOB_NAME,
|
||||
@ -85,19 +86,11 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
)).getOrThrow()
|
||||
}
|
||||
|
||||
val logDir = baseDirectory(BOB_NAME)
|
||||
val logFile = logDir.list { it.filter { it.fileName.toString().endsWith("out.log") }.findAny().get() }
|
||||
val matchingLineCount = logFile.readLines { it.filter { line -> logMessage in line }.count() }
|
||||
assertEquals(1, matchingLineCount)
|
||||
assertThat(stdOutLogFile(BOB_NAME).readText()).contains(logMessage)
|
||||
}
|
||||
|
||||
private fun parametersForRestartingNodes(): DriverParameters {
|
||||
return DriverParameters(
|
||||
startNodesInProcess = false, // Start nodes in separate processes to ensure CordappLoader is not shared between restarts
|
||||
inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
|
||||
cordappsForAllNodes = emptyList(),
|
||||
notarySpecs = emptyList()
|
||||
)
|
||||
private fun DriverDSL.stdOutLogFile(name: CordaX500Name): Path {
|
||||
return baseDirectory(name).list { it.filter { it.toString().endsWith("stdout.log") }.findAny().get() }
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
|
@ -18,42 +18,59 @@ object CheckpointVerifier {
|
||||
* Verifies that all Checkpoints stored in the db can be safely loaded with the currently installed version.
|
||||
* @throws CheckpointIncompatibleException if any offending checkpoint is found.
|
||||
*/
|
||||
fun verifyCheckpointsCompatible(checkpointStorage: CheckpointStorage, currentCordapps: List<Cordapp>, platformVersion: Int, serviceHub: ServiceHub, tokenizableServices: List<Any>) {
|
||||
fun verifyCheckpointsCompatible(
|
||||
checkpointStorage: CheckpointStorage,
|
||||
currentCordapps: List<Cordapp>,
|
||||
platformVersion: Int,
|
||||
serviceHub: ServiceHub,
|
||||
tokenizableServices: List<Any>
|
||||
) {
|
||||
val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
|
||||
CheckpointSerializeAsTokenContextImpl(tokenizableServices, CheckpointSerializationDefaults.CHECKPOINT_SERIALIZER, CheckpointSerializationDefaults.CHECKPOINT_CONTEXT, serviceHub)
|
||||
CheckpointSerializeAsTokenContextImpl(
|
||||
tokenizableServices,
|
||||
CheckpointSerializationDefaults.CHECKPOINT_SERIALIZER,
|
||||
CheckpointSerializationDefaults.CHECKPOINT_CONTEXT,
|
||||
serviceHub
|
||||
)
|
||||
)
|
||||
checkpointStorage.getAllCheckpoints().forEach { (_, serializedCheckpoint) ->
|
||||
|
||||
val cordappsByHash = currentCordapps.associateBy { it.jarHash }
|
||||
|
||||
checkpointStorage.getAllCheckpoints().forEach { (_, serializedCheckpoint) ->
|
||||
val checkpoint = try {
|
||||
serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
|
||||
} catch (e: ClassNotFoundException) {
|
||||
val message = e.message
|
||||
if (message != null) {
|
||||
throw CheckpointIncompatibleException.CordappNotInstalledException(message)
|
||||
} else {
|
||||
throw CheckpointIncompatibleException.CannotBeDeserialisedException(e)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
throw CheckpointIncompatibleException.CannotBeDeserialisedException(e)
|
||||
}
|
||||
|
||||
// For each Subflow, compare the checkpointed version to the current version.
|
||||
checkpoint.subFlowStack.forEach { checkFlowCompatible(it, currentCordapps, platformVersion) }
|
||||
checkpoint.subFlowStack.forEach { checkFlowCompatible(it, cordappsByHash, platformVersion) }
|
||||
}
|
||||
}
|
||||
|
||||
// Throws exception when the flow is incompatible
|
||||
private fun checkFlowCompatible(subFlow: SubFlow, currentCordapps: List<Cordapp>, platformVersion: Int) {
|
||||
val corDappInfo = subFlow.subFlowVersion
|
||||
private fun checkFlowCompatible(subFlow: SubFlow, currentCordappsByHash: Map<SecureHash.SHA256, Cordapp>, platformVersion: Int) {
|
||||
val subFlowVersion = subFlow.subFlowVersion
|
||||
|
||||
if (corDappInfo.platformVersion != platformVersion) {
|
||||
throw CheckpointIncompatibleException.SubFlowCoreVersionIncompatibleException(subFlow.flowClass, corDappInfo.platformVersion)
|
||||
if (subFlowVersion.platformVersion != platformVersion) {
|
||||
throw CheckpointIncompatibleException.SubFlowCoreVersionIncompatibleException(subFlow.flowClass, subFlowVersion.platformVersion)
|
||||
}
|
||||
|
||||
if (corDappInfo is SubFlowVersion.CorDappFlow) {
|
||||
val installedCordapps = currentCordapps.filter { it.name == corDappInfo.corDappName }
|
||||
when (installedCordapps.size) {
|
||||
0 -> throw CheckpointIncompatibleException.FlowNotInstalledException(subFlow.flowClass)
|
||||
1 -> {
|
||||
val currenCordapp = installedCordapps.first()
|
||||
if (corDappInfo.corDappHash != currenCordapp.jarHash) {
|
||||
throw CheckpointIncompatibleException.FlowVersionIncompatibleException(subFlow.flowClass, currenCordapp, corDappInfo.corDappHash)
|
||||
}
|
||||
}
|
||||
else -> throw IllegalStateException("Multiple Cordapps with name ${corDappInfo.corDappName} installed.") // This should not happen
|
||||
// If the sub-flow is from a CorDapp then make sure we have that exact CorDapp jar loaded
|
||||
if (subFlowVersion is SubFlowVersion.CorDappFlow && subFlowVersion.corDappHash !in currentCordappsByHash) {
|
||||
// If we don't then see if the flow exists in any of the CorDapps so that we can give the user a more useful error message
|
||||
val matchingCordapp = currentCordappsByHash.values.find { subFlow.flowClass in it.allFlows }
|
||||
if (matchingCordapp != null) {
|
||||
throw CheckpointIncompatibleException.FlowVersionIncompatibleException(subFlow.flowClass, matchingCordapp, subFlowVersion.corDappHash)
|
||||
} else {
|
||||
throw CheckpointIncompatibleException.CordappNotInstalledException(subFlow.flowClass.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -64,15 +81,20 @@ object CheckpointVerifier {
|
||||
*/
|
||||
sealed class CheckpointIncompatibleException(override val message: String) : Exception() {
|
||||
class CannotBeDeserialisedException(val e: Exception) : CheckpointIncompatibleException(
|
||||
"Found checkpoint that cannot be deserialised using the current Corda version. Please revert to the previous version of Corda, drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again. Cause: ${e.message}")
|
||||
"Found checkpoint that cannot be deserialised using the current Corda version. Please revert to the previous version of Corda, " +
|
||||
"drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again. Cause: ${e.message}")
|
||||
|
||||
class SubFlowCoreVersionIncompatibleException(val flowClass: Class<out FlowLogic<*>>, oldVersion: Int) : CheckpointIncompatibleException(
|
||||
"Found checkpoint for flow: ${flowClass} that is incompatible with the current Corda platform. Please revert to the previous version of Corda (version ${oldVersion}), drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.")
|
||||
"Found checkpoint for flow: $flowClass that is incompatible with the current Corda platform. Please revert to the previous " +
|
||||
"version of Corda (version $oldVersion), drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.")
|
||||
|
||||
class FlowVersionIncompatibleException(val flowClass: Class<out FlowLogic<*>>, val cordapp: Cordapp, oldHash: SecureHash) : CheckpointIncompatibleException(
|
||||
"Found checkpoint for flow: ${flowClass} that is incompatible with the current installed version of ${cordapp.name}. Please reinstall the previous version of the CorDapp (with hash: ${oldHash}), drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.")
|
||||
"Found checkpoint for flow: $flowClass that is incompatible with the current installed version of ${cordapp.name}. " +
|
||||
"Please reinstall the previous version of the CorDapp (with hash: $oldHash), drain your node " +
|
||||
"(see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.")
|
||||
|
||||
class FlowNotInstalledException(val flowClass: Class<out FlowLogic<*>>) : CheckpointIncompatibleException(
|
||||
"Found checkpoint for flow: ${flowClass} that is no longer installed. Please install the missing CorDapp, drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.")
|
||||
class CordappNotInstalledException(classNotFound: String) : CheckpointIncompatibleException(
|
||||
"Found checkpoint for CorDapp that is no longer installed. Specifically, could not find class $classNotFound. Please install the " +
|
||||
"missing CorDapp, drain your node (see https://docs.corda.net/upgrading-cordapps.html#flow-drains), and try again.")
|
||||
}
|
||||
|
||||
|
@ -22,8 +22,8 @@ import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
@ -171,7 +171,7 @@ class DBCheckpointStorageTests {
|
||||
checkpointStorage.addCheckpoint(id1, checkpoint1)
|
||||
}
|
||||
|
||||
Assertions.assertThatThrownBy {
|
||||
assertThatThrownBy {
|
||||
database.transaction {
|
||||
CheckpointVerifier.verifyCheckpointsCompatible(checkpointStorage, emptyList(), 1, mockServices, emptyList())
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ import com.typesafe.config.ConfigFactory
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import com.typesafe.config.ConfigValueFactory
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
import net.corda.cliutils.CommonCliConstants.BASE_DIR
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.concurrent.firstOf
|
||||
@ -579,7 +578,7 @@ class DriverDSLImpl(
|
||||
extraCustomCordapps + (cordappsForAllNodes ?: emptySet())
|
||||
)
|
||||
|
||||
if (parameters.startInSameProcess ?: startNodesInProcess) {
|
||||
val nodeFuture = if (parameters.startInSameProcess ?: startNodesInProcess) {
|
||||
val nodeAndThreadFuture = startInProcessNode(executorService, config)
|
||||
shutdownManager.registerShutdown(
|
||||
nodeAndThreadFuture.map { (node, thread) ->
|
||||
@ -603,7 +602,7 @@ class DriverDSLImpl(
|
||||
}
|
||||
}
|
||||
}
|
||||
return nodeFuture
|
||||
nodeFuture
|
||||
} else {
|
||||
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
||||
val process = startOutOfProcessNode(config, quasarJarPath, debugPort, systemProperties, parameters.maximumHeapSize)
|
||||
@ -624,7 +623,7 @@ class DriverDSLImpl(
|
||||
}
|
||||
val effectiveP2PAddress = config.corda.messagingServerAddress ?: config.corda.p2pAddress
|
||||
val p2pReadyFuture = addressMustBeBoundFuture(executorService, effectiveP2PAddress, process)
|
||||
return p2pReadyFuture.flatMap {
|
||||
p2pReadyFuture.flatMap {
|
||||
val processDeathFuture = poll(executorService, "process death while waiting for RPC (${config.corda.myLegalName})") {
|
||||
if (process.isAlive) null else process
|
||||
}
|
||||
@ -644,6 +643,8 @@ class DriverDSLImpl(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nodeFuture.doOnError { onNodeExit() }
|
||||
}
|
||||
|
||||
override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): CordaFuture<A> {
|
||||
|
Loading…
Reference in New Issue
Block a user