mirror of
https://github.com/corda/corda.git
synced 2025-03-17 01:25:25 +00:00
CORDA-1477 add check before db evolution that there are no checkpoints. (#967)
* CORDA-1477 add check before db evolution that there are no checkpoints. * CORDA-1477 address code review comments
This commit is contained in:
parent
5ee242de4f
commit
8c1c19fdf8
@ -43,30 +43,30 @@ class SchemaMigration(
|
||||
* Main entry point to the schema migration.
|
||||
* Called during node startup.
|
||||
*/
|
||||
fun nodeStartup() {
|
||||
fun nodeStartup(existingCheckpoints: Boolean) {
|
||||
when {
|
||||
databaseConfig.runMigration -> runMigration()
|
||||
databaseConfig.runMigration -> runMigration(existingCheckpoints)
|
||||
failOnMigrationMissing -> checkState()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* will run the liquibase migration on the actual database
|
||||
* Will run the Liquibase migration on the actual database.
|
||||
*/
|
||||
fun runMigration() = doRunMigration(run = true, outputWriter = null, check = false)
|
||||
fun runMigration(existingCheckpoints: Boolean) = doRunMigration(run = true, outputWriter = null, check = false, existingCheckpoints = existingCheckpoints)
|
||||
|
||||
/**
|
||||
* will write the migration to a Writer
|
||||
* Will write the migration to a [Writer].
|
||||
*/
|
||||
fun generateMigrationScript(writer: Writer) = doRunMigration(run = false, outputWriter = writer, check = false)
|
||||
|
||||
/**
|
||||
* ensures that the database is up to date with the latest migration changes
|
||||
* Ensures that the database is up to date with the latest migration changes.
|
||||
*/
|
||||
fun checkState() = doRunMigration(run = false, outputWriter = null, check = true)
|
||||
|
||||
/**
|
||||
* can be used from an external tool to release the lock in case something went terribly wrong
|
||||
* Can be used from an external tool to release the lock in case something went terribly wrong.
|
||||
*/
|
||||
fun forceReleaseMigrationLock() {
|
||||
dataSource.connection.use { connection ->
|
||||
@ -74,15 +74,15 @@ class SchemaMigration(
|
||||
}
|
||||
}
|
||||
|
||||
private fun doRunMigration(run: Boolean, outputWriter: Writer?, check: Boolean) {
|
||||
private fun doRunMigration(run: Boolean, outputWriter: Writer?, check: Boolean, existingCheckpoints: Boolean? = null) {
|
||||
|
||||
// virtual file name of the changelog that includes all schemas
|
||||
// Virtual file name of the changelog that includes all schemas.
|
||||
val dynamicInclude = "master.changelog.json"
|
||||
|
||||
dataSource.connection.use { connection ->
|
||||
|
||||
// collect all changelog file referenced in the included schemas
|
||||
// for backward compatibility reasons, when failOnMigrationMissing=false, we don't manage CorDapps via Liquibase but use the hibernate hbm2ddl=update
|
||||
// Collect all changelog file referenced in the included schemas.
|
||||
// For backward compatibility reasons, when failOnMigrationMissing=false, we don't manage CorDapps via Liquibase but use the hibernate hbm2ddl=update.
|
||||
val changelogList = schemas.map { mappedSchema ->
|
||||
val resource = getMigrationResource(mappedSchema, classLoader)
|
||||
when {
|
||||
@ -95,18 +95,18 @@ class SchemaMigration(
|
||||
}
|
||||
}
|
||||
|
||||
//create a resourse accessor that aggregates the changelogs included in the schemas into one dynamic stream
|
||||
// Create a resourse accessor that aggregates the changelogs included in the schemas into one dynamic stream.
|
||||
val customResourceAccessor = object : ClassLoaderResourceAccessor(classLoader) {
|
||||
override fun getResourcesAsStream(path: String): Set<InputStream> {
|
||||
|
||||
if (path == dynamicInclude) {
|
||||
//create a map in liquibase format including all migration files
|
||||
// Create a map in Liquibase format including all migration files.
|
||||
val includeAllFiles = mapOf("databaseChangeLog" to changelogList.filter { it != null }.map { file -> mapOf("include" to mapOf("file" to file)) })
|
||||
|
||||
// transform it to json
|
||||
// Transform it to json.
|
||||
val includeAllFilesJson = ObjectMapper().writeValueAsBytes(includeAllFiles)
|
||||
|
||||
//return the json as a stream
|
||||
// Return the json as a stream.
|
||||
return setOf(ByteArrayInputStream(includeAllFilesJson))
|
||||
}
|
||||
return super.getResourcesAsStream(path)?.take(1)?.toSet() ?: emptySet()
|
||||
@ -130,14 +130,13 @@ class SchemaMigration(
|
||||
logger.info("liquibaseSchemaName=${liquibase.database.liquibaseSchemaName}")
|
||||
logger.info("outputDefaultSchema=${liquibase.database.outputDefaultSchema}")
|
||||
|
||||
val unRunChanges = liquibase.listUnrunChangeSets(Contexts(), LabelExpression())
|
||||
|
||||
when {
|
||||
(run && !check) && (unRunChanges.isNotEmpty() && existingCheckpoints!!) -> throw CheckpointsException() // Do not allow database migration when there are checkpoints
|
||||
run && !check -> liquibase.update(Contexts())
|
||||
check && !run -> {
|
||||
val unRunChanges = liquibase.listUnrunChangeSets(Contexts(), LabelExpression())
|
||||
if (unRunChanges.isNotEmpty()) {
|
||||
throw OutstandingDatabaseChangesException(unRunChanges.size)
|
||||
}
|
||||
}
|
||||
check && !run && unRunChanges.isNotEmpty() -> throw OutstandingDatabaseChangesException(unRunChanges.size)
|
||||
check && !run -> {} // Do nothing will be interpreted as "check succeeded"
|
||||
(outputWriter != null) && !check && !run -> liquibase.update(Contexts(), outputWriter)
|
||||
else -> throw IllegalStateException("Invalid usage.")
|
||||
}
|
||||
@ -146,7 +145,7 @@ class SchemaMigration(
|
||||
|
||||
private fun getLiquibaseDatabase(conn: JdbcConnection): Database {
|
||||
|
||||
// the standard MSSQLDatabase in liquibase does not support sequences for Ms Azure
|
||||
// The standard MSSQLDatabase in Liquibase does not support sequences for Ms Azure.
|
||||
// this class just overrides that behaviour
|
||||
class AzureDatabase(conn: JdbcConnection) : MSSQLDatabase() {
|
||||
init {
|
||||
@ -178,4 +177,9 @@ class OutstandingDatabaseChangesException(@Suppress("MemberVisibilityCanBePrivat
|
||||
internal companion object {
|
||||
fun errorMessageFor(count: Int): String = "There are $count outstanding database changes that need to be run. Please use the advanced migration tool. See: https://docs.corda.r3.com/database-migration.html"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class CheckpointsException : DatabaseMigrationException("Attempting to update the database while there are flows in flight. " +
|
||||
"This is dangerous because the node might not be able to restore the flows correctly and could consequently fail. " +
|
||||
"Updating the database would make reverting to the previous version more difficult. " +
|
||||
"Please drain your node first. See: https://docs.corda.net/upgrading-cordapps.html#flow-drains")
|
||||
|
@ -1095,13 +1095,13 @@ fun configureDatabase(hikariProperties: Properties,
|
||||
schemaService.schemaOptions.keys,
|
||||
dataSource,
|
||||
!isH2Database(jdbcUrl),
|
||||
databaseConfig).nodeStartup()
|
||||
databaseConfig).nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })
|
||||
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters)
|
||||
} catch (ex: Exception) {
|
||||
when {
|
||||
ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex)
|
||||
ex.cause is ClassNotFoundException -> throw CouldNotCreateDataSourceException("Could not find the database driver class. Please add it to the 'drivers' folder. See: https://docs.corda.net/corda-configuration-file.html")
|
||||
else -> throw CouldNotCreateDataSourceException("Could not create the DataSource: ${ex.message}", ex)
|
||||
else -> throw ex
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,10 +10,10 @@
|
||||
|
||||
package net.corda.node.services.api
|
||||
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
import java.sql.Connection
|
||||
import java.util.stream.Stream
|
||||
|
||||
/**
|
||||
@ -42,4 +42,12 @@ interface CheckpointStorage {
|
||||
* underlying database connection is closed, so any processing should happen before it is closed.
|
||||
*/
|
||||
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, SerializedBytes<Checkpoint>>>
|
||||
|
||||
/**
|
||||
* This needs to run before Hibernate is initialised.
|
||||
*
|
||||
* @param connection The SQL Connection.
|
||||
* @return the number of checkpoints stored in the database.
|
||||
*/
|
||||
fun getCheckpointCount(connection: Connection): Long
|
||||
}
|
@ -21,6 +21,8 @@ import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.Serializable
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.stream.Stream
|
||||
import javax.persistence.Column
|
||||
@ -77,4 +79,18 @@ class DBCheckpointStorage : CheckpointStorage {
|
||||
StateMachineRunId(UUID.fromString(it.checkpointId)) to SerializedBytes<Checkpoint>(it.checkpoint)
|
||||
}
|
||||
}
|
||||
|
||||
override fun getCheckpointCount(connection: Connection): Long {
|
||||
try {
|
||||
return connection.prepareStatement("select count(*) from node_checkpoints").use { ps ->
|
||||
ps.executeQuery().use { rs ->
|
||||
rs.next()
|
||||
rs.getLong(1)
|
||||
}
|
||||
}
|
||||
} catch (e: SQLException) {
|
||||
// Happens when the table was not created yet.
|
||||
return 0L
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ class SchemaMigrationTest {
|
||||
|
||||
//run the migration on the database
|
||||
val migration = SchemaMigration(schemaService.schemaOptions.keys, HikariDataSource(HikariConfig(dataSourceProps)), true, DatabaseConfig())
|
||||
migration.runMigration()
|
||||
migration.runMigration(false)
|
||||
|
||||
//start the node with "runMigration = false" and check that it started correctly
|
||||
val db = configureDatabase(dataSourceProps, DatabaseConfig(runMigration = false), { null }, { null }, schemaService)
|
||||
|
@ -28,10 +28,12 @@ import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.services.config.ConfigHelper
|
||||
import net.corda.node.services.config.configOf
|
||||
import net.corda.node.services.config.parseAsNodeConfiguration
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.persistence.MigrationExporter
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy
|
||||
import net.corda.nodeapi.internal.config.parseAs
|
||||
import net.corda.nodeapi.internal.persistence.CheckpointsException
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||
import org.slf4j.LoggerFactory
|
||||
@ -163,8 +165,8 @@ private fun handleCommand(options: OptionSet, baseDirectory: Path, configFile: P
|
||||
}
|
||||
val config = parsedConfig.parseAs(Configuration::class, UnknownConfigKeysPolicy.IGNORE::handle)
|
||||
|
||||
fun runMigrationCommand(withMigration: (SchemaMigration) -> Unit): Unit = runWithDataSource(config, baseDirectory, classLoader) { dataSource ->
|
||||
withMigration(SchemaMigration(schemas, dataSource, true, config.database, classLoader))
|
||||
fun runMigrationCommand(withMigration: (SchemaMigration, DataSource) -> Unit): Unit = runWithDataSource(config, baseDirectory, classLoader) { dataSource ->
|
||||
withMigration(SchemaMigration(schemas, dataSource, true, config.database, classLoader), dataSource)
|
||||
}
|
||||
|
||||
when {
|
||||
@ -174,13 +176,13 @@ private fun handleCommand(options: OptionSet, baseDirectory: Path, configFile: P
|
||||
options.has(DRY_RUN) -> {
|
||||
val writer = getMigrationOutput(baseDirectory, options)
|
||||
migrationLogger.info("Exporting the current db migrations ...")
|
||||
runMigrationCommand {
|
||||
it.generateMigrationScript(writer)
|
||||
runMigrationCommand { migration, dataSource ->
|
||||
migration.generateMigrationScript(writer)
|
||||
}
|
||||
}
|
||||
options.has(RUN_MIGRATION) -> {
|
||||
migrationLogger.info("Running the database migration on $baseDirectory")
|
||||
runMigrationCommand { it.runMigration() }
|
||||
runMigrationCommand { migration, dataSource -> migration.runMigration(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) }
|
||||
}
|
||||
options.has(CREATE_MIGRATION_CORDAPP) && (mode == Mode.NODE) -> {
|
||||
|
||||
@ -244,6 +246,8 @@ private fun runWithDataSource(config: Configuration, baseDirectory: Path, classL
|
||||
|
||||
return try {
|
||||
withDatasource(createDatasourceFromDriverJarFolders(config.dataSourceProperties, classLoader, driversFolder + jarDirs))
|
||||
} catch (e: CheckpointsException) {
|
||||
errorAndExit(e.message)
|
||||
} catch (e: Exception) {
|
||||
errorAndExit("""Failed to create datasource.
|
||||
|Please check that the correct JDBC driver is installed in one of the following folders:
|
||||
|
Loading…
x
Reference in New Issue
Block a user