mirror of
https://github.com/corda/corda.git
synced 2025-06-13 04:38:19 +00:00
Retire DatabaseTransactionManager. (#2385)
This commit is contained in:
@ -1,13 +1,16 @@
|
||||
package net.corda.nodeapi.internal.persistence
|
||||
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
import rx.subjects.PublishSubject
|
||||
import rx.subjects.UnicastSubject
|
||||
import java.io.Closeable
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import javax.persistence.AttributeConverter
|
||||
import javax.sql.DataSource
|
||||
@ -39,6 +42,9 @@ enum class TransactionIsolationLevel {
|
||||
val jdbcValue: Int = java.sql.Connection::class.java.getField("TRANSACTION_$name").get(null) as Int
|
||||
}
|
||||
|
||||
private val _contextDatabase = ThreadLocal<CordaPersistence>()
|
||||
val contextDatabase get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
|
||||
|
||||
class CordaPersistence(
|
||||
val dataSource: DataSource,
|
||||
databaseConfig: DatabaseConfig,
|
||||
@ -49,7 +55,7 @@ class CordaPersistence(
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
|
||||
private val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
|
||||
val hibernateConfig: HibernateConfiguration by lazy {
|
||||
transaction {
|
||||
HibernateConfiguration(schemas, databaseConfig, attributeConverters)
|
||||
@ -57,33 +63,48 @@ class CordaPersistence(
|
||||
}
|
||||
val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas
|
||||
|
||||
data class Boundary(val txId: UUID)
|
||||
|
||||
internal val transactionBoundaries = PublishSubject.create<Boundary>().toSerialized()
|
||||
|
||||
init {
|
||||
DatabaseTransactionManager(this)
|
||||
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
|
||||
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
|
||||
// database transaction open. The [transaction] helper above handles this in a finally clause for you
|
||||
// but any manual database transaction management is liable to have this problem.
|
||||
contextTransactionOrNull?.let {
|
||||
error("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, $it")
|
||||
}
|
||||
_contextDatabase.set(this)
|
||||
// Check not in read-only mode.
|
||||
transaction {
|
||||
check(!connection.metaData.isReadOnly) { "Database should not be readonly." }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance of [DatabaseTransaction], with the given transaction isolation level.
|
||||
*/
|
||||
fun createTransaction(isolationLevel: TransactionIsolationLevel): DatabaseTransaction {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
DatabaseTransactionManager.dataSource = this
|
||||
return DatabaseTransactionManager.currentOrNew(isolationLevel)
|
||||
fun currentOrNew(isolation: TransactionIsolationLevel = defaultIsolationLevel): DatabaseTransaction {
|
||||
return contextTransactionOrNull ?: newTransaction(isolation)
|
||||
}
|
||||
|
||||
fun newTransaction(isolation: TransactionIsolationLevel = defaultIsolationLevel): DatabaseTransaction {
|
||||
return DatabaseTransaction(isolation.jdbcValue, contextTransactionOrNull, this).also {
|
||||
contextTransactionOrNull = it
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance of [DatabaseTransaction], with the default transaction isolation level.
|
||||
* Creates an instance of [DatabaseTransaction], with the given transaction isolation level.
|
||||
*/
|
||||
fun createTransaction(): DatabaseTransaction = createTransaction(defaultIsolationLevel)
|
||||
fun createTransaction(isolationLevel: TransactionIsolationLevel = defaultIsolationLevel): DatabaseTransaction {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
_contextDatabase.set(this)
|
||||
return currentOrNew(isolationLevel)
|
||||
}
|
||||
|
||||
fun createSession(): Connection {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
DatabaseTransactionManager.dataSource = this
|
||||
val ctx = DatabaseTransactionManager.currentOrNull()
|
||||
return ctx?.connection ?: throw IllegalStateException("Was expecting to find database transaction: must wrap calling code within a transaction.")
|
||||
_contextDatabase.set(this)
|
||||
return contextTransaction.connection
|
||||
}
|
||||
|
||||
/**
|
||||
@ -92,7 +113,7 @@ class CordaPersistence(
|
||||
* @param statement to be executed in the scope of this transaction.
|
||||
*/
|
||||
fun <T> transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T {
|
||||
DatabaseTransactionManager.dataSource = this
|
||||
_contextDatabase.set(this)
|
||||
return transaction(isolationLevel, 2, statement)
|
||||
}
|
||||
|
||||
@ -103,7 +124,7 @@ class CordaPersistence(
|
||||
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement)
|
||||
|
||||
private fun <T> transaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T {
|
||||
val outer = DatabaseTransactionManager.currentOrNull()
|
||||
val outer = contextTransactionOrNull
|
||||
return if (outer != null) {
|
||||
outer.statement()
|
||||
} else {
|
||||
@ -119,7 +140,7 @@ class CordaPersistence(
|
||||
log.warn("Cleanup task failed:", t)
|
||||
}
|
||||
while (true) {
|
||||
val transaction = DatabaseTransactionManager.currentOrNew(isolationLevel)
|
||||
val transaction = contextDatabase.currentOrNew(isolationLevel) // XXX: Does this code really support statement changing the contextDatabase?
|
||||
try {
|
||||
val answer = transaction.statement()
|
||||
transaction.commit()
|
||||
@ -153,8 +174,8 @@ class CordaPersistence(
|
||||
* For examples, see the call hierarchy of this function.
|
||||
*/
|
||||
fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
|
||||
val currentTxId = DatabaseTransactionManager.transactionId
|
||||
val databaseTxBoundary: Observable<DatabaseTransactionManager.Boundary> = DatabaseTransactionManager.transactionBoundaries.first { it.txId == currentTxId }
|
||||
val currentTxId = contextTransaction.id
|
||||
val databaseTxBoundary: Observable<CordaPersistence.Boundary> = contextDatabase.transactionBoundaries.first { it.txId == currentTxId }
|
||||
val subject = UnicastSubject.create<T>()
|
||||
subject.delaySubscription(databaseTxBoundary).subscribe(this)
|
||||
databaseTxBoundary.doOnCompleted { subject.onCompleted() }
|
||||
@ -162,12 +183,12 @@ fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
|
||||
}
|
||||
|
||||
// A subscriber that delegates to multiple others, wrapping a database transaction around the combination.
|
||||
private class DatabaseTransactionWrappingSubscriber<U>(val db: CordaPersistence?) : Subscriber<U>() {
|
||||
private class DatabaseTransactionWrappingSubscriber<U>(private val db: CordaPersistence?) : Subscriber<U>() {
|
||||
// Some unsubscribes happen inside onNext() so need something that supports concurrent modification.
|
||||
val delegates = CopyOnWriteArrayList<Subscriber<in U>>()
|
||||
|
||||
fun forEachSubscriberWithDbTx(block: Subscriber<in U>.() -> Unit) {
|
||||
(db ?: DatabaseTransactionManager.dataSource).transaction {
|
||||
(db ?: contextDatabase).transaction {
|
||||
delegates.filter { !it.isUnsubscribed }.forEach {
|
||||
it.block()
|
||||
}
|
||||
|
@ -1,37 +1,40 @@
|
||||
package net.corda.nodeapi.internal.persistence
|
||||
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import org.hibernate.Session
|
||||
import org.hibernate.Transaction
|
||||
import rx.subjects.Subject
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
|
||||
fun currentDBSession(): Session = contextTransaction.session
|
||||
private val _contextTransaction = ThreadLocal<DatabaseTransaction>()
|
||||
var contextTransactionOrNull: DatabaseTransaction?
|
||||
get() = _contextTransaction.get()
|
||||
set(transaction) = _contextTransaction.set(transaction)
|
||||
val contextTransaction get() = contextTransactionOrNull ?: error("Was expecting to find transaction set on current strand: ${Strand.currentStrand()}")
|
||||
|
||||
class DatabaseTransaction(
|
||||
isolation: Int,
|
||||
private val threadLocal: ThreadLocal<DatabaseTransaction>,
|
||||
private val transactionBoundaries: Subject<DatabaseTransactionManager.Boundary, DatabaseTransactionManager.Boundary>,
|
||||
val cordaPersistence: CordaPersistence
|
||||
private val outerTransaction: DatabaseTransaction?,
|
||||
val database: CordaPersistence
|
||||
) {
|
||||
val id: UUID = UUID.randomUUID()
|
||||
|
||||
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||
cordaPersistence.dataSource.connection.apply {
|
||||
database.dataSource.connection.apply {
|
||||
autoCommit = false
|
||||
transactionIsolation = isolation
|
||||
}
|
||||
}
|
||||
|
||||
private val sessionDelegate = lazy {
|
||||
val session = cordaPersistence.entityManagerFactory.withOptions().connection(connection).openSession()
|
||||
val session = database.entityManagerFactory.withOptions().connection(connection).openSession()
|
||||
hibernateTransaction = session.beginTransaction()
|
||||
session
|
||||
}
|
||||
|
||||
val session: Session by sessionDelegate
|
||||
private lateinit var hibernateTransaction: Transaction
|
||||
|
||||
private val outerTransaction: DatabaseTransaction? = threadLocal.get()
|
||||
|
||||
fun commit() {
|
||||
if (sessionDelegate.isInitialized()) {
|
||||
hibernateTransaction.commit()
|
||||
@ -53,9 +56,9 @@ class DatabaseTransaction(
|
||||
session.close()
|
||||
}
|
||||
connection.close()
|
||||
threadLocal.set(outerTransaction)
|
||||
contextTransactionOrNull = outerTransaction
|
||||
if (outerTransaction == null) {
|
||||
transactionBoundaries.onNext(DatabaseTransactionManager.Boundary(id))
|
||||
database.transactionBoundaries.onNext(CordaPersistence.Boundary(id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,77 +0,0 @@
|
||||
package net.corda.nodeapi.internal.persistence
|
||||
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import org.hibernate.Session
|
||||
import rx.subjects.PublishSubject
|
||||
import rx.subjects.Subject
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
fun currentDBSession(): Session = DatabaseTransactionManager.current().session
|
||||
|
||||
class DatabaseTransactionManager(initDataSource: CordaPersistence) {
|
||||
companion object {
|
||||
private val threadLocalDb = ThreadLocal<CordaPersistence>()
|
||||
private val threadLocalTx = ThreadLocal<DatabaseTransaction>()
|
||||
private val databaseToInstance = ConcurrentHashMap<CordaPersistence, DatabaseTransactionManager>()
|
||||
|
||||
fun setThreadLocalTx(tx: DatabaseTransaction?): DatabaseTransaction? {
|
||||
val oldTx = threadLocalTx.get()
|
||||
threadLocalTx.set(tx)
|
||||
return oldTx
|
||||
}
|
||||
|
||||
fun restoreThreadLocalTx(context: DatabaseTransaction?) {
|
||||
if (context != null) {
|
||||
threadLocalDb.set(context.cordaPersistence)
|
||||
}
|
||||
threadLocalTx.set(context)
|
||||
}
|
||||
|
||||
var dataSource: CordaPersistence
|
||||
get() = threadLocalDb.get() ?: throw IllegalStateException("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
|
||||
set(value) = threadLocalDb.set(value)
|
||||
|
||||
val transactionId: UUID
|
||||
get() = threadLocalTx.get()?.id ?: throw IllegalStateException("Was expecting to find transaction set on current strand: ${Strand.currentStrand()}")
|
||||
|
||||
val manager: DatabaseTransactionManager get() = databaseToInstance[dataSource]!!
|
||||
|
||||
val transactionBoundaries: Subject<Boundary, Boundary> get() = manager._transactionBoundaries
|
||||
|
||||
fun currentOrNull(): DatabaseTransaction? = manager.currentOrNull()
|
||||
|
||||
fun currentOrNew(isolation: TransactionIsolationLevel = dataSource.defaultIsolationLevel): DatabaseTransaction {
|
||||
return currentOrNull() ?: manager.newTransaction(isolation.jdbcValue)
|
||||
}
|
||||
|
||||
fun current(): DatabaseTransaction = currentOrNull() ?: error("No transaction in context.")
|
||||
|
||||
fun newTransaction(isolation: TransactionIsolationLevel = dataSource.defaultIsolationLevel): DatabaseTransaction {
|
||||
return manager.newTransaction(isolation.jdbcValue)
|
||||
}
|
||||
}
|
||||
|
||||
data class Boundary(val txId: UUID)
|
||||
|
||||
private val _transactionBoundaries = PublishSubject.create<Boundary>().toSerialized()
|
||||
|
||||
init {
|
||||
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
|
||||
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
|
||||
// database transaction open. The [transaction] helper above handles this in a finally clause for you
|
||||
// but any manual database transaction management is liable to have this problem.
|
||||
if (threadLocalTx.get() != null) {
|
||||
throw IllegalStateException("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, ${threadLocalTx.get()}")
|
||||
}
|
||||
dataSource = initDataSource
|
||||
databaseToInstance[dataSource] = this
|
||||
}
|
||||
|
||||
private fun newTransaction(isolation: Int) =
|
||||
DatabaseTransaction(isolation, threadLocalTx, transactionBoundaries, dataSource).apply {
|
||||
threadLocalTx.set(this)
|
||||
}
|
||||
|
||||
private fun currentOrNull(): DatabaseTransaction? = threadLocalTx.get()
|
||||
}
|
@ -119,15 +119,16 @@ class HibernateConfiguration(
|
||||
class NodeDatabaseConnectionProvider : ConnectionProvider {
|
||||
override fun closeConnection(conn: Connection) {
|
||||
conn.autoCommit = false
|
||||
val tx = DatabaseTransactionManager.current()
|
||||
tx.commit()
|
||||
tx.close()
|
||||
contextTransaction.run {
|
||||
commit()
|
||||
close()
|
||||
}
|
||||
}
|
||||
|
||||
override fun supportsAggressiveRelease(): Boolean = true
|
||||
|
||||
override fun getConnection(): Connection {
|
||||
return DatabaseTransactionManager.newTransaction().connection
|
||||
return contextDatabase.newTransaction().connection
|
||||
}
|
||||
|
||||
override fun <T : Any?> unwrap(unwrapType: Class<T>): T {
|
||||
|
Reference in New Issue
Block a user