mirror of
https://github.com/corda/corda.git
synced 2024-12-19 21:17:58 +00:00
ENT-5316 split schema migration
* ENT-5273 Split schema migration into separate core and app schema migration, with separate command line flags
This commit is contained in:
parent
1108ef2a24
commit
836dd559e8
@ -4,7 +4,7 @@
|
||||
|
||||
cordaVersion=4.6
|
||||
versionSuffix=SNAPSHOT
|
||||
gradlePluginsVersion=5.0.9
|
||||
gradlePluginsVersion=5.0.10
|
||||
kotlinVersion=1.2.71
|
||||
java8MinUpdateVersion=171
|
||||
# ***************************************************************#
|
||||
|
@ -56,7 +56,9 @@ class ReceiveFinalityFlowTest {
|
||||
bob.assertFlowSentForObservationDueToConstraintError(paymentReceiverId)
|
||||
|
||||
// Restart Bob with the contracts CorDapp so that it can recover from the error
|
||||
bob = mockNet.restartNode(bob, parameters = InternalMockNodeParameters(additionalCordapps = listOf(FINANCE_CONTRACTS_CORDAPP)))
|
||||
bob = mockNet.restartNode(bob,
|
||||
parameters = InternalMockNodeParameters(additionalCordapps = listOf(FINANCE_CONTRACTS_CORDAPP)),
|
||||
nodeFactory = { args -> InternalMockNetwork.MockNode(args, allowAppSchemaUpgradeWithCheckpoints = true) })
|
||||
mockNet.runNetwork()
|
||||
assertThat(bob.services.getCashBalance(GBP)).isEqualTo(100.POUNDS)
|
||||
}
|
||||
|
@ -39,24 +39,21 @@ class MissingSchemaMigrationTest {
|
||||
dataSource = DataSourceFactory.createDataSource(hikariProperties)
|
||||
}
|
||||
|
||||
private fun createSchemaMigration(schemasToMigrate: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean): SchemaMigration {
|
||||
return SchemaMigration(schemasToMigrate, dataSource, null, null,
|
||||
TestIdentity(ALICE_NAME, 70).name, forceThrowOnMissingMigration)
|
||||
}
|
||||
private fun schemaMigration() = SchemaMigration(dataSource, null, null,
|
||||
TestIdentity(ALICE_NAME, 70).name)
|
||||
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `test that an error is thrown when forceThrowOnMissingMigration is set and a mapped schema is missing a migration`() {
|
||||
assertThatThrownBy {
|
||||
createSchemaMigration(setOf(GoodSchema), true)
|
||||
.runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
|
||||
schemaMigration().runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }, setOf(GoodSchema), true)
|
||||
}.isInstanceOf(MissingMigrationException::class.java)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `test that an error is not thrown when forceThrowOnMissingMigration is not set and a mapped schema is missing a migration`() {
|
||||
assertDoesNotThrow {
|
||||
createSchemaMigration(setOf(GoodSchema), false)
|
||||
.runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
|
||||
schemaMigration().runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }, setOf(GoodSchema), false)
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,8 +61,7 @@ class MissingSchemaMigrationTest {
|
||||
fun `test that there are no missing migrations for the node`() {
|
||||
assertDoesNotThrow("This test failure indicates " +
|
||||
"a new table has been added to the node without the appropriate migration scripts being present") {
|
||||
createSchemaMigration(NodeSchemaService().internalSchemas(), false)
|
||||
.runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
|
||||
schemaMigration().runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }, NodeSchemaService().internalSchemas, true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,7 +79,8 @@ constructor(private val initSerEnv: Boolean,
|
||||
Paths.get(System.getProperty("java.home"), "bin", "java").toString(),
|
||||
"-jar",
|
||||
"corda.jar",
|
||||
"run-migration-scripts"
|
||||
"run-migration-scripts",
|
||||
"--core-schemas"
|
||||
)
|
||||
|
||||
private const val LOGS_DIR_NAME = "logs"
|
||||
|
@ -16,14 +16,12 @@ import java.io.ByteArrayInputStream
|
||||
import java.io.InputStream
|
||||
import java.nio.file.Path
|
||||
import java.sql.Connection
|
||||
import java.sql.Statement
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import javax.sql.DataSource
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
// Migrate the database to the current version, using liquibase.
|
||||
open class SchemaMigration(
|
||||
val schemas: Set<MappedSchema>,
|
||||
val dataSource: DataSource,
|
||||
cordappLoader: CordappLoader? = null,
|
||||
private val currentDirectory: Path?,
|
||||
@ -31,9 +29,6 @@ open class SchemaMigration(
|
||||
// its copy of the identity service. It is passed through using a system property. When multiple identity support is added, this will need
|
||||
// reworking so that multiple identities can be passed to the migration.
|
||||
private val ourName: CordaX500Name? = null,
|
||||
// This parameter forces an error to be thrown if there are missing migrations. When using H2, Hibernate will automatically create schemas where they are
|
||||
// missing, so no need to throw unless you're specifically testing whether all the migrations are present.
|
||||
private val forceThrowOnMissingMigration: Boolean = false,
|
||||
protected val databaseFactory: LiquibaseDatabaseFactory = LiquibaseDatabaseFactoryImpl()) {
|
||||
|
||||
companion object {
|
||||
@ -53,10 +48,14 @@ open class SchemaMigration(
|
||||
|
||||
/**
|
||||
* Will run the Liquibase migration on the actual database.
|
||||
* @param existingCheckpoints Whether checkpoints exist that would prohibit running a migration
|
||||
* @param schemas The set of MappedSchemas to check
|
||||
* @param forceThrowOnMissingMigration throws an exception if a mapped schema is missing the migration resource. Can be set to false
|
||||
* when allowing hibernate to create missing schemas in dev or tests.
|
||||
*/
|
||||
fun runMigration(existingCheckpoints: Boolean) {
|
||||
migrateOlderDatabaseToUseLiquibase(existingCheckpoints)
|
||||
val resourcesAndSourceInfo = prepareResources()
|
||||
fun runMigration(existingCheckpoints: Boolean, schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean) {
|
||||
migrateOlderDatabaseToUseLiquibase(existingCheckpoints, schemas)
|
||||
val resourcesAndSourceInfo = prepareResources(schemas, forceThrowOnMissingMigration)
|
||||
|
||||
// current version of Liquibase appears to be non-threadsafe
|
||||
// this is apparent when multiple in-process nodes are all running migrations simultaneously
|
||||
@ -76,9 +75,12 @@ open class SchemaMigration(
|
||||
|
||||
/**
|
||||
* Ensures that the database is up to date with the latest migration changes.
|
||||
* @param schemas The set of MappedSchemas to check
|
||||
* @param forceThrowOnMissingMigration throws an exception if a mapped schema is missing the migration resource. Can be set to false
|
||||
* when allowing hibernate to create missing schemas in dev or tests.
|
||||
*/
|
||||
fun checkState() {
|
||||
val resourcesAndSourceInfo = prepareResources()
|
||||
fun checkState(schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean) {
|
||||
val resourcesAndSourceInfo = prepareResources(schemas, forceThrowOnMissingMigration)
|
||||
|
||||
// current version of Liquibase appears to be non-threadsafe
|
||||
// this is apparent when multiple in-process nodes are all running migrations simultaneously
|
||||
@ -110,7 +112,7 @@ open class SchemaMigration(
|
||||
}
|
||||
}
|
||||
|
||||
private fun logOrThrowMigrationError(mappedSchema: MappedSchema): String? =
|
||||
private fun logOrThrowMigrationError(mappedSchema: MappedSchema, forceThrowOnMissingMigration: Boolean): String? =
|
||||
if (forceThrowOnMissingMigration) {
|
||||
throw MissingMigrationException(mappedSchema)
|
||||
} else {
|
||||
@ -121,15 +123,13 @@ open class SchemaMigration(
|
||||
// Virtual file name of the changelog that includes all schemas.
|
||||
val dynamicInclude = "master.changelog.json"
|
||||
|
||||
protected fun prepareResources(): List<Pair<CustomResourceAccessor, String>> {
|
||||
protected fun prepareResources(schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean): List<Pair<CustomResourceAccessor, String>> {
|
||||
// Collect all changelog files referenced in the included schemas.
|
||||
val changelogList = schemas.mapNotNull { mappedSchema ->
|
||||
val resource = getMigrationResource(mappedSchema, classLoader)
|
||||
when {
|
||||
resource != null -> resource
|
||||
// Corda OS FinanceApp in v3 has no Liquibase script, so no error is raised
|
||||
(mappedSchema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" || mappedSchema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1") && mappedSchema.migrationResource == null -> null
|
||||
else -> logOrThrowMigrationError(mappedSchema)
|
||||
else -> logOrThrowMigrationError(mappedSchema, forceThrowOnMissingMigration)
|
||||
}
|
||||
}
|
||||
|
||||
@ -155,21 +155,8 @@ open class SchemaMigration(
|
||||
}
|
||||
|
||||
/** For existing database created before verions 4.0 add Liquibase support - creates DATABASECHANGELOG and DATABASECHANGELOGLOCK tables and marks changesets as executed. */
|
||||
private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean {
|
||||
val isFinanceAppWithLiquibase = schemas.any { schema ->
|
||||
(schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1"
|
||||
|| schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1")
|
||||
&& schema.migrationResource != null
|
||||
}
|
||||
val noLiquibaseEntryLogForFinanceApp: (Statement) -> Boolean = {
|
||||
it.execute("SELECT COUNT(*) FROM DATABASECHANGELOG WHERE FILENAME IN ('migration/cash.changelog-init.xml','migration/commercial-paper.changelog-init.xml')")
|
||||
if (it.resultSet.next())
|
||||
it.resultSet.getInt(1) == 0
|
||||
else
|
||||
true
|
||||
}
|
||||
|
||||
val (isExistingDBWithoutLiquibase, isFinanceAppWithLiquibaseNotMigrated) = dataSource.connection.use {
|
||||
private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean, schemas: Set<MappedSchema>): Boolean {
|
||||
val isExistingDBWithoutLiquibase = dataSource.connection.use {
|
||||
|
||||
val existingDatabase = it.metaData.getTables(null, null, "NODE%", null).next()
|
||||
// Lower case names for PostgreSQL
|
||||
@ -179,12 +166,7 @@ open class SchemaMigration(
|
||||
// Lower case names for PostgreSQL
|
||||
|| it.metaData.getTables(null, null, "databasechangelog%", null).next()
|
||||
|
||||
val isFinanceAppWithLiquibaseNotMigrated = isFinanceAppWithLiquibase // If Finance App is pre v4.0 then no need to migrate it so no need to check.
|
||||
&& existingDatabase
|
||||
&& (!hasLiquibase // Migrate as other tables.
|
||||
|| (hasLiquibase && it.createStatement().use { noLiquibaseEntryLogForFinanceApp(it) })) // If Liquibase is already in the database check if Finance App schema log is missing.
|
||||
|
||||
Pair(existingDatabase && !hasLiquibase, isFinanceAppWithLiquibaseNotMigrated)
|
||||
existingDatabase && !hasLiquibase
|
||||
}
|
||||
|
||||
if (isExistingDBWithoutLiquibase && existingCheckpoints)
|
||||
@ -219,12 +201,6 @@ open class SchemaMigration(
|
||||
preV4Baseline.addAll(listOf("migration/notary-bft-smart.changelog-init.xml",
|
||||
"migration/notary-bft-smart.changelog-v1.xml"))
|
||||
}
|
||||
if (isFinanceAppWithLiquibaseNotMigrated) {
|
||||
preV4Baseline.addAll(listOf("migration/cash.changelog-init.xml",
|
||||
"migration/cash.changelog-v1.xml",
|
||||
"migration/commercial-paper.changelog-init.xml",
|
||||
"migration/commercial-paper.changelog-v1.xml"))
|
||||
}
|
||||
|
||||
if (preV4Baseline.isNotEmpty()) {
|
||||
val dynamicInclude = "master.changelog.json" // Virtual file name of the changelog that includes all schemas.
|
||||
@ -235,7 +211,7 @@ open class SchemaMigration(
|
||||
liquibase.changeLogSync(Contexts(), LabelExpression())
|
||||
}
|
||||
}
|
||||
return isExistingDBWithoutLiquibase || isFinanceAppWithLiquibaseNotMigrated
|
||||
return isExistingDBWithoutLiquibase
|
||||
}
|
||||
|
||||
private fun checkResourcesInClassPath(resources: List<String?>) {
|
||||
|
@ -37,7 +37,8 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
startNodesInProcess = false,
|
||||
inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
|
||||
cordappsForAllNodes = emptyList(),
|
||||
notarySpecs = emptyList()
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
)) {
|
||||
createSuspendedFlowInBob()
|
||||
val cordappsDir = baseDirectory(BOB_NAME) / "cordapps"
|
||||
|
@ -86,7 +86,7 @@ class NodeStatePersistenceTests {
|
||||
nodeName
|
||||
}()
|
||||
|
||||
val nodeHandle = startNode(providedName = nodeName, rpcUsers = listOf(user), customOverrides = mapOf("devMode" to "false")).getOrThrow()
|
||||
val nodeHandle = startNode(providedName = nodeName, rpcUsers = listOf(user)).getOrThrow()
|
||||
val result = CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
|
||||
val page = it.proxy.vaultQuery(MessageState::class.java)
|
||||
page.states.singleOrNull()
|
||||
|
@ -85,10 +85,10 @@ abstract class StatemachineErrorHandlingTest {
|
||||
|
||||
internal fun getBytemanOutput(nodeHandle: NodeHandle): List<String> {
|
||||
return nodeHandle.baseDirectory
|
||||
.list()
|
||||
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
|
||||
.readAllLines()
|
||||
}
|
||||
.list()
|
||||
.filter { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
|
||||
.flatMap { it.readAllLines() }
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
|
@ -15,6 +15,7 @@ import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
|
||||
@ -23,7 +24,7 @@ class CordappScanningDriverTest {
|
||||
fun `sub-classed initiated flow pointing to the same initiating flow as its super-class`() {
|
||||
val user = User("u", "p", setOf(startFlow<ReceiveFlow>()))
|
||||
// The driver will automatically pick up the annotated flows below
|
||||
driver(DriverParameters(notarySpecs = emptyList())) {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()))) {
|
||||
val (alice, bob) = listOf(
|
||||
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)),
|
||||
startNode(providedName = BOB_NAME)).transpose().getOrThrow()
|
||||
|
@ -21,7 +21,8 @@ class NodeConfigParsingTests {
|
||||
driver(DriverParameters(
|
||||
environmentVariables = mapOf("corda_sshd_port" to sshPort.toString()),
|
||||
startNodesInProcess = false,
|
||||
portAllocation = portAllocator)) {
|
||||
portAllocation = portAllocator,
|
||||
cordappsForAllNodes = emptyList())) {
|
||||
val hasSsh = startNode().get()
|
||||
.logFile()
|
||||
.readLines()
|
||||
@ -39,7 +40,8 @@ class NodeConfigParsingTests {
|
||||
driver(DriverParameters(
|
||||
environmentVariables = mapOf("CORDA_sshd_port" to sshPort.toString()),
|
||||
startNodesInProcess = false,
|
||||
portAllocation = portAllocator)) {
|
||||
portAllocation = portAllocator,
|
||||
cordappsForAllNodes = emptyList())) {
|
||||
val hasSsh = startNode().get()
|
||||
.logFile()
|
||||
.readLines()
|
||||
@ -58,7 +60,8 @@ class NodeConfigParsingTests {
|
||||
environmentVariables = mapOf("CORDA.sshd.port" to sshPort.toString(),
|
||||
"corda.devMode" to true.toString()),
|
||||
startNodesInProcess = false,
|
||||
portAllocation = portAllocator)) {
|
||||
portAllocation = portAllocator,
|
||||
cordappsForAllNodes = emptyList())) {
|
||||
val hasSsh = startNode(NodeParameters()).get()
|
||||
.logFile()
|
||||
.readLines()
|
||||
@ -95,7 +98,8 @@ class NodeConfigParsingTests {
|
||||
"corda_bad_key" to "2077"),
|
||||
startNodesInProcess = false,
|
||||
portAllocation = portAllocator,
|
||||
notarySpecs = emptyList())) {
|
||||
notarySpecs = emptyList(),
|
||||
cordappsForAllNodes = emptyList())) {
|
||||
|
||||
val hasWarning = startNode()
|
||||
.getOrThrow()
|
||||
|
@ -172,10 +172,10 @@ import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.jolokia.jvmagent.JolokiaServer
|
||||
import org.jolokia.jvmagent.JolokiaServerConfig
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Scheduler
|
||||
import java.io.IOException
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.security.KeyStoreException
|
||||
import java.security.cert.X509Certificate
|
||||
@ -184,7 +184,7 @@ import java.sql.Savepoint
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.format.DateTimeParseException
|
||||
import java.util.Properties
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
@ -194,6 +194,32 @@ import java.util.concurrent.TimeUnit.MINUTES
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import java.util.function.Consumer
|
||||
import javax.persistence.EntityManager
|
||||
import javax.sql.DataSource
|
||||
import kotlin.collections.ArrayList
|
||||
import kotlin.collections.List
|
||||
import kotlin.collections.MutableList
|
||||
import kotlin.collections.MutableSet
|
||||
import kotlin.collections.Set
|
||||
import kotlin.collections.drop
|
||||
import kotlin.collections.emptyList
|
||||
import kotlin.collections.filterNotNull
|
||||
import kotlin.collections.first
|
||||
import kotlin.collections.flatMap
|
||||
import kotlin.collections.fold
|
||||
import kotlin.collections.forEach
|
||||
import kotlin.collections.groupBy
|
||||
import kotlin.collections.last
|
||||
import kotlin.collections.listOf
|
||||
import kotlin.collections.map
|
||||
import kotlin.collections.mapOf
|
||||
import kotlin.collections.mutableListOf
|
||||
import kotlin.collections.mutableSetOf
|
||||
import kotlin.collections.plus
|
||||
import kotlin.collections.plusAssign
|
||||
import kotlin.collections.reversed
|
||||
import kotlin.collections.setOf
|
||||
import kotlin.collections.single
|
||||
import kotlin.collections.toSet
|
||||
|
||||
/**
|
||||
* A base node implementation that can be customised either for production (with real implementations that do real
|
||||
@ -212,9 +238,11 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
val busyNodeLatch: ReusableLatch = ReusableLatch(),
|
||||
djvmBootstrapSource: ApiSource = EmptyApi,
|
||||
djvmCordaSource: UserSource? = null,
|
||||
protected val allowHibernateToManageAppSchema: Boolean = false) : SingletonSerializeAsToken() {
|
||||
protected val allowHibernateToManageAppSchema: Boolean = false,
|
||||
private val allowAppSchemaUpgradeWithCheckpoints: Boolean = false) : SingletonSerializeAsToken() {
|
||||
|
||||
protected abstract val log: Logger
|
||||
|
||||
@Suppress("LeakingThis")
|
||||
private var tokenizableServices: MutableList<SerializeAsToken>? = mutableListOf(platformClock, this)
|
||||
|
||||
@ -472,12 +500,20 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
open fun runDatabaseMigrationScripts() {
|
||||
open fun runDatabaseMigrationScripts(
|
||||
updateCoreSchemas: Boolean,
|
||||
updateAppSchemas: Boolean,
|
||||
updateAppSchemasWithCheckpoints: Boolean
|
||||
) {
|
||||
check(started == null) { "Node has already been started" }
|
||||
Node.printBasicNodeInfo("Running database schema migration scripts ...")
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
|
||||
database.startHikariPool(props, schemaService.internalSchemas(), metricRegistry, this.cordappLoader, configuration.baseDirectory, configuration.myLegalName, runMigrationScripts = true)
|
||||
database.startHikariPool(props, metricRegistry) { dataSource, haveCheckpoints ->
|
||||
SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
|
||||
.checkOrUpdate(schemaService.internalSchemas, updateCoreSchemas, haveCheckpoints, true)
|
||||
.checkOrUpdate(schemaService.appSchemas, updateAppSchemas, !updateAppSchemasWithCheckpoints && haveCheckpoints, false)
|
||||
}
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
logVendorString(database, log)
|
||||
if (allowHibernateToManageAppSchema) {
|
||||
@ -987,7 +1023,12 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
protected open fun startDatabase() {
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
|
||||
database.startHikariPool(props, schemaService.internalSchemas(), metricRegistry, this.cordappLoader, configuration.baseDirectory, configuration.myLegalName, runMigrationScripts = runMigrationScripts)
|
||||
database.startHikariPool(props, metricRegistry) { dataSource, haveCheckpoints ->
|
||||
SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
|
||||
.checkOrUpdate(schemaService.internalSchemas, runMigrationScripts, haveCheckpoints, true)
|
||||
.checkOrUpdate(schemaService.appSchemas, runMigrationScripts, haveCheckpoints && !allowAppSchemaUpgradeWithCheckpoints, false)
|
||||
}
|
||||
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
logVendorString(database, log)
|
||||
}
|
||||
@ -1388,23 +1429,16 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
|
||||
allowHibernateToManageAppSchema = allowHibernateToManageAppSchema)
|
||||
}
|
||||
|
||||
@Suppress("LongParameterList", "ComplexMethod", "ThrowsCount")
|
||||
@Suppress("ThrowsCount")
|
||||
fun CordaPersistence.startHikariPool(
|
||||
hikariProperties: Properties,
|
||||
schemas: Set<MappedSchema>,
|
||||
metricRegistry: MetricRegistry? = null,
|
||||
cordappLoader: CordappLoader? = null,
|
||||
currentDir: Path? = null,
|
||||
ourName: CordaX500Name,
|
||||
runMigrationScripts: Boolean = false) {
|
||||
schemaMigration: (DataSource, Boolean) -> Unit) {
|
||||
try {
|
||||
val dataSource = DataSourceFactory.createDataSource(hikariProperties, metricRegistry = metricRegistry)
|
||||
val schemaMigration = SchemaMigration(schemas, dataSource, cordappLoader, currentDir, ourName)
|
||||
if (runMigrationScripts) {
|
||||
schemaMigration.runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
|
||||
} else {
|
||||
schemaMigration.checkState()
|
||||
}
|
||||
val haveCheckpoints = dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }
|
||||
|
||||
schemaMigration(dataSource, haveCheckpoints)
|
||||
start(dataSource)
|
||||
} catch (ex: Exception) {
|
||||
when {
|
||||
@ -1416,15 +1450,25 @@ fun CordaPersistence.startHikariPool(
|
||||
"Could not find the database driver class. Please add it to the 'drivers' folder.",
|
||||
NodeDatabaseErrors.MISSING_DRIVER)
|
||||
ex is OutstandingDatabaseChangesException -> throw (DatabaseIncompatibleException(ex.message))
|
||||
else ->
|
||||
else -> {
|
||||
LoggerFactory.getLogger("CordaPersistence extension").error("Could not create the DataSource", ex)
|
||||
throw CouldNotCreateDataSourceException(
|
||||
"Could not create the DataSource: ${ex.message}",
|
||||
NodeDatabaseErrors.FAILED_STARTUP,
|
||||
cause = ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun SchemaMigration.checkOrUpdate(schemas: Set<MappedSchema>, update: Boolean, haveCheckpoints: Boolean, forceThrowOnMissingMigration: Boolean): SchemaMigration {
|
||||
if (update)
|
||||
this.runMigration(haveCheckpoints, schemas, forceThrowOnMissingMigration)
|
||||
else
|
||||
this.checkState(schemas, forceThrowOnMissingMigration)
|
||||
return this
|
||||
}
|
||||
|
||||
fun clientSslOptionsCompatibleWith(nodeRpcOptions: NodeRpcOptions): ClientRpcSslOptions? {
|
||||
|
||||
if (!nodeRpcOptions.useSsl || nodeRpcOptions.sslConfig == null) {
|
||||
|
@ -561,11 +561,14 @@ open class Node(configuration: NodeConfiguration,
|
||||
return super.generateAndSaveNodeInfo()
|
||||
}
|
||||
|
||||
override fun runDatabaseMigrationScripts() {
|
||||
override fun runDatabaseMigrationScripts(
|
||||
updateCoreSchemas: Boolean,
|
||||
updateAppSchemas: Boolean,
|
||||
updateAppSchemasWithCheckpoints: Boolean) {
|
||||
if (allowHibernateToManageAppSchema) {
|
||||
initialiseSerialization()
|
||||
}
|
||||
super.runDatabaseMigrationScripts()
|
||||
super.runDatabaseMigrationScripts(updateCoreSchemas, updateAppSchemas, updateAppSchemasWithCheckpoints)
|
||||
}
|
||||
|
||||
override fun start(): NodeInfo {
|
||||
|
@ -4,12 +4,25 @@ import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.NodeCliCommand
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.internal.RunAfterNodeInitialisation
|
||||
import picocli.CommandLine
|
||||
|
||||
class RunMigrationScriptsCli(startup: NodeStartup) : NodeCliCommand("run-migration-scripts", "Run the database migration scripts and create or update schemas", startup) {
|
||||
@CommandLine.Option(names = ["--core-schemas"], description = ["Manage the core/node schemas"])
|
||||
var updateCoreSchemas: Boolean = false
|
||||
|
||||
@CommandLine.Option(names = ["--app-schemas"], description = ["Manage the CorDapp schemas"])
|
||||
var updateAppSchemas: Boolean = false
|
||||
|
||||
@CommandLine.Option(names = ["--update-app-schema-with-checkpoints"], description = ["Allow updating app schema even if there are suspended flows"])
|
||||
var updateAppSchemaWithCheckpoints: Boolean = false
|
||||
|
||||
|
||||
|
||||
override fun runProgram(): Int {
|
||||
require(updateAppSchemas || updateCoreSchemas) { "Nothing to do: at least one of --core-schemas or --app-schemas must be set" }
|
||||
return startup.initialiseAndRun(cmdLineOptions, object : RunAfterNodeInitialisation {
|
||||
override fun run(node: Node) {
|
||||
node.runDatabaseMigrationScripts()
|
||||
node.runDatabaseMigrationScripts(updateCoreSchemas, updateAppSchemas, updateAppSchemaWithCheckpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -62,14 +62,13 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
|
||||
NodeInfoSchemaV1,
|
||||
NodeCoreV1)
|
||||
|
||||
fun internalSchemas() = requiredSchemas + extraSchemas.filter { schema ->
|
||||
// when mapped schemas from the finance module are present, they are considered as internal ones
|
||||
schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" ||
|
||||
schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" ||
|
||||
val internalSchemas = requiredSchemas + extraSchemas.filter { schema ->
|
||||
schema::class.qualifiedName == "net.corda.node.services.transactions.NodeNotarySchemaV1" ||
|
||||
schema::class.qualifiedName?.startsWith("net.corda.notary.") ?: false
|
||||
}
|
||||
|
||||
val appSchemas = extraSchemas - internalSchemas
|
||||
|
||||
override val schemas: Set<MappedSchema> = requiredSchemas + extraSchemas
|
||||
|
||||
// Currently returns all schemas supported by the state, with no filtering or enrichment.
|
||||
|
@ -2,11 +2,13 @@ package net.corda.node.services.persistence
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.internal.checkOrUpdate
|
||||
import net.corda.node.internal.createCordaPersistence
|
||||
import net.corda.node.internal.startHikariPool
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
@ -93,7 +95,10 @@ class DbMapDeadlockTest {
|
||||
val dbConfig = DatabaseConfig()
|
||||
val schemaService = NodeSchemaService(extraSchemas = setOf(LockDbSchemaV2))
|
||||
createCordaPersistence(dbConfig, { null }, { null }, schemaService, hikariProperties, cacheFactory, null).apply {
|
||||
startHikariPool(hikariProperties, schemaService.schemas, ourName = TestIdentity(ALICE_NAME, 70).name, runMigrationScripts = true)
|
||||
startHikariPool(hikariProperties) { dataSource, haveCheckpoints ->
|
||||
SchemaMigration(dataSource, null, null, TestIdentity(ALICE_NAME, 70).name)
|
||||
.checkOrUpdate(schemaService.schemas, true, haveCheckpoints, false)
|
||||
}
|
||||
}.use { persistence ->
|
||||
|
||||
// First clean up any remains from previous test runs
|
||||
|
@ -90,6 +90,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
|
||||
}
|
||||
cordapp project(':samples:attachment-demo:contracts')
|
||||
cordapp project(':samples:attachment-demo:workflows')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Service,L=Zurich,C=CH"
|
||||
|
@ -48,6 +48,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
|
||||
nodeDefaults {
|
||||
cordapp project(':finance:workflows')
|
||||
cordapp project(':finance:contracts')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Service,L=Zurich,C=CH"
|
||||
|
@ -25,6 +25,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
|
||||
}
|
||||
rpcUsers = [['username': "default", 'password': "default", 'permissions': [ 'ALL' ]]]
|
||||
cordapp project(':samples:cordapp-configuration:workflows')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Service,L=Zurich,C=CH"
|
||||
|
@ -60,6 +60,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask])
|
||||
}
|
||||
cordapp project(':samples:irs-demo:cordapp:contracts-irs')
|
||||
cordapp project(':samples:irs-demo:cordapp:workflows-irs')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Service,L=Zurich,C=CH"
|
||||
|
@ -36,6 +36,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask])
|
||||
}
|
||||
cordapp project(':samples:network-verifier:contracts')
|
||||
cordapp project(':samples:network-verifier:workflows')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Service,L=Zurich,C=CH"
|
||||
|
@ -44,6 +44,7 @@ task deployNodesSingle(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
|
||||
extraConfig = [h2Settings: [address: "localhost:0"]]
|
||||
cordapp project(':samples:notary-demo:contracts')
|
||||
cordapp project(':samples:notary-demo:workflows')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Alice Corp,L=Madrid,C=ES"
|
||||
|
@ -91,6 +91,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
|
||||
cordapp project(':samples:simm-valuation-demo:contracts-states')
|
||||
cordapp project(':samples:simm-valuation-demo:flows')
|
||||
rpcUsers = [['username': "default", 'password': "default", 'permissions': [ 'ALL' ]]]
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Service,L=Zurich,C=CH"
|
||||
|
@ -81,6 +81,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask])
|
||||
cordapp project(':finance:workflows')
|
||||
cordapp project(':finance:contracts')
|
||||
cordapp project(':samples:trader-demo:workflows-trader')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Service,L=Zurich,C=CH"
|
||||
|
@ -127,7 +127,7 @@ open class MockServices private constructor(
|
||||
val cordappLoader = cordappLoaderForPackages(cordappPackages)
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
|
||||
val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService, schemaService.internalSchemas())
|
||||
val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService, schemaService.internalSchemas)
|
||||
val keyManagementService = MockKeyManagementService(
|
||||
identityService,
|
||||
*arrayOf(initialIdentity.keyPair) + moreKeys
|
||||
@ -170,7 +170,7 @@ open class MockServices private constructor(
|
||||
wellKnownPartyFromX500Name = identityService::wellKnownPartyFromX500Name,
|
||||
wellKnownPartyFromAnonymous = identityService::wellKnownPartyFromAnonymous,
|
||||
schemaService = schemaService,
|
||||
internalSchemas = schemaService.internalSchemas()
|
||||
internalSchemas = schemaService.internalSchemas
|
||||
)
|
||||
|
||||
val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, cacheFactory)
|
||||
|
@ -337,6 +337,8 @@ class DriverDSLImpl(
|
||||
return startOutOfProcessMiniNode(config,
|
||||
listOfNotNull(
|
||||
"run-migration-scripts",
|
||||
"--core-schemas",
|
||||
"--app-schemas",
|
||||
if (hibernateForAppSchema) "--allow-hibernate-to-manage-app-schema" else null
|
||||
).toTypedArray()).map { config }
|
||||
}
|
||||
|
@ -279,7 +279,10 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
|
||||
}
|
||||
}
|
||||
|
||||
open class MockNode(args: MockNodeArgs, private val mockFlowManager: MockNodeFlowManager = args.flowManager) : AbstractNode<TestStartedNode>(
|
||||
open class MockNode(
|
||||
args: MockNodeArgs,
|
||||
private val mockFlowManager: MockNodeFlowManager = args.flowManager,
|
||||
allowAppSchemaUpgradeWithCheckpoints: Boolean = false) : AbstractNode<TestStartedNode>(
|
||||
args.config,
|
||||
TestClock(Clock.systemUTC()),
|
||||
DefaultNamedCacheFactory(),
|
||||
@ -287,7 +290,8 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
|
||||
mockFlowManager,
|
||||
args.network.getServerThread(args.id),
|
||||
args.network.busyLatch,
|
||||
allowHibernateToManageAppSchema = true
|
||||
allowHibernateToManageAppSchema = true,
|
||||
allowAppSchemaUpgradeWithCheckpoints = allowAppSchemaUpgradeWithCheckpoints
|
||||
) {
|
||||
companion object {
|
||||
private val staticLog = contextLogger()
|
||||
|
@ -1,7 +1,11 @@
|
||||
package net.corda.testing.internal
|
||||
|
||||
import net.corda.core.context.AuthServiceId
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.contracts.PrivacySalt
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.Crypto.generateKeyPair
|
||||
import net.corda.core.crypto.SecureHash
|
||||
@ -19,6 +23,8 @@ import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.coretesting.internal.asTestContextEnv
|
||||
import net.corda.coretesting.internal.createTestSerializationEnv
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.node.internal.checkOrUpdate
|
||||
import net.corda.node.internal.createCordaPersistence
|
||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.internal.startHikariPool
|
||||
@ -32,19 +38,17 @@ import net.corda.nodeapi.internal.createDevNodeCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.loadDevCaTrustStore
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||
import net.corda.nodeapi.internal.registerDevP2pCertificates
|
||||
import net.corda.serialization.internal.amqp.AMQP_ENABLED
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.IOException
|
||||
import java.net.ServerSocket
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.util.*
|
||||
@ -169,7 +173,7 @@ fun configureDatabase(hikariProperties: Properties,
|
||||
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
|
||||
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
|
||||
schemaService: SchemaService = NodeSchemaService(),
|
||||
internalSchemas: Set<MappedSchema> = NodeSchemaService().internalSchemas(),
|
||||
internalSchemas: Set<MappedSchema> = NodeSchemaService().internalSchemas,
|
||||
cacheFactory: NamedCacheFactory = TestingNamedCacheFactory(),
|
||||
ourName: CordaX500Name = TestIdentity(ALICE_NAME, 70).name,
|
||||
runMigrationScripts: Boolean = true,
|
||||
@ -183,7 +187,10 @@ fun configureDatabase(hikariProperties: Properties,
|
||||
cacheFactory,
|
||||
null,
|
||||
allowHibernateToManageAppSchema)
|
||||
persistence.startHikariPool(hikariProperties, internalSchemas, ourName = ourName, runMigrationScripts = runMigrationScripts)
|
||||
persistence.startHikariPool(hikariProperties) { dataSource, haveCheckpoints ->
|
||||
SchemaMigration(dataSource, null, null, ourName)
|
||||
.checkOrUpdate(internalSchemas, runMigrationScripts, haveCheckpoints, false)
|
||||
}
|
||||
return persistence
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user