ENT-4493 schema migration refactor (#6313)

ENT-4493 Refactor SchemaMigration so it can be open harmonised with Enterprise and can be customised.
This commit is contained in:
Christian Sailer 2020-06-05 12:11:45 +01:00 committed by GitHub
parent 48cd263d48
commit 4a54ae5eb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 66 deletions

View File

@ -0,0 +1,8 @@
package net.corda.nodeapi.internal.persistence
import liquibase.database.Database
import liquibase.database.jvm.JdbcConnection
interface LiquibaseDatabaseFactory {
fun getLiquibaseDatabase(conn: JdbcConnection): Database
}

View File

@ -0,0 +1,11 @@
package net.corda.nodeapi.internal.persistence
import liquibase.database.Database
import liquibase.database.DatabaseFactory
import liquibase.database.jvm.JdbcConnection
class LiquibaseDatabaseFactoryImpl : LiquibaseDatabaseFactory {
override fun getLiquibaseDatabase(conn: JdbcConnection): Database {
return DatabaseFactory.getInstance().findCorrectDatabaseImplementation(conn)
}
}

View File

@ -4,25 +4,25 @@ import com.fasterxml.jackson.databind.ObjectMapper
import liquibase.Contexts import liquibase.Contexts
import liquibase.LabelExpression import liquibase.LabelExpression
import liquibase.Liquibase import liquibase.Liquibase
import liquibase.database.Database
import liquibase.database.DatabaseFactory
import liquibase.database.jvm.JdbcConnection import liquibase.database.jvm.JdbcConnection
import liquibase.exception.LiquibaseException
import liquibase.resource.ClassLoaderResourceAccessor import liquibase.resource.ClassLoaderResourceAccessor
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.nodeapi.internal.MigrationHelpers.getMigrationResource
import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.MigrationHelpers.getMigrationResource
import net.corda.nodeapi.internal.cordapp.CordappLoader import net.corda.nodeapi.internal.cordapp.CordappLoader
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.io.InputStream import java.io.InputStream
import java.nio.file.Path import java.nio.file.Path
import java.sql.Connection
import java.sql.Statement import java.sql.Statement
import javax.sql.DataSource
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import javax.sql.DataSource
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
// Migrate the database to the current version, using liquibase. // Migrate the database to the current version, using liquibase.
class SchemaMigration( open class SchemaMigration(
val schemas: Set<MappedSchema>, val schemas: Set<MappedSchema>,
val dataSource: DataSource, val dataSource: DataSource,
cordappLoader: CordappLoader? = null, cordappLoader: CordappLoader? = null,
@ -33,14 +33,16 @@ class SchemaMigration(
private val ourName: CordaX500Name? = null, private val ourName: CordaX500Name? = null,
// This parameter forces an error to be thrown if there are missing migrations. When using H2, Hibernate will automatically create schemas where they are // This parameter forces an error to be thrown if there are missing migrations. When using H2, Hibernate will automatically create schemas where they are
// missing, so no need to throw unless you're specifically testing whether all the migrations are present. // missing, so no need to throw unless you're specifically testing whether all the migrations are present.
private val forceThrowOnMissingMigration: Boolean = false) { private val forceThrowOnMissingMigration: Boolean = false,
protected val databaseFactory: LiquibaseDatabaseFactory = LiquibaseDatabaseFactoryImpl()) {
companion object { companion object {
private val logger = contextLogger() private val logger = contextLogger()
const val NODE_BASE_DIR_KEY = "liquibase.nodeDaseDir" const val NODE_BASE_DIR_KEY = "liquibase.nodeDaseDir"
const val NODE_X500_NAME = "liquibase.nodeName" const val NODE_X500_NAME = "liquibase.nodeName"
val loader = ThreadLocal<CordappLoader>() val loader = ThreadLocal<CordappLoader>()
private val mutex = ReentrantLock() @JvmStatic
protected val mutex = ReentrantLock()
} }
init { init {
@ -54,20 +56,49 @@ class SchemaMigration(
*/ */
fun runMigration(existingCheckpoints: Boolean) { fun runMigration(existingCheckpoints: Boolean) {
migrateOlderDatabaseToUseLiquibase(existingCheckpoints) migrateOlderDatabaseToUseLiquibase(existingCheckpoints)
doRunMigration(run = true, check = false, existingCheckpoints = existingCheckpoints) val resourcesAndSourceInfo = prepareResources()
// current version of Liquibase appears to be non-threadsafe
// this is apparent when multiple in-process nodes are all running migrations simultaneously
mutex.withLock {
dataSource.connection.use { connection ->
val (runner, _, shouldBlockOnCheckpoints) = prepareRunner(connection, resourcesAndSourceInfo)
if (shouldBlockOnCheckpoints && existingCheckpoints)
throw CheckpointsException()
try {
runner.update(Contexts().toString())
} catch (exp: LiquibaseException) {
throw DatabaseMigrationException(exp.message, exp)
}
}
}
} }
/** /**
* Ensures that the database is up to date with the latest migration changes. * Ensures that the database is up to date with the latest migration changes.
*/ */
fun checkState() = doRunMigration(run = false, check = true) fun checkState() {
val resourcesAndSourceInfo = prepareResources()
/** Create a resourse accessor that aggregates the changelogs included in the schemas into one dynamic stream. */ // current version of Liquibase appears to be non-threadsafe
private class CustomResourceAccessor(val dynamicInclude: String, val changelogList: List<String?>, classLoader: ClassLoader) : ClassLoaderResourceAccessor(classLoader) { // this is apparent when multiple in-process nodes are all running migrations simultaneously
mutex.withLock {
dataSource.connection.use { connection ->
val (_, changeToRunCount, _) = prepareRunner(connection, resourcesAndSourceInfo)
if (changeToRunCount > 0)
throw OutstandingDatabaseChangesException(changeToRunCount)
}
}
}
/** Create a resource accessor that aggregates the changelogs included in the schemas into one dynamic stream. */
protected class CustomResourceAccessor(val dynamicInclude: String, val changelogList: List<String?>, classLoader: ClassLoader) :
ClassLoaderResourceAccessor(classLoader) {
override fun getResourcesAsStream(path: String): Set<InputStream> { override fun getResourcesAsStream(path: String): Set<InputStream> {
if (path == dynamicInclude) { if (path == dynamicInclude) {
// Create a map in Liquibase format including all migration files. // Create a map in Liquibase format including all migration files.
val includeAllFiles = mapOf("databaseChangeLog" to changelogList.filter { it != null }.map { file -> mapOf("include" to mapOf("file" to file)) }) val includeAllFiles = mapOf("databaseChangeLog"
to changelogList.filterNotNull().map { file -> mapOf("include" to mapOf("file" to file)) })
// Transform it to json. // Transform it to json.
val includeAllFilesJson = ObjectMapper().writeValueAsBytes(includeAllFiles) val includeAllFilesJson = ObjectMapper().writeValueAsBytes(includeAllFiles)
@ -87,17 +118,10 @@ class SchemaMigration(
null null
} }
private fun doRunMigration(
run: Boolean,
check: Boolean,
existingCheckpoints: Boolean? = null
) {
// Virtual file name of the changelog that includes all schemas. // Virtual file name of the changelog that includes all schemas.
val dynamicInclude = "master.changelog.json" val dynamicInclude = "master.changelog.json"
dataSource.connection.use { connection -> protected fun prepareResources(): List<Pair<CustomResourceAccessor, String>> {
// Collect all changelog files referenced in the included schemas. // Collect all changelog files referenced in the included schemas.
val changelogList = schemas.mapNotNull { mappedSchema -> val changelogList = schemas.mapNotNull { mappedSchema ->
val resource = getMigrationResource(mappedSchema, classLoader) val resource = getMigrationResource(mappedSchema, classLoader)
@ -118,28 +142,16 @@ class SchemaMigration(
} }
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, changelogList, classLoader) val customResourceAccessor = CustomResourceAccessor(dynamicInclude, changelogList, classLoader)
checkResourcesInClassPath(changelogList) checkResourcesInClassPath(changelogList)
return listOf(Pair(customResourceAccessor, ""))
}
// current version of Liquibase appears to be non-threadsafe protected fun prepareRunner(connection: Connection,
// this is apparent when multiple in-process nodes are all running migrations simultaneously resourcesAndSourceInfo: List<Pair<CustomResourceAccessor, String>>): Triple<Liquibase, Int, Boolean> {
mutex.withLock { require(resourcesAndSourceInfo.size == 1)
val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection))) val liquibase = Liquibase(dynamicInclude, resourcesAndSourceInfo.single().first, databaseFactory.getLiquibaseDatabase(JdbcConnection(connection)))
val unRunChanges = liquibase.listUnrunChangeSets(Contexts(), LabelExpression()) val unRunChanges = liquibase.listUnrunChangeSets(Contexts(), LabelExpression())
return Triple(liquibase, unRunChanges.size, !unRunChanges.isEmpty())
when {
(run && !check) && (unRunChanges.isNotEmpty() && existingCheckpoints!!) -> throw CheckpointsException() // Do not allow database migration when there are checkpoints
run && !check -> liquibase.update(Contexts())
check && !run && unRunChanges.isNotEmpty() -> throw OutstandingDatabaseChangesException(unRunChanges.size)
check && !run -> {
} // Do nothing will be interpreted as "check succeeded"
else -> throw IllegalStateException("Invalid usage.")
}
}
}
}
private fun getLiquibaseDatabase(conn: JdbcConnection): Database {
return DatabaseFactory.getInstance().findCorrectDatabaseImplementation(conn)
} }
/** For existing database created before verions 4.0 add Liquibase support - creates DATABASECHANGELOG and DATABASECHANGELOGLOCK tables and marks changesets as executed. */ /** For existing database created before verions 4.0 add Liquibase support - creates DATABASECHANGELOG and DATABASECHANGELOGLOCK tables and marks changesets as executed. */
@ -219,7 +231,7 @@ class SchemaMigration(
checkResourcesInClassPath(preV4Baseline) checkResourcesInClassPath(preV4Baseline)
dataSource.connection.use { connection -> dataSource.connection.use { connection ->
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, preV4Baseline, classLoader) val customResourceAccessor = CustomResourceAccessor(dynamicInclude, preV4Baseline, classLoader)
val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection))) val liquibase = Liquibase(dynamicInclude, customResourceAccessor, databaseFactory.getLiquibaseDatabase(JdbcConnection(connection)))
liquibase.changeLogSync(Contexts(), LabelExpression()) liquibase.changeLogSync(Contexts(), LabelExpression())
} }
} }
@ -235,7 +247,7 @@ class SchemaMigration(
} }
} }
open class DatabaseMigrationException(message: String) : IllegalArgumentException(message) { open class DatabaseMigrationException(message: String?, cause: Throwable? = null) : IllegalArgumentException(message, cause) {
override val message: String = super.message!! override val message: String = super.message!!
} }