mirror of
https://github.com/corda/corda.git
synced 2025-04-19 08:36:39 +00:00
Merge pull request #6534 from corda/feature/ENT-5273-unified-database-management
ENT-5273 unified database management
This commit is contained in:
commit
6f2ca8978d
@ -56,7 +56,9 @@ class ReceiveFinalityFlowTest {
|
||||
bob.assertFlowSentForObservationDueToUntrustedAttachmentsException(paymentReceiverId)
|
||||
|
||||
// Restart Bob with the contracts CorDapp so that it can recover from the error
|
||||
bob = mockNet.restartNode(bob, parameters = InternalMockNodeParameters(additionalCordapps = listOf(FINANCE_CONTRACTS_CORDAPP)))
|
||||
bob = mockNet.restartNode(bob,
|
||||
parameters = InternalMockNodeParameters(additionalCordapps = listOf(FINANCE_CONTRACTS_CORDAPP)),
|
||||
nodeFactory = { args -> InternalMockNetwork.MockNode(args, allowAppSchemaUpgradeWithCheckpoints = true) })
|
||||
mockNet.runNetwork()
|
||||
assertThat(bob.services.getCashBalance(GBP)).isEqualTo(100.POUNDS)
|
||||
}
|
||||
|
@ -0,0 +1,108 @@
|
||||
package net.corda.nodeapitests.internal.persistence
|
||||
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.node.internal.DataSourceFactory
|
||||
import net.corda.node.internal.startHikariPool
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseMigrationException
|
||||
import net.corda.nodeapi.internal.persistence.HibernateSchemaChangeException
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.node.MockServices
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Table
|
||||
import javax.sql.DataSource
|
||||
|
||||
class MigrationSchemaSyncTest{
|
||||
object TestSchemaFamily
|
||||
|
||||
object GoodSchema : MappedSchema(schemaFamily = TestSchemaFamily.javaClass, version = 1, mappedTypes = listOf(State::class.java)) {
|
||||
@Entity
|
||||
@Table(name = "State")
|
||||
class State(
|
||||
@Column
|
||||
var id: String
|
||||
) : PersistentState(PersistentStateRef(UniqueIdentifier().toString(), 0 ))
|
||||
|
||||
override val migrationResource: String? = "goodschema.testmigration"
|
||||
}
|
||||
|
||||
lateinit var hikariProperties: Properties
|
||||
lateinit var dataSource: DataSource
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
hikariProperties = MockServices.makeTestDataSourceProperties()
|
||||
dataSource = DataSourceFactory.createDataSource(hikariProperties)
|
||||
}
|
||||
|
||||
private fun schemaMigration() = SchemaMigration(dataSource, null, null,
|
||||
TestIdentity(ALICE_NAME, 70).name)
|
||||
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun testSchemaScript(){
|
||||
schemaMigration().runMigration(false, setOf(GoodSchema), true)
|
||||
val persistence = CordaPersistence(
|
||||
false,
|
||||
setOf(GoodSchema),
|
||||
hikariProperties.getProperty("dataSource.url"),
|
||||
TestingNamedCacheFactory()
|
||||
)
|
||||
persistence.startHikariPool(hikariProperties){ _, _ -> Unit}
|
||||
|
||||
persistence.transaction {
|
||||
this.entityManager.persist(GoodSchema.State("id"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun checkThatSchemaSyncFixesLiquibaseException(){
|
||||
// Schema is missing if no migration is run and hibernate not allowed to create
|
||||
val persistenceBlank = CordaPersistence(
|
||||
false,
|
||||
setOf(GoodSchema),
|
||||
hikariProperties.getProperty("dataSource.url"),
|
||||
TestingNamedCacheFactory()
|
||||
)
|
||||
persistenceBlank.startHikariPool(hikariProperties){ _, _ -> Unit}
|
||||
assertThatThrownBy{ persistenceBlank.transaction {this.entityManager.persist(GoodSchema.State("id"))}}
|
||||
.isInstanceOf(HibernateSchemaChangeException::class.java)
|
||||
.hasMessageContaining("Incompatible schema")
|
||||
|
||||
// create schema via hibernate - now schema gets created and we can write
|
||||
val persistenceHibernate = CordaPersistence(
|
||||
false,
|
||||
setOf(GoodSchema),
|
||||
hikariProperties.getProperty("dataSource.url"),
|
||||
TestingNamedCacheFactory(),
|
||||
allowHibernateToManageAppSchema = true
|
||||
)
|
||||
persistenceHibernate.startHikariPool(hikariProperties){ _, _ -> Unit}
|
||||
persistenceHibernate.transaction { entityManager.persist(GoodSchema.State("id_hibernate")) }
|
||||
|
||||
// if we try to run schema migration now, the changelog and the schemas are out of sync
|
||||
assertThatThrownBy { schemaMigration().runMigration(false, setOf(GoodSchema), true) }
|
||||
.isInstanceOf(DatabaseMigrationException::class.java)
|
||||
.hasMessageContaining("Table \"STATE\" already exists")
|
||||
|
||||
// update the change log with schemas we know exist
|
||||
schemaMigration().synchroniseSchemas(setOf(GoodSchema), true)
|
||||
|
||||
// now run migration runs clean
|
||||
schemaMigration().runMigration(false, setOf(GoodSchema), true)
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -2,12 +2,11 @@ package net.corda.nodeapitests.internal.persistence
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.MissingMigrationException
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||
import net.corda.node.internal.DataSourceFactory
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.nodeapi.internal.persistence.MissingMigrationException
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.node.MockServices
|
||||
@ -40,25 +39,21 @@ class MissingSchemaMigrationTest {
|
||||
dataSource = DataSourceFactory.createDataSource(hikariProperties)
|
||||
}
|
||||
|
||||
private fun createSchemaMigration(schemasToMigrate: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean): SchemaMigration {
|
||||
val databaseConfig = DatabaseConfig()
|
||||
return SchemaMigration(schemasToMigrate, dataSource, databaseConfig, null, null,
|
||||
TestIdentity(ALICE_NAME, 70).name, forceThrowOnMissingMigration)
|
||||
}
|
||||
private fun schemaMigration() = SchemaMigration(dataSource, null, null,
|
||||
TestIdentity(ALICE_NAME, 70).name)
|
||||
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `test that an error is thrown when forceThrowOnMissingMigration is set and a mapped schema is missing a migration`() {
|
||||
assertThatThrownBy {
|
||||
createSchemaMigration(setOf(GoodSchema), true)
|
||||
.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
|
||||
schemaMigration().runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }, setOf(GoodSchema), true)
|
||||
}.isInstanceOf(MissingMigrationException::class.java)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `test that an error is not thrown when forceThrowOnMissingMigration is not set and a mapped schema is missing a migration`() {
|
||||
assertDoesNotThrow {
|
||||
createSchemaMigration(setOf(GoodSchema), false)
|
||||
.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
|
||||
schemaMigration().runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }, setOf(GoodSchema), false)
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,8 +61,7 @@ class MissingSchemaMigrationTest {
|
||||
fun `test that there are no missing migrations for the node`() {
|
||||
assertDoesNotThrow("This test failure indicates " +
|
||||
"a new table has been added to the node without the appropriate migration scripts being present") {
|
||||
createSchemaMigration(NodeSchemaService().internalSchemas(), false)
|
||||
.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
|
||||
schemaMigration().runMigration(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }, NodeSchemaService().internalSchemas, true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,19 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
|
||||
logicalFilePath="migration/node-services.changelog-init.xml">
|
||||
|
||||
<changeSet author="R3.Corda" id="unittest-goodschema-v1">
|
||||
<createTable tableName="State">
|
||||
<column name="output_index" type="INT">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="transaction_id" type="NVARCHAR(64)">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
<column name="id" type="NVARCHAR(255)"/>
|
||||
</createTable>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
@ -75,6 +75,15 @@ constructor(private val initSerEnv: Boolean,
|
||||
"generate-node-info"
|
||||
)
|
||||
|
||||
private val createSchemasCmd = listOf(
|
||||
Paths.get(System.getProperty("java.home"), "bin", "java").toString(),
|
||||
"-jar",
|
||||
"corda.jar",
|
||||
"run-migration-scripts",
|
||||
"--core-schemas",
|
||||
"--app-schemas"
|
||||
)
|
||||
|
||||
private const val LOGS_DIR_NAME = "logs"
|
||||
|
||||
private val jarsThatArentCordapps = setOf("corda.jar", "runnodes.jar")
|
||||
@ -92,7 +101,9 @@ constructor(private val initSerEnv: Boolean,
|
||||
}
|
||||
val executor = Executors.newFixedThreadPool(numParallelProcesses)
|
||||
return try {
|
||||
nodeDirs.map { executor.fork { generateNodeInfo(it) } }.transpose().getOrThrow()
|
||||
nodeDirs.map { executor.fork {
|
||||
createDbSchemas(it)
|
||||
generateNodeInfo(it) } }.transpose().getOrThrow()
|
||||
} finally {
|
||||
warningTimer.cancel()
|
||||
executor.shutdownNow()
|
||||
@ -100,23 +111,31 @@ constructor(private val initSerEnv: Boolean,
|
||||
}
|
||||
|
||||
private fun generateNodeInfo(nodeDir: Path): Path {
|
||||
runNodeJob(nodeInfoGenCmd, nodeDir, "node-info-gen.log")
|
||||
return nodeDir.list { paths ->
|
||||
paths.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get()
|
||||
}
|
||||
}
|
||||
|
||||
private fun createDbSchemas(nodeDir: Path) {
|
||||
runNodeJob(createSchemasCmd, nodeDir, "node-run-migration.log")
|
||||
}
|
||||
|
||||
private fun runNodeJob(command: List<String>, nodeDir: Path, logfileName: String) {
|
||||
val logsDir = (nodeDir / LOGS_DIR_NAME).createDirectories()
|
||||
val nodeInfoGenFile = (logsDir / "node-info-gen.log").toFile()
|
||||
val process = ProcessBuilder(nodeInfoGenCmd)
|
||||
val nodeRedirectFile = (logsDir / logfileName).toFile()
|
||||
val process = ProcessBuilder(command)
|
||||
.directory(nodeDir.toFile())
|
||||
.redirectErrorStream(true)
|
||||
.redirectOutput(nodeInfoGenFile)
|
||||
.redirectOutput(nodeRedirectFile)
|
||||
.apply { environment()["CAPSULE_CACHE_DIR"] = "../.cache" }
|
||||
.start()
|
||||
try {
|
||||
if (!process.waitFor(3, TimeUnit.MINUTES)) {
|
||||
process.destroyForcibly()
|
||||
printNodeInfoGenLogToConsole(nodeInfoGenFile)
|
||||
}
|
||||
printNodeInfoGenLogToConsole(nodeInfoGenFile) { process.exitValue() == 0 }
|
||||
return nodeDir.list { paths ->
|
||||
paths.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get()
|
||||
printNodeOutputToConsoleAndThrow(nodeRedirectFile)
|
||||
}
|
||||
if (process.exitValue() != 0) printNodeOutputToConsoleAndThrow(nodeRedirectFile)
|
||||
} catch (e: InterruptedException) {
|
||||
// Don't leave this process dangling if the thread is interrupted.
|
||||
process.destroyForcibly()
|
||||
@ -124,18 +143,16 @@ constructor(private val initSerEnv: Boolean,
|
||||
}
|
||||
}
|
||||
|
||||
private fun printNodeInfoGenLogToConsole(nodeInfoGenFile: File, check: (() -> Boolean) = { true }) {
|
||||
if (!check.invoke()) {
|
||||
val nodeDir = nodeInfoGenFile.parent
|
||||
val nodeIdentifier = try {
|
||||
ConfigFactory.parseFile((nodeDir / "node.conf").toFile()).getString("myLegalName")
|
||||
} catch (e: ConfigException) {
|
||||
nodeDir
|
||||
}
|
||||
System.err.println("#### Error while generating node info file $nodeIdentifier ####")
|
||||
nodeInfoGenFile.inputStream().copyTo(System.err)
|
||||
throw IllegalStateException("Error while generating node info file. Please check the logs in $nodeDir.")
|
||||
private fun printNodeOutputToConsoleAndThrow(stdoutFile: File) {
|
||||
val nodeDir = stdoutFile.parent
|
||||
val nodeIdentifier = try {
|
||||
ConfigFactory.parseFile((nodeDir / "node.conf").toFile()).getString("myLegalName")
|
||||
} catch (e: ConfigException) {
|
||||
nodeDir
|
||||
}
|
||||
System.err.println("#### Error while generating node info file $nodeIdentifier ####")
|
||||
stdoutFile.inputStream().copyTo(System.err)
|
||||
throw IllegalStateException("Error while generating node info file. Please check the logs in $nodeDir.")
|
||||
}
|
||||
|
||||
const val DEFAULT_MAX_MESSAGE_SIZE: Int = 10485760
|
||||
|
@ -31,24 +31,12 @@ import javax.sql.DataSource
|
||||
*/
|
||||
const val NODE_DATABASE_PREFIX = "node_"
|
||||
|
||||
enum class SchemaInitializationType{
|
||||
NONE,
|
||||
VALIDATE,
|
||||
UPDATE
|
||||
}
|
||||
|
||||
// This class forms part of the node config and so any changes to it must be handled with care
|
||||
data class DatabaseConfig(
|
||||
val initialiseSchema: Boolean = Defaults.initialiseSchema,
|
||||
val initialiseAppSchema: SchemaInitializationType = Defaults.initialiseAppSchema,
|
||||
val transactionIsolationLevel: TransactionIsolationLevel = Defaults.transactionIsolationLevel,
|
||||
val exportHibernateJMXStatistics: Boolean = Defaults.exportHibernateJMXStatistics,
|
||||
val mappedSchemaCacheSize: Long = Defaults.mappedSchemaCacheSize
|
||||
) {
|
||||
object Defaults {
|
||||
val initialiseSchema = true
|
||||
val initialiseAppSchema = SchemaInitializationType.UPDATE
|
||||
val transactionIsolationLevel = TransactionIsolationLevel.REPEATABLE_READ
|
||||
val exportHibernateJMXStatistics = false
|
||||
val mappedSchemaCacheSize = 100L
|
||||
}
|
||||
@ -67,6 +55,10 @@ enum class TransactionIsolationLevel {
|
||||
*/
|
||||
val jdbcString = "TRANSACTION_$name"
|
||||
val jdbcValue: Int = java.sql.Connection::class.java.getField(jdbcString).get(null) as Int
|
||||
|
||||
companion object{
|
||||
val default = READ_COMMITTED
|
||||
}
|
||||
}
|
||||
|
||||
internal val _prohibitDatabaseAccess = ThreadLocal.withInitial { false }
|
||||
@ -96,27 +88,28 @@ fun <T> withoutDatabaseAccess(block: () -> T): T {
|
||||
val contextDatabaseOrNull: CordaPersistence? get() = _contextDatabase.get()
|
||||
|
||||
class CordaPersistence(
|
||||
databaseConfig: DatabaseConfig,
|
||||
exportHibernateJMXStatistics: Boolean,
|
||||
schemas: Set<MappedSchema>,
|
||||
val jdbcUrl: String,
|
||||
cacheFactory: NamedCacheFactory,
|
||||
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet(),
|
||||
customClassLoader: ClassLoader? = null,
|
||||
val closeConnection: Boolean = true,
|
||||
val errorHandler: DatabaseTransaction.(e: Exception) -> Unit = {}
|
||||
val errorHandler: DatabaseTransaction.(e: Exception) -> Unit = {},
|
||||
allowHibernateToManageAppSchema: Boolean = false
|
||||
) : Closeable {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
|
||||
private val defaultIsolationLevel = TransactionIsolationLevel.default
|
||||
val hibernateConfig: HibernateConfiguration by lazy {
|
||||
transaction {
|
||||
try {
|
||||
HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl, cacheFactory, customClassLoader)
|
||||
HibernateConfiguration(schemas, exportHibernateJMXStatistics, attributeConverters, jdbcUrl, cacheFactory, customClassLoader, allowHibernateToManageAppSchema)
|
||||
} catch (e: Exception) {
|
||||
when (e) {
|
||||
is SchemaManagementException -> throw HibernateSchemaChangeException("Incompatible schema change detected. Please run the node with database.initialiseSchema=true. Reason: ${e.message}", e)
|
||||
is SchemaManagementException -> throw HibernateSchemaChangeException("Incompatible schema change detected. Please run schema migration scripts (node with sub-command run-migration-scripts). Reason: ${e.message}", e)
|
||||
else -> throw HibernateConfigException("Could not create Hibernate configuration: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
@ -19,11 +19,12 @@ import javax.persistence.AttributeConverter
|
||||
|
||||
class HibernateConfiguration(
|
||||
schemas: Set<MappedSchema>,
|
||||
private val databaseConfig: DatabaseConfig,
|
||||
private val exportHibernateJMXStatistics: Boolean,
|
||||
private val attributeConverters: Collection<AttributeConverter<*, *>>,
|
||||
jdbcUrl: String,
|
||||
cacheFactory: NamedCacheFactory,
|
||||
val customClassLoader: ClassLoader? = null
|
||||
val customClassLoader: ClassLoader? = null,
|
||||
val allowHibernateToManageAppSchema: Boolean = false
|
||||
) {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
@ -64,10 +65,10 @@ class HibernateConfiguration(
|
||||
fun sessionFactoryForSchemas(key: Set<MappedSchema>): SessionFactory = sessionFactories.get(key, ::makeSessionFactoryForSchemas)!!
|
||||
|
||||
private fun makeSessionFactoryForSchemas(schemas: Set<MappedSchema>): SessionFactory {
|
||||
val sessionFactory = sessionFactoryFactory.makeSessionFactoryForSchemas(databaseConfig, schemas, customClassLoader, attributeConverters)
|
||||
val sessionFactory = sessionFactoryFactory.makeSessionFactoryForSchemas(schemas, customClassLoader, attributeConverters, allowHibernateToManageAppSchema)
|
||||
|
||||
// export Hibernate JMX statistics
|
||||
if (databaseConfig.exportHibernateJMXStatistics)
|
||||
if (exportHibernateJMXStatistics)
|
||||
initStatistics(sessionFactory)
|
||||
|
||||
return sessionFactory
|
||||
@ -75,7 +76,7 @@ class HibernateConfiguration(
|
||||
|
||||
// NOTE: workaround suggested to overcome deprecation of StatisticsService (since Hibernate v4.0)
|
||||
// https://stackoverflow.com/questions/23606092/hibernate-upgrade-statisticsservice
|
||||
fun initStatistics(sessionFactory: SessionFactory) {
|
||||
private fun initStatistics(sessionFactory: SessionFactory) {
|
||||
val statsName = ObjectName("org.hibernate:type=statistics")
|
||||
val mbeanServer = ManagementFactory.getPlatformMBeanServer()
|
||||
|
||||
|
@ -0,0 +1,8 @@
|
||||
package net.corda.nodeapi.internal.persistence
|
||||
|
||||
import liquibase.database.Database
|
||||
import liquibase.database.jvm.JdbcConnection
|
||||
|
||||
interface LiquibaseDatabaseFactory {
|
||||
fun getLiquibaseDatabase(conn: JdbcConnection): Database
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
package net.corda.nodeapi.internal.persistence
|
||||
|
||||
import liquibase.database.Database
|
||||
import liquibase.database.DatabaseFactory
|
||||
import liquibase.database.jvm.JdbcConnection
|
||||
|
||||
class LiquibaseDatabaseFactoryImpl : LiquibaseDatabaseFactory {
|
||||
override fun getLiquibaseDatabase(conn: JdbcConnection): Database {
|
||||
return DatabaseFactory.getInstance().findCorrectDatabaseImplementation(conn)
|
||||
}
|
||||
}
|
@ -4,44 +4,40 @@ import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import liquibase.Contexts
|
||||
import liquibase.LabelExpression
|
||||
import liquibase.Liquibase
|
||||
import liquibase.database.Database
|
||||
import liquibase.database.DatabaseFactory
|
||||
import liquibase.database.jvm.JdbcConnection
|
||||
import liquibase.exception.LiquibaseException
|
||||
import liquibase.resource.ClassLoaderResourceAccessor
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.nodeapi.internal.MigrationHelpers.getMigrationResource
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.MigrationHelpers.getMigrationResource
|
||||
import net.corda.nodeapi.internal.cordapp.CordappLoader
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.InputStream
|
||||
import java.nio.file.Path
|
||||
import java.sql.Statement
|
||||
import javax.sql.DataSource
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import javax.sql.DataSource
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
// Migrate the database to the current version, using liquibase.
|
||||
class SchemaMigration(
|
||||
val schemas: Set<MappedSchema>,
|
||||
open class SchemaMigration(
|
||||
val dataSource: DataSource,
|
||||
private val databaseConfig: DatabaseConfig,
|
||||
cordappLoader: CordappLoader? = null,
|
||||
private val currentDirectory: Path?,
|
||||
// This parameter is used by the vault state migration to establish what the node's legal identity is when setting up
|
||||
// its copy of the identity service. It is passed through using a system property. When multiple identity support is added, this will need
|
||||
// reworking so that multiple identities can be passed to the migration.
|
||||
private val ourName: CordaX500Name? = null,
|
||||
// This parameter forces an error to be thrown if there are missing migrations. When using H2, Hibernate will automatically create schemas where they are
|
||||
// missing, so no need to throw unless you're specifically testing whether all the migrations are present.
|
||||
private val forceThrowOnMissingMigration: Boolean = false) {
|
||||
protected val databaseFactory: LiquibaseDatabaseFactory = LiquibaseDatabaseFactoryImpl()) {
|
||||
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
const val NODE_BASE_DIR_KEY = "liquibase.nodeDaseDir"
|
||||
const val NODE_X500_NAME = "liquibase.nodeName"
|
||||
val loader = ThreadLocal<CordappLoader>()
|
||||
private val mutex = ReentrantLock()
|
||||
@JvmStatic
|
||||
protected val mutex = ReentrantLock()
|
||||
}
|
||||
|
||||
init {
|
||||
@ -50,36 +46,86 @@ class SchemaMigration(
|
||||
|
||||
private val classLoader = cordappLoader?.appClassLoader ?: Thread.currentThread().contextClassLoader
|
||||
|
||||
/**
|
||||
* Main entry point to the schema migration.
|
||||
* Called during node startup.
|
||||
/**
|
||||
* Will run the Liquibase migration on the actual database.
|
||||
* @param existingCheckpoints Whether checkpoints exist that would prohibit running a migration
|
||||
* @param schemas The set of MappedSchemas to check
|
||||
* @param forceThrowOnMissingMigration throws an exception if a mapped schema is missing the migration resource. Can be set to false
|
||||
* when allowing hibernate to create missing schemas in dev or tests.
|
||||
*/
|
||||
fun nodeStartup(existingCheckpoints: Boolean) {
|
||||
when {
|
||||
databaseConfig.initialiseSchema -> {
|
||||
migrateOlderDatabaseToUseLiquibase(existingCheckpoints)
|
||||
runMigration(existingCheckpoints)
|
||||
fun runMigration(existingCheckpoints: Boolean, schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean) {
|
||||
val resourcesAndSourceInfo = prepareResources(schemas, forceThrowOnMissingMigration)
|
||||
|
||||
// current version of Liquibase appears to be non-threadsafe
|
||||
// this is apparent when multiple in-process nodes are all running migrations simultaneously
|
||||
mutex.withLock {
|
||||
dataSource.connection.use { connection ->
|
||||
val (runner, _, shouldBlockOnCheckpoints) = prepareRunner(connection, resourcesAndSourceInfo)
|
||||
if (shouldBlockOnCheckpoints && existingCheckpoints)
|
||||
throw CheckpointsException()
|
||||
try {
|
||||
runner.update(Contexts().toString())
|
||||
} catch (exp: LiquibaseException) {
|
||||
throw DatabaseMigrationException(exp.message, exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures that the database is up to date with the latest migration changes.
|
||||
* @param schemas The set of MappedSchemas to check
|
||||
* @param forceThrowOnMissingMigration throws an exception if a mapped schema is missing the migration resource. Can be set to false
|
||||
* when allowing hibernate to create missing schemas in dev or tests.
|
||||
*/
|
||||
fun checkState(schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean) {
|
||||
val resourcesAndSourceInfo = prepareResources(schemas, forceThrowOnMissingMigration)
|
||||
|
||||
// current version of Liquibase appears to be non-threadsafe
|
||||
// this is apparent when multiple in-process nodes are all running migrations simultaneously
|
||||
mutex.withLock {
|
||||
dataSource.connection.use { connection ->
|
||||
val (_, changeToRunCount, _) = prepareRunner(connection, resourcesAndSourceInfo)
|
||||
if (changeToRunCount > 0)
|
||||
throw OutstandingDatabaseChangesException(changeToRunCount)
|
||||
}
|
||||
else -> checkState()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will run the Liquibase migration on the actual database.
|
||||
* Synchronises the changelog table with the schema descriptions passed in without applying any of the changes to the database.
|
||||
* This can be used when migrating a CorDapp that had its schema generated by hibernate to liquibase schema migration, or when
|
||||
* updating from a version of Corda that does not use liquibase for CorDapps
|
||||
* **Warning** - this will not check if the matching schema changes have been applied, it will just generate the changelog
|
||||
* It must not be run on a newly installed CorDapp.
|
||||
* @param schemas The set of schemas to add to the changelog
|
||||
* @param forceThrowOnMissingMigration throw an exception if a mapped schema is missing its migration resource
|
||||
*/
|
||||
private fun runMigration(existingCheckpoints: Boolean) = doRunMigration(run = true, check = false, existingCheckpoints = existingCheckpoints)
|
||||
fun synchroniseSchemas(schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean) {
|
||||
val resourcesAndSourceInfo = prepareResources(schemas, forceThrowOnMissingMigration)
|
||||
|
||||
/**
|
||||
* Ensures that the database is up to date with the latest migration changes.
|
||||
*/
|
||||
private fun checkState() = doRunMigration(run = false, check = true)
|
||||
// current version of Liquibase appears to be non-threadsafe
|
||||
// this is apparent when multiple in-process nodes are all running migrations simultaneously
|
||||
mutex.withLock {
|
||||
dataSource.connection.use { connection ->
|
||||
val (runner, _, _) = prepareRunner(connection, resourcesAndSourceInfo)
|
||||
try {
|
||||
runner.changeLogSync(Contexts().toString())
|
||||
} catch (exp: LiquibaseException) {
|
||||
throw DatabaseMigrationException(exp.message, exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Create a resourse accessor that aggregates the changelogs included in the schemas into one dynamic stream. */
|
||||
private class CustomResourceAccessor(val dynamicInclude: String, val changelogList: List<String?>, classLoader: ClassLoader) : ClassLoaderResourceAccessor(classLoader) {
|
||||
/** Create a resource accessor that aggregates the changelogs included in the schemas into one dynamic stream. */
|
||||
protected class CustomResourceAccessor(val dynamicInclude: String, val changelogList: List<String?>, classLoader: ClassLoader) :
|
||||
ClassLoaderResourceAccessor(classLoader) {
|
||||
override fun getResourcesAsStream(path: String): Set<InputStream> {
|
||||
if (path == dynamicInclude) {
|
||||
// Create a map in Liquibase format including all migration files.
|
||||
val includeAllFiles = mapOf("databaseChangeLog" to changelogList.filter { it != null }.map { file -> mapOf("include" to mapOf("file" to file)) })
|
||||
val includeAllFiles = mapOf("databaseChangeLog"
|
||||
to changelogList.filterNotNull().map { file -> mapOf("include" to mapOf("file" to file)) })
|
||||
|
||||
// Transform it to json.
|
||||
val includeAllFilesJson = ObjectMapper().writeValueAsBytes(includeAllFiles)
|
||||
@ -91,7 +137,7 @@ class SchemaMigration(
|
||||
}
|
||||
}
|
||||
|
||||
private fun logOrThrowMigrationError(mappedSchema: MappedSchema): String? =
|
||||
private fun logOrThrowMigrationError(mappedSchema: MappedSchema, forceThrowOnMissingMigration: Boolean): String? =
|
||||
if (forceThrowOnMissingMigration) {
|
||||
throw MissingMigrationException(mappedSchema)
|
||||
} else {
|
||||
@ -99,143 +145,38 @@ class SchemaMigration(
|
||||
null
|
||||
}
|
||||
|
||||
private fun doRunMigration(
|
||||
run: Boolean,
|
||||
check: Boolean,
|
||||
existingCheckpoints: Boolean? = null
|
||||
) {
|
||||
// Virtual file name of the changelog that includes all schemas.
|
||||
val dynamicInclude = "master.changelog.json"
|
||||
|
||||
// Virtual file name of the changelog that includes all schemas.
|
||||
val dynamicInclude = "master.changelog.json"
|
||||
|
||||
dataSource.connection.use { connection ->
|
||||
|
||||
// Collect all changelog files referenced in the included schemas.
|
||||
val changelogList = schemas.mapNotNull { mappedSchema ->
|
||||
val resource = getMigrationResource(mappedSchema, classLoader)
|
||||
when {
|
||||
resource != null -> resource
|
||||
// Corda OS FinanceApp in v3 has no Liquibase script, so no error is raised
|
||||
(mappedSchema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" || mappedSchema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1") && mappedSchema.migrationResource == null -> null
|
||||
else -> logOrThrowMigrationError(mappedSchema)
|
||||
}
|
||||
}
|
||||
|
||||
val path = currentDirectory?.toString()
|
||||
if (path != null) {
|
||||
System.setProperty(NODE_BASE_DIR_KEY, path) // base dir for any custom change set which may need to load a file (currently AttachmentVersionNumberMigration)
|
||||
}
|
||||
if (ourName != null) {
|
||||
System.setProperty(NODE_X500_NAME, ourName.toString())
|
||||
}
|
||||
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, changelogList, classLoader)
|
||||
checkResourcesInClassPath(changelogList)
|
||||
|
||||
// current version of Liquibase appears to be non-threadsafe
|
||||
// this is apparent when multiple in-process nodes are all running migrations simultaneously
|
||||
mutex.withLock {
|
||||
val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection)))
|
||||
|
||||
val unRunChanges = liquibase.listUnrunChangeSets(Contexts(), LabelExpression())
|
||||
|
||||
when {
|
||||
(run && !check) && (unRunChanges.isNotEmpty() && existingCheckpoints!!) -> throw CheckpointsException() // Do not allow database migration when there are checkpoints
|
||||
run && !check -> liquibase.update(Contexts())
|
||||
check && !run && unRunChanges.isNotEmpty() -> throw OutstandingDatabaseChangesException(unRunChanges.size)
|
||||
check && !run -> {
|
||||
} // Do nothing will be interpreted as "check succeeded"
|
||||
else -> throw IllegalStateException("Invalid usage.")
|
||||
}
|
||||
protected fun prepareResources(schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean): List<Pair<CustomResourceAccessor, String>> {
|
||||
// Collect all changelog files referenced in the included schemas.
|
||||
val changelogList = schemas.mapNotNull { mappedSchema ->
|
||||
val resource = getMigrationResource(mappedSchema, classLoader)
|
||||
when {
|
||||
resource != null -> resource
|
||||
else -> logOrThrowMigrationError(mappedSchema, forceThrowOnMissingMigration)
|
||||
}
|
||||
}
|
||||
|
||||
val path = currentDirectory?.toString()
|
||||
if (path != null) {
|
||||
System.setProperty(NODE_BASE_DIR_KEY, path) // base dir for any custom change set which may need to load a file (currently AttachmentVersionNumberMigration)
|
||||
}
|
||||
if (ourName != null) {
|
||||
System.setProperty(NODE_X500_NAME, ourName.toString())
|
||||
}
|
||||
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, changelogList, classLoader)
|
||||
checkResourcesInClassPath(changelogList)
|
||||
return listOf(Pair(customResourceAccessor, ""))
|
||||
}
|
||||
|
||||
private fun getLiquibaseDatabase(conn: JdbcConnection): Database {
|
||||
return DatabaseFactory.getInstance().findCorrectDatabaseImplementation(conn)
|
||||
}
|
||||
protected fun prepareRunner(connection: Connection,
|
||||
resourcesAndSourceInfo: List<Pair<CustomResourceAccessor, String>>): Triple<Liquibase, Int, Boolean> {
|
||||
require(resourcesAndSourceInfo.size == 1)
|
||||
val liquibase = Liquibase(dynamicInclude, resourcesAndSourceInfo.single().first, databaseFactory.getLiquibaseDatabase(JdbcConnection(connection)))
|
||||
|
||||
/** For existing database created before verions 4.0 add Liquibase support - creates DATABASECHANGELOG and DATABASECHANGELOGLOCK tables and marks changesets as executed. */
|
||||
private fun migrateOlderDatabaseToUseLiquibase(existingCheckpoints: Boolean): Boolean {
|
||||
val isFinanceAppWithLiquibase = schemas.any { schema ->
|
||||
(schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1"
|
||||
|| schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1")
|
||||
&& schema.migrationResource != null
|
||||
}
|
||||
val noLiquibaseEntryLogForFinanceApp: (Statement) -> Boolean = {
|
||||
it.execute("SELECT COUNT(*) FROM DATABASECHANGELOG WHERE FILENAME IN ('migration/cash.changelog-init.xml','migration/commercial-paper.changelog-init.xml')")
|
||||
if (it.resultSet.next())
|
||||
it.resultSet.getInt(1) == 0
|
||||
else
|
||||
true
|
||||
}
|
||||
|
||||
val (isExistingDBWithoutLiquibase, isFinanceAppWithLiquibaseNotMigrated) = dataSource.connection.use {
|
||||
|
||||
val existingDatabase = it.metaData.getTables(null, null, "NODE%", null).next()
|
||||
// Lower case names for PostgreSQL
|
||||
|| it.metaData.getTables(null, null, "node%", null).next()
|
||||
|
||||
val hasLiquibase = it.metaData.getTables(null, null, "DATABASECHANGELOG%", null).next()
|
||||
// Lower case names for PostgreSQL
|
||||
|| it.metaData.getTables(null, null, "databasechangelog%", null).next()
|
||||
|
||||
val isFinanceAppWithLiquibaseNotMigrated = isFinanceAppWithLiquibase // If Finance App is pre v4.0 then no need to migrate it so no need to check.
|
||||
&& existingDatabase
|
||||
&& (!hasLiquibase // Migrate as other tables.
|
||||
|| (hasLiquibase && it.createStatement().use { noLiquibaseEntryLogForFinanceApp(it) })) // If Liquibase is already in the database check if Finance App schema log is missing.
|
||||
|
||||
Pair(existingDatabase && !hasLiquibase, isFinanceAppWithLiquibaseNotMigrated)
|
||||
}
|
||||
|
||||
if (isExistingDBWithoutLiquibase && existingCheckpoints)
|
||||
throw CheckpointsException()
|
||||
|
||||
// Schema migrations pre release 4.0
|
||||
val preV4Baseline = mutableListOf<String>()
|
||||
if (isExistingDBWithoutLiquibase) {
|
||||
preV4Baseline.addAll(listOf("migration/common.changelog-init.xml",
|
||||
"migration/node-info.changelog-init.xml",
|
||||
"migration/node-info.changelog-v1.xml",
|
||||
"migration/node-info.changelog-v2.xml",
|
||||
"migration/node-core.changelog-init.xml",
|
||||
"migration/node-core.changelog-v3.xml",
|
||||
"migration/node-core.changelog-v4.xml",
|
||||
"migration/node-core.changelog-v5.xml",
|
||||
"migration/node-core.changelog-pkey.xml",
|
||||
"migration/vault-schema.changelog-init.xml",
|
||||
"migration/vault-schema.changelog-v3.xml",
|
||||
"migration/vault-schema.changelog-v4.xml",
|
||||
"migration/vault-schema.changelog-pkey.xml"))
|
||||
|
||||
if (schemas.any { schema -> schema.migrationResource == "node-notary.changelog-master" })
|
||||
preV4Baseline.addAll(listOf("migration/node-notary.changelog-init.xml",
|
||||
"migration/node-notary.changelog-v1.xml"))
|
||||
|
||||
if (schemas.any { schema -> schema.migrationResource == "notary-raft.changelog-master" })
|
||||
preV4Baseline.addAll(listOf("migration/notary-raft.changelog-init.xml",
|
||||
"migration/notary-raft.changelog-v1.xml"))
|
||||
|
||||
if (schemas.any { schema -> schema.migrationResource == "notary-bft-smart.changelog-master" })
|
||||
preV4Baseline.addAll(listOf("migration/notary-bft-smart.changelog-init.xml",
|
||||
"migration/notary-bft-smart.changelog-v1.xml"))
|
||||
}
|
||||
if (isFinanceAppWithLiquibaseNotMigrated) {
|
||||
preV4Baseline.addAll(listOf("migration/cash.changelog-init.xml",
|
||||
"migration/cash.changelog-v1.xml",
|
||||
"migration/commercial-paper.changelog-init.xml",
|
||||
"migration/commercial-paper.changelog-v1.xml"))
|
||||
}
|
||||
|
||||
if (preV4Baseline.isNotEmpty()) {
|
||||
val dynamicInclude = "master.changelog.json" // Virtual file name of the changelog that includes all schemas.
|
||||
checkResourcesInClassPath(preV4Baseline)
|
||||
dataSource.connection.use { connection ->
|
||||
val customResourceAccessor = CustomResourceAccessor(dynamicInclude, preV4Baseline, classLoader)
|
||||
val liquibase = Liquibase(dynamicInclude, customResourceAccessor, getLiquibaseDatabase(JdbcConnection(connection)))
|
||||
liquibase.changeLogSync(Contexts(), LabelExpression())
|
||||
}
|
||||
}
|
||||
return isExistingDBWithoutLiquibase || isFinanceAppWithLiquibaseNotMigrated
|
||||
val unRunChanges = liquibase.listUnrunChangeSets(Contexts(), LabelExpression())
|
||||
return Triple(liquibase, unRunChanges.size, !unRunChanges.isEmpty())
|
||||
}
|
||||
|
||||
private fun checkResourcesInClassPath(resources: List<String?>) {
|
||||
@ -247,7 +188,7 @@ class SchemaMigration(
|
||||
}
|
||||
}
|
||||
|
||||
open class DatabaseMigrationException(message: String) : IllegalArgumentException(message) {
|
||||
open class DatabaseMigrationException(message: String?, cause: Throwable? = null) : IllegalArgumentException(message, cause) {
|
||||
override val message: String = super.message!!
|
||||
}
|
||||
|
||||
@ -269,6 +210,6 @@ class CheckpointsException : DatabaseMigrationException("Attempting to update th
|
||||
|
||||
class DatabaseIncompatibleException(@Suppress("MemberVisibilityCanBePrivate") private val reason: String) : DatabaseMigrationException(errorMessageFor(reason)) {
|
||||
internal companion object {
|
||||
fun errorMessageFor(reason: String): String = "Incompatible database schema version detected, please run the node with configuration option database.initialiseSchema=true. Reason: $reason"
|
||||
fun errorMessageFor(reason: String): String = "Incompatible database schema version detected, please run schema migration scripts (node with sub-command run-migration-scripts). Reason: $reason"
|
||||
}
|
||||
}
|
@ -3,9 +3,8 @@ package net.corda.nodeapi.internal.persistence.factory
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.toHexString
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
||||
import net.corda.nodeapi.internal.persistence.SchemaInitializationType
|
||||
import net.corda.nodeapi.internal.persistence.TransactionIsolationLevel
|
||||
import org.hibernate.SessionFactory
|
||||
import org.hibernate.boot.Metadata
|
||||
import org.hibernate.boot.MetadataBuilder
|
||||
@ -26,22 +25,19 @@ abstract class BaseSessionFactoryFactory : CordaSessionFactoryFactory {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
open fun buildHibernateConfig(databaseConfig: DatabaseConfig, metadataSources: MetadataSources): Configuration {
|
||||
open fun buildHibernateConfig(metadataSources: MetadataSources, allowHibernateToManageAppSchema: Boolean): Configuration {
|
||||
val hbm2dll: String =
|
||||
if (databaseConfig.initialiseSchema && databaseConfig.initialiseAppSchema == SchemaInitializationType.UPDATE) {
|
||||
if (allowHibernateToManageAppSchema) {
|
||||
"update"
|
||||
} else if ((!databaseConfig.initialiseSchema && databaseConfig.initialiseAppSchema == SchemaInitializationType.UPDATE)
|
||||
|| databaseConfig.initialiseAppSchema == SchemaInitializationType.VALIDATE) {
|
||||
} else {
|
||||
"validate"
|
||||
} else {
|
||||
"none"
|
||||
}
|
||||
// We set a connection provider as the auto schema generation requires it. The auto schema generation will not
|
||||
// necessarily remain and would likely be replaced by something like Liquibase. For now it is very convenient though.
|
||||
return Configuration(metadataSources).setProperty("hibernate.connection.provider_class", HibernateConfiguration.NodeDatabaseConnectionProvider::class.java.name)
|
||||
.setProperty("hibernate.format_sql", "true")
|
||||
.setProperty("javax.persistence.validation.mode", "none")
|
||||
.setProperty("hibernate.connection.isolation", databaseConfig.transactionIsolationLevel.jdbcValue.toString())
|
||||
.setProperty("hibernate.connection.isolation", TransactionIsolationLevel.default.jdbcValue.toString())
|
||||
.setProperty("hibernate.hbm2ddl.auto", hbm2dll)
|
||||
.setProperty("hibernate.jdbc.time_zone", "UTC")
|
||||
}
|
||||
@ -85,15 +81,15 @@ abstract class BaseSessionFactoryFactory : CordaSessionFactoryFactory {
|
||||
}
|
||||
|
||||
final override fun makeSessionFactoryForSchemas(
|
||||
databaseConfig: DatabaseConfig,
|
||||
schemas: Set<MappedSchema>,
|
||||
customClassLoader: ClassLoader?,
|
||||
attributeConverters: Collection<AttributeConverter<*, *>>): SessionFactory {
|
||||
attributeConverters: Collection<AttributeConverter<*, *>>,
|
||||
allowHibernateToMananageAppSchema: Boolean): SessionFactory {
|
||||
logger.info("Creating session factory for schemas: $schemas")
|
||||
val serviceRegistry = BootstrapServiceRegistryBuilder().build()
|
||||
val metadataSources = MetadataSources(serviceRegistry)
|
||||
|
||||
val config = buildHibernateConfig(databaseConfig, metadataSources)
|
||||
val config = buildHibernateConfig(metadataSources, allowHibernateToMananageAppSchema)
|
||||
schemas.forEach { schema ->
|
||||
schema.mappedTypes.forEach { config.addAnnotatedClass(it) }
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
package net.corda.nodeapi.internal.persistence.factory
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import org.hibernate.SessionFactory
|
||||
import org.hibernate.boot.Metadata
|
||||
import org.hibernate.boot.MetadataBuilder
|
||||
@ -11,10 +10,10 @@ interface CordaSessionFactoryFactory {
|
||||
val databaseType: String
|
||||
fun canHandleDatabase(jdbcUrl: String): Boolean
|
||||
fun makeSessionFactoryForSchemas(
|
||||
databaseConfig: DatabaseConfig,
|
||||
schemas: Set<MappedSchema>,
|
||||
customClassLoader: ClassLoader?,
|
||||
attributeConverters: Collection<AttributeConverter<*, *>>): SessionFactory
|
||||
attributeConverters: Collection<AttributeConverter<*, *>>,
|
||||
allowHibernateToMananageAppSchema: Boolean): SessionFactory
|
||||
fun getExtraConfiguration(key: String): Any?
|
||||
fun buildHibernateMetadata(metadataBuilder: MetadataBuilder, attributeConverters: Collection<AttributeConverter<*, *>>): Metadata
|
||||
}
|
@ -14,7 +14,7 @@ class HibernateConfigurationFactoryLoadingTest {
|
||||
val cacheFactory = mock<NamedCacheFactory>()
|
||||
HibernateConfiguration(
|
||||
emptySet(),
|
||||
DatabaseConfig(),
|
||||
false,
|
||||
emptyList(),
|
||||
jdbcUrl,
|
||||
cacheFactory)
|
||||
|
@ -37,7 +37,8 @@ class FlowCheckpointVersionNodeStartupCheckTest {
|
||||
startNodesInProcess = false,
|
||||
inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
|
||||
cordappsForAllNodes = emptyList(),
|
||||
notarySpecs = emptyList()
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
)) {
|
||||
createSuspendedFlowInBob()
|
||||
val cordappsDir = baseDirectory(BOB_NAME) / "cordapps"
|
||||
|
@ -86,7 +86,7 @@ class NodeStatePersistenceTests {
|
||||
nodeName
|
||||
}()
|
||||
|
||||
val nodeHandle = startNode(providedName = nodeName, rpcUsers = listOf(user), customOverrides = mapOf("devMode" to "false")).getOrThrow()
|
||||
val nodeHandle = startNode(providedName = nodeName, rpcUsers = listOf(user)).getOrThrow()
|
||||
val result = CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
|
||||
val page = it.proxy.vaultQuery(MessageState::class.java)
|
||||
page.states.singleOrNull()
|
||||
|
@ -84,7 +84,8 @@ class NodeRegistrationTest {
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
notarySpecs = listOf(NotarySpec(notaryName)),
|
||||
notaryCustomOverrides = mapOf("devMode" to false)
|
||||
notaryCustomOverrides = mapOf("devMode" to false),
|
||||
allowHibernateToManageAppSchema = false
|
||||
) {
|
||||
startNode(providedName = aliceName, customOverrides = mapOf("devMode" to false)).getOrThrow()
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.serialization.reproduction;
|
||||
|
||||
import com.google.common.io.LineProcessor;
|
||||
import net.corda.client.rpc.CordaRPCClient;
|
||||
import net.corda.core.concurrent.CordaFuture;
|
||||
import net.corda.node.services.Permissions;
|
||||
|
@ -44,7 +44,7 @@ class BootTests {
|
||||
rpc.startFlow(::ObjectInputStreamFlow).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
driver(DriverParameters(cordappsForAllNodes = listOf(enclosedCordapp()))) {
|
||||
driver(DriverParameters(cordappsForAllNodes = listOf(enclosedCordapp()), allowHibernateToManageAppSchema = false)) {
|
||||
val devModeNode = startNode(devParams).getOrThrow()
|
||||
val node = startNode(ALICE_NAME, devMode = false, parameters = params).getOrThrow()
|
||||
|
||||
|
@ -15,6 +15,7 @@ import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
|
||||
@ -23,7 +24,7 @@ class CordappScanningDriverTest {
|
||||
fun `sub-classed initiated flow pointing to the same initiating flow as its super-class`() {
|
||||
val user = User("u", "p", setOf(startFlow<ReceiveFlow>()))
|
||||
// The driver will automatically pick up the annotated flows below
|
||||
driver(DriverParameters(notarySpecs = emptyList())) {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()))) {
|
||||
val (alice, bob) = listOf(
|
||||
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)),
|
||||
startNode(providedName = BOB_NAME)).transpose().getOrThrow()
|
||||
|
@ -17,7 +17,7 @@ import javax.security.auth.x500.X500Principal
|
||||
class NodeKeystoreCheckTest {
|
||||
@Test(timeout=300_000)
|
||||
fun `starting node in non-dev mode with no key store`() {
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = emptyList())) {
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = emptyList(), allowHibernateToManageAppSchema = false)) {
|
||||
assertThatThrownBy {
|
||||
startNode(customOverrides = mapOf("devMode" to false)).getOrThrow()
|
||||
}.hasMessageContaining("One or more keyStores (identity or TLS) or trustStore not found.")
|
||||
@ -26,7 +26,7 @@ class NodeKeystoreCheckTest {
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `node should throw exception if cert path does not chain to the trust root`() {
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = emptyList())) {
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = emptyList(), allowHibernateToManageAppSchema = false)) {
|
||||
// Create keystores.
|
||||
val keystorePassword = "password"
|
||||
val certificatesDirectory = baseDirectory(ALICE_NAME) / "certificates"
|
||||
|
@ -2,32 +2,21 @@ package net.corda.node.persistence
|
||||
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.flows.isQuasarAgentSpecified
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseIncompatibleException
|
||||
import net.corda.node.internal.ConfigurationException
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertNotNull
|
||||
|
||||
class DbSchemaInitialisationTest {
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `database is initialised`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `database initialisation not allowed in config`() {
|
||||
driver(DriverParameters(startNodesInProcess = isQuasarAgentSpecified(), cordappsForAllNodes = emptyList())) {
|
||||
val nodeHandle = {
|
||||
startNode(NodeParameters(customOverrides = mapOf("database.initialiseSchema" to "true"))).getOrThrow()
|
||||
}()
|
||||
assertNotNull(nodeHandle)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `database is not initialised`() {
|
||||
driver(DriverParameters(startNodesInProcess = isQuasarAgentSpecified(), cordappsForAllNodes = emptyList())) {
|
||||
assertFailsWith(DatabaseIncompatibleException::class) {
|
||||
assertFailsWith(ConfigurationException::class) {
|
||||
startNode(NodeParameters(customOverrides = mapOf("database.initialiseSchema" to "false"))).getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -94,7 +94,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
notarySpecs = emptyList()
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
) {
|
||||
val alice = startNode(providedName = ALICE_NAME, devMode = false).getOrThrow() as NodeHandleInternal
|
||||
val nextParams = networkMapServer.networkParameters.copy(
|
||||
@ -148,7 +149,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
notarySpecs = emptyList()
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
) {
|
||||
|
||||
val notary: Party = TestIdentity.fresh("test notary").party
|
||||
@ -169,7 +171,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
notarySpecs = emptyList()
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
) {
|
||||
|
||||
val notary: Party = TestIdentity.fresh("test notary").party
|
||||
@ -193,7 +196,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
notarySpecs = emptyList()
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
) {
|
||||
|
||||
val oldParams = networkMapServer.networkParameters
|
||||
@ -212,7 +216,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
notarySpecs = emptyList()
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
) {
|
||||
|
||||
val oldParams = networkMapServer.networkParameters
|
||||
@ -247,7 +252,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
notarySpecs = emptyList()
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
) {
|
||||
startNode(providedName = ALICE_NAME, devMode = false).getOrThrow().use { aliceNode ->
|
||||
assertDownloadedNetworkParameters(aliceNode)
|
||||
@ -277,7 +283,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
notarySpecs = emptyList(),
|
||||
systemProperties = mapOf("net.corda.node.internal.nodeinfo.publish.interval" to 1.seconds.toString())
|
||||
systemProperties = mapOf("net.corda.node.internal.nodeinfo.publish.interval" to 1.seconds.toString()),
|
||||
allowHibernateToManageAppSchema = false
|
||||
) {
|
||||
startNode(providedName = ALICE_NAME, devMode = false).getOrThrow().use { aliceNode ->
|
||||
val aliceNodeInfo = aliceNode.nodeInfo.serialize().hash
|
||||
|
@ -58,7 +58,7 @@ class RpcExceptionHandlingTest {
|
||||
}
|
||||
}
|
||||
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()))) {
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()), allowHibernateToManageAppSchema = false)) {
|
||||
val (devModeNode, node) = listOf(startNode(params, BOB_NAME),
|
||||
startNode(ALICE_NAME, devMode = false, parameters = params))
|
||||
.transpose()
|
||||
@ -79,7 +79,7 @@ class RpcExceptionHandlingTest {
|
||||
rpc.startFlow(::FlowExceptionFlow, expectedMessage, expectedErrorId).returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()))) {
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()), allowHibernateToManageAppSchema = false)) {
|
||||
val (devModeNode, node) = listOf(startNode(params, BOB_NAME),
|
||||
startNode(ALICE_NAME, devMode = false, parameters = params))
|
||||
.transpose()
|
||||
@ -115,7 +115,7 @@ class RpcExceptionHandlingTest {
|
||||
nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()))) {
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()), allowHibernateToManageAppSchema = false)) {
|
||||
|
||||
assertThatThrownBy { scenario(ALICE_NAME, BOB_NAME,true) }.isInstanceOfSatisfying(CordaRuntimeException::class.java) { exception ->
|
||||
|
||||
|
@ -445,11 +445,11 @@ class VaultObserverExceptionTest {
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),inMemoryDB = false)
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
), inMemoryDB = false)
|
||||
) {
|
||||
val (aliceNode, bobNode) = listOf(ALICE_NAME, BOB_NAME)
|
||||
.map { startNode(providedName = it,
|
||||
@ -537,12 +537,12 @@ class VaultObserverExceptionTest {
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val (aliceNode, bobNode) = listOf(ALICE_NAME, BOB_NAME)
|
||||
.map { startNode(providedName = it,
|
||||
@ -622,12 +622,12 @@ class VaultObserverExceptionTest {
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val (aliceNode, bobNode) = listOf(ALICE_NAME, BOB_NAME)
|
||||
.map { startNode(providedName = it,
|
||||
@ -702,12 +702,12 @@ class VaultObserverExceptionTest {
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val (aliceNode, bobNode) = listOf(ALICE_NAME, BOB_NAME)
|
||||
.map { startNode(providedName = it,
|
||||
@ -762,12 +762,12 @@ class VaultObserverExceptionTest {
|
||||
fun `Accessing NodeVaultService rawUpdates from a flow is not allowed` () {
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
@ -792,12 +792,12 @@ class VaultObserverExceptionTest {
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.transactionfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")),
|
||||
inMemoryDB = false)
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.transactionfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
@ -823,12 +823,12 @@ class VaultObserverExceptionTest {
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.transactionfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")),
|
||||
inMemoryDB = false)
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.transactionfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
// Subscribing with custom SafeSubscriber; the custom SafeSubscriber will not get replaced by a ResilientSubscriber
|
||||
// meaning that it will behave as a SafeSubscriber; it will get unsubscribed upon throwing an error.
|
||||
|
@ -48,6 +48,14 @@ open class SharedNodeCmdLineOptions {
|
||||
)
|
||||
var devMode: Boolean? = null
|
||||
|
||||
@Option(
|
||||
names = ["--allow-hibernate-to-manage-app-schema"],
|
||||
description = ["Allows hibernate to create/modify app schema for CorDapps based on their mapped schema.",
|
||||
"Use this for rapid app development or for compatibility with pre-4.6 CorDapps.",
|
||||
"Only available in dev mode."]
|
||||
)
|
||||
var allowHibernateToManageAppSchema: Boolean = false
|
||||
|
||||
open fun parseConfiguration(configuration: Config): Valid<NodeConfiguration> {
|
||||
val option = Configuration.Options(strict = unknownConfigKeysPolicy == UnknownConfigKeysPolicy.FAIL)
|
||||
return configuration.parseAsNodeConfiguration(option)
|
||||
|
@ -178,7 +178,6 @@ import org.slf4j.Logger
|
||||
import rx.Scheduler
|
||||
import java.io.IOException
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.security.KeyStoreException
|
||||
import java.security.cert.X509Certificate
|
||||
@ -187,7 +186,7 @@ import java.sql.Savepoint
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.format.DateTimeParseException
|
||||
import java.util.Properties
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
@ -197,6 +196,8 @@ import java.util.concurrent.TimeUnit.MINUTES
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import java.util.function.Consumer
|
||||
import javax.persistence.EntityManager
|
||||
import javax.sql.DataSource
|
||||
import kotlin.collections.ArrayList
|
||||
|
||||
/**
|
||||
* A base node implementation that can be customised either for production (with real implementations that do real
|
||||
@ -214,9 +215,12 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
val serverThread: AffinityExecutor.ServiceAffinityExecutor,
|
||||
val busyNodeLatch: ReusableLatch = ReusableLatch(),
|
||||
djvmBootstrapSource: ApiSource = EmptyApi,
|
||||
djvmCordaSource: UserSource? = null) : SingletonSerializeAsToken() {
|
||||
djvmCordaSource: UserSource? = null,
|
||||
protected val allowHibernateToManageAppSchema: Boolean = false,
|
||||
private val allowAppSchemaUpgradeWithCheckpoints: Boolean = false) : SingletonSerializeAsToken() {
|
||||
|
||||
protected abstract val log: Logger
|
||||
|
||||
@Suppress("LeakingThis")
|
||||
private var tokenizableServices: MutableList<SerializeAsToken>? = mutableListOf(platformClock, this)
|
||||
|
||||
@ -226,6 +230,11 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
|
||||
protected val runOnStop = ArrayList<() -> Any?>()
|
||||
|
||||
protected open val runMigrationScripts: Boolean = configuredDbIsInMemory()
|
||||
|
||||
// if the configured DB is in memory, we will need to run db migrations, as the db does not persist between runs.
|
||||
private fun configuredDbIsInMemory() = configuration.dataSourceProperties.getProperty("dataSource.url").startsWith("jdbc:h2:mem:")
|
||||
|
||||
init {
|
||||
(serverThread as? ExecutorService)?.let {
|
||||
runOnStop += {
|
||||
@ -237,6 +246,12 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
quasarExcludePackages(configuration)
|
||||
|
||||
if (allowHibernateToManageAppSchema && !configuration.devMode) {
|
||||
throw ConfigurationException("Hibernate can only be used to manage app schema in development while using dev mode. " +
|
||||
"Please remove the --allow-hibernate-to-manage-app-schema command line flag and provide schema migration scripts for your CorDapps."
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private val notaryLoader = configuration.notary?.let {
|
||||
@ -252,7 +267,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
schemaService,
|
||||
configuration.dataSourceProperties,
|
||||
cacheFactory,
|
||||
cordappLoader.appClassLoader)
|
||||
cordappLoader.appClassLoader,
|
||||
allowHibernateToManageAppSchema)
|
||||
|
||||
private val transactionSupport = CordaTransactionSupportImpl(database)
|
||||
|
||||
@ -463,6 +479,53 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
open fun runDatabaseMigrationScripts(
|
||||
updateCoreSchemas: Boolean,
|
||||
updateAppSchemas: Boolean,
|
||||
updateAppSchemasWithCheckpoints: Boolean
|
||||
) {
|
||||
check(started == null) { "Node has already been started" }
|
||||
Node.printBasicNodeInfo("Running database schema migration scripts ...")
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
|
||||
database.startHikariPool(props, metricRegistry) { dataSource, haveCheckpoints ->
|
||||
SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
|
||||
.checkOrUpdate(schemaService.internalSchemas, updateCoreSchemas, haveCheckpoints, true)
|
||||
.checkOrUpdate(schemaService.appSchemas, updateAppSchemas, !updateAppSchemasWithCheckpoints && haveCheckpoints, false)
|
||||
}
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
logVendorString(database, log)
|
||||
if (allowHibernateToManageAppSchema) {
|
||||
Node.printBasicNodeInfo("Initialising CorDapps to get schemas created by hibernate")
|
||||
val trustRoot = initKeyStores()
|
||||
networkMapClient?.start(trustRoot)
|
||||
val (netParams, signedNetParams) = NetworkParametersReader(trustRoot, networkMapClient, configuration.baseDirectory).read()
|
||||
log.info("Loaded network parameters: $netParams")
|
||||
check(netParams.minimumPlatformVersion <= versionInfo.platformVersion) {
|
||||
"Node's platform version is lower than network's required minimumPlatformVersion"
|
||||
}
|
||||
networkMapCache.start(netParams.notaries)
|
||||
|
||||
database.transaction {
|
||||
networkParametersStorage.setCurrentParameters(signedNetParams, trustRoot)
|
||||
cordappProvider.start()
|
||||
}
|
||||
}
|
||||
Node.printBasicNodeInfo("Database migration done.")
|
||||
}
|
||||
|
||||
fun runSchemaSync() {
|
||||
check(started == null) { "Node has already been started" }
|
||||
Node.printBasicNodeInfo("Synchronising CorDapp schemas to the changelog ...")
|
||||
val hikariProperties = configuration.dataSourceProperties
|
||||
if (hikariProperties.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
|
||||
|
||||
val dataSource = DataSourceFactory.createDataSource(hikariProperties, metricRegistry = metricRegistry)
|
||||
SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
|
||||
.synchroniseSchemas(schemaService.appSchemas, false)
|
||||
Node.printBasicNodeInfo("CorDapp schemas synchronised")
|
||||
}
|
||||
|
||||
@Suppress("ComplexMethod")
|
||||
open fun start(): S {
|
||||
check(started == null) { "Node has already been started" }
|
||||
@ -974,7 +1037,12 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
protected open fun startDatabase() {
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
|
||||
database.startHikariPool(props, configuration.database, schemaService.internalSchemas(), metricRegistry, this.cordappLoader, configuration.baseDirectory, configuration.myLegalName)
|
||||
database.startHikariPool(props, metricRegistry) { dataSource, haveCheckpoints ->
|
||||
SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
|
||||
.checkOrUpdate(schemaService.internalSchemas, runMigrationScripts, haveCheckpoints, true)
|
||||
.checkOrUpdate(schemaService.appSchemas, runMigrationScripts, haveCheckpoints && !allowAppSchemaUpgradeWithCheckpoints, false)
|
||||
}
|
||||
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
logVendorString(database, log)
|
||||
}
|
||||
@ -1352,13 +1420,15 @@ class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogi
|
||||
|
||||
class ConfigurationException(message: String) : CordaException(message)
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
fun createCordaPersistence(databaseConfig: DatabaseConfig,
|
||||
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
|
||||
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
|
||||
schemaService: SchemaService,
|
||||
hikariProperties: Properties,
|
||||
cacheFactory: NamedCacheFactory,
|
||||
customClassLoader: ClassLoader?): CordaPersistence {
|
||||
customClassLoader: ClassLoader?,
|
||||
allowHibernateToManageAppSchema: Boolean = false): CordaPersistence {
|
||||
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
|
||||
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
|
||||
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
|
||||
@ -1369,25 +1439,31 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
|
||||
|
||||
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
|
||||
return CordaPersistence(
|
||||
databaseConfig,
|
||||
schemaService.schemas,
|
||||
jdbcUrl,
|
||||
cacheFactory,
|
||||
attributeConverters, customClassLoader,
|
||||
errorHandler = { e ->
|
||||
// "corrupting" a DatabaseTransaction only inside a flow state machine execution
|
||||
FlowStateMachineImpl.currentStateMachine()?.let {
|
||||
// register only the very first exception thrown throughout a chain of logical transactions
|
||||
setException(e)
|
||||
}
|
||||
})
|
||||
databaseConfig.exportHibernateJMXStatistics,
|
||||
schemaService.schemas,
|
||||
jdbcUrl,
|
||||
cacheFactory,
|
||||
attributeConverters, customClassLoader,
|
||||
errorHandler = { e ->
|
||||
// "corrupting" a DatabaseTransaction only inside a flow state machine execution
|
||||
FlowStateMachineImpl.currentStateMachine()?.let {
|
||||
// register only the very first exception thrown throughout a chain of logical transactions
|
||||
setException(e)
|
||||
}
|
||||
},
|
||||
allowHibernateToManageAppSchema = allowHibernateToManageAppSchema)
|
||||
}
|
||||
|
||||
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name) {
|
||||
@Suppress("ThrowsCount")
|
||||
fun CordaPersistence.startHikariPool(
|
||||
hikariProperties: Properties,
|
||||
metricRegistry: MetricRegistry? = null,
|
||||
schemaMigration: (DataSource, Boolean) -> Unit) {
|
||||
try {
|
||||
val dataSource = DataSourceFactory.createDataSource(hikariProperties, metricRegistry = metricRegistry)
|
||||
val schemaMigration = SchemaMigration(schemas, dataSource, databaseConfig, cordappLoader, currentDir, ourName)
|
||||
schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
|
||||
val haveCheckpoints = dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }
|
||||
|
||||
schemaMigration(dataSource, haveCheckpoints)
|
||||
start(dataSource)
|
||||
} catch (ex: Exception) {
|
||||
when {
|
||||
@ -1411,6 +1487,14 @@ fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfi
|
||||
}
|
||||
}
|
||||
|
||||
fun SchemaMigration.checkOrUpdate(schemas: Set<MappedSchema>, update: Boolean, haveCheckpoints: Boolean, forceThrowOnMissingMigration: Boolean): SchemaMigration {
|
||||
if (update)
|
||||
this.runMigration(haveCheckpoints, schemas, forceThrowOnMissingMigration)
|
||||
else
|
||||
this.checkState(schemas, forceThrowOnMissingMigration)
|
||||
return this
|
||||
}
|
||||
|
||||
fun clientSslOptionsCompatibleWith(nodeRpcOptions: NodeRpcOptions): ClientRpcSslOptions? {
|
||||
|
||||
if (!nodeRpcOptions.useSsl || nodeRpcOptions.sslConfig == null) {
|
||||
|
@ -125,7 +125,8 @@ open class Node(configuration: NodeConfiguration,
|
||||
flowManager: FlowManager = NodeFlowManager(configuration.flowOverrides),
|
||||
cacheFactoryPrototype: BindableNamedCacheFactory = DefaultNamedCacheFactory(),
|
||||
djvmBootstrapSource: ApiSource = createBootstrapSource(configuration),
|
||||
djvmCordaSource: UserSource? = createCordaSource(configuration)
|
||||
djvmCordaSource: UserSource? = createCordaSource(configuration),
|
||||
allowHibernateToManageAppSchema: Boolean = false
|
||||
) : AbstractNode<NodeInfo>(
|
||||
configuration,
|
||||
createClock(configuration),
|
||||
@ -135,7 +136,8 @@ open class Node(configuration: NodeConfiguration,
|
||||
// Under normal (non-test execution) it will always be "1"
|
||||
AffinityExecutor.ServiceAffinityExecutor("Node thread-${sameVmNodeCounter.incrementAndGet()}", 1),
|
||||
djvmBootstrapSource = djvmBootstrapSource,
|
||||
djvmCordaSource = djvmCordaSource
|
||||
djvmCordaSource = djvmCordaSource,
|
||||
allowHibernateToManageAppSchema = allowHibernateToManageAppSchema
|
||||
) {
|
||||
|
||||
override fun createStartedNode(nodeInfo: NodeInfo, rpcOps: CordaRPCOps, notaryService: NotaryService?): NodeInfo =
|
||||
@ -559,6 +561,16 @@ open class Node(configuration: NodeConfiguration,
|
||||
return super.generateAndSaveNodeInfo()
|
||||
}
|
||||
|
||||
override fun runDatabaseMigrationScripts(
|
||||
updateCoreSchemas: Boolean,
|
||||
updateAppSchemas: Boolean,
|
||||
updateAppSchemasWithCheckpoints: Boolean) {
|
||||
if (allowHibernateToManageAppSchema) {
|
||||
initialiseSerialization()
|
||||
}
|
||||
super.runDatabaseMigrationScripts(updateCoreSchemas, updateAppSchemas, updateAppSchemasWithCheckpoints)
|
||||
}
|
||||
|
||||
override fun start(): NodeInfo {
|
||||
registerDefaultExceptionHandler()
|
||||
initialiseSerialization()
|
||||
|
@ -76,10 +76,18 @@ open class NodeStartupCli : CordaCliWrapper("corda", "Runs a Corda Node") {
|
||||
private val justGenerateRpcSslCertsCli by lazy { GenerateRpcSslCertsCli(startup) }
|
||||
private val initialRegistrationCli by lazy { InitialRegistrationCli(startup) }
|
||||
private val validateConfigurationCli by lazy { ValidateConfigurationCli() }
|
||||
private val runMigrationScriptsCli by lazy { RunMigrationScriptsCli(startup) }
|
||||
private val synchroniseAppSchemasCli by lazy { SynchroniseSchemasCli(startup) }
|
||||
|
||||
override fun initLogging(): Boolean = this.initLogging(cmdLineOptions.baseDirectory)
|
||||
|
||||
override fun additionalSubCommands() = setOf(networkCacheCli, justGenerateNodeInfoCli, justGenerateRpcSslCertsCli, initialRegistrationCli, validateConfigurationCli)
|
||||
override fun additionalSubCommands() = setOf(networkCacheCli,
|
||||
justGenerateNodeInfoCli,
|
||||
justGenerateRpcSslCertsCli,
|
||||
initialRegistrationCli,
|
||||
validateConfigurationCli,
|
||||
runMigrationScriptsCli,
|
||||
synchroniseAppSchemasCli)
|
||||
|
||||
override fun call(): Int {
|
||||
if (!validateBaseDirectory()) {
|
||||
@ -201,7 +209,7 @@ open class NodeStartup : NodeStartupLogging {
|
||||
|
||||
protected open fun preNetworkRegistration(conf: NodeConfiguration) = Unit
|
||||
|
||||
open fun createNode(conf: NodeConfiguration, versionInfo: VersionInfo): Node = Node(conf, versionInfo)
|
||||
open fun createNode(conf: NodeConfiguration, versionInfo: VersionInfo): Node = Node(conf, versionInfo, allowHibernateToManageAppSchema = cmdLineOptions.allowHibernateToManageAppSchema)
|
||||
|
||||
fun startNode(node: Node, startTime: Long) {
|
||||
if (node.configuration.devMode) {
|
||||
|
@ -0,0 +1,29 @@
|
||||
package net.corda.node.internal.subcommands
|
||||
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.NodeCliCommand
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.internal.RunAfterNodeInitialisation
|
||||
import picocli.CommandLine
|
||||
|
||||
class RunMigrationScriptsCli(startup: NodeStartup) : NodeCliCommand("run-migration-scripts", "Run the database migration scripts and create or update schemas", startup) {
|
||||
@CommandLine.Option(names = ["--core-schemas"], description = ["Manage the core/node schemas"])
|
||||
var updateCoreSchemas: Boolean = false
|
||||
|
||||
@CommandLine.Option(names = ["--app-schemas"], description = ["Manage the CorDapp schemas"])
|
||||
var updateAppSchemas: Boolean = false
|
||||
|
||||
@CommandLine.Option(names = ["--update-app-schema-with-checkpoints"], description = ["Allow updating app schema even if there are suspended flows"])
|
||||
var updateAppSchemaWithCheckpoints: Boolean = false
|
||||
|
||||
|
||||
|
||||
override fun runProgram(): Int {
|
||||
require(updateAppSchemas || updateCoreSchemas) { "Nothing to do: at least one of --core-schemas or --app-schemas must be set" }
|
||||
return startup.initialiseAndRun(cmdLineOptions, object : RunAfterNodeInitialisation {
|
||||
override fun run(node: Node) {
|
||||
node.runDatabaseMigrationScripts(updateCoreSchemas, updateAppSchemas, updateAppSchemaWithCheckpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -0,0 +1,16 @@
|
||||
package net.corda.node.internal.subcommands
|
||||
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.NodeCliCommand
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.internal.RunAfterNodeInitialisation
|
||||
|
||||
class SynchroniseSchemasCli(startup: NodeStartup) : NodeCliCommand("sync-app-schemas", "Create changelog entries for liquibase files found in CorDapps", startup) {
|
||||
override fun runProgram(): Int {
|
||||
return startup.initialiseAndRun(cmdLineOptions, object : RunAfterNodeInitialisation {
|
||||
override fun run(node: Node) {
|
||||
node.runSchemaSync()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -10,9 +10,11 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.node.SimpleClock
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.persistence.*
|
||||
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.PublicKeyToTextConverter
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration.Companion.NODE_X500_NAME
|
||||
import java.io.PrintWriter
|
||||
import java.sql.Connection
|
||||
@ -74,7 +76,6 @@ abstract class CordaMigration : CustomTaskChange {
|
||||
cacheFactory: MigrationNamedCacheFactory,
|
||||
identityService: PersistentIdentityService,
|
||||
schema: Set<MappedSchema>): CordaPersistence {
|
||||
val configDefaults = DatabaseConfig()
|
||||
val attributeConverters = listOf(
|
||||
PublicKeyToTextConverter(),
|
||||
AbstractPartyToX500NameAsStringConverter(
|
||||
@ -83,7 +84,7 @@ abstract class CordaMigration : CustomTaskChange {
|
||||
)
|
||||
// Liquibase handles closing the database connection when migrations are finished. If the connection is closed here, then further
|
||||
// migrations may fail.
|
||||
return CordaPersistence(configDefaults, schema, jdbcUrl, cacheFactory, attributeConverters, closeConnection = false)
|
||||
return CordaPersistence(false, schema, jdbcUrl, cacheFactory, attributeConverters, closeConnection = false)
|
||||
}
|
||||
|
||||
override fun validate(database: Database?): ValidationErrors? {
|
||||
|
@ -15,7 +15,6 @@ import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.config.SslConfiguration
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.SchemaInitializationType
|
||||
import net.corda.tools.shell.SSHDConfiguration
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
@ -132,8 +131,6 @@ data class NodeConfigurationImpl(
|
||||
fun messagingServerExternal(messagingServerAddress: NetworkHostAndPort?) = messagingServerAddress != null
|
||||
|
||||
fun database(devMode: Boolean) = DatabaseConfig(
|
||||
initialiseSchema = devMode,
|
||||
initialiseAppSchema = if(devMode) SchemaInitializationType.UPDATE else SchemaInitializationType.VALIDATE,
|
||||
exportHibernateJMXStatistics = devMode
|
||||
)
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import net.corda.common.validation.internal.Validated.Companion.invalid
|
||||
import net.corda.common.validation.internal.Validated.Companion.valid
|
||||
import net.corda.core.context.AuthServiceId
|
||||
import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.node.internal.ConfigurationException
|
||||
import net.corda.node.services.config.AuthDataSourceType
|
||||
import net.corda.node.services.config.CertChainPolicyConfig
|
||||
import net.corda.node.services.config.CertChainPolicyType
|
||||
@ -44,7 +45,6 @@ import net.corda.nodeapi.BrokerRpcSslOptions
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.TransactionIsolationLevel
|
||||
import net.corda.nodeapi.internal.persistence.SchemaInitializationType
|
||||
import net.corda.notary.experimental.bftsmart.BFTSmartConfig
|
||||
import net.corda.notary.experimental.raft.RaftConfig
|
||||
import net.corda.tools.shell.SSHDConfiguration
|
||||
@ -267,16 +267,32 @@ internal object SSHDConfigurationSpec : Configuration.Specification<SSHDConfigur
|
||||
override fun parseValid(configuration: Config, options: Configuration.Options): Valid<SSHDConfiguration> = attempt<SSHDConfiguration, IllegalArgumentException> { SSHDConfiguration(configuration.withOptions(options)[port]) }
|
||||
}
|
||||
|
||||
enum class SchemaInitializationType{
|
||||
NONE,
|
||||
VALIDATE,
|
||||
UPDATE
|
||||
}
|
||||
|
||||
internal object DatabaseConfigSpec : Configuration.Specification<DatabaseConfig>("DatabaseConfig") {
|
||||
private val initialiseSchema by boolean().optional().withDefaultValue(DatabaseConfig.Defaults.initialiseSchema)
|
||||
private val initialiseAppSchema by enum(SchemaInitializationType::class).optional().withDefaultValue(DatabaseConfig.Defaults.initialiseAppSchema)
|
||||
private val transactionIsolationLevel by enum(TransactionIsolationLevel::class).optional().withDefaultValue(DatabaseConfig.Defaults.transactionIsolationLevel)
|
||||
private val initialiseSchema by boolean().optional()
|
||||
private val initialiseAppSchema by enum(SchemaInitializationType::class).optional()
|
||||
private val transactionIsolationLevel by enum(TransactionIsolationLevel::class).optional()
|
||||
private val exportHibernateJMXStatistics by boolean().optional().withDefaultValue(DatabaseConfig.Defaults.exportHibernateJMXStatistics)
|
||||
private val mappedSchemaCacheSize by long().optional().withDefaultValue(DatabaseConfig.Defaults.mappedSchemaCacheSize)
|
||||
|
||||
override fun parseValid(configuration: Config, options: Configuration.Options): Valid<DatabaseConfig> {
|
||||
if (initialiseSchema.isSpecifiedBy(configuration)){
|
||||
throw ConfigurationException("Unsupported configuration database/initialiseSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas")
|
||||
}
|
||||
if (initialiseAppSchema.isSpecifiedBy(configuration)){
|
||||
throw ConfigurationException("Unsupported configuration database/initialiseAppSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas")
|
||||
}
|
||||
if (transactionIsolationLevel.isSpecifiedBy(configuration)){
|
||||
throw ConfigurationException("Unsupported configuration database/transactionIsolationLevel - this option has been removed and cannot be changed")
|
||||
}
|
||||
val config = configuration.withOptions(options)
|
||||
return valid(DatabaseConfig(config[initialiseSchema], config[initialiseAppSchema], config[transactionIsolationLevel], config[exportHibernateJMXStatistics], config[mappedSchemaCacheSize]))
|
||||
|
||||
return valid(DatabaseConfig(config[exportHibernateJMXStatistics], config[mappedSchemaCacheSize]))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -62,13 +62,12 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
|
||||
NodeInfoSchemaV1,
|
||||
NodeCoreV1)
|
||||
|
||||
fun internalSchemas() = requiredSchemas + extraSchemas.filter { schema ->
|
||||
// when mapped schemas from the finance module are present, they are considered as internal ones
|
||||
schema::class.qualifiedName == "net.corda.finance.schemas.CashSchemaV1" ||
|
||||
schema::class.qualifiedName == "net.corda.finance.schemas.CommercialPaperSchemaV1" ||
|
||||
val internalSchemas = requiredSchemas + extraSchemas.filter { schema ->
|
||||
schema::class.qualifiedName?.startsWith("net.corda.notary.") ?: false
|
||||
}
|
||||
|
||||
val appSchemas = extraSchemas - internalSchemas
|
||||
|
||||
override val schemas: Set<MappedSchema> = requiredSchemas + extraSchemas
|
||||
|
||||
// Currently returns all schemas supported by the state, with no filtering or enrichment.
|
||||
|
@ -1,7 +1,6 @@
|
||||
additionalP2PAddresses = []
|
||||
crlCheckSoftFail = true
|
||||
database = {
|
||||
transactionIsolationLevel = "REPEATABLE_READ"
|
||||
exportHibernateJMXStatistics = "false"
|
||||
}
|
||||
dataSourceProperties = {
|
||||
|
@ -2,12 +2,13 @@ package net.corda.node.services.persistence
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.internal.checkOrUpdate
|
||||
import net.corda.node.internal.createCordaPersistence
|
||||
import net.corda.node.internal.startHikariPool
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.TransactionIsolationLevel
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
@ -91,10 +92,13 @@ class DbMapDeadlockTest {
|
||||
|
||||
fun recreateDeadlock(hikariProperties: Properties) {
|
||||
val cacheFactory = TestingNamedCacheFactory()
|
||||
val dbConfig = DatabaseConfig(initialiseSchema = true, transactionIsolationLevel = TransactionIsolationLevel.READ_COMMITTED)
|
||||
val dbConfig = DatabaseConfig()
|
||||
val schemaService = NodeSchemaService(extraSchemas = setOf(LockDbSchemaV2))
|
||||
createCordaPersistence(dbConfig, { null }, { null }, schemaService, hikariProperties, cacheFactory, null).apply {
|
||||
startHikariPool(hikariProperties, dbConfig, schemaService.schemas, ourName = TestIdentity(ALICE_NAME, 70).name)
|
||||
startHikariPool(hikariProperties) { dataSource, haveCheckpoints ->
|
||||
SchemaMigration(dataSource, null, null, TestIdentity(ALICE_NAME, 70).name)
|
||||
.checkOrUpdate(schemaService.schemas, true, haveCheckpoints, false)
|
||||
}
|
||||
}.use { persistence ->
|
||||
|
||||
// First clean up any remains from previous test runs
|
||||
|
@ -48,6 +48,7 @@ import net.corda.testing.internal.vault.VaultFiller
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.assertj.core.api.Assertions.`in`
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.hibernate.SessionFactory
|
||||
@ -976,7 +977,7 @@ class HibernateConfigurationTest {
|
||||
doReturn(it.party).whenever(mock).wellKnownPartyFromX500Name(it.name)
|
||||
}
|
||||
}
|
||||
database = configureDatabase(dataSourceProps, DatabaseConfig(initialiseSchema = initialiseSchema), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService)
|
||||
database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService, runMigrationScripts = initialiseSchema, allowHibernateToManageAppSchema = initialiseSchema)
|
||||
return database
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,6 @@ dataSourceProperties = {
|
||||
dataSource.password = ""
|
||||
}
|
||||
database = {
|
||||
transactionIsolationLevel = "REPEATABLE_READ"
|
||||
exportHibernateJMXStatistics = "false"
|
||||
}
|
||||
p2pAddress = "localhost:2233"
|
||||
|
@ -11,7 +11,6 @@ dataSourceProperties = {
|
||||
dataSource.password = ""
|
||||
}
|
||||
database = {
|
||||
transactionIsolationLevel = "REPEATABLE_READ"
|
||||
exportHibernateJMXStatistics = "false"
|
||||
}
|
||||
p2pAddress = "localhost:2233"
|
||||
|
@ -12,7 +12,6 @@ dataSourceProperties = {
|
||||
dataSource.password = ""
|
||||
}
|
||||
database = {
|
||||
transactionIsolationLevel = "REPEATABLE_READ"
|
||||
exportHibernateJMXStatistics = "false"
|
||||
}
|
||||
p2pAddress = "localhost:2233"
|
||||
|
@ -90,6 +90,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
|
||||
}
|
||||
cordapp project(':samples:attachment-demo:contracts')
|
||||
cordapp project(':samples:attachment-demo:workflows')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Node,L=Zurich,C=CH"
|
||||
|
@ -48,6 +48,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
|
||||
nodeDefaults {
|
||||
cordapp project(':finance:workflows')
|
||||
cordapp project(':finance:contracts')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Node,L=Zurich,C=CH"
|
||||
|
@ -25,6 +25,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
|
||||
}
|
||||
rpcUsers = [['username': "default", 'password': "default", 'permissions': [ 'ALL' ]]]
|
||||
cordapp project(':samples:cordapp-configuration:workflows')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Node,L=Zurich,C=CH"
|
||||
|
@ -60,6 +60,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask])
|
||||
}
|
||||
cordapp project(':samples:irs-demo:cordapp:contracts-irs')
|
||||
cordapp project(':samples:irs-demo:cordapp:workflows-irs')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Node,L=Zurich,C=CH"
|
||||
|
@ -36,6 +36,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask])
|
||||
}
|
||||
cordapp project(':samples:network-verifier:contracts')
|
||||
cordapp project(':samples:network-verifier:workflows')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Node,L=Zurich,C=CH"
|
||||
|
@ -44,6 +44,7 @@ task deployNodesSingle(type: Cordform, dependsOn: ['jar', nodeTask, webTask]) {
|
||||
extraConfig = [h2Settings: [address: "localhost:0"]]
|
||||
cordapp project(':samples:notary-demo:contracts')
|
||||
cordapp project(':samples:notary-demo:workflows')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Alice Corp,L=Madrid,C=ES"
|
||||
|
@ -91,6 +91,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
|
||||
cordapp project(':samples:simm-valuation-demo:contracts-states')
|
||||
cordapp project(':samples:simm-valuation-demo:flows')
|
||||
rpcUsers = [['username': "default", 'password': "default", 'permissions': [ 'ALL' ]]]
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Node,L=Zurich,C=CH"
|
||||
|
@ -81,6 +81,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask])
|
||||
cordapp project(':finance:workflows')
|
||||
cordapp project(':finance:contracts')
|
||||
cordapp project(':samples:trader-demo:workflows-trader')
|
||||
runSchemaMigration = true
|
||||
}
|
||||
node {
|
||||
name "O=Notary Node,L=Zurich,C=CH"
|
||||
|
@ -205,7 +205,8 @@ fun <A> driver(defaultParameters: DriverParameters = DriverParameters(), dsl: Dr
|
||||
cordappsForAllNodes = uncheckedCast(defaultParameters.cordappsForAllNodes),
|
||||
djvmBootstrapSource = defaultParameters.djvmBootstrapSource,
|
||||
djvmCordaSource = defaultParameters.djvmCordaSource,
|
||||
environmentVariables = defaultParameters.environmentVariables
|
||||
environmentVariables = defaultParameters.environmentVariables,
|
||||
allowHibernateToManageAppSchema = defaultParameters.allowHibernateToManageAppSchema
|
||||
),
|
||||
coerce = { it },
|
||||
dsl = dsl
|
||||
@ -266,7 +267,8 @@ data class DriverParameters(
|
||||
val cordappsForAllNodes: Collection<TestCordapp>? = null,
|
||||
val djvmBootstrapSource: Path? = null,
|
||||
val djvmCordaSource: List<Path> = emptyList(),
|
||||
val environmentVariables : Map<String, String> = emptyMap()
|
||||
val environmentVariables : Map<String, String> = emptyMap(),
|
||||
val allowHibernateToManageAppSchema: Boolean = true
|
||||
) {
|
||||
constructor(cordappsForAllNodes: Collection<TestCordapp>) : this(isDebug = false, cordappsForAllNodes = cordappsForAllNodes)
|
||||
|
||||
@ -427,6 +429,7 @@ data class DriverParameters(
|
||||
fun withDjvmBootstrapSource(djvmBootstrapSource: Path?): DriverParameters = copy(djvmBootstrapSource = djvmBootstrapSource)
|
||||
fun withDjvmCordaSource(djvmCordaSource: List<Path>): DriverParameters = copy(djvmCordaSource = djvmCordaSource)
|
||||
fun withEnvironmentVariables(variables : Map<String, String>): DriverParameters = copy(environmentVariables = variables)
|
||||
fun withAllowHibernateToManageAppSchema(value: Boolean): DriverParameters = copy(allowHibernateToManageAppSchema = value)
|
||||
|
||||
fun copy(
|
||||
isDebug: Boolean,
|
||||
|
@ -127,7 +127,7 @@ open class MockServices private constructor(
|
||||
val cordappLoader = cordappLoaderForPackages(cordappPackages)
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
|
||||
val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService, schemaService.internalSchemas())
|
||||
val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService, schemaService.internalSchemas)
|
||||
val keyManagementService = MockKeyManagementService(
|
||||
identityService,
|
||||
*arrayOf(initialIdentity.keyPair) + moreKeys
|
||||
@ -170,7 +170,7 @@ open class MockServices private constructor(
|
||||
wellKnownPartyFromX500Name = identityService::wellKnownPartyFromX500Name,
|
||||
wellKnownPartyFromAnonymous = identityService::wellKnownPartyFromAnonymous,
|
||||
schemaService = schemaService,
|
||||
internalSchemas = schemaService.internalSchemas()
|
||||
internalSchemas = schemaService.internalSchemas
|
||||
)
|
||||
|
||||
val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, cacheFactory)
|
||||
|
@ -1,4 +1,4 @@
|
||||
@file:Suppress("TooManyFunctions")
|
||||
@file:Suppress("TooManyFunctions", "Deprecation")
|
||||
package net.corda.testing.node.internal
|
||||
|
||||
import co.paralleluniverse.fibers.instrument.JavaAgent
|
||||
@ -23,12 +23,12 @@ import net.corda.core.internal.concurrent.fork
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_NAME
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_LICENCE
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_NAME
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_VENDOR
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_VERSION
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_WORKFLOW_NAME
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_WORKFLOW_LICENCE
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_WORKFLOW_NAME
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_WORKFLOW_VENDOR
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_WORKFLOW_VERSION
|
||||
import net.corda.core.internal.cordapp.CordappImpl.Companion.MIN_PLATFORM_VERSION
|
||||
@ -52,6 +52,7 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.node.NodeRegistrationOption
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.Node
|
||||
@ -83,11 +84,17 @@ import net.corda.notary.experimental.raft.RaftConfig
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.driver.*
|
||||
import net.corda.testing.driver.DriverDSL
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.JmxPolicy
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.driver.NotaryHandle
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.driver.WebserverHandle
|
||||
import net.corda.testing.driver.internal.InProcessImpl
|
||||
import net.corda.testing.driver.internal.NodeHandleInternal
|
||||
import net.corda.testing.driver.internal.OutOfProcessImpl
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.testing.node.ClusterSpec
|
||||
import net.corda.testing.node.NotarySpec
|
||||
import okhttp3.OkHttpClient
|
||||
@ -140,7 +147,8 @@ class DriverDSLImpl(
|
||||
val cordappsForAllNodes: Collection<TestCordappInternal>?,
|
||||
val djvmBootstrapSource: Path?,
|
||||
val djvmCordaSource: List<Path>,
|
||||
val environmentVariables : Map<String, String>
|
||||
val environmentVariables : Map<String, String>,
|
||||
val allowHibernateToManageAppSchema: Boolean = true
|
||||
) : InternalDriverDSL {
|
||||
|
||||
private var _executorService: ScheduledExecutorService? = null
|
||||
@ -257,30 +265,40 @@ class DriverDSLImpl(
|
||||
// TODO: Derive name from the full picked name, don't just wrap the common name
|
||||
val name = parameters.providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB")
|
||||
|
||||
val config = createConfig(name, parameters, p2pAddress)
|
||||
val registrationFuture = if (compatibilityZone?.rootCert != null) {
|
||||
// We don't need the network map to be available to be able to register the node
|
||||
startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.config(), parameters.customOverrides)
|
||||
createSchema(config, false).flatMap { startNodeRegistration(it, compatibilityZone.rootCert, compatibilityZone.config()) }
|
||||
} else {
|
||||
doneFuture(Unit)
|
||||
doneFuture(config)
|
||||
}
|
||||
|
||||
return registrationFuture.flatMap {
|
||||
networkMapAvailability.flatMap {
|
||||
return registrationFuture.flatMap { conf ->
|
||||
networkMapAvailability.flatMap {networkMap ->
|
||||
// But starting the node proper does require the network map
|
||||
startRegisteredNode(name, it, parameters, p2pAddress, bytemanPort)
|
||||
startRegisteredNode(conf, networkMap, parameters, bytemanPort)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("ComplexMethod")
|
||||
private fun startRegisteredNode(name: CordaX500Name,
|
||||
private fun startRegisteredNode(config: NodeConfig,
|
||||
localNetworkMap: LocalNetworkMap?,
|
||||
parameters: NodeParameters,
|
||||
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort(),
|
||||
bytemanPort: Int? = null): CordaFuture<NodeHandle> {
|
||||
val webAddress = portAllocation.nextHostAndPort()
|
||||
return startNodeInternal(config, webAddress, localNetworkMap, parameters, bytemanPort)
|
||||
}
|
||||
|
||||
@Suppress("ComplexMethod")
|
||||
private fun createConfig(
|
||||
providedName: CordaX500Name,
|
||||
parameters: NodeParameters,
|
||||
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort()
|
||||
): NodeConfig {
|
||||
val baseDirectory = baseDirectory(providedName).createDirectories()
|
||||
val rpcAddress = portAllocation.nextHostAndPort()
|
||||
val rpcAdminAddress = portAllocation.nextHostAndPort()
|
||||
val webAddress = portAllocation.nextHostAndPort()
|
||||
val users = parameters.rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
|
||||
val czUrlConfig = when (compatibilityZone) {
|
||||
null -> emptyMap()
|
||||
@ -301,50 +319,43 @@ class DriverDSLImpl(
|
||||
val flowOverrideConfig = FlowOverrideConfig(parameters.flowOverrides.map { FlowOverride(it.key.canonicalName, it.value.canonicalName) })
|
||||
|
||||
val overrides = configOf(
|
||||
NodeConfiguration::myLegalName.name to name.toString(),
|
||||
NodeConfiguration::myLegalName.name to providedName.toString(),
|
||||
NodeConfiguration::p2pAddress.name to p2pAddress.toString(),
|
||||
"rpcSettings.address" to rpcAddress.toString(),
|
||||
"rpcSettings.adminAddress" to rpcAdminAddress.toString(),
|
||||
NodeConfiguration::useTestClock.name to useTestClock,
|
||||
NodeConfiguration::rpcUsers.name to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
|
||||
NodeConfiguration::rpcUsers.name to if (users.isEmpty()) defaultRpcUserList else users.map {
|
||||
it.toConfig().root().unwrapped()
|
||||
},
|
||||
NodeConfiguration::verifierType.name to parameters.verifierType.name,
|
||||
NodeConfiguration::flowOverrides.name to flowOverrideConfig.toConfig().root().unwrapped(),
|
||||
NodeConfiguration::additionalNodeInfoPollingFrequencyMsec.name to 1000
|
||||
) + czUrlConfig + jmxConfig + parameters.customOverrides
|
||||
val config = NodeConfig(
|
||||
return NodeConfig(
|
||||
ConfigHelper.loadConfig(
|
||||
baseDirectory = baseDirectory(name),
|
||||
baseDirectory = baseDirectory,
|
||||
allowMissingConfig = true,
|
||||
configOverrides = if (overrides.hasPath("devMode")) overrides else overrides + mapOf("devMode" to true)
|
||||
).withDJVMConfig(djvmBootstrapSource, djvmCordaSource)
|
||||
).checkAndOverrideForInMemoryDB()
|
||||
return startNodeInternal(config, webAddress, localNetworkMap, parameters, bytemanPort)
|
||||
}
|
||||
|
||||
private fun createSchema(config: NodeConfig, hibernateForAppSchema: Boolean): CordaFuture<NodeConfig> {
|
||||
if (startNodesInProcess || inMemoryDB) return doneFuture(config)
|
||||
return startOutOfProcessMiniNode(config,
|
||||
listOfNotNull(
|
||||
"run-migration-scripts",
|
||||
"--core-schemas",
|
||||
"--app-schemas",
|
||||
if (hibernateForAppSchema) "--allow-hibernate-to-manage-app-schema" else null
|
||||
).toTypedArray()).map { config }
|
||||
}
|
||||
|
||||
private fun startNodeRegistration(
|
||||
providedName: CordaX500Name,
|
||||
config: NodeConfig,
|
||||
rootCert: X509Certificate,
|
||||
networkServicesConfig: NetworkServicesConfig,
|
||||
customOverrides: Map<String, Any?> = mapOf()
|
||||
networkServicesConfig: NetworkServicesConfig
|
||||
): CordaFuture<NodeConfig> {
|
||||
val baseDirectory = baseDirectory(providedName).createDirectories()
|
||||
val overrides = configOf(
|
||||
"p2pAddress" to portAllocation.nextHostAndPort().toString(),
|
||||
"compatibilityZoneURL" to networkServicesConfig.doormanURL.toString(),
|
||||
"myLegalName" to providedName.toString(),
|
||||
"rpcSettings" to mapOf(
|
||||
"address" to portAllocation.nextHostAndPort().toString(),
|
||||
"adminAddress" to portAllocation.nextHostAndPort().toString()
|
||||
),
|
||||
"additionalNodeInfoPollingFrequencyMsec" to 1000,
|
||||
"devMode" to false) + customOverrides
|
||||
val config = NodeConfig(
|
||||
ConfigHelper.loadConfig(
|
||||
baseDirectory = baseDirectory,
|
||||
allowMissingConfig = true,
|
||||
configOverrides = overrides
|
||||
).withDJVMConfig(djvmBootstrapSource, djvmCordaSource)
|
||||
).checkAndOverrideForInMemoryDB()
|
||||
|
||||
val versionInfo = VersionInfo(PLATFORM_VERSION, "1", "1", "1")
|
||||
config.corda.certificatesDirectory.createDirectories()
|
||||
@ -419,7 +430,7 @@ class DriverDSLImpl(
|
||||
val notaryInfosFuture = if (compatibilityZone == null) {
|
||||
// If no CZ is specified then the driver does the generation of the network parameters and the copying of the
|
||||
// node info files.
|
||||
startNotaryIdentityGeneration().map { notaryInfos -> Pair(notaryInfos, LocalNetworkMap(notaryInfos)) }
|
||||
startNotaryIdentityGeneration().map { notaryInfos -> Pair(notaryInfos, LocalNetworkMap(notaryInfos.map{it.second})) }
|
||||
} else {
|
||||
// Otherwise it's the CZ's job to distribute thse via the HTTP network map, as that is what the nodes will be expecting.
|
||||
val notaryInfosFuture = if (compatibilityZone.rootCert == null) {
|
||||
@ -430,7 +441,7 @@ class DriverDSLImpl(
|
||||
startAllNotaryRegistrations(compatibilityZone.rootCert, compatibilityZone)
|
||||
}
|
||||
notaryInfosFuture.map { notaryInfos ->
|
||||
compatibilityZone.publishNotaries(notaryInfos)
|
||||
compatibilityZone.publishNotaries(notaryInfos.map{it.second})
|
||||
Pair(notaryInfos, null)
|
||||
}
|
||||
}
|
||||
@ -438,9 +449,9 @@ class DriverDSLImpl(
|
||||
networkMapAvailability = notaryInfosFuture.map { it.second }
|
||||
|
||||
_notaries = notaryInfosFuture.map { (notaryInfos, localNetworkMap) ->
|
||||
val listOfFutureNodeHandles = startNotaries(localNetworkMap, notaryCustomOverrides)
|
||||
notaryInfos.zip(listOfFutureNodeHandles) { (identity, validating), nodeHandlesFuture ->
|
||||
NotaryHandle(identity, validating, nodeHandlesFuture)
|
||||
val listOfFutureNodeHandles = startNotaries(notaryInfos.map{it.first}, localNetworkMap, notaryCustomOverrides)
|
||||
notaryInfos.zip(listOfFutureNodeHandles) { (_, notaryInfo), nodeHandlesFuture ->
|
||||
NotaryHandle(notaryInfo.identity, notaryInfo.validating, nodeHandlesFuture)
|
||||
}
|
||||
}
|
||||
try {
|
||||
@ -480,9 +491,12 @@ class DriverDSLImpl(
|
||||
}
|
||||
}
|
||||
|
||||
private fun startNotaryIdentityGeneration(): CordaFuture<List<NotaryInfo>> {
|
||||
private fun startNotaryIdentityGeneration(): CordaFuture<List<Pair<NodeConfig,NotaryInfo>>> {
|
||||
return executorService.fork {
|
||||
notarySpecs.map { spec ->
|
||||
val notaryConfig = mapOf("notary" to mapOf("validating" to spec.validating))
|
||||
val parameters = NodeParameters(rpcUsers = spec.rpcUsers, verifierType = spec.verifierType, customOverrides = notaryConfig + notaryCustomOverrides, maximumHeapSize = spec.maximumHeapSize)
|
||||
val config = createConfig(spec.name, parameters)
|
||||
val identity = when (spec.cluster) {
|
||||
null -> {
|
||||
DevIdentityGenerator.installKeyStoreWithNodeIdentity(baseDirectory(spec.name), spec.name)
|
||||
@ -508,14 +522,14 @@ class DriverDSLImpl(
|
||||
}
|
||||
else -> throw UnsupportedOperationException("Cluster spec ${spec.cluster} not supported by Driver")
|
||||
}
|
||||
NotaryInfo(identity, spec.validating)
|
||||
Pair(config, NotaryInfo(identity, spec.validating))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun startAllNotaryRegistrations(
|
||||
rootCert: X509Certificate,
|
||||
compatibilityZone: CompatibilityZoneParams): CordaFuture<List<NotaryInfo>> {
|
||||
compatibilityZone: CompatibilityZoneParams): CordaFuture<List<Pair<NodeConfig, NotaryInfo>>> {
|
||||
// Start the registration process for all the notaries together then wait for their responses.
|
||||
return notarySpecs.map { spec ->
|
||||
require(spec.cluster == null) { "Registering distributed notaries not supported" }
|
||||
@ -527,51 +541,55 @@ class DriverDSLImpl(
|
||||
spec: NotarySpec,
|
||||
rootCert: X509Certificate,
|
||||
compatibilityZone: CompatibilityZoneParams
|
||||
): CordaFuture<NotaryInfo> {
|
||||
return startNodeRegistration(spec.name, rootCert, compatibilityZone.config()).flatMap { config ->
|
||||
// Node registration only gives us the node CA cert, not the identity cert. That is only created on first
|
||||
// startup or when the node is told to just generate its node info file. We do that here.
|
||||
if (startNodesInProcess) {
|
||||
executorService.fork {
|
||||
val nodeInfo = Node(config.corda, MOCK_VERSION_INFO, initialiseSerialization = false).generateAndSaveNodeInfo()
|
||||
NotaryInfo(nodeInfo.legalIdentities[0], spec.validating)
|
||||
}
|
||||
} else {
|
||||
// TODO The config we use here is uses a hardocded p2p port which changes when the node is run proper
|
||||
// This causes two node info files to be generated.
|
||||
startOutOfProcessMiniNode(config, arrayOf("generate-node-info")).map {
|
||||
// Once done we have to read the signed node info file that's been generated
|
||||
val nodeInfoFile = config.corda.baseDirectory.list { paths ->
|
||||
paths.filter { it.fileName.toString().startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get()
|
||||
): CordaFuture<Pair<NodeConfig,NotaryInfo>> {
|
||||
val parameters = NodeParameters(rpcUsers = spec.rpcUsers, verifierType = spec.verifierType, customOverrides = notaryCustomOverrides, maximumHeapSize = spec.maximumHeapSize)
|
||||
return createSchema(createConfig(spec.name, parameters), false).flatMap { config ->
|
||||
startNodeRegistration(config, rootCert, compatibilityZone.config())}.flatMap { config ->
|
||||
// Node registration only gives us the node CA cert, not the identity cert. That is only created on first
|
||||
// startup or when the node is told to just generate its node info file. We do that here.
|
||||
if (startNodesInProcess) {
|
||||
executorService.fork {
|
||||
val nodeInfo = Node(config.corda, MOCK_VERSION_INFO, initialiseSerialization = false).generateAndSaveNodeInfo()
|
||||
Pair(config.withNotaryDefinition(spec.validating), NotaryInfo(nodeInfo.legalIdentities[0], spec.validating))
|
||||
}
|
||||
} else {
|
||||
// TODO The config we use here is uses a hardocded p2p port which changes when the node is run proper
|
||||
// This causes two node info files to be generated.
|
||||
startOutOfProcessMiniNode(config, arrayOf("generate-node-info")).map {
|
||||
// Once done we have to read the signed node info file that's been generated
|
||||
val nodeInfoFile = config.corda.baseDirectory.list { paths ->
|
||||
paths.filter { it.fileName.toString().startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.findFirst()
|
||||
.get()
|
||||
}
|
||||
val nodeInfo = nodeInfoFile.readObject<SignedNodeInfo>().verified()
|
||||
Pair(config.withNotaryDefinition(spec.validating), NotaryInfo(nodeInfo.legalIdentities[0], spec.validating))
|
||||
}
|
||||
val nodeInfo = nodeInfoFile.readObject<SignedNodeInfo>().verified()
|
||||
NotaryInfo(nodeInfo.legalIdentities[0], spec.validating)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private fun generateNodeNames(spec: NotarySpec): List<CordaX500Name> {
|
||||
return (0 until spec.cluster!!.clusterSize).map { spec.name.copy(organisation = "${spec.name.organisation}-$it") }
|
||||
}
|
||||
|
||||
private fun startNotaries(localNetworkMap: LocalNetworkMap?, customOverrides: Map<String, Any?>): List<CordaFuture<List<NodeHandle>>> {
|
||||
return notarySpecs.map {
|
||||
when (it.cluster) {
|
||||
null -> startSingleNotary(it, localNetworkMap, customOverrides)
|
||||
private fun startNotaries(configs: List<NodeConfig>, localNetworkMap: LocalNetworkMap?, customOverrides: Map<String, Any?>): List<CordaFuture<List<NodeHandle>>> {
|
||||
return notarySpecs.zip(configs).map { (spec, config) ->
|
||||
when (spec.cluster) {
|
||||
null -> startSingleNotary(config, spec, localNetworkMap, customOverrides)
|
||||
is ClusterSpec.Raft,
|
||||
// DummyCluster is used for testing the notary communication path, and it does not matter
|
||||
// which underlying consensus algorithm is used, so we just stick to Raft
|
||||
is DummyClusterSpec -> startRaftNotaryCluster(it, localNetworkMap)
|
||||
is DummyClusterSpec -> startRaftNotaryCluster(spec, localNetworkMap)
|
||||
else -> throw IllegalArgumentException("BFT-SMaRt not supported")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun startSingleNotary(spec: NotarySpec, localNetworkMap: LocalNetworkMap?, customOverrides: Map<String, Any?>): CordaFuture<List<NodeHandle>> {
|
||||
private fun startSingleNotary(config: NodeConfig, spec: NotarySpec, localNetworkMap: LocalNetworkMap?, customOverrides: Map<String, Any?>): CordaFuture<List<NodeHandle>> {
|
||||
val notaryConfig = mapOf("notary" to mapOf("validating" to spec.validating))
|
||||
return startRegisteredNode(
|
||||
spec.name,
|
||||
config,
|
||||
localNetworkMap,
|
||||
NodeParameters(rpcUsers = spec.rpcUsers,
|
||||
verifierType = spec.verifierType,
|
||||
@ -598,20 +616,26 @@ class DriverDSLImpl(
|
||||
val nodeNames = generateNodeNames(spec)
|
||||
val clusterAddress = portAllocation.nextHostAndPort()
|
||||
|
||||
val firstParams = NodeParameters(rpcUsers = spec.rpcUsers, verifierType = spec.verifierType, customOverrides = notaryConfig(clusterAddress))
|
||||
val firstConfig = createSchema(createConfig(nodeNames[0], firstParams), allowHibernateToManageAppSchema)
|
||||
|
||||
// Start the first node that will bootstrap the cluster
|
||||
val firstNodeFuture = startRegisteredNode(
|
||||
nodeNames[0],
|
||||
firstConfig.getOrThrow(),
|
||||
localNetworkMap,
|
||||
NodeParameters(rpcUsers = spec.rpcUsers, verifierType = spec.verifierType, customOverrides = notaryConfig(clusterAddress))
|
||||
firstParams
|
||||
)
|
||||
|
||||
// All other nodes will join the cluster
|
||||
val restNodeFutures = nodeNames.drop(1).map {
|
||||
val nodeAddress = portAllocation.nextHostAndPort()
|
||||
val params = NodeParameters(rpcUsers = spec.rpcUsers, verifierType = spec.verifierType, customOverrides = notaryConfig(nodeAddress, clusterAddress))
|
||||
val config = createSchema(createConfig(it, params), allowHibernateToManageAppSchema)
|
||||
startRegisteredNode(
|
||||
it,
|
||||
config.getOrThrow(),
|
||||
localNetworkMap,
|
||||
NodeParameters(rpcUsers = spec.rpcUsers, verifierType = spec.verifierType, customOverrides = notaryConfig(nodeAddress, clusterAddress))
|
||||
params
|
||||
|
||||
)
|
||||
}
|
||||
|
||||
@ -676,7 +700,7 @@ class DriverDSLImpl(
|
||||
)
|
||||
|
||||
val nodeFuture = if (parameters.startInSameProcess ?: startNodesInProcess) {
|
||||
val nodeAndThreadFuture = startInProcessNode(executorService, config)
|
||||
val nodeAndThreadFuture = startInProcessNode(executorService, config, allowHibernateToManageAppSchema)
|
||||
shutdownManager.registerShutdown(
|
||||
nodeAndThreadFuture.map { (node, thread) ->
|
||||
{
|
||||
@ -703,6 +727,9 @@ class DriverDSLImpl(
|
||||
nodeFuture
|
||||
} else {
|
||||
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
||||
log.info("StartNodeInternal for ${config.corda.myLegalName.organisation} - calling create schema")
|
||||
createSchema(config, allowHibernateToManageAppSchema).getOrThrow()
|
||||
log.info("StartNodeInternal for ${config.corda.myLegalName.organisation} - create schema done")
|
||||
val process = startOutOfProcessNode(
|
||||
config,
|
||||
quasarJarPath,
|
||||
@ -713,7 +740,10 @@ class DriverDSLImpl(
|
||||
parameters.maximumHeapSize,
|
||||
parameters.logLevelOverride,
|
||||
identifier,
|
||||
environmentVariables
|
||||
environmentVariables,
|
||||
extraCmdLineFlag = listOfNotNull(
|
||||
if (allowHibernateToManageAppSchema) "--allow-hibernate-to-manage-app-schema" else null
|
||||
).toTypedArray()
|
||||
)
|
||||
|
||||
// Destroy the child process when the parent exits.This is needed even when `waitForAllNodesToFinish` is
|
||||
@ -792,6 +822,10 @@ class DriverDSLImpl(
|
||||
val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration().value()
|
||||
}
|
||||
|
||||
private fun NodeConfig.withNotaryDefinition(validating: Boolean): NodeConfig {
|
||||
return NodeConfig(this.typesafe.plus(mapOf("notary" to mapOf("validating" to validating))))
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val RPC_CONNECT_POLL_INTERVAL: Duration = 100.millis
|
||||
internal val log = contextLogger()
|
||||
@ -868,7 +902,8 @@ class DriverDSLImpl(
|
||||
|
||||
private fun startInProcessNode(
|
||||
executorService: ScheduledExecutorService,
|
||||
config: NodeConfig
|
||||
config: NodeConfig,
|
||||
allowHibernateToManageAppSchema: Boolean
|
||||
): CordaFuture<Pair<NodeWithInfo, Thread>> {
|
||||
val effectiveP2PAddress = config.corda.messagingServerAddress ?: config.corda.p2pAddress
|
||||
return executorService.fork {
|
||||
@ -879,7 +914,7 @@ class DriverDSLImpl(
|
||||
// Write node.conf
|
||||
writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe.toNodeOnly())
|
||||
// TODO pass the version in?
|
||||
val node = InProcessNode(config.corda, MOCK_VERSION_INFO)
|
||||
val node = InProcessNode(config.corda, MOCK_VERSION_INFO, allowHibernateToManageAppSchema = allowHibernateToManageAppSchema)
|
||||
val nodeInfo = node.start()
|
||||
val nodeWithInfo = NodeWithInfo(node, nodeInfo)
|
||||
val nodeThread = thread(name = config.corda.myLegalName.organisation) {
|
||||
@ -1256,7 +1291,8 @@ fun <DI : DriverDSL, D : InternalDriverDSL, A> genericDriver(
|
||||
cordappsForAllNodes = uncheckedCast(defaultParameters.cordappsForAllNodes),
|
||||
djvmBootstrapSource = defaultParameters.djvmBootstrapSource,
|
||||
djvmCordaSource = defaultParameters.djvmCordaSource,
|
||||
environmentVariables = defaultParameters.environmentVariables
|
||||
environmentVariables = defaultParameters.environmentVariables,
|
||||
allowHibernateToManageAppSchema = defaultParameters.allowHibernateToManageAppSchema
|
||||
)
|
||||
)
|
||||
val shutdownHook = addShutdownHook(driverDsl::shutdown)
|
||||
@ -1354,29 +1390,31 @@ fun <A> internalDriver(
|
||||
djvmBootstrapSource: Path? = null,
|
||||
djvmCordaSource: List<Path> = emptyList(),
|
||||
environmentVariables: Map<String, String> = emptyMap(),
|
||||
allowHibernateToManageAppSchema: Boolean = true,
|
||||
dsl: DriverDSLImpl.() -> A
|
||||
): A {
|
||||
return genericDriver(
|
||||
driverDsl = DriverDSLImpl(
|
||||
portAllocation = portAllocation,
|
||||
debugPortAllocation = debugPortAllocation,
|
||||
systemProperties = systemProperties,
|
||||
driverDirectory = driverDirectory.toAbsolutePath(),
|
||||
useTestClock = useTestClock,
|
||||
isDebug = isDebug,
|
||||
startNodesInProcess = startNodesInProcess,
|
||||
waitForAllNodesToFinish = waitForAllNodesToFinish,
|
||||
extraCordappPackagesToScan = extraCordappPackagesToScan,
|
||||
notarySpecs = notarySpecs,
|
||||
jmxPolicy = jmxPolicy,
|
||||
compatibilityZone = compatibilityZone,
|
||||
networkParameters = networkParameters,
|
||||
notaryCustomOverrides = notaryCustomOverrides,
|
||||
inMemoryDB = inMemoryDB,
|
||||
cordappsForAllNodes = cordappsForAllNodes,
|
||||
djvmBootstrapSource = djvmBootstrapSource,
|
||||
djvmCordaSource = djvmCordaSource,
|
||||
environmentVariables = environmentVariables
|
||||
portAllocation = portAllocation,
|
||||
debugPortAllocation = debugPortAllocation,
|
||||
systemProperties = systemProperties,
|
||||
driverDirectory = driverDirectory.toAbsolutePath(),
|
||||
useTestClock = useTestClock,
|
||||
isDebug = isDebug,
|
||||
startNodesInProcess = startNodesInProcess,
|
||||
waitForAllNodesToFinish = waitForAllNodesToFinish,
|
||||
extraCordappPackagesToScan = extraCordappPackagesToScan,
|
||||
notarySpecs = notarySpecs,
|
||||
jmxPolicy = jmxPolicy,
|
||||
compatibilityZone = compatibilityZone,
|
||||
networkParameters = networkParameters,
|
||||
notaryCustomOverrides = notaryCustomOverrides,
|
||||
inMemoryDB = inMemoryDB,
|
||||
cordappsForAllNodes = cordappsForAllNodes,
|
||||
djvmBootstrapSource = djvmBootstrapSource,
|
||||
djvmCordaSource = djvmCordaSource,
|
||||
environmentVariables = environmentVariables,
|
||||
allowHibernateToManageAppSchema = allowHibernateToManageAppSchema
|
||||
),
|
||||
coerce = { it },
|
||||
dsl = dsl
|
||||
|
@ -279,14 +279,19 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
|
||||
}
|
||||
}
|
||||
|
||||
open class MockNode(args: MockNodeArgs, private val mockFlowManager: MockNodeFlowManager = args.flowManager) : AbstractNode<TestStartedNode>(
|
||||
open class MockNode(
|
||||
args: MockNodeArgs,
|
||||
private val mockFlowManager: MockNodeFlowManager = args.flowManager,
|
||||
allowAppSchemaUpgradeWithCheckpoints: Boolean = false) : AbstractNode<TestStartedNode>(
|
||||
args.config,
|
||||
TestClock(Clock.systemUTC()),
|
||||
DefaultNamedCacheFactory(),
|
||||
args.version,
|
||||
mockFlowManager,
|
||||
args.network.getServerThread(args.id),
|
||||
args.network.busyLatch
|
||||
args.network.busyLatch,
|
||||
allowHibernateToManageAppSchema = true,
|
||||
allowAppSchemaUpgradeWithCheckpoints = allowAppSchemaUpgradeWithCheckpoints
|
||||
) {
|
||||
companion object {
|
||||
private val staticLog = contextLogger()
|
||||
@ -317,6 +322,8 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
|
||||
}
|
||||
}
|
||||
|
||||
override val runMigrationScripts: Boolean = true
|
||||
|
||||
val mockNet = args.network
|
||||
val id = args.id
|
||||
|
||||
|
@ -157,7 +157,12 @@ abstract class NodeBasedTest @JvmOverloads constructor(
|
||||
}
|
||||
}
|
||||
|
||||
class InProcessNode(configuration: NodeConfiguration, versionInfo: VersionInfo, flowManager: FlowManager = NodeFlowManager(configuration.flowOverrides)) : Node(configuration, versionInfo, false, flowManager = flowManager) {
|
||||
class InProcessNode(
|
||||
configuration: NodeConfiguration,
|
||||
versionInfo: VersionInfo,
|
||||
flowManager: FlowManager = NodeFlowManager(configuration.flowOverrides),
|
||||
allowHibernateToManageAppSchema: Boolean = true) : Node(configuration, versionInfo, false, flowManager = flowManager, allowHibernateToManageAppSchema = allowHibernateToManageAppSchema) {
|
||||
override val runMigrationScripts: Boolean = true
|
||||
override fun start(): NodeInfo {
|
||||
assertFalse(isInvalidJavaVersion(), "You are using a version of Java that is not supported (${SystemUtils.JAVA_VERSION}). Please upgrade to the latest version of Java 8.")
|
||||
return super.start()
|
||||
|
@ -2,19 +2,22 @@ package net.corda.smoketesting
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.nodeapi.internal.rpc.client.AMQPClientSerializationScheme
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.deleteRecursively
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.toPath
|
||||
import net.corda.core.internal.writeText
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.nodeapi.internal.rpc.client.AMQPClientSerializationScheme
|
||||
import net.corda.testing.common.internal.asContextEnv
|
||||
import net.corda.testing.common.internal.checkNotOnClasspath
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import java.lang.IllegalStateException
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.time.Instant
|
||||
@ -32,6 +35,7 @@ class NodeProcess(
|
||||
companion object {
|
||||
const val CORDAPPS_DIR_NAME = "cordapps"
|
||||
private val log = contextLogger()
|
||||
private const val schemaCreationTimeOutSeconds: Long = 180
|
||||
}
|
||||
|
||||
fun connect(user: User): CordaRPCConnection {
|
||||
@ -103,6 +107,7 @@ class NodeProcess(
|
||||
(nodeDir / "node.conf").writeText(config.toText())
|
||||
createNetworkParameters(NotaryInfo(notaryParty!!, true), nodeDir)
|
||||
|
||||
createSchema(nodeDir)
|
||||
val process = startNode(nodeDir)
|
||||
val client = CordaRPCClient(NetworkHostAndPort("localhost", config.rpcPort))
|
||||
waitForNode(process, config, client)
|
||||
@ -138,9 +143,25 @@ class NodeProcess(
|
||||
}
|
||||
}
|
||||
|
||||
private fun startNode(nodeDir: Path): Process {
|
||||
class SchemaCreationTimedOutError(nodeDir: Path) : Exception("Creating node schema timed out for $nodeDir")
|
||||
class SchemaCreationFailedError(nodeDir: Path) : Exception("Creating node schema failed for $nodeDir")
|
||||
|
||||
|
||||
private fun createSchema(nodeDir: Path){
|
||||
val process = startNode(nodeDir, arrayOf("run-migration-scripts", "--core-schemas", "--app-schemas"))
|
||||
if (!process.waitFor(schemaCreationTimeOutSeconds, SECONDS)) {
|
||||
process.destroy()
|
||||
throw SchemaCreationTimedOutError(nodeDir)
|
||||
}
|
||||
if (process.exitValue() != 0){
|
||||
throw SchemaCreationFailedError(nodeDir)
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("SpreadOperator")
|
||||
private fun startNode(nodeDir: Path, extraArgs: Array<String> = emptyArray()): Process {
|
||||
val builder = ProcessBuilder()
|
||||
.command(javaPath.toString(), "-Dcapsule.log=verbose", "-jar", cordaJar.toString())
|
||||
.command(javaPath.toString(), "-Dcapsule.log=verbose", "-jar", cordaJar.toString(), *extraArgs)
|
||||
.directory(nodeDir.toFile())
|
||||
.redirectError(ProcessBuilder.Redirect.INHERIT)
|
||||
.redirectOutput(ProcessBuilder.Redirect.INHERIT)
|
||||
|
@ -1,7 +1,11 @@
|
||||
package net.corda.testing.internal
|
||||
|
||||
import net.corda.core.context.AuthServiceId
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.contracts.PrivacySalt
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TimeWindow
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.Crypto.generateKeyPair
|
||||
import net.corda.core.crypto.SecureHash
|
||||
@ -19,6 +23,8 @@ import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.coretesting.internal.asTestContextEnv
|
||||
import net.corda.coretesting.internal.createTestSerializationEnv
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.node.internal.checkOrUpdate
|
||||
import net.corda.node.internal.createCordaPersistence
|
||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.internal.startHikariPool
|
||||
@ -32,19 +38,17 @@ import net.corda.nodeapi.internal.createDevNodeCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.loadDevCaTrustStore
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.SchemaMigration
|
||||
import net.corda.nodeapi.internal.registerDevP2pCertificates
|
||||
import net.corda.serialization.internal.amqp.AMQP_ENABLED
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.IOException
|
||||
import java.net.ServerSocket
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.util.*
|
||||
@ -163,16 +167,30 @@ fun RPCSecurityManagerImpl.Companion.fromUserList(id: AuthServiceId, users: List
|
||||
/**
|
||||
* Convenience method for configuring a database for some tests.
|
||||
*/
|
||||
@Suppress("LongParameterList")
|
||||
fun configureDatabase(hikariProperties: Properties,
|
||||
databaseConfig: DatabaseConfig,
|
||||
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
|
||||
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
|
||||
schemaService: SchemaService = NodeSchemaService(),
|
||||
internalSchemas: Set<MappedSchema> = NodeSchemaService().internalSchemas(),
|
||||
internalSchemas: Set<MappedSchema> = NodeSchemaService().internalSchemas,
|
||||
cacheFactory: NamedCacheFactory = TestingNamedCacheFactory(),
|
||||
ourName: CordaX500Name = TestIdentity(ALICE_NAME, 70).name): CordaPersistence {
|
||||
val persistence = createCordaPersistence(databaseConfig, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous, schemaService, hikariProperties, cacheFactory, null)
|
||||
persistence.startHikariPool(hikariProperties, databaseConfig, internalSchemas, ourName = ourName)
|
||||
ourName: CordaX500Name = TestIdentity(ALICE_NAME, 70).name,
|
||||
runMigrationScripts: Boolean = true,
|
||||
allowHibernateToManageAppSchema: Boolean = true): CordaPersistence {
|
||||
val persistence = createCordaPersistence(
|
||||
databaseConfig,
|
||||
wellKnownPartyFromX500Name,
|
||||
wellKnownPartyFromAnonymous,
|
||||
schemaService,
|
||||
hikariProperties,
|
||||
cacheFactory,
|
||||
null,
|
||||
allowHibernateToManageAppSchema)
|
||||
persistence.startHikariPool(hikariProperties) { dataSource, haveCheckpoints ->
|
||||
SchemaMigration(dataSource, null, null, ourName)
|
||||
.checkOrUpdate(internalSchemas, runMigrationScripts, haveCheckpoints, false)
|
||||
}
|
||||
return persistence
|
||||
}
|
||||
|
||||
|
@ -1,11 +1,17 @@
|
||||
package net.corda.demobench.model
|
||||
|
||||
import javafx.application.Application.Parameters
|
||||
import javafx.application.Platform
|
||||
import javafx.beans.binding.IntegerExpression
|
||||
import javafx.beans.property.SimpleBooleanProperty
|
||||
import javafx.scene.control.Alert
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.copyToDirectory
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.noneOrSingle
|
||||
import net.corda.core.internal.writeText
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
@ -52,6 +58,7 @@ class NodeController(
|
||||
}
|
||||
|
||||
val djvmEnabled = SimpleBooleanProperty(djvmEnabled)
|
||||
val allowHibernateToManageAppSchema = SimpleBooleanProperty(false)
|
||||
|
||||
private val jvm by inject<JVMConfig>()
|
||||
private val cordappController by inject<CordappController>()
|
||||
@ -61,6 +68,8 @@ class NodeController(
|
||||
private val cordaPath: Path = jvm.applicationDir.resolve("corda").resolve("corda.jar")
|
||||
private val command = jvm.commandFor(cordaPath).toTypedArray()
|
||||
|
||||
private val schemaSetupArgs = arrayOf("run-migration-scripts", "--core-schemas", "--app-schemas")
|
||||
|
||||
private val nodes = LinkedHashMap<String, NodeConfigWrapper>()
|
||||
private var notaryIdentity: Party? = null
|
||||
private var networkParametersCopier: NetworkParametersCopier? = null
|
||||
@ -155,6 +164,23 @@ class NodeController(
|
||||
jvm.setCapsuleCacheDir(this)
|
||||
}
|
||||
(networkParametersCopier ?: makeNetworkParametersCopier(config)).install(config.nodeDir)
|
||||
@Suppress("SpreadOperator")
|
||||
val schemaSetupCommand = jvm.commandFor(cordaPath, *schemaSetupArgs).let {
|
||||
if (allowHibernateToManageAppSchema.value) {
|
||||
it + "--allow-hibernate-to-manage-app-schema"
|
||||
} else {
|
||||
it
|
||||
}
|
||||
}.toTypedArray()
|
||||
if (pty.runSetupProcess(schemaSetupCommand, cordaEnv, config.nodeDir.toString()) != 0) {
|
||||
Platform.runLater {
|
||||
Alert(
|
||||
Alert.AlertType.ERROR,
|
||||
"Failed to set up database schema for node [${config.nodeConfig.myLegalName}]\n" +
|
||||
"Please check logfiles!").showAndWait()
|
||||
}
|
||||
return false
|
||||
}
|
||||
pty.run(command, cordaEnv, config.nodeDir.toString())
|
||||
log.info("Launched node: ${config.nodeConfig.myLegalName}")
|
||||
return true
|
||||
|
@ -63,6 +63,11 @@ class R3Pty(val name: CordaX500Name, settings: SettingsProvider, dimension: Dime
|
||||
terminal.createTerminalSession(connector).apply { start() }
|
||||
}
|
||||
|
||||
fun runSetupProcess(args: Array<String>, envs: Map<String, String>, workingDir: String?): Int {
|
||||
val process = PtyProcess.exec(args, envs, workingDir)
|
||||
return process.waitFor()
|
||||
}
|
||||
|
||||
@Suppress("unused")
|
||||
@Throws(InterruptedException::class)
|
||||
fun waitFor(): Int = terminal.ttyConnector?.waitFor() ?: -1
|
||||
|
@ -155,6 +155,9 @@ class NodeTabView : Fragment() {
|
||||
checkbox("Deterministic Contract Verification", nodeController.djvmEnabled).apply {
|
||||
styleClass += "djvm"
|
||||
}
|
||||
checkbox("Allow Hibernate to manage app schema", nodeController.allowHibernateToManageAppSchema).apply {
|
||||
styleClass += "hibernate"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user