From 4a54ae5eb9f4b41f5d36a425159874a5597adc89 Mon Sep 17 00:00:00 2001 From: Christian Sailer Date: Fri, 5 Jun 2020 12:11:45 +0100 Subject: [PATCH] ENT-4493 schema migration refactor (#6313) ENT-4493 Refactor SchemaMigration so it can be open harmonised with Enterprise and can be customised. --- .../persistence/LiquibaseDatabaseFactory.kt | 8 + .../LiquibaseDatabaseFactoryImpl.kt | 11 ++ .../internal/persistence/SchemaMigration.kt | 144 ++++++++++-------- 3 files changed, 97 insertions(+), 66 deletions(-) create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/LiquibaseDatabaseFactory.kt create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/LiquibaseDatabaseFactoryImpl.kt diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/LiquibaseDatabaseFactory.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/LiquibaseDatabaseFactory.kt new file mode 100644 index 0000000000..ba67c35946 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/LiquibaseDatabaseFactory.kt @@ -0,0 +1,8 @@ +package net.corda.nodeapi.internal.persistence + +import liquibase.database.Database +import liquibase.database.jvm.JdbcConnection + +interface LiquibaseDatabaseFactory { + fun getLiquibaseDatabase(conn: JdbcConnection): Database +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/LiquibaseDatabaseFactoryImpl.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/LiquibaseDatabaseFactoryImpl.kt new file mode 100644 index 0000000000..317f1e6edf --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/LiquibaseDatabaseFactoryImpl.kt @@ -0,0 +1,11 @@ +package net.corda.nodeapi.internal.persistence + +import liquibase.database.Database +import liquibase.database.DatabaseFactory +import liquibase.database.jvm.JdbcConnection + +class LiquibaseDatabaseFactoryImpl : LiquibaseDatabaseFactory { + override fun getLiquibaseDatabase(conn: JdbcConnection): Database { + return DatabaseFactory.getInstance().findCorrectDatabaseImplementation(conn) + } +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/SchemaMigration.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/SchemaMigration.kt index ce592d95da..733c9ecf94 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/SchemaMigration.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/SchemaMigration.kt @@ -4,25 +4,25 @@ import com.fasterxml.jackson.databind.ObjectMapper import liquibase.Contexts import liquibase.LabelExpression import liquibase.Liquibase -import liquibase.database.Database -import liquibase.database.DatabaseFactory import liquibase.database.jvm.JdbcConnection +import liquibase.exception.LiquibaseException import liquibase.resource.ClassLoaderResourceAccessor import net.corda.core.identity.CordaX500Name -import net.corda.nodeapi.internal.MigrationHelpers.getMigrationResource import net.corda.core.schemas.MappedSchema import net.corda.core.utilities.contextLogger +import net.corda.nodeapi.internal.MigrationHelpers.getMigrationResource import net.corda.nodeapi.internal.cordapp.CordappLoader import java.io.ByteArrayInputStream import java.io.InputStream import java.nio.file.Path +import java.sql.Connection import java.sql.Statement -import javax.sql.DataSource import java.util.concurrent.locks.ReentrantLock +import javax.sql.DataSource import kotlin.concurrent.withLock // Migrate the database to the current version, using liquibase. -class SchemaMigration( +open class SchemaMigration( val schemas: Set, val dataSource: DataSource, cordappLoader: CordappLoader? = null, @@ -33,14 +33,16 @@ class SchemaMigration( private val ourName: CordaX500Name? = null, // This parameter forces an error to be thrown if there are missing migrations. When using H2, Hibernate will automatically create schemas where they are // missing, so no need to throw unless you're specifically testing whether all the migrations are present. - private val forceThrowOnMissingMigration: Boolean = false) { + private val forceThrowOnMissingMigration: Boolean = false, + protected val databaseFactory: LiquibaseDatabaseFactory = LiquibaseDatabaseFactoryImpl()) { companion object { private val logger = contextLogger() const val NODE_BASE_DIR_KEY = "liquibase.nodeDaseDir" const val NODE_X500_NAME = "liquibase.nodeName" val loader = ThreadLocal() - private val mutex = ReentrantLock() + @JvmStatic + protected val mutex = ReentrantLock() } init { @@ -49,25 +51,54 @@ class SchemaMigration( private val classLoader = cordappLoader?.appClassLoader ?: Thread.currentThread().contextClassLoader - /** + /** * Will run the Liquibase migration on the actual database. */ - fun runMigration(existingCheckpoints: Boolean) { - migrateOlderDatabaseToUseLiquibase(existingCheckpoints) - doRunMigration(run = true, check = false, existingCheckpoints = existingCheckpoints) - } + fun runMigration(existingCheckpoints: Boolean) { + migrateOlderDatabaseToUseLiquibase(existingCheckpoints) + val resourcesAndSourceInfo = prepareResources() + + // current version of Liquibase appears to be non-threadsafe + // this is apparent when multiple in-process nodes are all running migrations simultaneously + mutex.withLock { + dataSource.connection.use { connection -> + val (runner, _, shouldBlockOnCheckpoints) = prepareRunner(connection, resourcesAndSourceInfo) + if (shouldBlockOnCheckpoints && existingCheckpoints) + throw CheckpointsException() + try { + runner.update(Contexts().toString()) + } catch (exp: LiquibaseException) { + throw DatabaseMigrationException(exp.message, exp) + } + } + } + } /** * Ensures that the database is up to date with the latest migration changes. */ - fun checkState() = doRunMigration(run = false, check = true) + fun checkState() { + val resourcesAndSourceInfo = prepareResources() - /** Create a resourse accessor that aggregates the changelogs included in the schemas into one dynamic stream. */ - private class CustomResourceAccessor(val dynamicInclude: String, val changelogList: List, classLoader: ClassLoader) : ClassLoaderResourceAccessor(classLoader) { + // current version of Liquibase appears to be non-threadsafe + // this is apparent when multiple in-process nodes are all running migrations simultaneously + mutex.withLock { + dataSource.connection.use { connection -> + val (_, changeToRunCount, _) = prepareRunner(connection, resourcesAndSourceInfo) + if (changeToRunCount > 0) + throw OutstandingDatabaseChangesException(changeToRunCount) + } + } + } + + /** Create a resource accessor that aggregates the changelogs included in the schemas into one dynamic stream. */ + protected class CustomResourceAccessor(val dynamicInclude: String, val changelogList: List, classLoader: ClassLoader) : + ClassLoaderResourceAccessor(classLoader) { override fun getResourcesAsStream(path: String): Set { if (path == dynamicInclude) { // Create a map in Liquibase format including all migration files. - val includeAllFiles = mapOf("databaseChangeLog" to changelogList.filter { it != null }.map { file -> mapOf("include" to mapOf("file" to file)) }) + val includeAllFiles = mapOf("databaseChangeLog" + to changelogList.filterNotNull().map { file -> mapOf("include" to mapOf("file" to file)) }) // Transform it to json. val includeAllFilesJson = ObjectMapper().writeValueAsBytes(includeAllFiles) @@ -87,59 +118,40 @@ class SchemaMigration( null } - private fun doRunMigration( - run: Boolean, - check: Boolean, - existingCheckpoints: Boolean? = null - ) { + // Virtual file name of the changelog that includes all schemas. + val dynamicInclude = "master.changelog.json" - // Virtual file name of the changelog that includes all schemas. - val dynamicInclude = "master.changelog.json" - - dataSource.connection.use { connection -> - - // Collect all changelog files referenced in the included schemas. - val changelogList = schemas.mapNotNull { mappedSchema -> - val resource = getMigrationResource(mappedSchema, classLoader) - when { - resource != null -> resource - // Corda OS FinanceApp in v3 has no Liquibase script, so no error is raised - (mappedSchema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" || mappedSchema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1") && mappedSchema.migrationResource == null -> null - else -> logOrThrowMigrationError(mappedSchema) - } - } - - val path = currentDirectory?.toString() - if (path != null) { - System.setProperty(NODE_BASE_DIR_KEY, path) // base dir for any custom change set which may need to load a file (currently AttachmentVersionNumberMigration) - } - if (ourName != null) { - System.setProperty(NODE_X500_NAME, ourName.toString()) - } - val customResourceAccessor = CustomResourceAccessor(dynamicInclude, changelogList, classLoader) - checkResourcesInClassPath(changelogList) - - // current version of Liquibase appears to be non-threadsafe - // this is apparent when multiple in-process nodes are all running migrations simultaneously - mutex.withLock { - val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection))) - - val unRunChanges = liquibase.listUnrunChangeSets(Contexts(), LabelExpression()) - - when { - (run && !check) && (unRunChanges.isNotEmpty() && existingCheckpoints!!) -> throw CheckpointsException() // Do not allow database migration when there are checkpoints - run && !check -> liquibase.update(Contexts()) - check && !run && unRunChanges.isNotEmpty() -> throw OutstandingDatabaseChangesException(unRunChanges.size) - check && !run -> { - } // Do nothing will be interpreted as "check succeeded" - else -> throw IllegalStateException("Invalid usage.") - } + protected fun prepareResources(): List> { + // Collect all changelog files referenced in the included schemas. + val changelogList = schemas.mapNotNull { mappedSchema -> + val resource = getMigrationResource(mappedSchema, classLoader) + when { + resource != null -> resource + // Corda OS FinanceApp in v3 has no Liquibase script, so no error is raised + (mappedSchema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" || mappedSchema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1") && mappedSchema.migrationResource == null -> null + else -> logOrThrowMigrationError(mappedSchema) } } + + val path = currentDirectory?.toString() + if (path != null) { + System.setProperty(NODE_BASE_DIR_KEY, path) // base dir for any custom change set which may need to load a file (currently AttachmentVersionNumberMigration) + } + if (ourName != null) { + System.setProperty(NODE_X500_NAME, ourName.toString()) + } + val customResourceAccessor = CustomResourceAccessor(dynamicInclude, changelogList, classLoader) + checkResourcesInClassPath(changelogList) + return listOf(Pair(customResourceAccessor, "")) } - private fun getLiquibaseDatabase(conn: JdbcConnection): Database { - return DatabaseFactory.getInstance().findCorrectDatabaseImplementation(conn) + protected fun prepareRunner(connection: Connection, + resourcesAndSourceInfo: List>): Triple { + require(resourcesAndSourceInfo.size == 1) + val liquibase = Liquibase(dynamicInclude, resourcesAndSourceInfo.single().first, databaseFactory.getLiquibaseDatabase(JdbcConnection(connection))) + + val unRunChanges = liquibase.listUnrunChangeSets(Contexts(), LabelExpression()) + return Triple(liquibase, unRunChanges.size, !unRunChanges.isEmpty()) } /** For existing database created before verions 4.0 add Liquibase support - creates DATABASECHANGELOG and DATABASECHANGELOGLOCK tables and marks changesets as executed. */ @@ -219,7 +231,7 @@ class SchemaMigration( checkResourcesInClassPath(preV4Baseline) dataSource.connection.use { connection -> val customResourceAccessor = CustomResourceAccessor(dynamicInclude, preV4Baseline, classLoader) - val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection))) + val liquibase = Liquibase(dynamicInclude, customResourceAccessor, databaseFactory.getLiquibaseDatabase(JdbcConnection(connection))) liquibase.changeLogSync(Contexts(), LabelExpression()) } } @@ -235,7 +247,7 @@ class SchemaMigration( } } -open class DatabaseMigrationException(message: String) : IllegalArgumentException(message) { +open class DatabaseMigrationException(message: String?, cause: Throwable? = null) : IllegalArgumentException(message, cause) { override val message: String = super.message!! }