mirror of
https://github.com/corda/corda.git
synced 2025-06-19 15:43:52 +00:00
Feature/corda 1813/change postgres column type (#3631)
* CORDA-1813 fix Postgres db bloat issue * CORDA-1813 merge fixes * CORDA-1813 change column type and size to a standard corda type * CORDA-1813 docs * CORDA-1813 create custom hibernate type for the checkpoint blob and align with enterprise * CORDA-1813 Remove max col size * CORDA-1813 Remove max col size * CORDA-1813 Fix merge * CORDA-1813 Remove buggy :serverNameTablePrefix: configuration
This commit is contained in:
@ -22,7 +22,6 @@ const val NODE_DATABASE_PREFIX = "node_"
|
||||
// This class forms part of the node config and so any changes to it must be handled with care
|
||||
data class DatabaseConfig(
|
||||
val initialiseSchema: Boolean = true,
|
||||
val serverNameTablePrefix: String = "",
|
||||
val transactionIsolationLevel: TransactionIsolationLevel = TransactionIsolationLevel.REPEATABLE_READ,
|
||||
val exportHibernateJMXStatistics: Boolean = false,
|
||||
val mappedSchemaCacheSize: Long = 100
|
||||
@ -51,6 +50,7 @@ val contextDatabaseOrNull: CordaPersistence? get() = _contextDatabase.get()
|
||||
class CordaPersistence(
|
||||
databaseConfig: DatabaseConfig,
|
||||
schemas: Set<MappedSchema>,
|
||||
val jdbcUrl: String,
|
||||
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet()
|
||||
) : Closeable {
|
||||
companion object {
|
||||
@ -60,7 +60,7 @@ class CordaPersistence(
|
||||
private val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
|
||||
val hibernateConfig: HibernateConfiguration by lazy {
|
||||
transaction {
|
||||
HibernateConfiguration(schemas, databaseConfig, attributeConverters)
|
||||
HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl)
|
||||
}
|
||||
}
|
||||
val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas
|
||||
@ -84,6 +84,7 @@ class CordaPersistence(
|
||||
transaction {
|
||||
check(!connection.metaData.isReadOnly) { "Database should not be readonly." }
|
||||
checkCorrectAttachmentsContractsTableName(connection)
|
||||
checkCorrectCheckpointTypeOnPostgres(connection)
|
||||
}
|
||||
}
|
||||
|
||||
@ -272,7 +273,7 @@ private fun Throwable.hasSQLExceptionCause(): Boolean =
|
||||
|
||||
class CouldNotCreateDataSourceException(override val message: String?, override val cause: Throwable? = null) : Exception()
|
||||
|
||||
class IncompatibleAttachmentsContractsTableName(override val message: String?, override val cause: Throwable? = null) : Exception()
|
||||
class DatabaseIncompatibleException(override val message: String?, override val cause: Throwable? = null) : Exception()
|
||||
|
||||
private fun checkCorrectAttachmentsContractsTableName(connection: Connection) {
|
||||
val correctName = "NODE_ATTACHMENTS_CONTRACTS"
|
||||
@ -282,7 +283,22 @@ private fun checkCorrectAttachmentsContractsTableName(connection: Connection) {
|
||||
fun warning(incorrectName: String, version: String) = "The database contains the older table name $incorrectName instead of $correctName, see upgrade notes to migrate from Corda database version $version https://docs.corda.net/head/upgrade-notes.html."
|
||||
|
||||
if (!connection.metaData.getTables(null, null, correctName, null).next()) {
|
||||
if (connection.metaData.getTables(null, null, incorrectV30Name, null).next()) { throw IncompatibleAttachmentsContractsTableName(warning(incorrectV30Name, "3.0")) }
|
||||
if (connection.metaData.getTables(null, null, incorrectV31Name, null).next()) { throw IncompatibleAttachmentsContractsTableName(warning(incorrectV31Name, "3.1")) }
|
||||
if (connection.metaData.getTables(null, null, incorrectV30Name, null).next()) { throw DatabaseIncompatibleException(warning(incorrectV30Name, "3.0")) }
|
||||
if (connection.metaData.getTables(null, null, incorrectV31Name, null).next()) { throw DatabaseIncompatibleException(warning(incorrectV31Name, "3.1")) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkCorrectCheckpointTypeOnPostgres(connection: Connection) {
|
||||
val metaData = connection.metaData
|
||||
if (metaData.getDatabaseProductName() != "PostgreSQL") {
|
||||
return
|
||||
}
|
||||
|
||||
val result = metaData.getColumns(null, null, "node_checkpoints", "checkpoint_value")
|
||||
if (result.next()) {
|
||||
val type = result.getString("TYPE_NAME")
|
||||
if (type != "bytea") {
|
||||
throw DatabaseIncompatibleException("The type of the 'checkpoint_value' table must be 'bytea', but 'oid' was found. See upgrade notes to migrate from Corda database version 3.1 https://docs.corda.net/head/upgrade-notes.html.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,17 +6,17 @@ import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.toHexString
|
||||
import org.hibernate.SessionFactory
|
||||
import org.hibernate.boot.Metadata
|
||||
import org.hibernate.boot.MetadataBuilder
|
||||
import org.hibernate.boot.MetadataSources
|
||||
import org.hibernate.boot.model.naming.Identifier
|
||||
import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
|
||||
import org.hibernate.boot.registry.BootstrapServiceRegistryBuilder
|
||||
import org.hibernate.boot.registry.classloading.internal.ClassLoaderServiceImpl
|
||||
import org.hibernate.boot.registry.classloading.spi.ClassLoaderService
|
||||
import org.hibernate.cfg.Configuration
|
||||
import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider
|
||||
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment
|
||||
import org.hibernate.service.UnknownUnwrapTypeException
|
||||
import org.hibernate.type.AbstractSingleColumnStandardBasicType
|
||||
import org.hibernate.type.MaterializedBlobType
|
||||
import org.hibernate.type.descriptor.java.PrimitiveByteArrayTypeDescriptor
|
||||
import org.hibernate.type.descriptor.sql.BlobTypeDescriptor
|
||||
import org.hibernate.type.descriptor.sql.VarbinaryTypeDescriptor
|
||||
@ -29,10 +29,32 @@ class HibernateConfiguration(
|
||||
schemas: Set<MappedSchema>,
|
||||
private val databaseConfig: DatabaseConfig,
|
||||
private val attributeConverters: Collection<AttributeConverter<*, *>>,
|
||||
private val jdbcUrl: String,
|
||||
val cordappClassLoader: ClassLoader? = null
|
||||
) {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
|
||||
// register custom converters
|
||||
fun buildHibernateMetadata(metadataBuilder: MetadataBuilder, jdbcUrl:String, attributeConverters: Collection<AttributeConverter<*, *>>): Metadata {
|
||||
metadataBuilder.run {
|
||||
attributeConverters.forEach { applyAttributeConverter(it) }
|
||||
// Register a tweaked version of `org.hibernate.type.MaterializedBlobType` that truncates logged messages.
|
||||
// to avoid OOM when large blobs might get logged.
|
||||
applyBasicType(CordaMaterializedBlobType, CordaMaterializedBlobType.name)
|
||||
applyBasicType(CordaWrapperBinaryType, CordaWrapperBinaryType.name)
|
||||
|
||||
// Create a custom type that will map a blob to byteA in postgres and as a normal blob for all other dbms.
|
||||
// This is required for the Checkpoints as a workaround for the issue that postgres has on azure.
|
||||
if (jdbcUrl.contains(":postgresql:", ignoreCase = true)) {
|
||||
applyBasicType(MapBlobToPostgresByteA, MapBlobToPostgresByteA.name)
|
||||
} else {
|
||||
applyBasicType(MapBlobToNormalBlob, MapBlobToNormalBlob.name)
|
||||
}
|
||||
|
||||
return build()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val sessionFactories = Caffeine.newBuilder().maximumSize(databaseConfig.mappedSchemaCacheSize).build<Set<MappedSchema>, SessionFactory>()
|
||||
@ -62,7 +84,7 @@ class HibernateConfiguration(
|
||||
schema.mappedTypes.forEach { config.addAnnotatedClass(it) }
|
||||
}
|
||||
|
||||
val sessionFactory = buildSessionFactory(config, metadataSources, databaseConfig.serverNameTablePrefix, cordappClassLoader)
|
||||
val sessionFactory = buildSessionFactory(config, metadataSources, cordappClassLoader)
|
||||
logger.info("Created session factory for schemas: $schemas")
|
||||
|
||||
// export Hibernate JMX statistics
|
||||
@ -83,13 +105,12 @@ class HibernateConfiguration(
|
||||
|
||||
try {
|
||||
mbeanServer.registerMBean(statisticsMBean, statsName)
|
||||
}
|
||||
catch (e: Exception) {
|
||||
} catch (e: Exception) {
|
||||
logger.warn(e.message)
|
||||
}
|
||||
}
|
||||
|
||||
private fun buildSessionFactory(config: Configuration, metadataSources: MetadataSources, tablePrefix: String, cordappClassLoader: ClassLoader?): SessionFactory {
|
||||
private fun buildSessionFactory(config: Configuration, metadataSources: MetadataSources, cordappClassLoader: ClassLoader?): SessionFactory {
|
||||
config.standardServiceRegistryBuilder.applySettings(config.properties)
|
||||
|
||||
if (cordappClassLoader != null) {
|
||||
@ -98,22 +119,8 @@ class HibernateConfiguration(
|
||||
ClassLoaderServiceImpl(cordappClassLoader))
|
||||
}
|
||||
|
||||
val metadata = metadataSources.getMetadataBuilder(config.standardServiceRegistryBuilder.build()).run {
|
||||
applyPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() {
|
||||
override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier {
|
||||
val default = super.toPhysicalTableName(name, context)
|
||||
return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted)
|
||||
}
|
||||
})
|
||||
// register custom converters
|
||||
attributeConverters.forEach { applyAttributeConverter(it) }
|
||||
// Register a tweaked version of `org.hibernate.type.MaterializedBlobType` that truncates logged messages.
|
||||
// to avoid OOM when large blobs might get logged.
|
||||
applyBasicType(CordaMaterializedBlobType, CordaMaterializedBlobType.name)
|
||||
applyBasicType(CordaWrapperBinaryType, CordaWrapperBinaryType.name)
|
||||
build()
|
||||
}
|
||||
|
||||
val metadataBuilder = metadataSources.getMetadataBuilder(config.standardServiceRegistryBuilder.build())
|
||||
val metadata = buildHibernateMetadata(metadataBuilder, jdbcUrl, attributeConverters)
|
||||
return metadata.sessionFactoryBuilder.run {
|
||||
allowOutOfTransactionUpdateOperations(true)
|
||||
applySecondLevelCacheSupport(false)
|
||||
@ -148,7 +155,7 @@ class HibernateConfiguration(
|
||||
}
|
||||
|
||||
// A tweaked version of `org.hibernate.type.MaterializedBlobType` that truncates logged messages. Also logs in hex.
|
||||
private object CordaMaterializedBlobType : AbstractSingleColumnStandardBasicType<ByteArray>(BlobTypeDescriptor.DEFAULT, CordaPrimitiveByteArrayTypeDescriptor) {
|
||||
object CordaMaterializedBlobType : AbstractSingleColumnStandardBasicType<ByteArray>(BlobTypeDescriptor.DEFAULT, CordaPrimitiveByteArrayTypeDescriptor) {
|
||||
override fun getName(): String {
|
||||
return "materialized_blob"
|
||||
}
|
||||
@ -172,7 +179,7 @@ class HibernateConfiguration(
|
||||
}
|
||||
|
||||
// A tweaked version of `org.hibernate.type.WrapperBinaryType` that deals with ByteArray (java primitive byte[] type).
|
||||
private object CordaWrapperBinaryType : AbstractSingleColumnStandardBasicType<ByteArray>(VarbinaryTypeDescriptor.INSTANCE, PrimitiveByteArrayTypeDescriptor.INSTANCE) {
|
||||
object CordaWrapperBinaryType : AbstractSingleColumnStandardBasicType<ByteArray>(VarbinaryTypeDescriptor.INSTANCE, PrimitiveByteArrayTypeDescriptor.INSTANCE) {
|
||||
override fun getRegistrationKeys(): Array<String> {
|
||||
return arrayOf(name, "ByteArray", ByteArray::class.java.name)
|
||||
}
|
||||
@ -181,4 +188,21 @@ class HibernateConfiguration(
|
||||
return "corda-wrapper-binary"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Maps to a byte array on postgres.
|
||||
object MapBlobToPostgresByteA : AbstractSingleColumnStandardBasicType<ByteArray>(VarbinaryTypeDescriptor.INSTANCE, PrimitiveByteArrayTypeDescriptor.INSTANCE) {
|
||||
override fun getRegistrationKeys(): Array<String> {
|
||||
return arrayOf(name, "ByteArray", ByteArray::class.java.name)
|
||||
}
|
||||
|
||||
override fun getName(): String {
|
||||
return "corda-blob"
|
||||
}
|
||||
}
|
||||
|
||||
object MapBlobToNormalBlob : MaterializedBlobType() {
|
||||
override fun getName(): String {
|
||||
return "corda-blob"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user