mirror of
https://github.com/corda/corda.git
synced 2025-01-31 00:24:59 +00:00
remove Exposed library and JDBCHashMap class
This commit is contained in:
parent
4995b7a1bd
commit
bd48bdfd28
@ -144,10 +144,6 @@ dependencies {
|
||||
// For H2 database support in persistence
|
||||
compile "com.h2database:h2:$h2_version"
|
||||
|
||||
// Exposed: Kotlin SQL library - under evaluation
|
||||
// TODO: Upgrade to Exposed 0.7 (has API changes)
|
||||
compile "org.jetbrains.exposed:exposed:0.5.0"
|
||||
|
||||
// SQL connection pooling library
|
||||
compile "com.zaxxer:HikariCP:2.5.1"
|
||||
|
||||
|
@ -1,257 +0,0 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import com.google.common.collect.testing.MapTestSuiteBuilder
|
||||
import com.google.common.collect.testing.SetTestSuiteBuilder
|
||||
import com.google.common.collect.testing.TestStringMapGenerator
|
||||
import com.google.common.collect.testing.TestStringSetGenerator
|
||||
import com.google.common.collect.testing.features.CollectionFeature
|
||||
import com.google.common.collect.testing.features.CollectionSize
|
||||
import com.google.common.collect.testing.features.MapFeature
|
||||
import com.google.common.collect.testing.features.SetFeature
|
||||
import com.google.common.collect.testing.testers.*
|
||||
import junit.framework.TestSuite
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.makeTestDatabaseProperties
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.*
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Suite
|
||||
import java.util.*
|
||||
|
||||
@RunWith(Suite::class)
|
||||
@Suite.SuiteClasses(
|
||||
JDBCHashMapTestSuite.MapLoadOnInitFalse::class,
|
||||
JDBCHashMapTestSuite.MapLoadOnInitTrue::class,
|
||||
JDBCHashMapTestSuite.MapConstrained::class,
|
||||
JDBCHashMapTestSuite.SetLoadOnInitFalse::class,
|
||||
JDBCHashMapTestSuite.SetLoadOnInitTrue::class,
|
||||
JDBCHashMapTestSuite.SetConstrained::class)
|
||||
class JDBCHashMapTestSuite {
|
||||
companion object {
|
||||
lateinit var transaction: DatabaseTransaction
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var loadOnInitFalseMap: JDBCHashMap<String, String>
|
||||
lateinit var memoryConstrainedMap: JDBCHashMap<String, String>
|
||||
lateinit var loadOnInitTrueMap: JDBCHashMap<String, String>
|
||||
lateinit var loadOnInitFalseSet: JDBCHashSet<String>
|
||||
lateinit var memoryConstrainedSet: JDBCHashSet<String>
|
||||
lateinit var loadOnInitTrueSet: JDBCHashSet<String>
|
||||
|
||||
@JvmStatic
|
||||
@BeforeClass
|
||||
fun before() {
|
||||
initialiseTestSerialization()
|
||||
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), createIdentityService = { throw UnsupportedOperationException("Identity Service should not be in use") })
|
||||
setUpDatabaseTx()
|
||||
loadOnInitFalseMap = JDBCHashMap<String, String>("test_map_false", loadOnInit = false)
|
||||
memoryConstrainedMap = JDBCHashMap<String, String>("test_map_constrained", loadOnInit = false, maxBuckets = 1)
|
||||
loadOnInitTrueMap = JDBCHashMap<String, String>("test_map_true", loadOnInit = true)
|
||||
loadOnInitFalseSet = JDBCHashSet<String>("test_set_false", loadOnInit = false)
|
||||
memoryConstrainedSet = JDBCHashSet<String>("test_set_constrained", loadOnInit = false, maxBuckets = 1)
|
||||
loadOnInitTrueSet = JDBCHashSet<String>("test_set_true", loadOnInit = true)
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
@AfterClass
|
||||
fun after() {
|
||||
closeDatabaseTx()
|
||||
database.close()
|
||||
resetTestSerialization()
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun createMapTestSuite(loadOnInit: Boolean, constrained: Boolean): TestSuite = MapTestSuiteBuilder
|
||||
.using(JDBCHashMapTestGenerator(loadOnInit = loadOnInit, constrained = constrained))
|
||||
.named("test JDBCHashMap with loadOnInit=$loadOnInit")
|
||||
.withFeatures(
|
||||
CollectionSize.ANY,
|
||||
MapFeature.ALLOWS_ANY_NULL_QUERIES,
|
||||
MapFeature.GENERAL_PURPOSE,
|
||||
CollectionFeature.SUPPORTS_ITERATOR_REMOVE,
|
||||
CollectionFeature.KNOWN_ORDER
|
||||
)
|
||||
// putAll(null) not supported by Kotlin MutableMap interface
|
||||
.suppressing(MapPutAllTester::class.java.getMethod("testPutAll_nullCollectionReference"))
|
||||
// We suppress the following because of NotReallyMutableEntry
|
||||
.suppressing(MapReplaceAllTester::class.java.getMethod("testReplaceAllPreservesOrder"))
|
||||
.suppressing(MapReplaceAllTester::class.java.getMethod("testReplaceAllRotate"))
|
||||
.suppressing(MapEntrySetTester::class.java.getMethod("testSetValue"))
|
||||
.createTestSuite()
|
||||
|
||||
@JvmStatic
|
||||
fun createSetTestSuite(loadOnInit: Boolean, constrained: Boolean): TestSuite = SetTestSuiteBuilder
|
||||
.using(JDBCHashSetTestGenerator(loadOnInit = loadOnInit, constrained = constrained))
|
||||
.named("test JDBCHashSet with loadOnInit=$loadOnInit")
|
||||
.withFeatures(
|
||||
CollectionSize.ANY,
|
||||
SetFeature.GENERAL_PURPOSE,
|
||||
CollectionFeature.SUPPORTS_ITERATOR_REMOVE,
|
||||
CollectionFeature.KNOWN_ORDER
|
||||
)
|
||||
// add/remove/retainAll(null) not supported by Kotlin MutableSet interface
|
||||
.suppressing(CollectionAddAllTester::class.java.getMethod("testAddAll_nullCollectionReference"))
|
||||
.suppressing(CollectionAddAllTester::class.java.getMethod("testAddAll_nullUnsupported"))
|
||||
.suppressing(CollectionAddTester::class.java.getMethod("testAdd_nullUnsupported"))
|
||||
.suppressing(CollectionCreationTester::class.java.getMethod("testCreateWithNull_unsupported"))
|
||||
.suppressing(CollectionRemoveAllTester::class.java.getMethod("testRemoveAll_nullCollectionReferenceNonEmptySubject"))
|
||||
.suppressing(CollectionRemoveAllTester::class.java.getMethod("testRemoveAll_nullCollectionReferenceEmptySubject"))
|
||||
.suppressing(CollectionRetainAllTester::class.java.getMethod("testRetainAll_nullCollectionReferenceNonEmptySubject"))
|
||||
.suppressing(CollectionRetainAllTester::class.java.getMethod("testRetainAll_nullCollectionReferenceEmptySubject"))
|
||||
.createTestSuite()
|
||||
|
||||
private fun setUpDatabaseTx() {
|
||||
transaction = DatabaseTransactionManager.currentOrNew()
|
||||
}
|
||||
|
||||
private fun closeDatabaseTx() {
|
||||
transaction.commit()
|
||||
transaction.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Guava test suite generator for JDBCHashMap(loadOnInit=false, constrained = false).
|
||||
*/
|
||||
class MapLoadOnInitFalse {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
fun suite(): TestSuite = createMapTestSuite(false, false)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Guava test suite generator for JDBCHashMap(loadOnInit=false, constrained = true).
|
||||
*/
|
||||
class MapConstrained {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
fun suite(): TestSuite = createMapTestSuite(false, true)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Guava test suite generator for JDBCHashMap(loadOnInit=true, constrained = false).
|
||||
*/
|
||||
class MapLoadOnInitTrue {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
fun suite(): TestSuite = createMapTestSuite(true, false)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generator of map instances needed for testing.
|
||||
*/
|
||||
class JDBCHashMapTestGenerator(val loadOnInit: Boolean, val constrained: Boolean) : TestStringMapGenerator() {
|
||||
override fun create(elements: Array<Map.Entry<String, String>>): Map<String, String> {
|
||||
val map = if (loadOnInit) loadOnInitTrueMap else if (constrained) memoryConstrainedMap else loadOnInitFalseMap
|
||||
map.clear()
|
||||
map.putAll(elements.associate { Pair(it.key, it.value) })
|
||||
return map
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Guava test suite generator for JDBCHashSet(loadOnInit=false, constrained = false).
|
||||
*/
|
||||
class SetLoadOnInitFalse {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
fun suite(): TestSuite = createSetTestSuite(false, false)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Guava test suite generator for JDBCHashSet(loadOnInit=false, constrained = true).
|
||||
*/
|
||||
class SetConstrained {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
fun suite(): TestSuite = createSetTestSuite(false, true)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Guava test suite generator for JDBCHashSet(loadOnInit=true, constrained = false).
|
||||
*/
|
||||
class SetLoadOnInitTrue {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
fun suite(): TestSuite = createSetTestSuite(true, false)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generator of set instances needed for testing.
|
||||
*/
|
||||
class JDBCHashSetTestGenerator(val loadOnInit: Boolean, val constrained: Boolean) : TestStringSetGenerator() {
|
||||
override fun create(elements: Array<String>): Set<String> {
|
||||
val set = if (loadOnInit) loadOnInitTrueSet else if (constrained) memoryConstrainedSet else loadOnInitFalseSet
|
||||
set.clear()
|
||||
set.addAll(elements)
|
||||
return set
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the contents of a map can be reloaded from the database.
|
||||
*
|
||||
* If the Map reloads, then so will the Set as it just delegates.
|
||||
*/
|
||||
class MapCanBeReloaded : TestDependencyInjectionBase() {
|
||||
private val ops = listOf(Triple(AddOrRemove.ADD, "A", "1"),
|
||||
Triple(AddOrRemove.ADD, "B", "2"),
|
||||
Triple(AddOrRemove.ADD, "C", "3"),
|
||||
Triple(AddOrRemove.ADD, "D", "4"),
|
||||
Triple(AddOrRemove.ADD, "E", "5"),
|
||||
Triple(AddOrRemove.REMOVE, "A", "6"),
|
||||
Triple(AddOrRemove.ADD, "G", "7"),
|
||||
Triple(AddOrRemove.ADD, "H", "8"),
|
||||
Triple(AddOrRemove.REMOVE, "D", "9"),
|
||||
Triple(AddOrRemove.ADD, "C", "10"))
|
||||
|
||||
private fun applyOpsToMap(map: MutableMap<String, String>): MutableMap<String, String> {
|
||||
for (op in ops) {
|
||||
if (op.first == AddOrRemove.ADD) {
|
||||
map[op.second] = op.third
|
||||
} else {
|
||||
map.remove(op.second)
|
||||
}
|
||||
}
|
||||
return map
|
||||
}
|
||||
|
||||
private val transientMapForComparison = applyOpsToMap(LinkedHashMap())
|
||||
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun before() {
|
||||
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), createIdentityService = { throw UnsupportedOperationException("Identity Service should not be in use") })
|
||||
}
|
||||
|
||||
@After
|
||||
fun after() {
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `fill map and check content after reconstruction`() {
|
||||
database.transaction {
|
||||
val persistentMap = JDBCHashMap<String, String>("the_table")
|
||||
// Populate map the first time.
|
||||
applyOpsToMap(persistentMap)
|
||||
assertThat(persistentMap.entries).containsExactly(*transientMapForComparison.entries.toTypedArray())
|
||||
}
|
||||
database.transaction {
|
||||
val persistentMap = JDBCHashMap<String, String>("the_table", loadOnInit = false)
|
||||
assertThat(persistentMap.entries).containsExactly(*transientMapForComparison.entries.toTypedArray())
|
||||
}
|
||||
database.transaction {
|
||||
val persistentMap = JDBCHashMap<String, String>("the_table", loadOnInit = true)
|
||||
assertThat(persistentMap.entries).containsExactly(*transientMapForComparison.entries.toTypedArray())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -541,7 +541,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
this.database = configureDatabase(props, configuration.database, { _services.schemaService }, createIdentityService = { _services.identityService })
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
database.transaction {
|
||||
log.info("Connected to ${database.database.vendor} database.")
|
||||
log.info("Connected to ${database.dataSource.connection.metaData.databaseProductName} database.")
|
||||
}
|
||||
this.database::close.let {
|
||||
dbCloser = it
|
||||
|
@ -9,8 +9,8 @@ import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import org.hibernate.FlushMode
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import rx.Observable
|
||||
|
||||
/**
|
||||
@ -40,7 +40,7 @@ class HibernateObserver(vaultUpdates: Observable<Vault.Update<ContractState>>, v
|
||||
fun persistStateWithSchema(state: ContractState, stateRef: StateRef, schema: MappedSchema) {
|
||||
val sessionFactory = config.sessionFactoryForSchema(schema)
|
||||
val session = sessionFactory.withOptions().
|
||||
connection(TransactionManager.current().connection).
|
||||
connection(DatabaseTransactionManager.current().connection).
|
||||
flushMode(FlushMode.MANUAL).
|
||||
openSession()
|
||||
session.use {
|
||||
|
@ -21,8 +21,8 @@ import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import org.hibernate.Session
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import rx.Observable
|
||||
import java.lang.Exception
|
||||
import java.util.*
|
||||
@ -157,7 +157,7 @@ class HibernateVaultQueryImpl(hibernateConfig: HibernateConfiguration,
|
||||
|
||||
private fun getSession(): Session {
|
||||
return sessionFactory.withOptions().
|
||||
connection(TransactionManager.current().connection).
|
||||
connection(DatabaseTransactionManager.current().connection).
|
||||
openSession()
|
||||
}
|
||||
|
||||
|
@ -7,7 +7,6 @@ import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import org.hibernate.SessionFactory
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
@ -18,13 +17,14 @@ import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
|
||||
/**
|
||||
* Table prefix for all tables owned by the node module.
|
||||
*/
|
||||
const val NODE_DATABASE_PREFIX = "node_"
|
||||
|
||||
//HikariDataSource implements Closeable which allows CordaPersistence to be Closeable
|
||||
class CordaPersistence(var dataSource: HikariDataSource, private var createSchemaService: () -> SchemaService,
|
||||
private val createIdentityService: ()-> IdentityService, databaseProperties: Properties): Closeable {
|
||||
|
||||
/** Holds Exposed database, the field will be removed once Exposed library is removed */
|
||||
lateinit var database: Database
|
||||
var transactionIsolationLevel = parserTransactionIsolationLevel(databaseProperties.getProperty("transactionIsolationLevel"))
|
||||
|
||||
val hibernateConfig: HibernateConfiguration by lazy {
|
||||
@ -112,10 +112,6 @@ fun configureDatabase(dataSourceProperties: Properties, databaseProperties: Prop
|
||||
val dataSource = HikariDataSource(config)
|
||||
val persistence = CordaPersistence.connect(dataSource, createSchemaService, createIdentityService, databaseProperties ?: Properties())
|
||||
|
||||
//org.jetbrains.exposed.sql.Database will be removed once Exposed library is removed
|
||||
val database = Database.connect(dataSource) { _ -> ExposedTransactionManager() }
|
||||
persistence.database = database
|
||||
|
||||
// Check not in read-only mode.
|
||||
persistence.transaction {
|
||||
persistence.dataSource.connection.use {
|
||||
|
@ -1,209 +0,0 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.parsePublicKeyBase58
|
||||
import net.corda.core.crypto.toBase58String
|
||||
import org.bouncycastle.cert.X509CertificateHolder
|
||||
import org.h2.jdbc.JdbcBlob
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.security.PublicKey
|
||||
import java.security.cert.CertPath
|
||||
import java.security.cert.CertificateFactory
|
||||
import java.time.Instant
|
||||
import java.time.LocalDate
|
||||
import java.time.LocalDateTime
|
||||
import java.time.ZoneOffset
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* Table prefix for all tables owned by the node module.
|
||||
*/
|
||||
const val NODE_DATABASE_PREFIX = "node_"
|
||||
|
||||
// Composite columns for use with below Exposed helpers.
|
||||
data class PartyColumns(val name: Column<String>, val owningKey: Column<PublicKey>)
|
||||
data class PartyAndCertificateColumns(val name: Column<String>, val owningKey: Column<PublicKey>,
|
||||
val certificate: Column<X509CertificateHolder>, val certPath: Column<CertPath>)
|
||||
data class StateRefColumns(val txId: Column<SecureHash>, val index: Column<Int>)
|
||||
data class TxnNoteColumns(val txId: Column<SecureHash>, val note: Column<String>)
|
||||
|
||||
/**
|
||||
* [Table] column helpers for use with Exposed, as per [varchar] etc.
|
||||
*/
|
||||
fun Table.certificate(name: String) = this.registerColumn<X509CertificateHolder>(name, X509CertificateColumnType)
|
||||
fun Table.certificatePath(name: String) = this.registerColumn<CertPath>(name, CertPathColumnType)
|
||||
fun Table.publicKey(name: String) = this.registerColumn<PublicKey>(name, PublicKeyColumnType)
|
||||
fun Table.secureHash(name: String) = this.registerColumn<SecureHash>(name, SecureHashColumnType)
|
||||
fun Table.party(nameColumnName: String,
|
||||
keyColumnName: String) = PartyColumns(this.varchar(nameColumnName, length = 255), this.publicKey(keyColumnName))
|
||||
fun Table.partyAndCertificate(nameColumnName: String,
|
||||
keyColumnName: String,
|
||||
certificateColumnName: String,
|
||||
pathColumnName: String) = PartyAndCertificateColumns(this.varchar(nameColumnName, length = 255), this.publicKey(keyColumnName),
|
||||
this.certificate(certificateColumnName), this.certificatePath(pathColumnName))
|
||||
fun Table.uuidString(name: String) = this.registerColumn<UUID>(name, UUIDStringColumnType)
|
||||
fun Table.localDate(name: String) = this.registerColumn<LocalDate>(name, LocalDateColumnType)
|
||||
fun Table.localDateTime(name: String) = this.registerColumn<LocalDateTime>(name, LocalDateTimeColumnType)
|
||||
fun Table.instant(name: String) = this.registerColumn<Instant>(name, InstantColumnType)
|
||||
fun Table.stateRef(txIdColumnName: String, indexColumnName: String) = StateRefColumns(this.secureHash(txIdColumnName), this.integer(indexColumnName))
|
||||
fun Table.txnNote(txIdColumnName: String, txnNoteColumnName: String) = TxnNoteColumns(this.secureHash(txIdColumnName), this.text(txnNoteColumnName))
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [X509CertificateHolder].
|
||||
*/
|
||||
object X509CertificateColumnType : ColumnType() {
|
||||
override fun sqlType(): String = "BLOB"
|
||||
|
||||
override fun valueFromDB(value: Any): Any {
|
||||
val blob = value as JdbcBlob
|
||||
return X509CertificateHolder(blob.getBytes(0, blob.length().toInt()))
|
||||
}
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = (value as X509CertificateHolder).encoded
|
||||
}
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [CertPath].
|
||||
*/
|
||||
object CertPathColumnType : ColumnType() {
|
||||
private val factory = CertificateFactory.getInstance("X.509")
|
||||
override fun sqlType(): String = "BLOB"
|
||||
|
||||
override fun valueFromDB(value: Any): Any {
|
||||
val blob = value as JdbcBlob
|
||||
return factory.generateCertPath(ByteArrayInputStream(blob.getBytes(0, blob.length().toInt())))
|
||||
}
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = (value as CertPath).encoded
|
||||
}
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [PublicKey].
|
||||
*/
|
||||
// TODO Rethink how we store CompositeKeys in db. Currently they are stored as Base58 strings and as we don't know the size
|
||||
// of a CompositeKey they could be CLOB fields. Given the time to fetch these types and that they are unsuitable as table keys,
|
||||
// having a shorter primary key (such as SHA256 hash or a UUID generated on demand) that references a common composite key table may make more sense.
|
||||
object PublicKeyColumnType : ColumnType() {
|
||||
override fun sqlType(): String = "VARCHAR"
|
||||
|
||||
override fun valueFromDB(value: Any): Any = parsePublicKeyBase58(value.toString())
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = (value as? PublicKey)?.toBase58String() ?: value
|
||||
}
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [SecureHash].
|
||||
*/
|
||||
object SecureHashColumnType : ColumnType() {
|
||||
override fun sqlType(): String = "VARCHAR(64)"
|
||||
|
||||
override fun valueFromDB(value: Any): Any = SecureHash.parse(value.toString())
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = (value as? SecureHash)?.toString() ?: value
|
||||
}
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [UUID], always using a string representation.
|
||||
*/
|
||||
object UUIDStringColumnType : ColumnType() {
|
||||
override fun sqlType(): String = "VARCHAR(36)"
|
||||
|
||||
override fun valueFromDB(value: Any): Any = UUID.fromString(value.toString())
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = (value as? UUID)?.toString() ?: value
|
||||
}
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [java.time.LocalDate].
|
||||
*/
|
||||
object LocalDateColumnType : ColumnType() {
|
||||
override fun sqlType(): String = "DATE"
|
||||
|
||||
override fun nonNullValueToString(value: Any): String {
|
||||
if (value is String) return value
|
||||
|
||||
val localDate = when (value) {
|
||||
is LocalDate -> value
|
||||
is java.sql.Date -> value.toLocalDate()
|
||||
is java.sql.Timestamp -> value.toLocalDateTime().toLocalDate()
|
||||
else -> error("Unexpected value: $value")
|
||||
}
|
||||
return "'$localDate'"
|
||||
}
|
||||
|
||||
override fun valueFromDB(value: Any): Any = when (value) {
|
||||
is java.sql.Date -> value.toLocalDate()
|
||||
is java.sql.Timestamp -> value.toLocalDateTime().toLocalDate()
|
||||
is Long -> LocalDate.from(Instant.ofEpochMilli(value))
|
||||
else -> value
|
||||
}
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is LocalDate) {
|
||||
java.sql.Date(value.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli())
|
||||
} else value
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [java.time.LocalDateTime].
|
||||
*/
|
||||
object LocalDateTimeColumnType : ColumnType() {
|
||||
private val sqlType = DateColumnType(time = true).sqlType()
|
||||
override fun sqlType(): String = sqlType
|
||||
|
||||
override fun nonNullValueToString(value: Any): String {
|
||||
if (value is String) return value
|
||||
|
||||
val localDateTime = when (value) {
|
||||
is LocalDateTime -> value
|
||||
is java.sql.Date -> value.toLocalDate().atStartOfDay()
|
||||
is java.sql.Timestamp -> value.toLocalDateTime()
|
||||
else -> error("Unexpected value: $value")
|
||||
}
|
||||
return "'$localDateTime'"
|
||||
}
|
||||
|
||||
override fun valueFromDB(value: Any): Any = when (value) {
|
||||
is java.sql.Date -> value.toLocalDate().atStartOfDay()
|
||||
is java.sql.Timestamp -> value.toLocalDateTime()
|
||||
is Long -> LocalDateTime.from(Instant.ofEpochMilli(value))
|
||||
else -> value
|
||||
}
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is LocalDateTime) {
|
||||
java.sql.Timestamp(value.toInstant(ZoneOffset.UTC).toEpochMilli())
|
||||
} else value
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [java.time.Instant].
|
||||
*/
|
||||
object InstantColumnType : ColumnType() {
|
||||
private val sqlType = DateColumnType(time = true).sqlType()
|
||||
override fun sqlType(): String = sqlType
|
||||
|
||||
override fun nonNullValueToString(value: Any): String {
|
||||
if (value is String) return value
|
||||
|
||||
val localDateTime = when (value) {
|
||||
is Instant -> value
|
||||
is java.sql.Date -> value.toLocalDate().atStartOfDay().toInstant(ZoneOffset.UTC)
|
||||
is java.sql.Timestamp -> value.toLocalDateTime().toInstant(ZoneOffset.UTC)
|
||||
else -> error("Unexpected value: $value")
|
||||
}
|
||||
return "'$localDateTime'"
|
||||
}
|
||||
|
||||
override fun valueFromDB(value: Any): Any = when (value) {
|
||||
is java.sql.Date -> value.toLocalDate().atStartOfDay().toInstant(ZoneOffset.UTC)
|
||||
is java.sql.Timestamp -> value.toLocalDateTime().toInstant(ZoneOffset.UTC)
|
||||
is Long -> LocalDateTime.from(Instant.ofEpochMilli(value)).toInstant(ZoneOffset.UTC)
|
||||
else -> value
|
||||
}
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is Instant) {
|
||||
java.sql.Timestamp(value.toEpochMilli())
|
||||
} else value
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionInterface
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.sql.Connection
|
||||
|
||||
/**
|
||||
* Wrapper of [DatabaseTransaction], because the class is effectively used for [ExposedTransaction.connection] method only not all methods are implemented.
|
||||
* The class will obsolete when Exposed library is phased out.
|
||||
*/
|
||||
class ExposedTransaction(override val db: Database, val databaseTransaction: DatabaseTransaction): TransactionInterface {
|
||||
|
||||
override val outerTransaction: Transaction?
|
||||
get() = throw UnsupportedOperationException()
|
||||
|
||||
override val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||
databaseTransaction.connection
|
||||
}
|
||||
|
||||
override fun commit() {
|
||||
databaseTransaction.commit()
|
||||
}
|
||||
|
||||
override fun rollback() {
|
||||
databaseTransaction.rollback()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
databaseTransaction.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegates methods to [DatabaseTransactionManager].
|
||||
* The class will obsolete when Exposed library is phased out.
|
||||
*/
|
||||
class ExposedTransactionManager: TransactionManager {
|
||||
companion object {
|
||||
val database: Database
|
||||
get() = DatabaseTransactionManager.dataSource.database
|
||||
}
|
||||
|
||||
override fun newTransaction(isolation: Int): Transaction {
|
||||
var databaseTransaction = DatabaseTransactionManager.newTransaction(isolation)
|
||||
return Transaction(ExposedTransaction(database, databaseTransaction))
|
||||
}
|
||||
|
||||
override fun currentOrNull(): Transaction? {
|
||||
val databaseTransaction = DatabaseTransactionManager.currentOrNull()
|
||||
return if (databaseTransaction != null) Transaction(ExposedTransaction(database, databaseTransaction)) else null
|
||||
}
|
||||
}
|
@ -1,508 +0,0 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import net.corda.core.serialization.SerializationDefaults.STORAGE_CONTEXT
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.sql.Blob
|
||||
import java.util.*
|
||||
import kotlin.system.measureTimeMillis
|
||||
|
||||
/**
|
||||
* The classes in this file provide a convenient way to quickly implement persistence for map- and set-like
|
||||
* collections of data. These might not be sufficient for the eventual implementations of persistence dependent on
|
||||
* access patterns and performance requirements.
|
||||
*/
|
||||
|
||||
/**
|
||||
* The default maximum size of the LRU cache.
|
||||
* Current computation is linear to max heap size, ensuring a minimum of 256 buckets.
|
||||
*
|
||||
* TODO: make this value configurable
|
||||
* TODO: tune this value, as it's currently mostly a guess
|
||||
*/
|
||||
val DEFAULT_MAX_BUCKETS = (256 * (1 + Math.max(0, (Runtime.getRuntime().maxMemory() / 1000000 - 128) / 64))).toInt()
|
||||
|
||||
/**
|
||||
* A convenient JDBC table backed hash map with iteration order based on insertion order.
|
||||
* See [AbstractJDBCHashMap] for further implementation details.
|
||||
*
|
||||
* In this subclass, keys and values are represented by Blobs of Kryo serialized forms of the key and value objects.
|
||||
* If you can extend [AbstractJDBCHashMap] and implement less Kryo dependent key and/or value mappings then that is
|
||||
* likely preferrable.
|
||||
*/
|
||||
class JDBCHashMap<K : Any, V : Any>(tableName: String,
|
||||
loadOnInit: Boolean = false,
|
||||
maxBuckets: Int = DEFAULT_MAX_BUCKETS)
|
||||
: AbstractJDBCHashMap<K, V, JDBCHashMap.BlobMapTable>(BlobMapTable(tableName), loadOnInit, maxBuckets) {
|
||||
|
||||
class BlobMapTable(tableName: String) : JDBCHashedTable(tableName) {
|
||||
val key = blob("key")
|
||||
val value = blob("value")
|
||||
}
|
||||
|
||||
override fun keyFromRow(row: ResultRow): K = deserializeFromBlob(row[table.key])
|
||||
override fun valueFromRow(row: ResultRow): V = deserializeFromBlob(row[table.value])
|
||||
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.key] = serializeToBlob(entry.key, finalizables)
|
||||
}
|
||||
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.value] = serializeToBlob(entry.value, finalizables)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun bytesToBlob(value: SerializedBytes<*>, finalizables: MutableList<() -> Unit>): Blob {
|
||||
val blob = DatabaseTransactionManager.current().connection.createBlob()
|
||||
finalizables += { blob.free() }
|
||||
blob.setBytes(1, value.bytes)
|
||||
return blob
|
||||
}
|
||||
|
||||
fun serializeToBlob(value: Any, finalizables: MutableList<() -> Unit>): Blob = bytesToBlob(value.serialize(context = STORAGE_CONTEXT), finalizables)
|
||||
|
||||
fun <T : Any> bytesFromBlob(blob: Blob): SerializedBytes<T> {
|
||||
try {
|
||||
return SerializedBytes(blob.getBytes(0, blob.length().toInt()))
|
||||
} finally {
|
||||
blob.free()
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
fun <T : Any> deserializeFromBlob(blob: Blob): T = bytesFromBlob<Any>(blob).deserialize(context = STORAGE_CONTEXT) as T
|
||||
|
||||
/**
|
||||
* A convenient JDBC table backed hash set with iteration order based on insertion order.
|
||||
* See [AbstractJDBCHashSet] and [AbstractJDBCHashMap] for further implementation details.
|
||||
*
|
||||
* In this subclass, elements are represented by Blobs of Kryo serialized forms of the element objects.
|
||||
* If you can extend [AbstractJDBCHashSet] and implement less Kryo dependent element mappings then that is
|
||||
* likely preferrable.
|
||||
*/
|
||||
class JDBCHashSet<K : Any>(tableName: String,
|
||||
loadOnInit: Boolean = false,
|
||||
maxBuckets: Int = DEFAULT_MAX_BUCKETS)
|
||||
: AbstractJDBCHashSet<K, JDBCHashSet.BlobSetTable>(BlobSetTable(tableName), loadOnInit, maxBuckets) {
|
||||
|
||||
class BlobSetTable(tableName: String) : JDBCHashedTable(tableName) {
|
||||
val key = blob("key")
|
||||
}
|
||||
|
||||
override fun elementFromRow(row: ResultRow): K = deserializeFromBlob(row[table.key])
|
||||
|
||||
override fun addElementToInsert(insert: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.key] = serializeToBlob(entry, finalizables)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class for JDBC backed hash set that delegates to a JDBC backed hash map where the values are all
|
||||
* [Unit] and not actually persisted. Iteration order is order of insertion. Iterators can remove().
|
||||
*
|
||||
* See [AbstractJDBCHashMap] for implementation details.
|
||||
*/
|
||||
abstract class AbstractJDBCHashSet<K : Any, out T : JDBCHashedTable>(protected val table: T,
|
||||
loadOnInit: Boolean = false,
|
||||
maxBuckets: Int = DEFAULT_MAX_BUCKETS) : MutableSet<K>, AbstractSet<K>() {
|
||||
protected val innerMap = object : AbstractJDBCHashMap<K, Unit, T>(table, loadOnInit, maxBuckets) {
|
||||
override fun keyFromRow(row: ResultRow): K = this@AbstractJDBCHashSet.elementFromRow(row)
|
||||
|
||||
// Return constant.
|
||||
override fun valueFromRow(row: ResultRow) = Unit
|
||||
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<K, Unit>, finalizables: MutableList<() -> Unit>) =
|
||||
this@AbstractJDBCHashSet.addElementToInsert(insert, entry.key, finalizables)
|
||||
|
||||
// No op as not actually persisted.
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<K, Unit>, finalizables: MutableList<() -> Unit>) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override fun add(element: K): Boolean {
|
||||
if (innerMap.containsKey(element)) {
|
||||
return false
|
||||
} else {
|
||||
innerMap.put(element, Unit)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
override fun clear() {
|
||||
innerMap.clear()
|
||||
}
|
||||
|
||||
override fun iterator(): MutableIterator<K> = innerMap.keys.iterator()
|
||||
|
||||
override fun remove(element: K): Boolean = (innerMap.remove(element) != null)
|
||||
|
||||
override val size: Int
|
||||
get() = innerMap.size
|
||||
|
||||
override fun contains(element: K): Boolean = innerMap.containsKey(element)
|
||||
|
||||
override fun isEmpty(): Boolean = innerMap.isEmpty()
|
||||
|
||||
/**
|
||||
* Implementation should return the element object marshalled from the database table row.
|
||||
*
|
||||
* See example implementations in [JDBCHashSet].
|
||||
*/
|
||||
protected abstract fun elementFromRow(row: ResultRow): K
|
||||
|
||||
/**
|
||||
* Implementation should marshall the element to the insert statement.
|
||||
*
|
||||
* If some cleanup is required after the insert statement is executed, such as closing a Blob, then add a closure
|
||||
* to the finalizables to do so.
|
||||
*
|
||||
* See example implementations in [JDBCHashSet].
|
||||
*/
|
||||
protected abstract fun addElementToInsert(insert: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>)
|
||||
}
|
||||
|
||||
/**
|
||||
* A base class for a JDBC table backed hash map that iterates in insertion order by using
|
||||
* an ever increasing sequence number on entries. Iterators supports remove() but entries are not really mutable and
|
||||
* do not support setValue() method from [MutableMap.MutableEntry].
|
||||
*
|
||||
* You should only use keys that have overridden [Object.hashCode] and that have a good hash code distribution. Beware
|
||||
* changing the hashCode() implementation once objects have been persisted. A process to re-hash the entries persisted
|
||||
* would be necessary if you do this.
|
||||
*
|
||||
* Subclasses must provide their own mapping to and from keys/values and the database table columns, but there are
|
||||
* inherited columns that all tables must provide to support iteration order and hashing.
|
||||
*
|
||||
* The map operates in one of two modes.
|
||||
* 1. loadOnInit=true where the entire table is loaded into memory in the constructor and all entries remain in memory,
|
||||
* with only writes needing to perform database access.
|
||||
* 2. loadOnInit=false where all entries with the same key hash code are loaded from the database on demand when accessed
|
||||
* via any method other than via keys/values/entries properties, and thus the whole map is not loaded into memory. The number
|
||||
* of entries retained in memory is controlled indirectly by an LRU algorithm (courtesy of [LinkedHashMap]) and a maximum
|
||||
* number of hash "buckets", where one bucket represents all entries with the same hash code. There is a default value
|
||||
* for maximum buckets.
|
||||
*
|
||||
* All operations require a [transaction] to be started.
|
||||
*
|
||||
* The keys/values/entries collections are really designed just for iterating and other uses might turn out to be
|
||||
* costly in terms of performance. Beware when loadOnInit=true, the iterator first sorts the entries which could be
|
||||
* costly too.
|
||||
*
|
||||
* This class is *not* thread safe.
|
||||
*
|
||||
* TODO: consider caching size once calculated for the first time.
|
||||
* TODO: buckets just use a list and so are vulnerable to poor hash code implementations with collisions.
|
||||
* TODO: if iterators are used extensively when loadOnInit=true, consider maintaining a collection of keys in iteration order to avoid sorting each time.
|
||||
* TODO: revisit whether we need the loadOnInit==true functionality and remove if not.
|
||||
*/
|
||||
abstract class AbstractJDBCHashMap<K : Any, V : Any, out T : JDBCHashedTable>(val table: T,
|
||||
val loadOnInit: Boolean = false,
|
||||
val maxBuckets: Int = DEFAULT_MAX_BUCKETS) : MutableMap<K, V>, AbstractMap<K, V>() {
|
||||
|
||||
companion object {
|
||||
protected val log = loggerFor<AbstractJDBCHashMap<*, *, *>>()
|
||||
|
||||
private const val INITIAL_CAPACITY: Int = 16
|
||||
private const val LOAD_FACTOR: Float = 0.75f
|
||||
}
|
||||
|
||||
// Hash code -> entries mapping.
|
||||
// When loadOnInit = false, size will be limited to maxBuckets entries (which are hash buckets) and map maintains access order rather than insertion order.
|
||||
private val buckets = object : LinkedHashMap<Int, MutableList<NotReallyMutableEntry<K, V>>>(INITIAL_CAPACITY, LOAD_FACTOR, !loadOnInit) {
|
||||
override fun removeEldestEntry(eldest: MutableMap.MutableEntry<Int, MutableList<NotReallyMutableEntry<K, V>>>?): Boolean {
|
||||
return !loadOnInit && size > maxBuckets
|
||||
}
|
||||
}
|
||||
|
||||
init {
|
||||
check(maxBuckets > 0) { "The maximum number of buckets to retain in memory must be a positive integer." }
|
||||
// TODO: Move this to schema version managment tool.
|
||||
createTablesIfNecessary()
|
||||
if (loadOnInit) {
|
||||
log.trace { "Loading all entries on init for ${table.tableName}" }
|
||||
val elapsedMillis = measureTimeMillis {
|
||||
// load from database.
|
||||
table.selectAll().map {
|
||||
val entry = createEntry(it)
|
||||
val bucket = getBucket(entry.key)
|
||||
bucket.add(entry)
|
||||
}
|
||||
}
|
||||
log.trace { "Loaded $size entries on init for ${table.tableName} in $elapsedMillis millis." }
|
||||
}
|
||||
}
|
||||
|
||||
private fun createTablesIfNecessary() {
|
||||
SchemaUtils.create(table)
|
||||
}
|
||||
|
||||
override fun isEmpty(): Boolean {
|
||||
for (bucket in buckets.values) {
|
||||
if (!bucket.isEmpty()) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return size == 0
|
||||
}
|
||||
|
||||
override fun remove(key: K): V? {
|
||||
val bucket = getBucket(key)
|
||||
var removed: V? = null
|
||||
buckets.computeIfPresent(key.hashCode()) { _, value ->
|
||||
for (entry in value) {
|
||||
if (entry.key == key) {
|
||||
removed = entry.value
|
||||
bucket.remove(entry)
|
||||
deleteRecord(entry)
|
||||
break
|
||||
}
|
||||
}
|
||||
value
|
||||
}
|
||||
return removed
|
||||
}
|
||||
|
||||
override fun containsKey(key: K): Boolean = (get(key) != null)
|
||||
|
||||
// We haven't implemented setValue. We could implement if necessary.
|
||||
// Make sure to remove the relevant suppressed tests in JDBCHashMapTestSuite.createMapTestSuite if this is implemented.
|
||||
private class NotReallyMutableEntry<K, V>(key: K, value: V, val seqNo: Int) : AbstractMap.SimpleImmutableEntry<K, V>(key, value), MutableMap.MutableEntry<K, V> {
|
||||
override fun setValue(newValue: V): V {
|
||||
throw UnsupportedOperationException("Not really mutable. Implement if really required.")
|
||||
}
|
||||
}
|
||||
|
||||
private inner class EntryIterator : MutableIterator<MutableMap.MutableEntry<K, V>> {
|
||||
private val iterator = if (loadOnInit) {
|
||||
buckets.values.flatten().sortedBy { it.seqNo }.iterator()
|
||||
} else {
|
||||
// This uses a Sequence to make the mapping lazy.
|
||||
table.selectAll().orderBy(table.seqNo).asSequence().map {
|
||||
val bucket = buckets[it[table.keyHash]]
|
||||
if (bucket != null) {
|
||||
val seqNo = it[table.seqNo]
|
||||
for (entry in bucket) {
|
||||
if (entry.seqNo == seqNo) {
|
||||
return@map entry
|
||||
}
|
||||
}
|
||||
}
|
||||
return@map createEntry(it)
|
||||
}.iterator()
|
||||
}
|
||||
private var current: MutableMap.MutableEntry<K, V>? = null
|
||||
|
||||
override fun hasNext(): Boolean = iterator.hasNext()
|
||||
|
||||
override fun next(): MutableMap.MutableEntry<K, V> {
|
||||
val extractedNext = iterator.next()
|
||||
current = extractedNext
|
||||
return extractedNext
|
||||
}
|
||||
|
||||
override fun remove() {
|
||||
val savedCurrent = current ?: throw IllegalStateException("Not called next() yet or already removed.")
|
||||
current = null
|
||||
remove(savedCurrent.key)
|
||||
}
|
||||
}
|
||||
|
||||
override val keys: MutableSet<K> get() {
|
||||
return object : AbstractSet<K>() {
|
||||
override val size: Int get() = this@AbstractJDBCHashMap.size
|
||||
override fun iterator(): MutableIterator<K> {
|
||||
return object : MutableIterator<K> {
|
||||
private val entryIterator = EntryIterator()
|
||||
|
||||
override fun hasNext(): Boolean = entryIterator.hasNext()
|
||||
override fun next(): K = entryIterator.next().key
|
||||
override fun remove() {
|
||||
entryIterator.remove()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override val values: MutableCollection<V> get() {
|
||||
return object : AbstractCollection<V>() {
|
||||
override val size: Int get() = this@AbstractJDBCHashMap.size
|
||||
override fun iterator(): MutableIterator<V> {
|
||||
return object : MutableIterator<V> {
|
||||
private val entryIterator = EntryIterator()
|
||||
|
||||
override fun hasNext(): Boolean = entryIterator.hasNext()
|
||||
override fun next(): V = entryIterator.next().value
|
||||
override fun remove() {
|
||||
entryIterator.remove()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override val entries: MutableSet<MutableMap.MutableEntry<K, V>> get() {
|
||||
return object : AbstractSet<MutableMap.MutableEntry<K, V>>() {
|
||||
override val size: Int get() = this@AbstractJDBCHashMap.size
|
||||
override fun iterator(): MutableIterator<MutableMap.MutableEntry<K, V>> {
|
||||
return object : MutableIterator<MutableMap.MutableEntry<K, V>> {
|
||||
private val entryIterator = EntryIterator()
|
||||
|
||||
override fun hasNext(): Boolean = entryIterator.hasNext()
|
||||
override fun next(): MutableMap.MutableEntry<K, V> = entryIterator.next()
|
||||
override fun remove() {
|
||||
entryIterator.remove()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun put(key: K, value: V): V? {
|
||||
var oldValue: V? = null
|
||||
var oldSeqNo: Int? = null
|
||||
getBucket(key)
|
||||
buckets.compute(key.hashCode()) { _, list ->
|
||||
val newList = list ?: newBucket()
|
||||
val iterator = newList.listIterator()
|
||||
while (iterator.hasNext()) {
|
||||
val entry = iterator.next()
|
||||
if (entry.key == key) {
|
||||
oldValue = entry.value
|
||||
oldSeqNo = entry.seqNo
|
||||
iterator.remove()
|
||||
deleteRecord(entry)
|
||||
break
|
||||
}
|
||||
}
|
||||
val seqNo = addRecord(key, value, oldSeqNo)
|
||||
val newEntry = NotReallyMutableEntry<K, V>(key, value, seqNo)
|
||||
newList.add(newEntry)
|
||||
newList
|
||||
}
|
||||
return oldValue
|
||||
}
|
||||
|
||||
override fun containsValue(value: V): Boolean {
|
||||
for (storedValue in values) {
|
||||
if (storedValue == value) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
override val size: Int get() {
|
||||
return if (loadOnInit) {
|
||||
buckets.values.map { it.size }.sum()
|
||||
} else {
|
||||
table.slice(table.seqNo).selectAll().count()
|
||||
}
|
||||
}
|
||||
|
||||
override fun clear() {
|
||||
if (!loadOnInit || !isEmpty()) {
|
||||
table.deleteAll()
|
||||
buckets.clear()
|
||||
}
|
||||
}
|
||||
|
||||
override fun get(key: K): V? {
|
||||
for ((entryKey, value) in getBucket(key)) {
|
||||
if (entryKey == key) {
|
||||
return value
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
private fun getBucket(key: Any): MutableList<NotReallyMutableEntry<K, V>> {
|
||||
return buckets.computeIfAbsent(key.hashCode()) { _ ->
|
||||
if (!loadOnInit) {
|
||||
loadBucket(key.hashCode())
|
||||
} else {
|
||||
newBucket()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun newBucket(): MutableList<NotReallyMutableEntry<K, V>> = mutableListOf()
|
||||
|
||||
private fun loadBucket(hashCode: Int): MutableList<NotReallyMutableEntry<K, V>> {
|
||||
return table.select { table.keyHash.eq(hashCode) }.map {
|
||||
createEntry(it)
|
||||
}.toMutableList<NotReallyMutableEntry<K, V>>()
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation should return the key object marshalled from the database table row.
|
||||
*
|
||||
* See example implementations in [JDBCHashMap].
|
||||
*/
|
||||
protected abstract fun keyFromRow(row: ResultRow): K
|
||||
|
||||
/**
|
||||
* Implementation should return the value object marshalled from the database table row.
|
||||
*
|
||||
* See example implementations in [JDBCHashMap].
|
||||
*/
|
||||
protected abstract fun valueFromRow(row: ResultRow): V
|
||||
|
||||
/**
|
||||
* Implementation should marshall the key to the insert statement.
|
||||
*
|
||||
* If some cleanup is required after the insert statement is executed, such as closing a Blob, then add a closure
|
||||
* to the finalizables to do so.
|
||||
*
|
||||
* See example implementations in [JDBCHashMap].
|
||||
*/
|
||||
protected abstract fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>)
|
||||
|
||||
/**
|
||||
* Implementation should marshall the value to the insert statement.
|
||||
*
|
||||
* If some cleanup is required after the insert statement is executed, such as closing a Blob, then add a closure
|
||||
* to the finalizables to do so.
|
||||
*
|
||||
* See example implementations in [JDBCHashMap].
|
||||
*/
|
||||
protected abstract fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>)
|
||||
|
||||
private fun createEntry(it: ResultRow) = NotReallyMutableEntry<K, V>(keyFromRow(it), valueFromRow(it), it[table.seqNo])
|
||||
|
||||
private fun deleteRecord(entry: NotReallyMutableEntry<K, V>) {
|
||||
table.deleteWhere {
|
||||
table.seqNo eq entry.seqNo
|
||||
}
|
||||
}
|
||||
|
||||
private fun addRecord(key: K, value: V, oldSeqNo: Int?): Int {
|
||||
val finalizables = mutableListOf<() -> Unit>()
|
||||
try {
|
||||
return table.insert {
|
||||
it[keyHash] = key.hashCode()
|
||||
val entry = SimpleEntry<K, V>(key, value)
|
||||
addKeyToInsert(it, entry, finalizables)
|
||||
addValueToInsert(it, entry, finalizables)
|
||||
if (oldSeqNo != null) {
|
||||
it[seqNo] = oldSeqNo
|
||||
it.generatedKey = oldSeqNo
|
||||
}
|
||||
} get table.seqNo
|
||||
} finally {
|
||||
finalizables.forEach { it() }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
open class JDBCHashedTable(tableName: String) : Table(tableName) {
|
||||
val keyHash = integer("key_hash").index()
|
||||
val seqNo = integer("seq_no").autoIncrement().index().primaryKey()
|
||||
}
|
@ -9,6 +9,7 @@ import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
@ -16,7 +17,6 @@ import net.corda.testing.node.makeTestDatabaseProperties
|
||||
import net.corda.testing.node.makeTestIdentityService
|
||||
import org.hibernate.annotations.Cascade
|
||||
import org.hibernate.annotations.CascadeType
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -102,11 +102,11 @@ class HibernateObserverTests {
|
||||
val observer = HibernateObserver(rawUpdatesPublisher, database.hibernateConfig)
|
||||
database.transaction {
|
||||
rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0)))))
|
||||
val parentRowCountResult = TransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery()
|
||||
val parentRowCountResult = DatabaseTransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery()
|
||||
parentRowCountResult.next()
|
||||
val parentRows = parentRowCountResult.getInt(1)
|
||||
parentRowCountResult.close()
|
||||
val childrenRowCountResult = TransactionManager.current().connection.prepareStatement("select count(*) from Children").executeQuery()
|
||||
val childrenRowCountResult = DatabaseTransactionManager.current().connection.prepareStatement("select count(*) from Children").executeQuery()
|
||||
childrenRowCountResult.next()
|
||||
val childrenRows = childrenRowCountResult.getInt(1)
|
||||
childrenRowCountResult.close()
|
||||
|
@ -10,12 +10,12 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.DatabaseTransaction
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.makeTestDatabaseProperties
|
||||
import net.corda.testing.node.makeTestIdentityService
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -27,7 +27,7 @@ class DistributedImmutableMapTests : TestDependencyInjectionBase() {
|
||||
data class Member(val client: CopycatClient, val server: CopycatServer)
|
||||
|
||||
lateinit var cluster: List<Member>
|
||||
lateinit var transaction: Transaction
|
||||
lateinit var transaction: DatabaseTransaction
|
||||
private val databases: MutableList<CordaPersistence> = mutableListOf()
|
||||
|
||||
@Before
|
||||
|
Loading…
x
Reference in New Issue
Block a user