diff --git a/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FlowHookContainer.kt b/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FlowHookContainer.kt index 686e3d46a4..abb55cca2c 100644 --- a/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FlowHookContainer.kt +++ b/experimental/flow-hook/src/main/kotlin/net/corda/flowhook/FlowHookContainer.kt @@ -2,7 +2,7 @@ package net.corda.flowhook import co.paralleluniverse.fibers.Fiber import net.corda.node.services.statemachine.Event -import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager +import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import java.sql.Connection @Suppress("UNUSED") @@ -156,7 +156,7 @@ object FlowHookContainer { private fun currentTransactionOrThread(): Any { return try { - DatabaseTransactionManager.currentOrNull() + contextTransactionOrNull } catch (exception: IllegalStateException) { null } ?: Thread.currentThread() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt index 3e818b4f76..a1c6210a5e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt @@ -44,7 +44,9 @@ enum class TransactionIsolationLevel { } private val _contextDatabase = ThreadLocal() -val contextDatabase get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}") +var contextDatabase: CordaPersistence + get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}") + set(database) = _contextDatabase.set(database) class CordaPersistence( val dataSource: DataSource, diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt index a1c16fa9eb..ffcdd55c24 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt @@ -15,7 +15,7 @@ val contextTransaction get() = contextTransactionOrNull ?: error("Was expecting class DatabaseTransaction( isolation: Int, - private val outerTransaction: DatabaseTransaction?, + val outerTransaction: DatabaseTransaction?, val database: CordaPersistence ) { val id: UUID = UUID.randomUUID() diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 91b64b81e6..6fed52b7d8 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -5,7 +5,6 @@ import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.debug import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.Checkpoint -import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.currentDBSession import org.slf4j.LoggerFactory @@ -43,7 +42,7 @@ class DBCheckpointStorage : CheckpointStorage { } override fun removeCheckpoint(id: StateMachineRunId): Boolean { - val session = DatabaseTransactionManager.current().session + val session = currentDBSession() val criteriaBuilder = session.criteriaBuilder val delete = criteriaBuilder.createCriteriaDelete(DBCheckpoint::class.java) val root = delete.from(DBCheckpoint::class.java) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 6449a8b090..6efdc09f7b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -10,7 +10,9 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal -import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager +import net.corda.nodeapi.internal.persistence.contextDatabase +import net.corda.nodeapi.internal.persistence.contextTransaction +import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import java.time.Duration import java.time.Instant import java.util.concurrent.TimeUnit @@ -163,24 +165,24 @@ class ActionExecutorImpl( @Suspendable private fun executeCreateTransaction() { - if (DatabaseTransactionManager.currentOrNull() != null) { + if (contextTransactionOrNull != null) { throw IllegalStateException("Refusing to create a second transaction") } - DatabaseTransactionManager.newTransaction() + contextDatabase.newTransaction() } @Suspendable private fun executeRollbackTransaction() { - DatabaseTransactionManager.currentOrNull()?.close() + contextTransactionOrNull?.close() } @Suspendable private fun executeCommitTransaction() { try { - DatabaseTransactionManager.current().commit() + contextTransaction.commit() } finally { - DatabaseTransactionManager.current().close() - DatabaseTransactionManager.setThreadLocalTx(null) + contextTransaction.close() + contextTransactionOrNull = null } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt index b895a40863..2cf328a450 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt @@ -5,7 +5,8 @@ import net.corda.core.utilities.contextLogger import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager +import net.corda.nodeapi.internal.persistence.contextDatabase +import net.corda.nodeapi.internal.persistence.contextTransactionOrNull import java.security.SecureRandom /** @@ -31,12 +32,12 @@ class TransitionExecutorImpl( transition: TransitionResult, actionExecutor: ActionExecutor ): Pair { - DatabaseTransactionManager.dataSource = database + contextDatabase = database for (action in transition.actions) { try { actionExecutor.executeAction(fiber, action) } catch (exception: Throwable) { - DatabaseTransactionManager.currentOrNull()?.close() + contextTransactionOrNull?.close() if (transition.newState.checkpoint.errorState is ErrorState.Errored) { // If we errored while transitioning to an error state then we cannot record the additional // error as that may result in an infinite loop, e.g. error propagation fails -> record error -> propagate fails again. diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt index 7e13cfab85..33cd73e74b 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt @@ -10,7 +10,7 @@ import net.corda.core.utilities.toNonEmptySet import net.corda.core.utilities.trace import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.StateMachineManager -import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager +import net.corda.nodeapi.internal.persistence.contextDatabase import java.util.* class VaultSoftLockManager private constructor(private val vault: VaultService) { @@ -52,14 +52,14 @@ class VaultSoftLockManager private constructor(private val vault: VaultService) private fun registerSoftLocks(flowId: UUID, stateRefs: NonEmptySet) { log.trace { "Reserving soft locks for flow id $flowId and states $stateRefs" } - DatabaseTransactionManager.dataSource.transaction { + contextDatabase.transaction { vault.softLockReserve(flowId, stateRefs) } } private fun unregisterSoftLocks(flowId: UUID, logic: FlowLogic<*>) { log.trace { "Releasing soft locks for flow ${logic.javaClass.simpleName} with flow id $flowId" } - DatabaseTransactionManager.dataSource.transaction { + contextDatabase.transaction { vault.softLockRelease(flowId) } }