mirror of
https://github.com/corda/corda.git
synced 2025-04-15 15:07:03 +00:00
Merge remote-tracking branch 'remotes/open/master' into szymonsztuka/os-merge-20180824
# Conflicts: # build.gradle # core/src/main/kotlin/net/corda/core/schemas/PersistentTypes.kt # finance/src/main/kotlin/net/corda/finance/schemas/CashSchemaV1.kt # finance/src/main/kotlin/net/corda/finance/schemas/CommercialPaperSchemaV1.kt # finance/src/main/resources/migration/cash.changelog-init.xml # finance/src/main/resources/migration/cash.changelog-master.xml # finance/src/main/resources/migration/commercial-paper.changelog-init.xml # finance/src/main/resources/migration/commercial-paper.changelog-master.xml # finance/src/main/resources/migration/commercial-paper.changelog-v1.xml # node-api/src/main/kotlin/net/corda/nodeapi/internal/MigrationHelpers.kt # node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/SchemaMigration.kt # node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt # node/src/main/kotlin/net/corda/node/internal/schemas/NodeInfoSchema.kt # node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt # node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt # node/src/main/resources/migration/common.changelog-init.xml # node/src/main/resources/migration/common.changelog-master.xml # node/src/main/resources/migration/node-core.changelog-init.xml # node/src/main/resources/migration/node-core.changelog-master.xml # node/src/main/resources/migration/node-core.changelog-pkey.xml # node/src/main/resources/migration/node-core.changelog-postgres-blob.xml # node/src/main/resources/migration/node-core.changelog-tx-mapping.xml # node/src/main/resources/migration/node-core.changelog-v3.xml # node/src/main/resources/migration/node-core.changelog-v4.xml # node/src/main/resources/migration/node-info.changelog-init.xml # node/src/main/resources/migration/node-info.changelog-master.xml # node/src/main/resources/migration/node-info.changelog-v1.xml # node/src/main/resources/migration/node-info.changelog-v2.xml # node/src/main/resources/migration/node-notary.changelog-init.xml # node/src/main/resources/migration/node-notary.changelog-master.xml # node/src/main/resources/migration/node-notary.changelog-pkey.xml # node/src/main/resources/migration/node-notary.changelog-v1.xml # node/src/main/resources/migration/vault-schema.changelog-init.xml # node/src/main/resources/migration/vault-schema.changelog-master.xml # node/src/main/resources/migration/vault-schema.changelog-pkey.xml # testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt
This commit is contained in:
commit
598e3a327a
@ -70,8 +70,8 @@ buildscript {
|
||||
ext.shiro_version = '1.4.0'
|
||||
ext.shadow_version = '2.0.4'
|
||||
ext.artifactory_plugin_version = constants.getProperty('artifactoryPluginVersion')
|
||||
ext.liquibase_version = '3.5.3'
|
||||
ext.hikari_version = '2.5.1'
|
||||
ext.liquibase_version = '3.6.2'
|
||||
ext.artifactory_contextUrl = 'https://ci-artifactory.corda.r3cev.com/artifactory'
|
||||
ext.snake_yaml_version = constants.getProperty('snakeYamlVersion')
|
||||
ext.docker_compose_rule_version = '0.33.0'
|
||||
|
@ -59,9 +59,7 @@ open class MappedSchema(schemaFamily: Class<*>,
|
||||
/**
|
||||
* Points to a classpath resource containing the database changes for the [mappedTypes]
|
||||
*/
|
||||
protected open val migrationResource: String? = null
|
||||
|
||||
internal fun internalGetMigrationResource(): String? = migrationResource
|
||||
open val migrationResource: String? = null
|
||||
|
||||
override fun toString(): String = "${this.javaClass.simpleName}(name=$name, version=$version)"
|
||||
|
||||
|
@ -63,6 +63,11 @@ dependencies {
|
||||
// For caches rather than guava
|
||||
compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version"
|
||||
|
||||
// For db migration
|
||||
compile "org.liquibase:liquibase-core:$liquibase_version"
|
||||
compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
|
||||
runtime 'com.mattbertolini:liquibase-slf4j:2.0.0'
|
||||
|
||||
// Unit testing helpers.
|
||||
testCompile "junit:junit:$junit_version"
|
||||
testCompile "org.assertj:assertj-core:$assertj_version"
|
||||
|
@ -11,7 +11,6 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import com.google.common.base.CaseFormat
|
||||
import net.corda.core.internal.getMigrationResource
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
|
||||
object MigrationHelpers {
|
||||
@ -21,7 +20,7 @@ object MigrationHelpers {
|
||||
private val possibleMigrationExtensions = listOf(".xml", ".sql", ".yml", ".json")
|
||||
|
||||
fun getMigrationResource(schema: MappedSchema, classLoader: ClassLoader): String? {
|
||||
val declaredMigration = schema.getMigrationResource()
|
||||
val declaredMigration = schema.migrationResource
|
||||
|
||||
if (declaredMigration == null) {
|
||||
// try to apply the naming convention and find the migration file in the classpath
|
||||
@ -34,9 +33,8 @@ object MigrationHelpers {
|
||||
return "$MIGRATION_PREFIX/$declaredMigration.$DEFAULT_MIGRATION_EXTENSION"
|
||||
}
|
||||
|
||||
// SchemaName will be transformed from camel case to lower_hyphen
|
||||
// then add ".changelog-master"
|
||||
fun migrationResourceNameForSchema(schema: MappedSchema): String {
|
||||
// SchemaName will be transformed from camel case to lower_hyphen then add ".changelog-master"
|
||||
private fun migrationResourceNameForSchema(schema: MappedSchema): String {
|
||||
val name: String = schema::class.simpleName!!
|
||||
val fileName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_HYPHEN, name)
|
||||
return "$MIGRATION_PREFIX/$fileName.$CHANGELOG_NAME"
|
||||
|
@ -100,8 +100,6 @@ class CordaPersistence(
|
||||
// Check not in read-only mode.
|
||||
transaction {
|
||||
check(!connection.metaData.isReadOnly) { "Database should not be readonly." }
|
||||
checkCorrectAttachmentsContractsTableName(connection)
|
||||
checkCorrectCheckpointTypeOnPostgres(connection)
|
||||
}
|
||||
}
|
||||
object DataSourceConfigTag {
|
||||
@ -306,33 +304,3 @@ private fun Throwable.hasSQLExceptionCause(): Boolean =
|
||||
}
|
||||
|
||||
class CouldNotCreateDataSourceException(override val message: String?, override val cause: Throwable? = null) : Exception()
|
||||
|
||||
class DatabaseIncompatibleException(override val message: String?, override val cause: Throwable? = null) : Exception()
|
||||
|
||||
private fun checkCorrectAttachmentsContractsTableName(connection: Connection) {
|
||||
val correctName = "NODE_ATTACHMENTS_CONTRACTS"
|
||||
val incorrectV30Name = "NODE_ATTACHMENTS_CONTRACT_CLASS_NAME"
|
||||
val incorrectV31Name = "NODE_ATTCHMENTS_CONTRACTS"
|
||||
|
||||
fun warning(incorrectName: String, version: String) = "The database contains the older table name $incorrectName instead of $correctName, see upgrade notes to migrate from Corda database version $version https://docs.corda.net/head/upgrade-notes.html."
|
||||
|
||||
if (!connection.metaData.getTables(null, null, correctName, null).next()) {
|
||||
if (connection.metaData.getTables(null, null, incorrectV30Name, null).next()) { throw DatabaseIncompatibleException(warning(incorrectV30Name, "3.0")) }
|
||||
if (connection.metaData.getTables(null, null, incorrectV31Name, null).next()) { throw DatabaseIncompatibleException(warning(incorrectV31Name, "3.1")) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkCorrectCheckpointTypeOnPostgres(connection: Connection) {
|
||||
val metaData = connection.metaData
|
||||
if (metaData.getDatabaseProductName() != "PostgreSQL") {
|
||||
return
|
||||
}
|
||||
|
||||
val result = metaData.getColumns(null, null, "node_checkpoints", "checkpoint_value")
|
||||
if (result.next()) {
|
||||
val type = result.getString("TYPE_NAME")
|
||||
if (type != "bytea") {
|
||||
throw DatabaseIncompatibleException("The type of the 'checkpoint_value' table must be 'bytea', but 'oid' was found. See upgrade notes to migrate from Corda database version 3.1 https://docs.corda.net/head/upgrade-notes.html.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,7 +31,6 @@ import javax.sql.DataSource
|
||||
class SchemaMigration(
|
||||
val schemas: Set<MappedSchema>,
|
||||
val dataSource: DataSource,
|
||||
val failOnMigrationMissing: Boolean,
|
||||
private val databaseConfig: DatabaseConfig,
|
||||
private val classLoader: ClassLoader = Thread.currentThread().contextClassLoader) {
|
||||
|
||||
@ -45,8 +44,12 @@ class SchemaMigration(
|
||||
*/
|
||||
fun nodeStartup(existingCheckpoints: Boolean) {
|
||||
when {
|
||||
databaseConfig.runMigration -> runMigration(existingCheckpoints)
|
||||
failOnMigrationMissing -> checkState()
|
||||
databaseConfig.initialiseSchema -> {
|
||||
//TODO if it's h2 only
|
||||
migrateOlderDatabaseToUseLiquibase(existingCheckpoints)
|
||||
runMigration(existingCheckpoints)
|
||||
}
|
||||
else -> checkState()
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,7 +66,7 @@ class SchemaMigration(
|
||||
/**
|
||||
* Ensures that the database is up to date with the latest migration changes.
|
||||
*/
|
||||
fun checkState() = doRunMigration(run = false, outputWriter = null, check = true)
|
||||
private fun checkState() = doRunMigration(run = false, outputWriter = null, check = true)
|
||||
|
||||
/**
|
||||
* Can be used from an external tool to release the lock in case something went terribly wrong.
|
||||
@ -74,6 +77,23 @@ class SchemaMigration(
|
||||
}
|
||||
}
|
||||
|
||||
/** Create a resourse accessor that aggregates the changelogs included in the schemas into one dynamic stream. */
|
||||
private class CustomResourceAccessor(val dynamicInclude: String, val changelogList: List<String?>, classLoader: ClassLoader) : ClassLoaderResourceAccessor(classLoader) {
|
||||
override fun getResourcesAsStream(path: String): Set<InputStream> {
|
||||
if (path == dynamicInclude) {
|
||||
// Create a map in Liquibase format including all migration files.
|
||||
val includeAllFiles = mapOf("databaseChangeLog" to changelogList.filter { it != null }.map { file -> mapOf("include" to mapOf("file" to file)) })
|
||||
|
||||
// Transform it to json.
|
||||
val includeAllFilesJson = ObjectMapper().writeValueAsBytes(includeAllFiles)
|
||||
|
||||
// Return the json as a stream.
|
||||
return setOf(ByteArrayInputStream(includeAllFilesJson))
|
||||
}
|
||||
return super.getResourcesAsStream(path)?.take(1)?.toSet() ?: emptySet()
|
||||
}
|
||||
}
|
||||
|
||||
private fun doRunMigration(run: Boolean, outputWriter: Writer?, check: Boolean, existingCheckpoints: Boolean? = null) {
|
||||
|
||||
// Virtual file name of the changelog that includes all schemas.
|
||||
@ -87,31 +107,11 @@ class SchemaMigration(
|
||||
val resource = getMigrationResource(mappedSchema, classLoader)
|
||||
when {
|
||||
resource != null -> resource
|
||||
failOnMigrationMissing -> throw MissingMigrationException(mappedSchema)
|
||||
else -> {
|
||||
logger.warn(MissingMigrationException.errorMessageFor(mappedSchema))
|
||||
null
|
||||
}
|
||||
else -> throw MissingMigrationException(mappedSchema)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a resourse accessor that aggregates the changelogs included in the schemas into one dynamic stream.
|
||||
val customResourceAccessor = object : ClassLoaderResourceAccessor(classLoader) {
|
||||
override fun getResourcesAsStream(path: String): Set<InputStream> {
|
||||
|
||||
if (path == dynamicInclude) {
|
||||
// Create a map in Liquibase format including all migration files.
|
||||
val includeAllFiles = mapOf("databaseChangeLog" to changelogList.filter { it != null }.map { file -> mapOf("include" to mapOf("file" to file)) })
|
||||
|
||||
// Transform it to json.
|
||||
val includeAllFilesJson = ObjectMapper().writeValueAsBytes(includeAllFiles)
|
||||
|
||||
// Return the json as a stream.
|
||||
return setOf(ByteArrayInputStream(includeAllFilesJson))
|
||||
}
|
||||
return super.getResourcesAsStream(path)?.take(1)?.toSet() ?: emptySet()
|
||||
}
|
||||
}
|
||||
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, changelogList, classLoader)
|
||||
|
||||
val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection)))
|
||||
|
||||
@ -138,6 +138,7 @@ class SchemaMigration(
|
||||
check && !run && unRunChanges.isNotEmpty() -> throw OutstandingDatabaseChangesException(unRunChanges.size)
|
||||
check && !run -> {} // Do nothing will be interpreted as "check succeeded"
|
||||
(outputWriter != null) && !check && !run -> liquibase.update(Contexts(), outputWriter)
|
||||
(outputWriter != null) && !check && !run -> liquibase.update(Contexts(), outputWriter)
|
||||
else -> throw IllegalStateException("Invalid usage.")
|
||||
}
|
||||
}
|
||||
@ -161,6 +162,54 @@ class SchemaMigration(
|
||||
|
||||
return if (liquibaseDbImplementation is MSSQLDatabase) AzureDatabase(conn) else liquibaseDbImplementation
|
||||
}
|
||||
|
||||
/** For existing database created before verions 4.0 add Liquibase support - creates DATABASECHANGELOG and DATABASECHANGELOGLOCK tables and mark changesets are executed. */
|
||||
private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean {
|
||||
val isExistingDBWithoutLiquibase = dataSource.connection.use {
|
||||
it.metaData.getTables(null, null, "NODE%", null).next() &&
|
||||
!it.metaData.getTables(null, null, "DATABASECHANGELOG", null).next() &&
|
||||
!it.metaData.getTables(null, null, "DATABASECHANGELOGLOCK", null).next()
|
||||
}
|
||||
when {
|
||||
isExistingDBWithoutLiquibase && existingCheckpoints -> throw CheckpointsException()
|
||||
isExistingDBWithoutLiquibase -> {
|
||||
// Virtual file name of the changelog that includes all schemas.
|
||||
val dynamicInclude = "master.changelog.json"
|
||||
|
||||
dataSource.connection.use { connection ->
|
||||
// Schema migrations pre release 4.0
|
||||
val preV4Baseline =
|
||||
listOf("migration/common.changelog-init.xml",
|
||||
"migration/node-info.changelog-init.xml",
|
||||
"migration/node-info.changelog-v1.xml",
|
||||
"migration/node-info.changelog-v2.xml",
|
||||
"migration/node-core.changelog-init.xml",
|
||||
"migration/node-core.changelog-v3.xml",
|
||||
"migration/node-core.changelog-v4.xml",
|
||||
"migration/node-core.changelog-v5.xml",
|
||||
"migration/node-core.changelog-pkey.xml",
|
||||
"migration/vault-schema.changelog-init.xml",
|
||||
"migration/vault-schema.changelog-v3.xml",
|
||||
"migration/vault-schema.changelog-v4.xml",
|
||||
"migration/vault-schema.changelog-pkey.xml",
|
||||
"migration/cash.changelog-init.xml",
|
||||
"migration/cash.changelog-v1.xml",
|
||||
"migration/commercial-paper.changelog-init.xml",
|
||||
"migration/commercial-paper.changelog-v1.xml") +
|
||||
if (schemas.any { schema -> schema.migrationResource == "node-notary.changelog-master" })
|
||||
listOf("migration/node-notary.changelog-init.xml",
|
||||
"migration/node-notary.changelog-v1.xml",
|
||||
"migration/vault-schema.changelog-pkey.xml")
|
||||
else emptyList()
|
||||
|
||||
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, preV4Baseline, classLoader)
|
||||
val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection)))
|
||||
liquibase.changeLogSync(Contexts(), LabelExpression())
|
||||
}
|
||||
}
|
||||
}
|
||||
return isExistingDBWithoutLiquibase
|
||||
}
|
||||
}
|
||||
|
||||
open class DatabaseMigrationException(message: String) : IllegalArgumentException(message) {
|
||||
@ -183,3 +232,9 @@ class CheckpointsException : DatabaseMigrationException("Attempting to update th
|
||||
"This is dangerous because the node might not be able to restore the flows correctly and could consequently fail. " +
|
||||
"Updating the database would make reverting to the previous version more difficult. " +
|
||||
"Please drain your node first. See: https://docs.corda.net/upgrading-cordapps.html#flow-drains")
|
||||
|
||||
class DatabaseIncompatibleException(@Suppress("MemberVisibilityCanBePrivate") private val reason: String) : DatabaseMigrationException(errorMessageFor(reason)) {
|
||||
internal companion object {
|
||||
fun errorMessageFor(reason: String): String = "Incompatible database schema version detected, please run the node with configuration option database.initialiseSchema=true. Reason: $reason"
|
||||
}
|
||||
}
|
@ -1,70 +0,0 @@
|
||||
package net.corda.node.persistence
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.internal.packageName
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.testMessage.Message
|
||||
import net.corda.testMessage.MessageState
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import org.junit.Test
|
||||
import java.nio.file.Path
|
||||
import java.sql.DriverManager
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class FailNodeOnNotMigratedAttachmentContractsTableNameTests {
|
||||
@Test
|
||||
fun `node fails when detecting table name not migrated from version 3 dot 0`() {
|
||||
`node fails when not detecting compatible table name`("NODE_ATTACHMENTS_CONTRACTS", "NODE_ATTACHMENTS_CONTRACT_CLASS_NAME")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `node fails when detecting table name not migrated from version 3 dot 1`() {
|
||||
`node fails when not detecting compatible table name`("NODE_ATTACHMENTS_CONTRACTS", "NODE_ATTCHMENTS_CONTRACTS")
|
||||
}
|
||||
|
||||
private fun `node fails when not detecting compatible table name`(tableNameFromMapping: String, tableNameInDB: String) {
|
||||
val user = User("mark", "dadada", setOf(Permissions.startFlow<SendMessageFlow>(), Permissions.invokeRpc("vaultQuery")))
|
||||
val message = Message("Hello world!")
|
||||
val baseDir: Path = driver(DriverParameters(
|
||||
inMemoryDB = false,
|
||||
startNodesInProcess = isQuasarAgentSpecified(),
|
||||
extraCordappPackagesToScan = listOf(MessageState::class.packageName)
|
||||
)) {
|
||||
val (nodeName, baseDir) = {
|
||||
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
|
||||
val nodeName = nodeHandle.nodeInfo.singleIdentity().name
|
||||
CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
|
||||
}
|
||||
nodeHandle.stop()
|
||||
Pair(nodeName, nodeHandle.baseDirectory)
|
||||
}()
|
||||
|
||||
// replace the correct table name with one from the former release
|
||||
DriverManager.getConnection("jdbc:h2:file://$baseDir/persistence", "sa", "").use {
|
||||
it.createStatement().execute("ALTER TABLE $tableNameFromMapping RENAME TO $tableNameInDB")
|
||||
it.commit()
|
||||
}
|
||||
assertFailsWith(net.corda.nodeapi.internal.persistence.DatabaseIncompatibleException::class) {
|
||||
val nodeHandle = startNode(providedName = nodeName, rpcUsers = listOf(user)).getOrThrow()
|
||||
nodeHandle.stop()
|
||||
}
|
||||
baseDir
|
||||
}
|
||||
|
||||
// check that the node didn't recreated the correct table matching it's entity mapping
|
||||
val (hasTableFromMapping, hasTableFromDB) = DriverManager.getConnection("jdbc:h2:file://$baseDir/persistence", "sa", "").use {
|
||||
Pair(it.metaData.getTables(null, null, tableNameFromMapping, null).next(),
|
||||
it.metaData.getTables(null, null, tableNameInDB, null).next())
|
||||
}
|
||||
assertFalse(hasTableFromMapping)
|
||||
assertTrue(hasTableFromDB)
|
||||
}
|
||||
}
|
@ -36,6 +36,7 @@ import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.node.*
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -148,7 +149,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
configuration.database,
|
||||
identityService::wellKnownPartyFromX500Name,
|
||||
identityService::wellKnownPartyFromAnonymous,
|
||||
schemaService
|
||||
schemaService,
|
||||
configuration.dataSourceProperties
|
||||
)
|
||||
init {
|
||||
// TODO Break cyclic dependency
|
||||
@ -774,7 +776,9 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
|
||||
database.startHikariPool(props, configuration.database, schemaService)
|
||||
val isH2Database = isH2Database(props.getProperty("dataSource.url", ""))
|
||||
val schemas = if (isH2Database) schemaService.internalSchemas() else schemaService.schemaOptions.keys
|
||||
database.startHikariPool(props, configuration.database, schemas)
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
logVendorString(database, log)
|
||||
}
|
||||
@ -1045,39 +1049,42 @@ fun configureDatabase(hikariProperties: Properties,
|
||||
databaseConfig: DatabaseConfig,
|
||||
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
|
||||
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
|
||||
schemaService: SchemaService = NodeSchemaService()): CordaPersistence =
|
||||
schemaService: NodeSchemaService = NodeSchemaService()): CordaPersistence {
|
||||
|
||||
val isH2Database = isH2Database(hikariProperties.getProperty("dataSource.url", ""))
|
||||
val schemas = if (isH2Database) schemaService.internalSchemas() else schemaService.schemaOptions.keys
|
||||
createCordaPersistence(databaseConfig, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous, schemaService)
|
||||
.apply { startHikariPool(hikariProperties, databaseConfig, schemaService) }
|
||||
.apply { startHikariPool(hikariProperties, databaseConfig, schemas) }
|
||||
|
||||
}
|
||||
|
||||
fun createCordaPersistence(databaseConfig: DatabaseConfig,
|
||||
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
|
||||
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
|
||||
schemaService: SchemaService): CordaPersistence {
|
||||
schemaService: SchemaService,
|
||||
hikariProperties: Properties): CordaPersistence {
|
||||
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
|
||||
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
|
||||
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
|
||||
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
|
||||
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
||||
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
||||
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, attributeConverters)
|
||||
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
|
||||
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters)
|
||||
}
|
||||
|
||||
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemaService: SchemaService) {
|
||||
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>) {
|
||||
try {
|
||||
val dataSource = DataSourceFactory.createDataSource(hikariProperties)
|
||||
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
|
||||
val schemaMigration = SchemaMigration(
|
||||
schemaService.schemaOptions.keys,
|
||||
dataSource,
|
||||
!isH2Database(jdbcUrl),
|
||||
databaseConfig
|
||||
)
|
||||
val schemaMigration = SchemaMigration(schemas, dataSource, databaseConfig)
|
||||
schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })
|
||||
start(dataSource, jdbcUrl)
|
||||
} catch (ex: Exception) {
|
||||
when {
|
||||
ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex)
|
||||
ex.cause is ClassNotFoundException -> throw CouldNotCreateDataSourceException("Could not find the database driver class. Please add it to the 'drivers' folder. See: https://docs.corda.net/corda-configuration-file.html")
|
||||
ex is OutstandingDatabaseChangesException -> throw (DatabaseIncompatibleException(ex.message))
|
||||
ex is DatabaseIncompatibleException -> throw ex
|
||||
else -> throw CouldNotCreateDataSourceException("Could not create the DataSource: ${ex.message}", ex)
|
||||
}
|
||||
|
@ -29,7 +29,6 @@ object NodeInfoSchemaV1 : MappedSchema(
|
||||
version = 1,
|
||||
mappedTypes = listOf(PersistentNodeInfo::class.java, DBPartyAndCertificate::class.java, DBHostAndPort::class.java, NodePropertiesPersistentStore.DBNodeProperty::class.java)
|
||||
) {
|
||||
|
||||
override val migrationResource = "node-info.changelog-master"
|
||||
|
||||
@Entity
|
||||
@ -83,6 +82,7 @@ object NodeInfoSchemaV1 : MappedSchema(
|
||||
@GeneratedValue
|
||||
@Column(name = "hosts_id", nullable = false)
|
||||
var id: Int,
|
||||
@Column(name = "host_name")
|
||||
val host: String? = null,
|
||||
val port: Int? = null
|
||||
) {
|
||||
|
@ -30,7 +30,6 @@ interface CheckpointStorage {
|
||||
*/
|
||||
fun updateCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>)
|
||||
|
||||
|
||||
/**
|
||||
* Remove existing checkpoint from the store.
|
||||
* @return whether the id matched a checkpoint that was removed.
|
||||
|
@ -18,16 +18,16 @@ import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import org.hibernate.annotations.Type
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.stream.Stream
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
import org.hibernate.annotations.Type
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
|
||||
/**
|
||||
* Simple checkpoint key value storage in DB.
|
||||
|
@ -40,7 +40,7 @@ import net.corda.node.services.vault.VaultSchemaV1
|
||||
* TODO: support plugins for schema version upgrading or custom mapping not supported by original [QueryableState].
|
||||
* TODO: create whitelisted tables when a CorDapp is first installed
|
||||
*/
|
||||
class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet(), includeNotarySchemas: Boolean = false) : SchemaService, SingletonSerializeAsToken() {
|
||||
class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet(), includeNotarySchemas: Boolean = false) : SchemaService, SingletonSerializeAsToken() {
|
||||
// Core Entities used by a Node
|
||||
object NodeCore
|
||||
|
||||
@ -77,10 +77,10 @@ class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet(), includeNot
|
||||
mapOf(Pair(CommonSchemaV1, SchemaOptions()),
|
||||
Pair(VaultSchemaV1, SchemaOptions()),
|
||||
Pair(NodeInfoSchemaV1, SchemaOptions()),
|
||||
Pair(NodeCoreV1, SchemaOptions()))
|
||||
private val notarySchemas = if (includeNotarySchemas) mapOf(Pair(NodeNotaryV1, SchemaOptions())) else emptyMap<MappedSchema, SchemaService.SchemaOptions>()
|
||||
Pair(NodeCoreV1, SchemaOptions())) +
|
||||
if (includeNotarySchemas) mapOf(Pair(NodeNotaryV1, SchemaOptions())) else emptyMap()
|
||||
|
||||
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas + notarySchemas + extraSchemas.associateBy({ it }, { SchemaOptions() })
|
||||
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas + extraSchemas.associateBy({ it }, { SchemaOptions() })
|
||||
|
||||
// Currently returns all schemas supported by the state, with no filtering or enrichment.
|
||||
override fun selectSchemas(state: ContractState): Iterable<MappedSchema> {
|
||||
|
@ -19,6 +19,7 @@
|
||||
<include file="migration/node-core.changelog-v5.xml"/>
|
||||
<include file="migration/node-core.changelog-pkey.xml"/>
|
||||
<include file="migration/node-core.changelog-postgres-blob.xml"/>
|
||||
<include file="migration/node-core.changelog-v8.xml"/>
|
||||
<include file="migration/node-core.changelog-tx-mapping.xml"/>
|
||||
|
||||
</databaseChangeLog>
|
||||
|
17
node/src/main/resources/migration/node-core.changelog-v8.xml
Normal file
17
node/src/main/resources/migration/node-core.changelog-v8.xml
Normal file
@ -0,0 +1,17 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<changeSet author="R3.Corda" id="conditional_attchments_rename">
|
||||
<preConditions onFail="MARK_RAN"><tableExists tableName="NODE_ATTCHMENTS_CONTRACTS"/></preConditions>
|
||||
<renameTable oldTableName="NODE_ATTCHMENTS_CONTRACTS" newTableName="NODE_ATTACHMENTS_CONTRACTS" />
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="conditional_attchments_contracts">
|
||||
<preConditions onFail="MARK_RAN"><tableExists tableName="NODE_ATTACHMENTS_CONTRACT_CLASS_NAME"/></preConditions>
|
||||
<renameTable oldTableName="NODE_ATTACHMENTS_CONTRACT_CLASS_NAME" newTableName="NODE_ATTACHMENTS_CONTRACTS" />
|
||||
</changeSet>
|
||||
|
||||
</databaseChangeLog>
|
@ -17,5 +17,6 @@
|
||||
<include file="migration/node-info.changelog-init.xml"/>
|
||||
<include file="migration/node-info.changelog-v1.xml"/>
|
||||
<include file="migration/node-info.changelog-v2.xml"/>
|
||||
<include file="migration/node-info.changelog-v3.xml"/>
|
||||
|
||||
</databaseChangeLog>
|
||||
|
@ -0,0 +1,9 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
|
||||
|
||||
<changeSet author="R3.Corda" id="column_host_name">
|
||||
<renameColumn newColumnName="host_name" oldColumnName="host" tableName="node_info_hosts"/>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
@ -21,7 +21,6 @@ import net.corda.core.identity.Party;
|
||||
import net.corda.core.messaging.DataFeed;
|
||||
import net.corda.core.node.services.IdentityService;
|
||||
import net.corda.core.node.services.Vault;
|
||||
import net.corda.core.node.services.VaultQueryException;
|
||||
import net.corda.core.node.services.VaultService;
|
||||
import net.corda.core.node.services.vault.*;
|
||||
import net.corda.core.node.services.vault.QueryCriteria.LinearStateQueryCriteria;
|
||||
@ -44,7 +43,6 @@ import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import rx.Observable;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -110,9 +110,11 @@ open class MockServices private constructor(
|
||||
initialIdentity: TestIdentity,
|
||||
networkParameters: NetworkParameters = testNetworkParameters(),
|
||||
vararg moreKeys: KeyPair): Pair<CordaPersistence, MockServices> {
|
||||
|
||||
val cordappLoader = cordappLoaderForPackages(cordappPackages)
|
||||
val dataSourceProps = makeInternalTestDataSourceProperties(initialIdentity.name.organisation, SecureHash.randomSHA256().toString())
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
|
||||
//TODO different schemas based on h2 or not
|
||||
val database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(initialIdentity.name.organisation), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService)
|
||||
val mockService = database.transaction {
|
||||
object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user