mirror of
https://github.com/corda/corda.git
synced 2025-06-14 13:18:18 +00:00
CORDA-1471 Database schema setup for internal tables via Liquibase (#3815)
Internal tables (the tables from node and finance modules) are now tracked /created by Liquibase script. Tables backing MappedSchemma in Cordapps are created by Hibernate (as before). The PR scope added Liquibase library, setup code SchemaMigration and XML scripts and from Enterprise. For existing database installation - the node will auto-upgrade to use Liquibase. Method migrateOlderDatabaseToUseLiquibase checks for any 3.X existing Corda database to upgrade database to use Liquibase. When the existing database without Liquibase integral tables is detected, the node (at startup) will create Liquibase tracking tables and fill them with all migration scripts (marked as done), this ensure the database will look as it would use Liquibase from the beginning. The database changes gradually introduced by the subsequent 3.X releases (3.1, 3.2) are conditionally run by Liquibase.
This commit is contained in:
@ -0,0 +1,32 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import com.google.common.base.CaseFormat
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
|
||||
object MigrationHelpers {
|
||||
private const val MIGRATION_PREFIX = "migration"
|
||||
private const val DEFAULT_MIGRATION_EXTENSION = "xml"
|
||||
private const val CHANGELOG_NAME = "changelog-master"
|
||||
private val possibleMigrationExtensions = listOf(".xml", ".sql", ".yml", ".json")
|
||||
|
||||
fun getMigrationResource(schema: MappedSchema, classLoader: ClassLoader): String? {
|
||||
val declaredMigration = schema.migrationResource
|
||||
|
||||
if (declaredMigration == null) {
|
||||
// try to apply the naming convention and find the migration file in the classpath
|
||||
val resource = migrationResourceNameForSchema(schema)
|
||||
return possibleMigrationExtensions.map { "$resource$it" }.firstOrNull {
|
||||
classLoader.getResource(it) != null
|
||||
}
|
||||
}
|
||||
|
||||
return "$MIGRATION_PREFIX/$declaredMigration.$DEFAULT_MIGRATION_EXTENSION"
|
||||
}
|
||||
|
||||
// SchemaName will be transformed from camel case to lower_hyphen then add ".changelog-master"
|
||||
private fun migrationResourceNameForSchema(schema: MappedSchema): String {
|
||||
val name: String = schema::class.simpleName!!
|
||||
val fileName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_HYPHEN, name)
|
||||
return "$MIGRATION_PREFIX/$fileName.$CHANGELOG_NAME"
|
||||
}
|
||||
}
|
@ -83,8 +83,6 @@ class CordaPersistence(
|
||||
// Check not in read-only mode.
|
||||
transaction {
|
||||
check(!connection.metaData.isReadOnly) { "Database should not be readonly." }
|
||||
checkCorrectAttachmentsContractsTableName(connection)
|
||||
checkCorrectCheckpointTypeOnPostgres(connection)
|
||||
}
|
||||
}
|
||||
|
||||
@ -272,33 +270,3 @@ private fun Throwable.hasSQLExceptionCause(): Boolean =
|
||||
}
|
||||
|
||||
class CouldNotCreateDataSourceException(override val message: String?, override val cause: Throwable? = null) : Exception()
|
||||
|
||||
class DatabaseIncompatibleException(override val message: String?, override val cause: Throwable? = null) : Exception()
|
||||
|
||||
private fun checkCorrectAttachmentsContractsTableName(connection: Connection) {
|
||||
val correctName = "NODE_ATTACHMENTS_CONTRACTS"
|
||||
val incorrectV30Name = "NODE_ATTACHMENTS_CONTRACT_CLASS_NAME"
|
||||
val incorrectV31Name = "NODE_ATTCHMENTS_CONTRACTS"
|
||||
|
||||
fun warning(incorrectName: String, version: String) = "The database contains the older table name $incorrectName instead of $correctName, see upgrade notes to migrate from Corda database version $version https://docs.corda.net/head/upgrade-notes.html."
|
||||
|
||||
if (!connection.metaData.getTables(null, null, correctName, null).next()) {
|
||||
if (connection.metaData.getTables(null, null, incorrectV30Name, null).next()) { throw DatabaseIncompatibleException(warning(incorrectV30Name, "3.0")) }
|
||||
if (connection.metaData.getTables(null, null, incorrectV31Name, null).next()) { throw DatabaseIncompatibleException(warning(incorrectV31Name, "3.1")) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkCorrectCheckpointTypeOnPostgres(connection: Connection) {
|
||||
val metaData = connection.metaData
|
||||
if (metaData.getDatabaseProductName() != "PostgreSQL") {
|
||||
return
|
||||
}
|
||||
|
||||
val result = metaData.getColumns(null, null, "node_checkpoints", "checkpoint_value")
|
||||
if (result.next()) {
|
||||
val type = result.getString("TYPE_NAME")
|
||||
if (type != "bytea") {
|
||||
throw DatabaseIncompatibleException("The type of the 'checkpoint_value' table must be 'bytea', but 'oid' was found. See upgrade notes to migrate from Corda database version 3.1 https://docs.corda.net/head/upgrade-notes.html.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,181 @@
|
||||
package net.corda.nodeapi.internal.persistence
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import liquibase.Contexts
|
||||
import liquibase.LabelExpression
|
||||
import liquibase.Liquibase
|
||||
import liquibase.database.Database
|
||||
import liquibase.database.DatabaseFactory
|
||||
import liquibase.database.jvm.JdbcConnection
|
||||
import liquibase.resource.ClassLoaderResourceAccessor
|
||||
import net.corda.nodeapi.internal.MigrationHelpers.getMigrationResource
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.InputStream
|
||||
import java.io.Writer
|
||||
import javax.sql.DataSource
|
||||
|
||||
class SchemaMigration(
|
||||
val schemas: Set<MappedSchema>,
|
||||
val dataSource: DataSource,
|
||||
private val databaseConfig: DatabaseConfig,
|
||||
private val classLoader: ClassLoader = Thread.currentThread().contextClassLoader) {
|
||||
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
/**
|
||||
* Main entry point to the schema migration.
|
||||
* Called during node startup.
|
||||
*/
|
||||
fun nodeStartup(existingCheckpoints: Boolean) {
|
||||
when {
|
||||
databaseConfig.initialiseSchema -> {
|
||||
migrateOlderDatabaseToUseLiquibase(existingCheckpoints)
|
||||
runMigration(existingCheckpoints)
|
||||
}
|
||||
else -> checkState()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will run the Liquibase migration on the actual database.
|
||||
*/
|
||||
private fun runMigration(existingCheckpoints: Boolean) = doRunMigration(run = true, check = false, existingCheckpoints = existingCheckpoints)
|
||||
|
||||
/**
|
||||
* Ensures that the database is up to date with the latest migration changes.
|
||||
*/
|
||||
private fun checkState() = doRunMigration(run = false, check = true)
|
||||
|
||||
/** Create a resourse accessor that aggregates the changelogs included in the schemas into one dynamic stream. */
|
||||
private class CustomResourceAccessor(val dynamicInclude: String, val changelogList: List<String?>, classLoader: ClassLoader) : ClassLoaderResourceAccessor(classLoader) {
|
||||
override fun getResourcesAsStream(path: String): Set<InputStream> {
|
||||
if (path == dynamicInclude) {
|
||||
// Create a map in Liquibase format including all migration files.
|
||||
val includeAllFiles = mapOf("databaseChangeLog" to changelogList.filter { it != null }.map { file -> mapOf("include" to mapOf("file" to file)) })
|
||||
|
||||
// Transform it to json.
|
||||
val includeAllFilesJson = ObjectMapper().writeValueAsBytes(includeAllFiles)
|
||||
|
||||
// Return the json as a stream.
|
||||
return setOf(ByteArrayInputStream(includeAllFilesJson))
|
||||
}
|
||||
return super.getResourcesAsStream(path)?.take(1)?.toSet() ?: emptySet()
|
||||
}
|
||||
}
|
||||
|
||||
private fun doRunMigration(run: Boolean, check: Boolean, existingCheckpoints: Boolean? = null) {
|
||||
|
||||
// Virtual file name of the changelog that includes all schemas.
|
||||
val dynamicInclude = "master.changelog.json"
|
||||
|
||||
dataSource.connection.use { connection ->
|
||||
|
||||
// Collect all changelog file referenced in the included schemas.
|
||||
// For backward compatibility reasons, when failOnMigrationMissing=false, we don't manage CorDapps via Liquibase but use the hibernate hbm2ddl=update.
|
||||
val changelogList = schemas.map { mappedSchema ->
|
||||
val resource = getMigrationResource(mappedSchema, classLoader)
|
||||
when {
|
||||
resource != null -> resource
|
||||
else -> throw MissingMigrationException(mappedSchema)
|
||||
}
|
||||
}
|
||||
|
||||
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, changelogList, classLoader)
|
||||
|
||||
val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection)))
|
||||
|
||||
val unRunChanges = liquibase.listUnrunChangeSets(Contexts(), LabelExpression())
|
||||
|
||||
when {
|
||||
(run && !check) && (unRunChanges.isNotEmpty() && existingCheckpoints!!) -> throw CheckpointsException() // Do not allow database migration when there are checkpoints
|
||||
run && !check -> liquibase.update(Contexts())
|
||||
check && !run && unRunChanges.isNotEmpty() -> throw OutstandingDatabaseChangesException(unRunChanges.size)
|
||||
check && !run -> {} // Do nothing will be interpreted as "check succeeded"
|
||||
else -> throw IllegalStateException("Invalid usage.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun getLiquibaseDatabase(conn: JdbcConnection): Database {
|
||||
return DatabaseFactory.getInstance().findCorrectDatabaseImplementation(conn)
|
||||
}
|
||||
|
||||
/** For existing database created before verions 4.0 add Liquibase support - creates DATABASECHANGELOG and DATABASECHANGELOGLOCK tables and mark changesets are executed. */
|
||||
private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean {
|
||||
val isExistingDBWithoutLiquibase = dataSource.connection.use {
|
||||
it.metaData.getTables(null, null, "NODE%", null).next() &&
|
||||
!it.metaData.getTables(null, null, "DATABASECHANGELOG", null).next() &&
|
||||
!it.metaData.getTables(null, null, "DATABASECHANGELOGLOCK", null).next()
|
||||
}
|
||||
when {
|
||||
isExistingDBWithoutLiquibase && existingCheckpoints -> throw CheckpointsException()
|
||||
isExistingDBWithoutLiquibase -> {
|
||||
// Virtual file name of the changelog that includes all schemas.
|
||||
val dynamicInclude = "master.changelog.json"
|
||||
|
||||
dataSource.connection.use { connection ->
|
||||
// Schema migrations pre release 4.0
|
||||
val preV4Baseline =
|
||||
listOf("migration/common.changelog-init.xml",
|
||||
"migration/node-info.changelog-init.xml",
|
||||
"migration/node-info.changelog-v1.xml",
|
||||
"migration/node-info.changelog-v2.xml",
|
||||
"migration/node-core.changelog-init.xml",
|
||||
"migration/node-core.changelog-v3.xml",
|
||||
"migration/node-core.changelog-v4.xml",
|
||||
"migration/node-core.changelog-v5.xml",
|
||||
"migration/node-core.changelog-pkey.xml",
|
||||
"migration/vault-schema.changelog-init.xml",
|
||||
"migration/vault-schema.changelog-v3.xml",
|
||||
"migration/vault-schema.changelog-v4.xml",
|
||||
"migration/vault-schema.changelog-pkey.xml",
|
||||
"migration/cash.changelog-init.xml",
|
||||
"migration/cash.changelog-v1.xml",
|
||||
"migration/commercial-paper.changelog-init.xml",
|
||||
"migration/commercial-paper.changelog-v1.xml") +
|
||||
if (schemas.any { schema -> schema.migrationResource == "node-notary.changelog-master" })
|
||||
listOf("migration/node-notary.changelog-init.xml",
|
||||
"migration/node-notary.changelog-v1.xml",
|
||||
"migration/vault-schema.changelog-pkey.xml")
|
||||
else emptyList()
|
||||
|
||||
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, preV4Baseline, classLoader)
|
||||
val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection)))
|
||||
liquibase.changeLogSync(Contexts(), LabelExpression())
|
||||
}
|
||||
}
|
||||
}
|
||||
return isExistingDBWithoutLiquibase
|
||||
}
|
||||
}
|
||||
|
||||
open class DatabaseMigrationException(message: String) : IllegalArgumentException(message) {
|
||||
override val message: String = super.message!!
|
||||
}
|
||||
|
||||
class MissingMigrationException(@Suppress("MemberVisibilityCanBePrivate") val mappedSchema: MappedSchema) : DatabaseMigrationException(errorMessageFor(mappedSchema)) {
|
||||
internal companion object {
|
||||
fun errorMessageFor(mappedSchema: MappedSchema): String = "No migration defined for schema: ${mappedSchema.name} v${mappedSchema.version}"
|
||||
}
|
||||
}
|
||||
|
||||
class OutstandingDatabaseChangesException(@Suppress("MemberVisibilityCanBePrivate") private val count: Int) : DatabaseMigrationException(errorMessageFor(count)) {
|
||||
internal companion object {
|
||||
fun errorMessageFor(count: Int): String = "There are $count outstanding database changes that need to be run."
|
||||
}
|
||||
}
|
||||
|
||||
class CheckpointsException : DatabaseMigrationException("Attempting to update the database while there are flows in flight. " +
|
||||
"This is dangerous because the node might not be able to restore the flows correctly and could consequently fail. " +
|
||||
"Updating the database would make reverting to the previous version more difficult. " +
|
||||
"Please drain your node first. See: https://docs.corda.net/upgrading-cordapps.html#flow-drains")
|
||||
|
||||
class DatabaseIncompatibleException(@Suppress("MemberVisibilityCanBePrivate") private val reason: String) : DatabaseMigrationException(errorMessageFor(reason)) {
|
||||
internal companion object {
|
||||
fun errorMessageFor(reason: String): String = "Incompatible database schema version detected, please run the node with configuration option database.initialiseSchema=true. Reason: $reason"
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user