ENT-5264 Synchronise schema on the command line (#6353)

* Decouple DatabaseConfig and CordaPersistence etc.
* Add schema sync to schema migration + test
* Add command line parameters for synchronising schema
This commit is contained in:
Christian Sailer 2020-06-18 11:38:46 +01:00 committed by GitHub
parent 836dd559e8
commit 4091fdc8b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 198 additions and 18 deletions

View File

@ -0,0 +1,108 @@
package net.corda.nodeapitests.internal.persistence
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.PersistentStateRef
import net.corda.node.internal.DataSourceFactory
import net.corda.node.internal.startHikariPool
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseMigrationException
import net.corda.nodeapi.internal.persistence.HibernateSchemaChangeException
import net.corda.nodeapi.internal.persistence.SchemaMigration
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.node.MockServices
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Before
import org.junit.Test
import java.util.*
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Table
import javax.sql.DataSource
class MigrationSchemaSyncTest{
object TestSchemaFamily
object GoodSchema : MappedSchema(schemaFamily = TestSchemaFamily.javaClass, version = 1, mappedTypes = listOf(State::class.java)) {
@Entity
@Table(name = "State")
class State(
@Column
var id: String
) : PersistentState(PersistentStateRef(UniqueIdentifier().toString(), 0 ))
override val migrationResource: String? = "goodschema.testmigration"
}
lateinit var hikariProperties: Properties
lateinit var dataSource: DataSource
@Before
fun setUp() {
hikariProperties = MockServices.makeTestDataSourceProperties()
dataSource = DataSourceFactory.createDataSource(hikariProperties)
}
private fun schemaMigration() = SchemaMigration(dataSource, null, null,
TestIdentity(ALICE_NAME, 70).name)
@Test(timeout=300_000)
fun testSchemaScript(){
schemaMigration().runMigration(false, setOf(GoodSchema), true)
val persistence = CordaPersistence(
false,
setOf(GoodSchema),
hikariProperties.getProperty("dataSource.url"),
TestingNamedCacheFactory()
)
persistence.startHikariPool(hikariProperties){ _, _ -> Unit}
persistence.transaction {
this.entityManager.persist(GoodSchema.State("id"))
}
}
@Test(timeout=300_000)
fun checkThatSchemaSyncFixesLiquibaseException(){
// Schema is missing if no migration is run and hibernate not allowed to create
val persistenceBlank = CordaPersistence(
false,
setOf(GoodSchema),
hikariProperties.getProperty("dataSource.url"),
TestingNamedCacheFactory()
)
persistenceBlank.startHikariPool(hikariProperties){ _, _ -> Unit}
assertThatThrownBy{ persistenceBlank.transaction {this.entityManager.persist(GoodSchema.State("id"))}}
.isInstanceOf(HibernateSchemaChangeException::class.java)
.hasMessageContaining("Incompatible schema")
// create schema via hibernate - now schema gets created and we can write
val persistenceHibernate = CordaPersistence(
false,
setOf(GoodSchema),
hikariProperties.getProperty("dataSource.url"),
TestingNamedCacheFactory(),
allowHibernateToManageAppSchema = true
)
persistenceHibernate.startHikariPool(hikariProperties){ _, _ -> Unit}
persistenceHibernate.transaction { entityManager.persist(GoodSchema.State("id_hibernate")) }
// if we try to run schema migration now, the changelog and the schemas are out of sync
assertThatThrownBy { schemaMigration().runMigration(false, setOf(GoodSchema), true) }
.isInstanceOf(DatabaseMigrationException::class.java)
.hasMessageContaining("Table \"STATE\" already exists")
// update the change log with schemas we know exist
schemaMigration().synchroniseSchemas(setOf(GoodSchema), true)
// now run migration runs clean
schemaMigration().runMigration(false, setOf(GoodSchema), true)
}
}

View File

@ -0,0 +1,19 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="unittest-goodschema-v1">
<createTable tableName="State">
<column name="output_index" type="INT">
<constraints nullable="false"/>
</column>
<column name="transaction_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="id" type="NVARCHAR(255)"/>
</createTable>
</changeSet>
</databaseChangeLog>

View File

@ -88,7 +88,7 @@ fun <T> withoutDatabaseAccess(block: () -> T): T {
val contextDatabaseOrNull: CordaPersistence? get() = _contextDatabase.get()
class CordaPersistence(
databaseConfig: DatabaseConfig,
exportHibernateJMXStatistics: Boolean,
schemas: Set<MappedSchema>,
val jdbcUrl: String,
cacheFactory: NamedCacheFactory,
@ -106,7 +106,7 @@ class CordaPersistence(
val hibernateConfig: HibernateConfiguration by lazy {
transaction {
try {
HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl, cacheFactory, customClassLoader, allowHibernateToManageAppSchema)
HibernateConfiguration(schemas, exportHibernateJMXStatistics, attributeConverters, jdbcUrl, cacheFactory, customClassLoader, allowHibernateToManageAppSchema)
} catch (e: Exception) {
when (e) {
is SchemaManagementException -> throw HibernateSchemaChangeException("Incompatible schema change detected. Please run schema migration scripts (node with sub-command run-migration-scripts). Reason: ${e.message}", e)

View File

@ -19,7 +19,7 @@ import javax.persistence.AttributeConverter
class HibernateConfiguration(
schemas: Set<MappedSchema>,
private val databaseConfig: DatabaseConfig,
private val exportHibernateJMXStatistics: Boolean,
private val attributeConverters: Collection<AttributeConverter<*, *>>,
jdbcUrl: String,
cacheFactory: NamedCacheFactory,
@ -65,10 +65,10 @@ class HibernateConfiguration(
fun sessionFactoryForSchemas(key: Set<MappedSchema>): SessionFactory = sessionFactories.get(key, ::makeSessionFactoryForSchemas)!!
private fun makeSessionFactoryForSchemas(schemas: Set<MappedSchema>): SessionFactory {
val sessionFactory = sessionFactoryFactory.makeSessionFactoryForSchemas(databaseConfig, schemas, customClassLoader, attributeConverters, allowHibernateToManageAppSchema)
val sessionFactory = sessionFactoryFactory.makeSessionFactoryForSchemas(schemas, customClassLoader, attributeConverters, allowHibernateToManageAppSchema)
// export Hibernate JMX statistics
if (databaseConfig.exportHibernateJMXStatistics)
if (exportHibernateJMXStatistics)
initStatistics(sessionFactory)
return sessionFactory

View File

@ -93,6 +93,32 @@ open class SchemaMigration(
}
}
/**
* Synchronises the changelog table with the schema descriptions passed in without applying any of the changes to the database.
* This can be used when migrating a CorDapp that had its schema generated by hibernate to liquibase schema migration, or when
* updating from a version of Corda that does not use liquibase for CorDapps
* **Warning** - this will not check if the matching schema changes have been applied, it will just generate the changelog
* It must not be run on a newly installed CorDapp.
* @param schemas The set of schemas to add to the changelog
* @param forceThrowOnMissingMigration throw an exception if a mapped schema is missing its migration resource
*/
fun synchroniseSchemas(schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean) {
val resourcesAndSourceInfo = prepareResources(schemas, forceThrowOnMissingMigration)
// current version of Liquibase appears to be non-threadsafe
// this is apparent when multiple in-process nodes are all running migrations simultaneously
mutex.withLock {
dataSource.connection.use { connection ->
val (runner, _, _) = prepareRunner(connection, resourcesAndSourceInfo)
try {
runner.changeLogSync(Contexts().toString())
} catch (exp: LiquibaseException) {
throw DatabaseMigrationException(exp.message, exp)
}
}
}
}
/** Create a resource accessor that aggregates the changelogs included in the schemas into one dynamic stream. */
protected class CustomResourceAccessor(val dynamicInclude: String, val changelogList: List<String?>, classLoader: ClassLoader) :
ClassLoaderResourceAccessor(classLoader) {

View File

@ -3,7 +3,6 @@ package net.corda.nodeapi.internal.persistence.factory
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toHexString
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.persistence.TransactionIsolationLevel
import org.hibernate.SessionFactory
@ -26,7 +25,7 @@ abstract class BaseSessionFactoryFactory : CordaSessionFactoryFactory {
private val logger = contextLogger()
}
open fun buildHibernateConfig(databaseConfig: DatabaseConfig, metadataSources: MetadataSources, allowHibernateToManageAppSchema: Boolean): Configuration {
open fun buildHibernateConfig(metadataSources: MetadataSources, allowHibernateToManageAppSchema: Boolean): Configuration {
val hbm2dll: String =
if (allowHibernateToManageAppSchema) {
"update"
@ -82,7 +81,6 @@ abstract class BaseSessionFactoryFactory : CordaSessionFactoryFactory {
}
final override fun makeSessionFactoryForSchemas(
databaseConfig: DatabaseConfig,
schemas: Set<MappedSchema>,
customClassLoader: ClassLoader?,
attributeConverters: Collection<AttributeConverter<*, *>>,
@ -91,7 +89,7 @@ abstract class BaseSessionFactoryFactory : CordaSessionFactoryFactory {
val serviceRegistry = BootstrapServiceRegistryBuilder().build()
val metadataSources = MetadataSources(serviceRegistry)
val config = buildHibernateConfig(databaseConfig, metadataSources, allowHibernateToMananageAppSchema)
val config = buildHibernateConfig(metadataSources, allowHibernateToMananageAppSchema)
schemas.forEach { schema ->
schema.mappedTypes.forEach { config.addAnnotatedClass(it) }
}

View File

@ -1,7 +1,6 @@
package net.corda.nodeapi.internal.persistence.factory
import net.corda.core.schemas.MappedSchema
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import org.hibernate.SessionFactory
import org.hibernate.boot.Metadata
import org.hibernate.boot.MetadataBuilder
@ -11,7 +10,6 @@ interface CordaSessionFactoryFactory {
val databaseType: String
fun canHandleDatabase(jdbcUrl: String): Boolean
fun makeSessionFactoryForSchemas(
databaseConfig: DatabaseConfig,
schemas: Set<MappedSchema>,
customClassLoader: ClassLoader?,
attributeConverters: Collection<AttributeConverter<*, *>>,

View File

@ -14,7 +14,7 @@ class HibernateConfigurationFactoryLoadingTest {
val cacheFactory = mock<NamedCacheFactory>()
HibernateConfiguration(
emptySet(),
DatabaseConfig(),
false,
emptyList(),
jdbcUrl,
cacheFactory)

View File

@ -535,6 +535,18 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
Node.printBasicNodeInfo("Database migration done.")
}
fun runSchemaSync() {
check(started == null) { "Node has already been started" }
Node.printBasicNodeInfo("Synchronising CorDapp schemas to the changelog ...")
val hikariProperties = configuration.dataSourceProperties
if (hikariProperties.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
val dataSource = DataSourceFactory.createDataSource(hikariProperties, metricRegistry = metricRegistry)
SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
.synchroniseSchemas(schemaService.appSchemas, false)
Node.printBasicNodeInfo("CorDapp schemas synchronised")
}
open fun start(): S {
check(started == null) { "Node has already been started" }
@ -1414,7 +1426,7 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
return CordaPersistence(
databaseConfig,
databaseConfig.exportHibernateJMXStatistics,
schemaService.schemas,
jdbcUrl,
cacheFactory,

View File

@ -77,6 +77,7 @@ open class NodeStartupCli : CordaCliWrapper("corda", "Runs a Corda Node") {
private val initialRegistrationCli by lazy { InitialRegistrationCli(startup) }
private val validateConfigurationCli by lazy { ValidateConfigurationCli() }
private val runMigrationScriptsCli by lazy { RunMigrationScriptsCli(startup) }
private val synchroniseAppSchemasCli by lazy { SynchroniseSchemasCli(startup) }
override fun initLogging(): Boolean = this.initLogging(cmdLineOptions.baseDirectory)
@ -85,7 +86,8 @@ open class NodeStartupCli : CordaCliWrapper("corda", "Runs a Corda Node") {
justGenerateRpcSslCertsCli,
initialRegistrationCli,
validateConfigurationCli,
runMigrationScriptsCli)
runMigrationScriptsCli,
synchroniseAppSchemasCli)
override fun call(): Int {
if (!validateBaseDirectory()) {

View File

@ -0,0 +1,16 @@
package net.corda.node.internal.subcommands
import net.corda.node.internal.Node
import net.corda.node.internal.NodeCliCommand
import net.corda.node.internal.NodeStartup
import net.corda.node.internal.RunAfterNodeInitialisation
class SynchroniseSchemasCli(startup: NodeStartup) : NodeCliCommand("sync-app-schemas", "Create changelog entries for liquibase files found in CorDapps", startup) {
override fun runProgram(): Int {
return startup.initialiseAndRun(cmdLineOptions, object : RunAfterNodeInitialisation {
override fun run(node: Node) {
node.runSchemaSync()
}
})
}
}

View File

@ -10,9 +10,11 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.schemas.MappedSchema
import net.corda.node.SimpleClock
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.persistence.*
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.PublicKeyToTextConverter
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.SchemaMigration.Companion.NODE_X500_NAME
import java.io.PrintWriter
import java.sql.Connection
@ -74,7 +76,6 @@ abstract class CordaMigration : CustomTaskChange {
cacheFactory: MigrationNamedCacheFactory,
identityService: PersistentIdentityService,
schema: Set<MappedSchema>): CordaPersistence {
val configDefaults = DatabaseConfig()
val attributeConverters = listOf(
PublicKeyToTextConverter(),
AbstractPartyToX500NameAsStringConverter(
@ -83,7 +84,7 @@ abstract class CordaMigration : CustomTaskChange {
)
// Liquibase handles closing the database connection when migrations are finished. If the connection is closed here, then further
// migrations may fail.
return CordaPersistence(configDefaults, schema, jdbcUrl, cacheFactory, attributeConverters, closeConnection = false)
return CordaPersistence(false, schema, jdbcUrl, cacheFactory, attributeConverters, closeConnection = false)
}
override fun validate(database: Database?): ValidationErrors? {