mirror of
https://github.com/corda/corda.git
synced 2025-01-15 01:10:33 +00:00
Merge commit 'bbcafca959b2d468a6f4f9bb847a1bbb7bfc9fdc' into andr3ej-os-merges
# Conflicts: # node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt # node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt # node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
This commit is contained in:
commit
bd342a690c
@ -1,13 +1,16 @@
|
|||||||
package net.corda.nodeapi.internal.persistence
|
package net.corda.nodeapi.internal.persistence
|
||||||
|
|
||||||
|
import co.paralleluniverse.strands.Strand
|
||||||
import net.corda.core.schemas.MappedSchema
|
import net.corda.core.schemas.MappedSchema
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.Subscriber
|
import rx.Subscriber
|
||||||
|
import rx.subjects.PublishSubject
|
||||||
import rx.subjects.UnicastSubject
|
import rx.subjects.UnicastSubject
|
||||||
import java.io.Closeable
|
import java.io.Closeable
|
||||||
import java.sql.Connection
|
import java.sql.Connection
|
||||||
import java.sql.SQLException
|
import java.sql.SQLException
|
||||||
|
import java.util.*
|
||||||
import java.util.concurrent.CopyOnWriteArrayList
|
import java.util.concurrent.CopyOnWriteArrayList
|
||||||
import javax.persistence.AttributeConverter
|
import javax.persistence.AttributeConverter
|
||||||
import javax.sql.DataSource
|
import javax.sql.DataSource
|
||||||
@ -40,6 +43,9 @@ enum class TransactionIsolationLevel {
|
|||||||
val jdbcValue: Int = java.sql.Connection::class.java.getField(jdbcString).get(null) as Int
|
val jdbcValue: Int = java.sql.Connection::class.java.getField(jdbcString).get(null) as Int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private val _contextDatabase = ThreadLocal<CordaPersistence>()
|
||||||
|
val contextDatabase get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
|
||||||
|
|
||||||
class CordaPersistence(
|
class CordaPersistence(
|
||||||
val dataSource: DataSource,
|
val dataSource: DataSource,
|
||||||
databaseConfig: DatabaseConfig,
|
databaseConfig: DatabaseConfig,
|
||||||
@ -51,7 +57,7 @@ class CordaPersistence(
|
|||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
|
private val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
|
||||||
val hibernateConfig: HibernateConfiguration by lazy {
|
val hibernateConfig: HibernateConfiguration by lazy {
|
||||||
|
|
||||||
transaction {
|
transaction {
|
||||||
@ -60,8 +66,19 @@ class CordaPersistence(
|
|||||||
}
|
}
|
||||||
val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas
|
val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas
|
||||||
|
|
||||||
|
data class Boundary(val txId: UUID)
|
||||||
|
|
||||||
|
internal val transactionBoundaries = PublishSubject.create<Boundary>().toSerialized()
|
||||||
|
|
||||||
init {
|
init {
|
||||||
DatabaseTransactionManager(this)
|
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
|
||||||
|
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
|
||||||
|
// database transaction open. The [transaction] helper above handles this in a finally clause for you
|
||||||
|
// but any manual database transaction management is liable to have this problem.
|
||||||
|
contextTransactionOrNull?.let {
|
||||||
|
error("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, $it")
|
||||||
|
}
|
||||||
|
_contextDatabase.set(this)
|
||||||
// Check not in read-only mode.
|
// Check not in read-only mode.
|
||||||
transaction {
|
transaction {
|
||||||
check(!connection.metaData.isReadOnly) { "Database should not be readonly." }
|
check(!connection.metaData.isReadOnly) { "Database should not be readonly." }
|
||||||
@ -72,25 +89,29 @@ class CordaPersistence(
|
|||||||
const val DATA_SOURCE_URL = "dataSource.url"
|
const val DATA_SOURCE_URL = "dataSource.url"
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
fun currentOrNew(isolation: TransactionIsolationLevel = defaultIsolationLevel): DatabaseTransaction {
|
||||||
* Creates an instance of [DatabaseTransaction], with the given transaction isolation level.
|
return contextTransactionOrNull ?: newTransaction(isolation)
|
||||||
*/
|
}
|
||||||
fun createTransaction(isolationLevel: TransactionIsolationLevel): DatabaseTransaction {
|
|
||||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
fun newTransaction(isolation: TransactionIsolationLevel = defaultIsolationLevel): DatabaseTransaction {
|
||||||
DatabaseTransactionManager.dataSource = this
|
return DatabaseTransaction(isolation.jdbcValue, contextTransactionOrNull, this).also {
|
||||||
return DatabaseTransactionManager.currentOrNew(isolationLevel)
|
contextTransactionOrNull = it
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an instance of [DatabaseTransaction], with the default transaction isolation level.
|
* Creates an instance of [DatabaseTransaction], with the given transaction isolation level.
|
||||||
*/
|
*/
|
||||||
fun createTransaction(): DatabaseTransaction = createTransaction(defaultIsolationLevel)
|
fun createTransaction(isolationLevel: TransactionIsolationLevel = defaultIsolationLevel): DatabaseTransaction {
|
||||||
|
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||||
|
_contextDatabase.set(this)
|
||||||
|
return currentOrNew(isolationLevel)
|
||||||
|
}
|
||||||
|
|
||||||
fun createSession(): Connection {
|
fun createSession(): Connection {
|
||||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||||
DatabaseTransactionManager.dataSource = this
|
_contextDatabase.set(this)
|
||||||
val ctx = DatabaseTransactionManager.currentOrNull()
|
return contextTransaction.connection
|
||||||
return ctx?.connection ?: throw IllegalStateException("Was expecting to find database transaction: must wrap calling code within a transaction.")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -99,7 +120,7 @@ class CordaPersistence(
|
|||||||
* @param statement to be executed in the scope of this transaction.
|
* @param statement to be executed in the scope of this transaction.
|
||||||
*/
|
*/
|
||||||
fun <T> transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T {
|
fun <T> transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T {
|
||||||
DatabaseTransactionManager.dataSource = this
|
_contextDatabase.set(this)
|
||||||
return transaction(isolationLevel, 2, statement)
|
return transaction(isolationLevel, 2, statement)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,7 +131,7 @@ class CordaPersistence(
|
|||||||
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement)
|
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement)
|
||||||
|
|
||||||
private fun <T> transaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T {
|
private fun <T> transaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T {
|
||||||
val outer = DatabaseTransactionManager.currentOrNull()
|
val outer = contextTransactionOrNull
|
||||||
return if (outer != null) {
|
return if (outer != null) {
|
||||||
outer.statement()
|
outer.statement()
|
||||||
} else {
|
} else {
|
||||||
@ -126,7 +147,7 @@ class CordaPersistence(
|
|||||||
log.warn("Cleanup task failed:", t)
|
log.warn("Cleanup task failed:", t)
|
||||||
}
|
}
|
||||||
while (true) {
|
while (true) {
|
||||||
val transaction = DatabaseTransactionManager.currentOrNew(isolationLevel)
|
val transaction = contextDatabase.currentOrNew(isolationLevel) // XXX: Does this code really support statement changing the contextDatabase?
|
||||||
try {
|
try {
|
||||||
val answer = transaction.statement()
|
val answer = transaction.statement()
|
||||||
transaction.commit()
|
transaction.commit()
|
||||||
@ -160,8 +181,8 @@ class CordaPersistence(
|
|||||||
* For examples, see the call hierarchy of this function.
|
* For examples, see the call hierarchy of this function.
|
||||||
*/
|
*/
|
||||||
fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
|
fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
|
||||||
val currentTxId = DatabaseTransactionManager.transactionId
|
val currentTxId = contextTransaction.id
|
||||||
val databaseTxBoundary: Observable<DatabaseTransactionManager.Boundary> = DatabaseTransactionManager.transactionBoundaries.first { it.txId == currentTxId }
|
val databaseTxBoundary: Observable<CordaPersistence.Boundary> = contextDatabase.transactionBoundaries.first { it.txId == currentTxId }
|
||||||
val subject = UnicastSubject.create<T>()
|
val subject = UnicastSubject.create<T>()
|
||||||
subject.delaySubscription(databaseTxBoundary).subscribe(this)
|
subject.delaySubscription(databaseTxBoundary).subscribe(this)
|
||||||
databaseTxBoundary.doOnCompleted { subject.onCompleted() }
|
databaseTxBoundary.doOnCompleted { subject.onCompleted() }
|
||||||
@ -169,12 +190,12 @@ fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// A subscriber that delegates to multiple others, wrapping a database transaction around the combination.
|
// A subscriber that delegates to multiple others, wrapping a database transaction around the combination.
|
||||||
private class DatabaseTransactionWrappingSubscriber<U>(val db: CordaPersistence?) : Subscriber<U>() {
|
private class DatabaseTransactionWrappingSubscriber<U>(private val db: CordaPersistence?) : Subscriber<U>() {
|
||||||
// Some unsubscribes happen inside onNext() so need something that supports concurrent modification.
|
// Some unsubscribes happen inside onNext() so need something that supports concurrent modification.
|
||||||
val delegates = CopyOnWriteArrayList<Subscriber<in U>>()
|
val delegates = CopyOnWriteArrayList<Subscriber<in U>>()
|
||||||
|
|
||||||
fun forEachSubscriberWithDbTx(block: Subscriber<in U>.() -> Unit) {
|
fun forEachSubscriberWithDbTx(block: Subscriber<in U>.() -> Unit) {
|
||||||
(db ?: DatabaseTransactionManager.dataSource).transaction {
|
(db ?: contextDatabase).transaction {
|
||||||
delegates.filter { !it.isUnsubscribed }.forEach {
|
delegates.filter { !it.isUnsubscribed }.forEach {
|
||||||
it.block()
|
it.block()
|
||||||
}
|
}
|
||||||
|
@ -1,23 +1,29 @@
|
|||||||
package net.corda.nodeapi.internal.persistence
|
package net.corda.nodeapi.internal.persistence
|
||||||
|
|
||||||
|
import co.paralleluniverse.strands.Strand
|
||||||
import org.hibernate.Session
|
import org.hibernate.Session
|
||||||
import org.hibernate.Transaction
|
import org.hibernate.Transaction
|
||||||
import rx.subjects.Subject
|
|
||||||
import java.sql.Connection
|
import java.sql.Connection
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
|
fun currentDBSession(): Session = contextTransaction.session
|
||||||
|
private val _contextTransaction = ThreadLocal<DatabaseTransaction>()
|
||||||
|
var contextTransactionOrNull: DatabaseTransaction?
|
||||||
|
get() = _contextTransaction.get()
|
||||||
|
set(transaction) = _contextTransaction.set(transaction)
|
||||||
|
val contextTransaction get() = contextTransactionOrNull ?: error("Was expecting to find transaction set on current strand: ${Strand.currentStrand()}")
|
||||||
|
|
||||||
class DatabaseTransaction(
|
class DatabaseTransaction(
|
||||||
isolation: Int,
|
isolation: Int,
|
||||||
private val threadLocal: ThreadLocal<DatabaseTransaction>,
|
private val outerTransaction: DatabaseTransaction?,
|
||||||
private val transactionBoundaries: Subject<DatabaseTransactionManager.Boundary, DatabaseTransactionManager.Boundary>,
|
val database: CordaPersistence
|
||||||
val cordaPersistence: CordaPersistence
|
|
||||||
) {
|
) {
|
||||||
val id: UUID = UUID.randomUUID()
|
val id: UUID = UUID.randomUUID()
|
||||||
|
|
||||||
private var _connectionCreated = false
|
private var _connectionCreated = false
|
||||||
val connectionCreated get() = _connectionCreated
|
val connectionCreated get() = _connectionCreated
|
||||||
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||||
cordaPersistence.dataSource.connection
|
database.dataSource.connection
|
||||||
.apply {
|
.apply {
|
||||||
_connectionCreated = true
|
_connectionCreated = true
|
||||||
// only set the transaction isolation level if it's actually changed - setting isn't free.
|
// only set the transaction isolation level if it's actually changed - setting isn't free.
|
||||||
@ -28,16 +34,13 @@ class DatabaseTransaction(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private val sessionDelegate = lazy {
|
private val sessionDelegate = lazy {
|
||||||
val session = cordaPersistence.entityManagerFactory.withOptions().connection(connection).openSession()
|
val session = database.entityManagerFactory.withOptions().connection(connection).openSession()
|
||||||
hibernateTransaction = session.beginTransaction()
|
hibernateTransaction = session.beginTransaction()
|
||||||
session
|
session
|
||||||
}
|
}
|
||||||
|
|
||||||
val session: Session by sessionDelegate
|
val session: Session by sessionDelegate
|
||||||
private lateinit var hibernateTransaction: Transaction
|
private lateinit var hibernateTransaction: Transaction
|
||||||
|
|
||||||
val outerTransaction: DatabaseTransaction? = threadLocal.get()
|
|
||||||
|
|
||||||
fun commit() {
|
fun commit() {
|
||||||
if (sessionDelegate.isInitialized()) {
|
if (sessionDelegate.isInitialized()) {
|
||||||
hibernateTransaction.commit()
|
hibernateTransaction.commit()
|
||||||
@ -63,9 +66,9 @@ class DatabaseTransaction(
|
|||||||
if (_connectionCreated) {
|
if (_connectionCreated) {
|
||||||
connection.close()
|
connection.close()
|
||||||
}
|
}
|
||||||
threadLocal.set(outerTransaction)
|
contextTransactionOrNull = outerTransaction
|
||||||
if (outerTransaction == null) {
|
if (outerTransaction == null) {
|
||||||
transactionBoundaries.onNext(DatabaseTransactionManager.Boundary(id))
|
database.transactionBoundaries.onNext(CordaPersistence.Boundary(id))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,77 +0,0 @@
|
|||||||
package net.corda.nodeapi.internal.persistence
|
|
||||||
|
|
||||||
import co.paralleluniverse.strands.Strand
|
|
||||||
import org.hibernate.Session
|
|
||||||
import rx.subjects.PublishSubject
|
|
||||||
import rx.subjects.Subject
|
|
||||||
import java.util.*
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
|
|
||||||
fun currentDBSession(): Session = DatabaseTransactionManager.current().session
|
|
||||||
|
|
||||||
class DatabaseTransactionManager(initDataSource: CordaPersistence) {
|
|
||||||
companion object {
|
|
||||||
private val threadLocalDb = ThreadLocal<CordaPersistence>()
|
|
||||||
private val threadLocalTx = ThreadLocal<DatabaseTransaction>()
|
|
||||||
private val databaseToInstance = ConcurrentHashMap<CordaPersistence, DatabaseTransactionManager>()
|
|
||||||
|
|
||||||
fun setThreadLocalTx(tx: DatabaseTransaction?): DatabaseTransaction? {
|
|
||||||
val oldTx = threadLocalTx.get()
|
|
||||||
threadLocalTx.set(tx)
|
|
||||||
return oldTx
|
|
||||||
}
|
|
||||||
|
|
||||||
fun restoreThreadLocalTx(context: DatabaseTransaction?) {
|
|
||||||
if (context != null) {
|
|
||||||
threadLocalDb.set(context.cordaPersistence)
|
|
||||||
}
|
|
||||||
threadLocalTx.set(context)
|
|
||||||
}
|
|
||||||
|
|
||||||
var dataSource: CordaPersistence
|
|
||||||
get() = threadLocalDb.get() ?: throw IllegalStateException("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
|
|
||||||
set(value) = threadLocalDb.set(value)
|
|
||||||
|
|
||||||
val transactionId: UUID
|
|
||||||
get() = threadLocalTx.get()?.id ?: throw IllegalStateException("Was expecting to find transaction set on current strand: ${Strand.currentStrand()}")
|
|
||||||
|
|
||||||
val manager: DatabaseTransactionManager get() = databaseToInstance[dataSource]!!
|
|
||||||
|
|
||||||
val transactionBoundaries: Subject<Boundary, Boundary> get() = manager._transactionBoundaries
|
|
||||||
|
|
||||||
fun currentOrNull(): DatabaseTransaction? = manager.currentOrNull()
|
|
||||||
|
|
||||||
fun currentOrNew(isolation: TransactionIsolationLevel = dataSource.defaultIsolationLevel): DatabaseTransaction {
|
|
||||||
return currentOrNull() ?: manager.newTransaction(isolation.jdbcValue)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun current(): DatabaseTransaction = currentOrNull() ?: error("No transaction in context.")
|
|
||||||
|
|
||||||
fun newTransaction(isolation: TransactionIsolationLevel = dataSource.defaultIsolationLevel): DatabaseTransaction {
|
|
||||||
return manager.newTransaction(isolation.jdbcValue)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
data class Boundary(val txId: UUID)
|
|
||||||
|
|
||||||
private val _transactionBoundaries = PublishSubject.create<Boundary>().toSerialized()
|
|
||||||
|
|
||||||
init {
|
|
||||||
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
|
|
||||||
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
|
|
||||||
// database transaction open. The [transaction] helper above handles this in a finally clause for you
|
|
||||||
// but any manual database transaction management is liable to have this problem.
|
|
||||||
if (threadLocalTx.get() != null) {
|
|
||||||
throw IllegalStateException("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, ${threadLocalTx.get()}")
|
|
||||||
}
|
|
||||||
dataSource = initDataSource
|
|
||||||
databaseToInstance[dataSource] = this
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun newTransaction(isolation: Int) =
|
|
||||||
DatabaseTransaction(isolation, threadLocalTx, transactionBoundaries, dataSource).apply {
|
|
||||||
threadLocalTx.set(this)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun currentOrNull(): DatabaseTransaction? = threadLocalTx.get()
|
|
||||||
}
|
|
@ -128,15 +128,16 @@ class HibernateConfiguration(
|
|||||||
class NodeDatabaseConnectionProvider : ConnectionProvider {
|
class NodeDatabaseConnectionProvider : ConnectionProvider {
|
||||||
override fun closeConnection(conn: Connection) {
|
override fun closeConnection(conn: Connection) {
|
||||||
conn.autoCommit = false
|
conn.autoCommit = false
|
||||||
val tx = DatabaseTransactionManager.current()
|
contextTransaction.run {
|
||||||
tx.commit()
|
commit()
|
||||||
tx.close()
|
close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun supportsAggressiveRelease(): Boolean = true
|
override fun supportsAggressiveRelease(): Boolean = true
|
||||||
|
|
||||||
override fun getConnection(): Connection {
|
override fun getConnection(): Connection {
|
||||||
return DatabaseTransactionManager.newTransaction().connection
|
return contextDatabase.newTransaction().connection
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun <T : Any?> unwrap(unwrapType: Class<T>): T {
|
override fun <T : Any?> unwrap(unwrapType: Class<T>): T {
|
||||||
|
@ -17,7 +17,6 @@ import net.corda.core.node.services.vault.AttachmentSort
|
|||||||
import net.corda.core.serialization.*
|
import net.corda.core.serialization.*
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
|
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
|
|
||||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||||
import java.io.*
|
import java.io.*
|
||||||
@ -242,8 +241,7 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
|
|||||||
|
|
||||||
override fun queryAttachments(criteria: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
|
override fun queryAttachments(criteria: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
|
||||||
log.info("Attachment query criteria: $criteria, sorting: $sorting")
|
log.info("Attachment query criteria: $criteria, sorting: $sorting")
|
||||||
|
val session = currentDBSession()
|
||||||
val session = DatabaseTransactionManager.current().session
|
|
||||||
val criteriaBuilder = session.criteriaBuilder
|
val criteriaBuilder = session.criteriaBuilder
|
||||||
|
|
||||||
val criteriaQuery = criteriaBuilder.createQuery(DBAttachment::class.java)
|
val criteriaQuery = criteriaBuilder.createQuery(DBAttachment::class.java)
|
||||||
|
@ -10,8 +10,8 @@ import net.corda.core.schemas.PersistentStateRef
|
|||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.node.services.api.SchemaService
|
import net.corda.node.services.api.SchemaService
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
|
|
||||||
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
||||||
|
import net.corda.nodeapi.internal.persistence.contextTransaction
|
||||||
import org.hibernate.FlushMode
|
import org.hibernate.FlushMode
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
|
|
||||||
@ -54,7 +54,7 @@ class HibernateObserver private constructor(private val config: HibernateConfigu
|
|||||||
internal fun persistStatesWithSchema(statesAndRefs: List<ContractStateAndRef>, schema: MappedSchema) {
|
internal fun persistStatesWithSchema(statesAndRefs: List<ContractStateAndRef>, schema: MappedSchema) {
|
||||||
val sessionFactory = config.sessionFactoryForSchemas(setOf(schema))
|
val sessionFactory = config.sessionFactoryForSchemas(setOf(schema))
|
||||||
val session = sessionFactory.withOptions().
|
val session = sessionFactory.withOptions().
|
||||||
connection(DatabaseTransactionManager.current().connection).
|
connection(contextTransaction.connection).
|
||||||
flushMode(FlushMode.MANUAL).
|
flushMode(FlushMode.MANUAL).
|
||||||
openSession()
|
openSession()
|
||||||
session.use { thisSession ->
|
session.use { thisSession ->
|
||||||
|
@ -27,7 +27,8 @@ import net.corda.node.services.statemachine.transitions.FlowContinuation
|
|||||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
|
import net.corda.nodeapi.internal.persistence.contextTransaction
|
||||||
|
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
@ -58,8 +59,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun extractThreadLocalTransaction(): TransientReference<DatabaseTransaction> {
|
private fun extractThreadLocalTransaction(): TransientReference<DatabaseTransaction> {
|
||||||
val transaction = DatabaseTransactionManager.current()
|
val transaction = contextTransaction
|
||||||
DatabaseTransactionManager.setThreadLocalTx(null)
|
contextTransactionOrNull = null
|
||||||
return TransientReference(transaction)
|
return TransientReference(transaction)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -234,7 +235,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
parkAndSerialize { _, _ ->
|
parkAndSerialize { _, _ ->
|
||||||
logger.trace { "Suspended on $ioRequest" }
|
logger.trace { "Suspended on $ioRequest" }
|
||||||
|
|
||||||
DatabaseTransactionManager.setThreadLocalTx(transaction.value)
|
contextTransactionOrNull = transaction.value
|
||||||
val event = try {
|
val event = try {
|
||||||
Event.Suspend(
|
Event.Suspend(
|
||||||
ioRequest = ioRequest,
|
ioRequest = ioRequest,
|
||||||
|
@ -17,12 +17,8 @@ import net.corda.core.transactions.NotaryChangeWireTransaction
|
|||||||
import net.corda.core.transactions.WireTransaction
|
import net.corda.core.transactions.WireTransaction
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
import net.corda.node.services.api.VaultServiceInternal
|
import net.corda.node.services.api.VaultServiceInternal
|
||||||
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
|
||||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
|
import net.corda.nodeapi.internal.persistence.*
|
||||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
|
||||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
|
||||||
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
|
||||||
import org.hibernate.Session
|
import org.hibernate.Session
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
@ -479,8 +475,7 @@ class NodeVaultService(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getSession() = DatabaseTransactionManager.currentOrNew().session
|
private fun getSession() = contextDatabase.currentOrNew().session
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Derive list from existing vault states and then incrementally update using vault observables
|
* Derive list from existing vault states and then incrementally update using vault observables
|
||||||
*/
|
*/
|
||||||
|
@ -15,7 +15,6 @@ import net.corda.core.schemas.QueryableState
|
|||||||
import net.corda.node.services.api.SchemaService
|
import net.corda.node.services.api.SchemaService
|
||||||
import net.corda.node.internal.configureDatabase
|
import net.corda.node.internal.configureDatabase
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
|
|
||||||
import net.corda.testing.internal.LogHelper
|
import net.corda.testing.internal.LogHelper
|
||||||
import net.corda.testing.TestIdentity
|
import net.corda.testing.TestIdentity
|
||||||
import net.corda.testing.contracts.DummyContract
|
import net.corda.testing.contracts.DummyContract
|
||||||
@ -74,11 +73,11 @@ class HibernateObserverTests {
|
|||||||
database.transaction {
|
database.transaction {
|
||||||
val MEGA_CORP = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
|
val MEGA_CORP = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
|
||||||
rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), DummyContract.PROGRAM_ID, MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0)))))
|
rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), DummyContract.PROGRAM_ID, MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0)))))
|
||||||
val parentRowCountResult = DatabaseTransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery()
|
val parentRowCountResult = connection.prepareStatement("select count(*) from Parents").executeQuery()
|
||||||
parentRowCountResult.next()
|
parentRowCountResult.next()
|
||||||
val parentRows = parentRowCountResult.getInt(1)
|
val parentRows = parentRowCountResult.getInt(1)
|
||||||
parentRowCountResult.close()
|
parentRowCountResult.close()
|
||||||
val childrenRowCountResult = DatabaseTransactionManager.current().connection.prepareStatement("select count(*) from Children").executeQuery()
|
val childrenRowCountResult = connection.prepareStatement("select count(*) from Children").executeQuery()
|
||||||
childrenRowCountResult.next()
|
childrenRowCountResult.next()
|
||||||
val childrenRows = childrenRowCountResult.getInt(1)
|
val childrenRows = childrenRowCountResult.getInt(1)
|
||||||
childrenRowCountResult.close()
|
childrenRowCountResult.close()
|
||||||
|
@ -16,9 +16,7 @@ import java.io.Closeable
|
|||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
class ObservablesTests {
|
class ObservablesTests {
|
||||||
|
private fun isInDatabaseTransaction() = contextTransactionOrNull != null
|
||||||
private fun isInDatabaseTransaction(): Boolean = (DatabaseTransactionManager.currentOrNull() != null)
|
|
||||||
|
|
||||||
private val toBeClosed = mutableListOf<Closeable>()
|
private val toBeClosed = mutableListOf<Closeable>()
|
||||||
|
|
||||||
private fun createDatabase(): CordaPersistence {
|
private fun createDatabase(): CordaPersistence {
|
||||||
@ -168,7 +166,7 @@ class ObservablesTests {
|
|||||||
observableWithDbTx.first().subscribe { undelayedEvent.set(it to isInDatabaseTransaction()) }
|
observableWithDbTx.first().subscribe { undelayedEvent.set(it to isInDatabaseTransaction()) }
|
||||||
|
|
||||||
fun observeSecondEvent(event: Int, future: SettableFuture<Pair<Int, UUID?>>) {
|
fun observeSecondEvent(event: Int, future: SettableFuture<Pair<Int, UUID?>>) {
|
||||||
future.set(event to if (isInDatabaseTransaction()) DatabaseTransactionManager.transactionId else null)
|
future.set(event to if (isInDatabaseTransaction()) contextTransaction.id else null)
|
||||||
}
|
}
|
||||||
|
|
||||||
observableWithDbTx.skip(1).first().subscribe { observeSecondEvent(it, delayedEventFromSecondObserver) }
|
observableWithDbTx.skip(1).first().subscribe { observeSecondEvent(it, delayedEventFromSecondObserver) }
|
||||||
|
Loading…
Reference in New Issue
Block a user