diff --git a/node/build.gradle b/node/build.gradle index 70a435caf8..d9e10b1a9d 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -144,10 +144,6 @@ dependencies { // For H2 database support in persistence compile "com.h2database:h2:$h2_version" - // Exposed: Kotlin SQL library - under evaluation - // TODO: Upgrade to Exposed 0.7 (has API changes) - compile "org.jetbrains.exposed:exposed:0.5.0" - // SQL connection pooling library compile "com.zaxxer:HikariCP:2.5.1" diff --git a/node/src/integration-test/kotlin/net/corda/node/utilities/JDBCHashMapTestSuite.kt b/node/src/integration-test/kotlin/net/corda/node/utilities/JDBCHashMapTestSuite.kt deleted file mode 100644 index 479c400068..0000000000 --- a/node/src/integration-test/kotlin/net/corda/node/utilities/JDBCHashMapTestSuite.kt +++ /dev/null @@ -1,257 +0,0 @@ -package net.corda.node.utilities - -import com.google.common.collect.testing.MapTestSuiteBuilder -import com.google.common.collect.testing.SetTestSuiteBuilder -import com.google.common.collect.testing.TestStringMapGenerator -import com.google.common.collect.testing.TestStringSetGenerator -import com.google.common.collect.testing.features.CollectionFeature -import com.google.common.collect.testing.features.CollectionSize -import com.google.common.collect.testing.features.MapFeature -import com.google.common.collect.testing.features.SetFeature -import com.google.common.collect.testing.testers.* -import junit.framework.TestSuite -import net.corda.testing.* -import net.corda.testing.node.makeTestDataSourceProperties -import net.corda.testing.node.makeTestDatabaseProperties -import org.assertj.core.api.Assertions.assertThat -import org.junit.* -import org.junit.runner.RunWith -import org.junit.runners.Suite -import java.util.* - -@RunWith(Suite::class) -@Suite.SuiteClasses( - JDBCHashMapTestSuite.MapLoadOnInitFalse::class, - JDBCHashMapTestSuite.MapLoadOnInitTrue::class, - JDBCHashMapTestSuite.MapConstrained::class, - JDBCHashMapTestSuite.SetLoadOnInitFalse::class, - JDBCHashMapTestSuite.SetLoadOnInitTrue::class, - JDBCHashMapTestSuite.SetConstrained::class) -class JDBCHashMapTestSuite { - companion object { - lateinit var transaction: DatabaseTransaction - lateinit var database: CordaPersistence - lateinit var loadOnInitFalseMap: JDBCHashMap - lateinit var memoryConstrainedMap: JDBCHashMap - lateinit var loadOnInitTrueMap: JDBCHashMap - lateinit var loadOnInitFalseSet: JDBCHashSet - lateinit var memoryConstrainedSet: JDBCHashSet - lateinit var loadOnInitTrueSet: JDBCHashSet - - @JvmStatic - @BeforeClass - fun before() { - initialiseTestSerialization() - database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), createIdentityService = { throw UnsupportedOperationException("Identity Service should not be in use") }) - setUpDatabaseTx() - loadOnInitFalseMap = JDBCHashMap("test_map_false", loadOnInit = false) - memoryConstrainedMap = JDBCHashMap("test_map_constrained", loadOnInit = false, maxBuckets = 1) - loadOnInitTrueMap = JDBCHashMap("test_map_true", loadOnInit = true) - loadOnInitFalseSet = JDBCHashSet("test_set_false", loadOnInit = false) - memoryConstrainedSet = JDBCHashSet("test_set_constrained", loadOnInit = false, maxBuckets = 1) - loadOnInitTrueSet = JDBCHashSet("test_set_true", loadOnInit = true) - } - - @JvmStatic - @AfterClass - fun after() { - closeDatabaseTx() - database.close() - resetTestSerialization() - } - - @JvmStatic - fun createMapTestSuite(loadOnInit: Boolean, constrained: Boolean): TestSuite = MapTestSuiteBuilder - .using(JDBCHashMapTestGenerator(loadOnInit = loadOnInit, constrained = constrained)) - .named("test JDBCHashMap with loadOnInit=$loadOnInit") - .withFeatures( - CollectionSize.ANY, - MapFeature.ALLOWS_ANY_NULL_QUERIES, - MapFeature.GENERAL_PURPOSE, - CollectionFeature.SUPPORTS_ITERATOR_REMOVE, - CollectionFeature.KNOWN_ORDER - ) - // putAll(null) not supported by Kotlin MutableMap interface - .suppressing(MapPutAllTester::class.java.getMethod("testPutAll_nullCollectionReference")) - // We suppress the following because of NotReallyMutableEntry - .suppressing(MapReplaceAllTester::class.java.getMethod("testReplaceAllPreservesOrder")) - .suppressing(MapReplaceAllTester::class.java.getMethod("testReplaceAllRotate")) - .suppressing(MapEntrySetTester::class.java.getMethod("testSetValue")) - .createTestSuite() - - @JvmStatic - fun createSetTestSuite(loadOnInit: Boolean, constrained: Boolean): TestSuite = SetTestSuiteBuilder - .using(JDBCHashSetTestGenerator(loadOnInit = loadOnInit, constrained = constrained)) - .named("test JDBCHashSet with loadOnInit=$loadOnInit") - .withFeatures( - CollectionSize.ANY, - SetFeature.GENERAL_PURPOSE, - CollectionFeature.SUPPORTS_ITERATOR_REMOVE, - CollectionFeature.KNOWN_ORDER - ) - // add/remove/retainAll(null) not supported by Kotlin MutableSet interface - .suppressing(CollectionAddAllTester::class.java.getMethod("testAddAll_nullCollectionReference")) - .suppressing(CollectionAddAllTester::class.java.getMethod("testAddAll_nullUnsupported")) - .suppressing(CollectionAddTester::class.java.getMethod("testAdd_nullUnsupported")) - .suppressing(CollectionCreationTester::class.java.getMethod("testCreateWithNull_unsupported")) - .suppressing(CollectionRemoveAllTester::class.java.getMethod("testRemoveAll_nullCollectionReferenceNonEmptySubject")) - .suppressing(CollectionRemoveAllTester::class.java.getMethod("testRemoveAll_nullCollectionReferenceEmptySubject")) - .suppressing(CollectionRetainAllTester::class.java.getMethod("testRetainAll_nullCollectionReferenceNonEmptySubject")) - .suppressing(CollectionRetainAllTester::class.java.getMethod("testRetainAll_nullCollectionReferenceEmptySubject")) - .createTestSuite() - - private fun setUpDatabaseTx() { - transaction = DatabaseTransactionManager.currentOrNew() - } - - private fun closeDatabaseTx() { - transaction.commit() - transaction.close() - } - } - - /** - * Guava test suite generator for JDBCHashMap(loadOnInit=false, constrained = false). - */ - class MapLoadOnInitFalse { - companion object { - @JvmStatic - fun suite(): TestSuite = createMapTestSuite(false, false) - } - } - - /** - * Guava test suite generator for JDBCHashMap(loadOnInit=false, constrained = true). - */ - class MapConstrained { - companion object { - @JvmStatic - fun suite(): TestSuite = createMapTestSuite(false, true) - } - } - - /** - * Guava test suite generator for JDBCHashMap(loadOnInit=true, constrained = false). - */ - class MapLoadOnInitTrue { - companion object { - @JvmStatic - fun suite(): TestSuite = createMapTestSuite(true, false) - } - } - - /** - * Generator of map instances needed for testing. - */ - class JDBCHashMapTestGenerator(val loadOnInit: Boolean, val constrained: Boolean) : TestStringMapGenerator() { - override fun create(elements: Array>): Map { - val map = if (loadOnInit) loadOnInitTrueMap else if (constrained) memoryConstrainedMap else loadOnInitFalseMap - map.clear() - map.putAll(elements.associate { Pair(it.key, it.value) }) - return map - } - } - - /** - * Guava test suite generator for JDBCHashSet(loadOnInit=false, constrained = false). - */ - class SetLoadOnInitFalse { - companion object { - @JvmStatic - fun suite(): TestSuite = createSetTestSuite(false, false) - } - } - - /** - * Guava test suite generator for JDBCHashSet(loadOnInit=false, constrained = true). - */ - class SetConstrained { - companion object { - @JvmStatic - fun suite(): TestSuite = createSetTestSuite(false, true) - } - } - - /** - * Guava test suite generator for JDBCHashSet(loadOnInit=true, constrained = false). - */ - class SetLoadOnInitTrue { - companion object { - @JvmStatic - fun suite(): TestSuite = createSetTestSuite(true, false) - } - } - - /** - * Generator of set instances needed for testing. - */ - class JDBCHashSetTestGenerator(val loadOnInit: Boolean, val constrained: Boolean) : TestStringSetGenerator() { - override fun create(elements: Array): Set { - val set = if (loadOnInit) loadOnInitTrueSet else if (constrained) memoryConstrainedSet else loadOnInitFalseSet - set.clear() - set.addAll(elements) - return set - } - } - - /** - * Test that the contents of a map can be reloaded from the database. - * - * If the Map reloads, then so will the Set as it just delegates. - */ - class MapCanBeReloaded : TestDependencyInjectionBase() { - private val ops = listOf(Triple(AddOrRemove.ADD, "A", "1"), - Triple(AddOrRemove.ADD, "B", "2"), - Triple(AddOrRemove.ADD, "C", "3"), - Triple(AddOrRemove.ADD, "D", "4"), - Triple(AddOrRemove.ADD, "E", "5"), - Triple(AddOrRemove.REMOVE, "A", "6"), - Triple(AddOrRemove.ADD, "G", "7"), - Triple(AddOrRemove.ADD, "H", "8"), - Triple(AddOrRemove.REMOVE, "D", "9"), - Triple(AddOrRemove.ADD, "C", "10")) - - private fun applyOpsToMap(map: MutableMap): MutableMap { - for (op in ops) { - if (op.first == AddOrRemove.ADD) { - map[op.second] = op.third - } else { - map.remove(op.second) - } - } - return map - } - - private val transientMapForComparison = applyOpsToMap(LinkedHashMap()) - - lateinit var database: CordaPersistence - - @Before - fun before() { - database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), createIdentityService = { throw UnsupportedOperationException("Identity Service should not be in use") }) - } - - @After - fun after() { - database.close() - } - - @Test - fun `fill map and check content after reconstruction`() { - database.transaction { - val persistentMap = JDBCHashMap("the_table") - // Populate map the first time. - applyOpsToMap(persistentMap) - assertThat(persistentMap.entries).containsExactly(*transientMapForComparison.entries.toTypedArray()) - } - database.transaction { - val persistentMap = JDBCHashMap("the_table", loadOnInit = false) - assertThat(persistentMap.entries).containsExactly(*transientMapForComparison.entries.toTypedArray()) - } - database.transaction { - val persistentMap = JDBCHashMap("the_table", loadOnInit = true) - assertThat(persistentMap.entries).containsExactly(*transientMapForComparison.entries.toTypedArray()) - } - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 434ddba409..f75f050837 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -541,7 +541,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, this.database = configureDatabase(props, configuration.database, { _services.schemaService }, createIdentityService = { _services.identityService }) // Now log the vendor string as this will also cause a connection to be tested eagerly. database.transaction { - log.info("Connected to ${database.database.vendor} database.") + log.info("Connected to ${database.dataSource.connection.metaData.databaseProductName} database.") } this.database::close.let { dbCloser = it diff --git a/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt b/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt index fc168e80b2..58fbe944a1 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt @@ -9,8 +9,8 @@ import net.corda.core.schemas.PersistentStateRef import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.node.services.database.HibernateConfiguration +import net.corda.node.utilities.DatabaseTransactionManager import org.hibernate.FlushMode -import org.jetbrains.exposed.sql.transactions.TransactionManager import rx.Observable /** @@ -40,7 +40,7 @@ class HibernateObserver(vaultUpdates: Observable>, v fun persistStateWithSchema(state: ContractState, stateRef: StateRef, schema: MappedSchema) { val sessionFactory = config.sessionFactoryForSchema(schema) val session = sessionFactory.withOptions(). - connection(TransactionManager.current().connection). + connection(DatabaseTransactionManager.current().connection). flushMode(FlushMode.MANUAL). openSession() session.use { diff --git a/node/src/main/kotlin/net/corda/node/services/vault/HibernateVaultQueryImpl.kt b/node/src/main/kotlin/net/corda/node/services/vault/HibernateVaultQueryImpl.kt index 55cd451b38..b6cfebe4cd 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/HibernateVaultQueryImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/HibernateVaultQueryImpl.kt @@ -21,8 +21,8 @@ import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import net.corda.node.services.database.HibernateConfiguration +import net.corda.node.utilities.DatabaseTransactionManager import org.hibernate.Session -import org.jetbrains.exposed.sql.transactions.TransactionManager import rx.Observable import java.lang.Exception import java.util.* @@ -157,7 +157,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration, private fun getSession(): Session { return sessionFactory.withOptions(). - connection(TransactionManager.current().connection). + connection(DatabaseTransactionManager.current().connection). openSession() } diff --git a/node/src/main/kotlin/net/corda/node/utilities/CordaPersistence.kt b/node/src/main/kotlin/net/corda/node/utilities/CordaPersistence.kt index e4563ebaa0..b8add3a202 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/CordaPersistence.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/CordaPersistence.kt @@ -7,7 +7,6 @@ import net.corda.node.services.api.SchemaService import net.corda.node.services.database.HibernateConfiguration import net.corda.node.services.schema.NodeSchemaService import org.hibernate.SessionFactory -import org.jetbrains.exposed.sql.Database import rx.Observable import rx.Subscriber @@ -18,13 +17,14 @@ import java.sql.SQLException import java.util.* import java.util.concurrent.CopyOnWriteArrayList +/** + * Table prefix for all tables owned by the node module. + */ +const val NODE_DATABASE_PREFIX = "node_" //HikariDataSource implements Closeable which allows CordaPersistence to be Closeable class CordaPersistence(var dataSource: HikariDataSource, private var createSchemaService: () -> SchemaService, private val createIdentityService: ()-> IdentityService, databaseProperties: Properties): Closeable { - - /** Holds Exposed database, the field will be removed once Exposed library is removed */ - lateinit var database: Database var transactionIsolationLevel = parserTransactionIsolationLevel(databaseProperties.getProperty("transactionIsolationLevel")) val hibernateConfig: HibernateConfiguration by lazy { @@ -112,10 +112,6 @@ fun configureDatabase(dataSourceProperties: Properties, databaseProperties: Prop val dataSource = HikariDataSource(config) val persistence = CordaPersistence.connect(dataSource, createSchemaService, createIdentityService, databaseProperties ?: Properties()) - //org.jetbrains.exposed.sql.Database will be removed once Exposed library is removed - val database = Database.connect(dataSource) { _ -> ExposedTransactionManager() } - persistence.database = database - // Check not in read-only mode. persistence.transaction { persistence.dataSource.connection.use { diff --git a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt b/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt deleted file mode 100644 index 028117015c..0000000000 --- a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt +++ /dev/null @@ -1,209 +0,0 @@ -package net.corda.node.utilities - -import net.corda.core.crypto.SecureHash -import net.corda.core.crypto.parsePublicKeyBase58 -import net.corda.core.crypto.toBase58String -import org.bouncycastle.cert.X509CertificateHolder -import org.h2.jdbc.JdbcBlob -import org.jetbrains.exposed.sql.* -import java.io.ByteArrayInputStream -import java.security.PublicKey -import java.security.cert.CertPath -import java.security.cert.CertificateFactory -import java.time.Instant -import java.time.LocalDate -import java.time.LocalDateTime -import java.time.ZoneOffset -import java.util.* - -/** - * Table prefix for all tables owned by the node module. - */ -const val NODE_DATABASE_PREFIX = "node_" - -// Composite columns for use with below Exposed helpers. -data class PartyColumns(val name: Column, val owningKey: Column) -data class PartyAndCertificateColumns(val name: Column, val owningKey: Column, - val certificate: Column, val certPath: Column) -data class StateRefColumns(val txId: Column, val index: Column) -data class TxnNoteColumns(val txId: Column, val note: Column) - -/** - * [Table] column helpers for use with Exposed, as per [varchar] etc. - */ -fun Table.certificate(name: String) = this.registerColumn(name, X509CertificateColumnType) -fun Table.certificatePath(name: String) = this.registerColumn(name, CertPathColumnType) -fun Table.publicKey(name: String) = this.registerColumn(name, PublicKeyColumnType) -fun Table.secureHash(name: String) = this.registerColumn(name, SecureHashColumnType) -fun Table.party(nameColumnName: String, - keyColumnName: String) = PartyColumns(this.varchar(nameColumnName, length = 255), this.publicKey(keyColumnName)) -fun Table.partyAndCertificate(nameColumnName: String, - keyColumnName: String, - certificateColumnName: String, - pathColumnName: String) = PartyAndCertificateColumns(this.varchar(nameColumnName, length = 255), this.publicKey(keyColumnName), - this.certificate(certificateColumnName), this.certificatePath(pathColumnName)) -fun Table.uuidString(name: String) = this.registerColumn(name, UUIDStringColumnType) -fun Table.localDate(name: String) = this.registerColumn(name, LocalDateColumnType) -fun Table.localDateTime(name: String) = this.registerColumn(name, LocalDateTimeColumnType) -fun Table.instant(name: String) = this.registerColumn(name, InstantColumnType) -fun Table.stateRef(txIdColumnName: String, indexColumnName: String) = StateRefColumns(this.secureHash(txIdColumnName), this.integer(indexColumnName)) -fun Table.txnNote(txIdColumnName: String, txnNoteColumnName: String) = TxnNoteColumns(this.secureHash(txIdColumnName), this.text(txnNoteColumnName)) - -/** - * [ColumnType] for marshalling to/from database on behalf of [X509CertificateHolder]. - */ -object X509CertificateColumnType : ColumnType() { - override fun sqlType(): String = "BLOB" - - override fun valueFromDB(value: Any): Any { - val blob = value as JdbcBlob - return X509CertificateHolder(blob.getBytes(0, blob.length().toInt())) - } - - override fun notNullValueToDB(value: Any): Any = (value as X509CertificateHolder).encoded -} - -/** - * [ColumnType] for marshalling to/from database on behalf of [CertPath]. - */ -object CertPathColumnType : ColumnType() { - private val factory = CertificateFactory.getInstance("X.509") - override fun sqlType(): String = "BLOB" - - override fun valueFromDB(value: Any): Any { - val blob = value as JdbcBlob - return factory.generateCertPath(ByteArrayInputStream(blob.getBytes(0, blob.length().toInt()))) - } - - override fun notNullValueToDB(value: Any): Any = (value as CertPath).encoded -} - -/** - * [ColumnType] for marshalling to/from database on behalf of [PublicKey]. - */ -// TODO Rethink how we store CompositeKeys in db. Currently they are stored as Base58 strings and as we don't know the size -// of a CompositeKey they could be CLOB fields. Given the time to fetch these types and that they are unsuitable as table keys, -// having a shorter primary key (such as SHA256 hash or a UUID generated on demand) that references a common composite key table may make more sense. -object PublicKeyColumnType : ColumnType() { - override fun sqlType(): String = "VARCHAR" - - override fun valueFromDB(value: Any): Any = parsePublicKeyBase58(value.toString()) - - override fun notNullValueToDB(value: Any): Any = (value as? PublicKey)?.toBase58String() ?: value -} - -/** - * [ColumnType] for marshalling to/from database on behalf of [SecureHash]. - */ -object SecureHashColumnType : ColumnType() { - override fun sqlType(): String = "VARCHAR(64)" - - override fun valueFromDB(value: Any): Any = SecureHash.parse(value.toString()) - - override fun notNullValueToDB(value: Any): Any = (value as? SecureHash)?.toString() ?: value -} - -/** - * [ColumnType] for marshalling to/from database on behalf of [UUID], always using a string representation. - */ -object UUIDStringColumnType : ColumnType() { - override fun sqlType(): String = "VARCHAR(36)" - - override fun valueFromDB(value: Any): Any = UUID.fromString(value.toString()) - - override fun notNullValueToDB(value: Any): Any = (value as? UUID)?.toString() ?: value -} - -/** - * [ColumnType] for marshalling to/from database on behalf of [java.time.LocalDate]. - */ -object LocalDateColumnType : ColumnType() { - override fun sqlType(): String = "DATE" - - override fun nonNullValueToString(value: Any): String { - if (value is String) return value - - val localDate = when (value) { - is LocalDate -> value - is java.sql.Date -> value.toLocalDate() - is java.sql.Timestamp -> value.toLocalDateTime().toLocalDate() - else -> error("Unexpected value: $value") - } - return "'$localDate'" - } - - override fun valueFromDB(value: Any): Any = when (value) { - is java.sql.Date -> value.toLocalDate() - is java.sql.Timestamp -> value.toLocalDateTime().toLocalDate() - is Long -> LocalDate.from(Instant.ofEpochMilli(value)) - else -> value - } - - override fun notNullValueToDB(value: Any): Any = if (value is LocalDate) { - java.sql.Date(value.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli()) - } else value -} - - -/** - * [ColumnType] for marshalling to/from database on behalf of [java.time.LocalDateTime]. - */ -object LocalDateTimeColumnType : ColumnType() { - private val sqlType = DateColumnType(time = true).sqlType() - override fun sqlType(): String = sqlType - - override fun nonNullValueToString(value: Any): String { - if (value is String) return value - - val localDateTime = when (value) { - is LocalDateTime -> value - is java.sql.Date -> value.toLocalDate().atStartOfDay() - is java.sql.Timestamp -> value.toLocalDateTime() - else -> error("Unexpected value: $value") - } - return "'$localDateTime'" - } - - override fun valueFromDB(value: Any): Any = when (value) { - is java.sql.Date -> value.toLocalDate().atStartOfDay() - is java.sql.Timestamp -> value.toLocalDateTime() - is Long -> LocalDateTime.from(Instant.ofEpochMilli(value)) - else -> value - } - - override fun notNullValueToDB(value: Any): Any = if (value is LocalDateTime) { - java.sql.Timestamp(value.toInstant(ZoneOffset.UTC).toEpochMilli()) - } else value -} - - -/** - * [ColumnType] for marshalling to/from database on behalf of [java.time.Instant]. - */ -object InstantColumnType : ColumnType() { - private val sqlType = DateColumnType(time = true).sqlType() - override fun sqlType(): String = sqlType - - override fun nonNullValueToString(value: Any): String { - if (value is String) return value - - val localDateTime = when (value) { - is Instant -> value - is java.sql.Date -> value.toLocalDate().atStartOfDay().toInstant(ZoneOffset.UTC) - is java.sql.Timestamp -> value.toLocalDateTime().toInstant(ZoneOffset.UTC) - else -> error("Unexpected value: $value") - } - return "'$localDateTime'" - } - - override fun valueFromDB(value: Any): Any = when (value) { - is java.sql.Date -> value.toLocalDate().atStartOfDay().toInstant(ZoneOffset.UTC) - is java.sql.Timestamp -> value.toLocalDateTime().toInstant(ZoneOffset.UTC) - is Long -> LocalDateTime.from(Instant.ofEpochMilli(value)).toInstant(ZoneOffset.UTC) - else -> value - } - - override fun notNullValueToDB(value: Any): Any = if (value is Instant) { - java.sql.Timestamp(value.toEpochMilli()) - } else value -} diff --git a/node/src/main/kotlin/net/corda/node/utilities/ExposedTransactionManager.kt b/node/src/main/kotlin/net/corda/node/utilities/ExposedTransactionManager.kt deleted file mode 100644 index 1d90449c35..0000000000 --- a/node/src/main/kotlin/net/corda/node/utilities/ExposedTransactionManager.kt +++ /dev/null @@ -1,54 +0,0 @@ -package net.corda.node.utilities - -import org.jetbrains.exposed.sql.Database -import org.jetbrains.exposed.sql.Transaction -import org.jetbrains.exposed.sql.transactions.TransactionInterface -import org.jetbrains.exposed.sql.transactions.TransactionManager -import java.sql.Connection - -/** - * Wrapper of [DatabaseTransaction], because the class is effectively used for [ExposedTransaction.connection] method only not all methods are implemented. - * The class will obsolete when Exposed library is phased out. - */ -class ExposedTransaction(override val db: Database, val databaseTransaction: DatabaseTransaction): TransactionInterface { - - override val outerTransaction: Transaction? - get() = throw UnsupportedOperationException() - - override val connection: Connection by lazy(LazyThreadSafetyMode.NONE) { - databaseTransaction.connection - } - - override fun commit() { - databaseTransaction.commit() - } - - override fun rollback() { - databaseTransaction.rollback() - } - - override fun close() { - databaseTransaction.close() - } -} - -/** - * Delegates methods to [DatabaseTransactionManager]. - * The class will obsolete when Exposed library is phased out. - */ -class ExposedTransactionManager: TransactionManager { - companion object { - val database: Database - get() = DatabaseTransactionManager.dataSource.database - } - - override fun newTransaction(isolation: Int): Transaction { - var databaseTransaction = DatabaseTransactionManager.newTransaction(isolation) - return Transaction(ExposedTransaction(database, databaseTransaction)) - } - - override fun currentOrNull(): Transaction? { - val databaseTransaction = DatabaseTransactionManager.currentOrNull() - return if (databaseTransaction != null) Transaction(ExposedTransaction(database, databaseTransaction)) else null - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/utilities/JDBCHashMap.kt b/node/src/main/kotlin/net/corda/node/utilities/JDBCHashMap.kt deleted file mode 100644 index 5b193ec2e8..0000000000 --- a/node/src/main/kotlin/net/corda/node/utilities/JDBCHashMap.kt +++ /dev/null @@ -1,508 +0,0 @@ -package net.corda.node.utilities - -import net.corda.core.serialization.SerializationDefaults.STORAGE_CONTEXT -import net.corda.core.serialization.SerializedBytes -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize -import net.corda.core.utilities.loggerFor -import net.corda.core.utilities.trace -import org.jetbrains.exposed.sql.* -import org.jetbrains.exposed.sql.statements.InsertStatement -import java.sql.Blob -import java.util.* -import kotlin.system.measureTimeMillis - -/** - * The classes in this file provide a convenient way to quickly implement persistence for map- and set-like - * collections of data. These might not be sufficient for the eventual implementations of persistence dependent on - * access patterns and performance requirements. - */ - -/** - * The default maximum size of the LRU cache. - * Current computation is linear to max heap size, ensuring a minimum of 256 buckets. - * - * TODO: make this value configurable - * TODO: tune this value, as it's currently mostly a guess - */ -val DEFAULT_MAX_BUCKETS = (256 * (1 + Math.max(0, (Runtime.getRuntime().maxMemory() / 1000000 - 128) / 64))).toInt() - -/** - * A convenient JDBC table backed hash map with iteration order based on insertion order. - * See [AbstractJDBCHashMap] for further implementation details. - * - * In this subclass, keys and values are represented by Blobs of Kryo serialized forms of the key and value objects. - * If you can extend [AbstractJDBCHashMap] and implement less Kryo dependent key and/or value mappings then that is - * likely preferrable. - */ -class JDBCHashMap(tableName: String, - loadOnInit: Boolean = false, - maxBuckets: Int = DEFAULT_MAX_BUCKETS) - : AbstractJDBCHashMap(BlobMapTable(tableName), loadOnInit, maxBuckets) { - - class BlobMapTable(tableName: String) : JDBCHashedTable(tableName) { - val key = blob("key") - val value = blob("value") - } - - override fun keyFromRow(row: ResultRow): K = deserializeFromBlob(row[table.key]) - override fun valueFromRow(row: ResultRow): V = deserializeFromBlob(row[table.value]) - - override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { - insert[table.key] = serializeToBlob(entry.key, finalizables) - } - - override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { - insert[table.value] = serializeToBlob(entry.value, finalizables) - } - -} - -fun bytesToBlob(value: SerializedBytes<*>, finalizables: MutableList<() -> Unit>): Blob { - val blob = DatabaseTransactionManager.current().connection.createBlob() - finalizables += { blob.free() } - blob.setBytes(1, value.bytes) - return blob -} - -fun serializeToBlob(value: Any, finalizables: MutableList<() -> Unit>): Blob = bytesToBlob(value.serialize(context = STORAGE_CONTEXT), finalizables) - -fun bytesFromBlob(blob: Blob): SerializedBytes { - try { - return SerializedBytes(blob.getBytes(0, blob.length().toInt())) - } finally { - blob.free() - } -} - -@Suppress("UNCHECKED_CAST") -fun deserializeFromBlob(blob: Blob): T = bytesFromBlob(blob).deserialize(context = STORAGE_CONTEXT) as T - -/** - * A convenient JDBC table backed hash set with iteration order based on insertion order. - * See [AbstractJDBCHashSet] and [AbstractJDBCHashMap] for further implementation details. - * - * In this subclass, elements are represented by Blobs of Kryo serialized forms of the element objects. - * If you can extend [AbstractJDBCHashSet] and implement less Kryo dependent element mappings then that is - * likely preferrable. - */ -class JDBCHashSet(tableName: String, - loadOnInit: Boolean = false, - maxBuckets: Int = DEFAULT_MAX_BUCKETS) - : AbstractJDBCHashSet(BlobSetTable(tableName), loadOnInit, maxBuckets) { - - class BlobSetTable(tableName: String) : JDBCHashedTable(tableName) { - val key = blob("key") - } - - override fun elementFromRow(row: ResultRow): K = deserializeFromBlob(row[table.key]) - - override fun addElementToInsert(insert: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>) { - insert[table.key] = serializeToBlob(entry, finalizables) - } -} - -/** - * Base class for JDBC backed hash set that delegates to a JDBC backed hash map where the values are all - * [Unit] and not actually persisted. Iteration order is order of insertion. Iterators can remove(). - * - * See [AbstractJDBCHashMap] for implementation details. - */ -abstract class AbstractJDBCHashSet(protected val table: T, - loadOnInit: Boolean = false, - maxBuckets: Int = DEFAULT_MAX_BUCKETS) : MutableSet, AbstractSet() { - protected val innerMap = object : AbstractJDBCHashMap(table, loadOnInit, maxBuckets) { - override fun keyFromRow(row: ResultRow): K = this@AbstractJDBCHashSet.elementFromRow(row) - - // Return constant. - override fun valueFromRow(row: ResultRow) = Unit - - override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) = - this@AbstractJDBCHashSet.addElementToInsert(insert, entry.key, finalizables) - - // No op as not actually persisted. - override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { - } - - } - - override fun add(element: K): Boolean { - if (innerMap.containsKey(element)) { - return false - } else { - innerMap.put(element, Unit) - return true - } - } - - override fun clear() { - innerMap.clear() - } - - override fun iterator(): MutableIterator = innerMap.keys.iterator() - - override fun remove(element: K): Boolean = (innerMap.remove(element) != null) - - override val size: Int - get() = innerMap.size - - override fun contains(element: K): Boolean = innerMap.containsKey(element) - - override fun isEmpty(): Boolean = innerMap.isEmpty() - - /** - * Implementation should return the element object marshalled from the database table row. - * - * See example implementations in [JDBCHashSet]. - */ - protected abstract fun elementFromRow(row: ResultRow): K - - /** - * Implementation should marshall the element to the insert statement. - * - * If some cleanup is required after the insert statement is executed, such as closing a Blob, then add a closure - * to the finalizables to do so. - * - * See example implementations in [JDBCHashSet]. - */ - protected abstract fun addElementToInsert(insert: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>) -} - -/** - * A base class for a JDBC table backed hash map that iterates in insertion order by using - * an ever increasing sequence number on entries. Iterators supports remove() but entries are not really mutable and - * do not support setValue() method from [MutableMap.MutableEntry]. - * - * You should only use keys that have overridden [Object.hashCode] and that have a good hash code distribution. Beware - * changing the hashCode() implementation once objects have been persisted. A process to re-hash the entries persisted - * would be necessary if you do this. - * - * Subclasses must provide their own mapping to and from keys/values and the database table columns, but there are - * inherited columns that all tables must provide to support iteration order and hashing. - * - * The map operates in one of two modes. - * 1. loadOnInit=true where the entire table is loaded into memory in the constructor and all entries remain in memory, - * with only writes needing to perform database access. - * 2. loadOnInit=false where all entries with the same key hash code are loaded from the database on demand when accessed - * via any method other than via keys/values/entries properties, and thus the whole map is not loaded into memory. The number - * of entries retained in memory is controlled indirectly by an LRU algorithm (courtesy of [LinkedHashMap]) and a maximum - * number of hash "buckets", where one bucket represents all entries with the same hash code. There is a default value - * for maximum buckets. - * - * All operations require a [transaction] to be started. - * - * The keys/values/entries collections are really designed just for iterating and other uses might turn out to be - * costly in terms of performance. Beware when loadOnInit=true, the iterator first sorts the entries which could be - * costly too. - * - * This class is *not* thread safe. - * - * TODO: consider caching size once calculated for the first time. - * TODO: buckets just use a list and so are vulnerable to poor hash code implementations with collisions. - * TODO: if iterators are used extensively when loadOnInit=true, consider maintaining a collection of keys in iteration order to avoid sorting each time. - * TODO: revisit whether we need the loadOnInit==true functionality and remove if not. - */ -abstract class AbstractJDBCHashMap(val table: T, - val loadOnInit: Boolean = false, - val maxBuckets: Int = DEFAULT_MAX_BUCKETS) : MutableMap, AbstractMap() { - - companion object { - protected val log = loggerFor>() - - private const val INITIAL_CAPACITY: Int = 16 - private const val LOAD_FACTOR: Float = 0.75f - } - - // Hash code -> entries mapping. - // When loadOnInit = false, size will be limited to maxBuckets entries (which are hash buckets) and map maintains access order rather than insertion order. - private val buckets = object : LinkedHashMap>>(INITIAL_CAPACITY, LOAD_FACTOR, !loadOnInit) { - override fun removeEldestEntry(eldest: MutableMap.MutableEntry>>?): Boolean { - return !loadOnInit && size > maxBuckets - } - } - - init { - check(maxBuckets > 0) { "The maximum number of buckets to retain in memory must be a positive integer." } - // TODO: Move this to schema version managment tool. - createTablesIfNecessary() - if (loadOnInit) { - log.trace { "Loading all entries on init for ${table.tableName}" } - val elapsedMillis = measureTimeMillis { - // load from database. - table.selectAll().map { - val entry = createEntry(it) - val bucket = getBucket(entry.key) - bucket.add(entry) - } - } - log.trace { "Loaded $size entries on init for ${table.tableName} in $elapsedMillis millis." } - } - } - - private fun createTablesIfNecessary() { - SchemaUtils.create(table) - } - - override fun isEmpty(): Boolean { - for (bucket in buckets.values) { - if (!bucket.isEmpty()) { - return false - } - } - return size == 0 - } - - override fun remove(key: K): V? { - val bucket = getBucket(key) - var removed: V? = null - buckets.computeIfPresent(key.hashCode()) { _, value -> - for (entry in value) { - if (entry.key == key) { - removed = entry.value - bucket.remove(entry) - deleteRecord(entry) - break - } - } - value - } - return removed - } - - override fun containsKey(key: K): Boolean = (get(key) != null) - - // We haven't implemented setValue. We could implement if necessary. - // Make sure to remove the relevant suppressed tests in JDBCHashMapTestSuite.createMapTestSuite if this is implemented. - private class NotReallyMutableEntry(key: K, value: V, val seqNo: Int) : AbstractMap.SimpleImmutableEntry(key, value), MutableMap.MutableEntry { - override fun setValue(newValue: V): V { - throw UnsupportedOperationException("Not really mutable. Implement if really required.") - } - } - - private inner class EntryIterator : MutableIterator> { - private val iterator = if (loadOnInit) { - buckets.values.flatten().sortedBy { it.seqNo }.iterator() - } else { - // This uses a Sequence to make the mapping lazy. - table.selectAll().orderBy(table.seqNo).asSequence().map { - val bucket = buckets[it[table.keyHash]] - if (bucket != null) { - val seqNo = it[table.seqNo] - for (entry in bucket) { - if (entry.seqNo == seqNo) { - return@map entry - } - } - } - return@map createEntry(it) - }.iterator() - } - private var current: MutableMap.MutableEntry? = null - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): MutableMap.MutableEntry { - val extractedNext = iterator.next() - current = extractedNext - return extractedNext - } - - override fun remove() { - val savedCurrent = current ?: throw IllegalStateException("Not called next() yet or already removed.") - current = null - remove(savedCurrent.key) - } - } - - override val keys: MutableSet get() { - return object : AbstractSet() { - override val size: Int get() = this@AbstractJDBCHashMap.size - override fun iterator(): MutableIterator { - return object : MutableIterator { - private val entryIterator = EntryIterator() - - override fun hasNext(): Boolean = entryIterator.hasNext() - override fun next(): K = entryIterator.next().key - override fun remove() { - entryIterator.remove() - } - } - } - } - } - - override val values: MutableCollection get() { - return object : AbstractCollection() { - override val size: Int get() = this@AbstractJDBCHashMap.size - override fun iterator(): MutableIterator { - return object : MutableIterator { - private val entryIterator = EntryIterator() - - override fun hasNext(): Boolean = entryIterator.hasNext() - override fun next(): V = entryIterator.next().value - override fun remove() { - entryIterator.remove() - } - } - } - } - } - - override val entries: MutableSet> get() { - return object : AbstractSet>() { - override val size: Int get() = this@AbstractJDBCHashMap.size - override fun iterator(): MutableIterator> { - return object : MutableIterator> { - private val entryIterator = EntryIterator() - - override fun hasNext(): Boolean = entryIterator.hasNext() - override fun next(): MutableMap.MutableEntry = entryIterator.next() - override fun remove() { - entryIterator.remove() - } - } - } - } - } - - override fun put(key: K, value: V): V? { - var oldValue: V? = null - var oldSeqNo: Int? = null - getBucket(key) - buckets.compute(key.hashCode()) { _, list -> - val newList = list ?: newBucket() - val iterator = newList.listIterator() - while (iterator.hasNext()) { - val entry = iterator.next() - if (entry.key == key) { - oldValue = entry.value - oldSeqNo = entry.seqNo - iterator.remove() - deleteRecord(entry) - break - } - } - val seqNo = addRecord(key, value, oldSeqNo) - val newEntry = NotReallyMutableEntry(key, value, seqNo) - newList.add(newEntry) - newList - } - return oldValue - } - - override fun containsValue(value: V): Boolean { - for (storedValue in values) { - if (storedValue == value) { - return true - } - } - return false - } - - override val size: Int get() { - return if (loadOnInit) { - buckets.values.map { it.size }.sum() - } else { - table.slice(table.seqNo).selectAll().count() - } - } - - override fun clear() { - if (!loadOnInit || !isEmpty()) { - table.deleteAll() - buckets.clear() - } - } - - override fun get(key: K): V? { - for ((entryKey, value) in getBucket(key)) { - if (entryKey == key) { - return value - } - } - return null - } - - private fun getBucket(key: Any): MutableList> { - return buckets.computeIfAbsent(key.hashCode()) { _ -> - if (!loadOnInit) { - loadBucket(key.hashCode()) - } else { - newBucket() - } - } - } - - private fun newBucket(): MutableList> = mutableListOf() - - private fun loadBucket(hashCode: Int): MutableList> { - return table.select { table.keyHash.eq(hashCode) }.map { - createEntry(it) - }.toMutableList>() - } - - /** - * Implementation should return the key object marshalled from the database table row. - * - * See example implementations in [JDBCHashMap]. - */ - protected abstract fun keyFromRow(row: ResultRow): K - - /** - * Implementation should return the value object marshalled from the database table row. - * - * See example implementations in [JDBCHashMap]. - */ - protected abstract fun valueFromRow(row: ResultRow): V - - /** - * Implementation should marshall the key to the insert statement. - * - * If some cleanup is required after the insert statement is executed, such as closing a Blob, then add a closure - * to the finalizables to do so. - * - * See example implementations in [JDBCHashMap]. - */ - protected abstract fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) - - /** - * Implementation should marshall the value to the insert statement. - * - * If some cleanup is required after the insert statement is executed, such as closing a Blob, then add a closure - * to the finalizables to do so. - * - * See example implementations in [JDBCHashMap]. - */ - protected abstract fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) - - private fun createEntry(it: ResultRow) = NotReallyMutableEntry(keyFromRow(it), valueFromRow(it), it[table.seqNo]) - - private fun deleteRecord(entry: NotReallyMutableEntry) { - table.deleteWhere { - table.seqNo eq entry.seqNo - } - } - - private fun addRecord(key: K, value: V, oldSeqNo: Int?): Int { - val finalizables = mutableListOf<() -> Unit>() - try { - return table.insert { - it[keyHash] = key.hashCode() - val entry = SimpleEntry(key, value) - addKeyToInsert(it, entry, finalizables) - addValueToInsert(it, entry, finalizables) - if (oldSeqNo != null) { - it[seqNo] = oldSeqNo - it.generatedKey = oldSeqNo - } - } get table.seqNo - } finally { - finalizables.forEach { it() } - } - } -} - -open class JDBCHashedTable(tableName: String) : Table(tableName) { - val keyHash = integer("key_hash").index() - val seqNo = integer("seq_no").autoIncrement().index().primaryKey() -} diff --git a/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt b/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt index f061b1c4bf..66c3d5dad0 100644 --- a/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt @@ -9,6 +9,7 @@ import net.corda.core.schemas.PersistentState import net.corda.core.schemas.QueryableState import net.corda.testing.LogHelper import net.corda.node.services.api.SchemaService +import net.corda.node.utilities.DatabaseTransactionManager import net.corda.node.utilities.configureDatabase import net.corda.testing.MEGA_CORP import net.corda.testing.node.makeTestDataSourceProperties @@ -16,7 +17,6 @@ import net.corda.testing.node.makeTestDatabaseProperties import net.corda.testing.node.makeTestIdentityService import org.hibernate.annotations.Cascade import org.hibernate.annotations.CascadeType -import org.jetbrains.exposed.sql.transactions.TransactionManager import org.junit.After import org.junit.Before import org.junit.Test @@ -102,11 +102,11 @@ class HibernateObserverTests { val observer = HibernateObserver(rawUpdatesPublisher, database.hibernateConfig) database.transaction { rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0))))) - val parentRowCountResult = TransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery() + val parentRowCountResult = DatabaseTransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery() parentRowCountResult.next() val parentRows = parentRowCountResult.getInt(1) parentRowCountResult.close() - val childrenRowCountResult = TransactionManager.current().connection.prepareStatement("select count(*) from Children").executeQuery() + val childrenRowCountResult = DatabaseTransactionManager.current().connection.prepareStatement("select count(*) from Children").executeQuery() childrenRowCountResult.next() val childrenRows = childrenRowCountResult.getInt(1) childrenRowCountResult.close() diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt index 52a2113d2b..f06897bae5 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/DistributedImmutableMapTests.kt @@ -10,12 +10,12 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow import net.corda.node.services.network.NetworkMapService import net.corda.node.utilities.CordaPersistence +import net.corda.node.utilities.DatabaseTransaction import net.corda.node.utilities.configureDatabase import net.corda.testing.* import net.corda.testing.node.makeTestDataSourceProperties import net.corda.testing.node.makeTestDatabaseProperties import net.corda.testing.node.makeTestIdentityService -import org.jetbrains.exposed.sql.Transaction import org.junit.After import org.junit.Before import org.junit.Test @@ -27,7 +27,7 @@ class DistributedImmutableMapTests : TestDependencyInjectionBase() { data class Member(val client: CopycatClient, val server: CopycatServer) lateinit var cluster: List - lateinit var transaction: Transaction + lateinit var transaction: DatabaseTransaction private val databases: MutableList = mutableListOf() @Before