Database.isolatedTransaction removal

This commit is contained in:
szymonsztuka 2017-08-09 09:31:48 +01:00 committed by GitHub
parent 1d965b1785
commit a3ffd92544
5 changed files with 13 additions and 27 deletions

View File

@ -772,7 +772,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with // the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
// the identity key. But the infrastructure to make that easy isn't here yet. // the identity key. But the infrastructure to make that easy isn't here yet.
override val keyManagementService by lazy { makeKeyManagementService(identityService) } override val keyManagementService by lazy { makeKeyManagementService(identityService) }
override val schedulerService by lazy { NodeSchedulerService(this, unfinishedSchedules = busyNodeLatch) } override val schedulerService by lazy { NodeSchedulerService(this, unfinishedSchedules = busyNodeLatch, serverThread = serverThread) }
override val identityService by lazy { override val identityService by lazy {
val trustStore = KeyStoreWrapper(configuration.trustStoreFile, configuration.trustStorePassword) val trustStore = KeyStoreWrapper(configuration.trustStoreFile, configuration.trustStorePassword)
val caKeyStore = KeyStoreWrapper(configuration.nodeKeystore, configuration.keyStorePassword) val caKeyStore = KeyStoreWrapper(configuration.nodeKeystore, configuration.keyStorePassword)

View File

@ -42,7 +42,8 @@ import javax.annotation.concurrent.ThreadSafe
@ThreadSafe @ThreadSafe
class NodeSchedulerService(private val services: ServiceHubInternal, class NodeSchedulerService(private val services: ServiceHubInternal,
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(), private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
private val unfinishedSchedules: ReusableLatch = ReusableLatch()) private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
private val serverThread: AffinityExecutor)
: SchedulerService, SingletonSerializeAsToken() { : SchedulerService, SingletonSerializeAsToken() {
companion object { companion object {
@ -156,13 +157,10 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
} }
private fun onTimeReached(scheduledState: ScheduledStateRef) { private fun onTimeReached(scheduledState: ScheduledStateRef) {
serverThread.fetchFrom {
services.database.transaction { services.database.transaction {
val scheduledFlow = getScheduledFlow(scheduledState) val scheduledFlow = getScheduledFlow(scheduledState)
if (scheduledFlow != null) { if (scheduledFlow != null) {
// TODO Because the flow is executed asynchronously, there is a small window between this tx we're in
// committing and the flow's first checkpoint when it starts in which we can lose the flow if the node
// goes down.
// See discussion in https://github.com/corda/corda/pull/639#discussion_r115257437
val future = services.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture val future = services.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture
future.then { future.then {
unfinishedSchedules.countDown() unfinishedSchedules.countDown()
@ -170,6 +168,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
} }
} }
} }
}
private fun getScheduledFlow(scheduledState: ScheduledStateRef): FlowLogic<*>? { private fun getScheduledFlow(scheduledState: ScheduledStateRef): FlowLogic<*>? {
val scheduledActivity = getScheduledActivity(scheduledState) val scheduledActivity = getScheduledActivity(scheduledState)

View File

@ -468,11 +468,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
fun <T> add(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> { fun <T> add(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {
// TODO: Check that logic has @Suspendable on its call method. // TODO: Check that logic has @Suspendable on its call method.
executor.checkOnThread() executor.checkOnThread()
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait val fiber = database.transaction {
// on the flow completion future inside that context. The problem is that any progress checkpoints are
// unable to acquire the table lock and move forward till the calling transaction finishes.
// Committing in line here on a fresh context ensure we can progress.
val fiber = database.isolatedTransaction {
val fiber = createFiber(logic, flowInitiator) val fiber = createFiber(logic, flowInitiator)
updateCheckpoint(fiber) updateCheckpoint(fiber)
fiber fiber

View File

@ -42,15 +42,6 @@ class CordaPersistence(var dataSource: HikariDataSource, databaseProperties: Pro
return ctx?.connection ?: throw IllegalStateException("Was expecting to find database transaction: must wrap calling code within a transaction.") return ctx?.connection ?: throw IllegalStateException("Was expecting to find database transaction: must wrap calling code within a transaction.")
} }
fun <T> isolatedTransaction(block: DatabaseTransaction.() -> T): T {
val context = DatabaseTransactionManager.setThreadLocalTx(null)
return try {
transaction(block)
} finally {
DatabaseTransactionManager.restoreThreadLocalTx(context)
}
}
fun <T> transaction(statement: DatabaseTransaction.() -> T): T { fun <T> transaction(statement: DatabaseTransaction.() -> T): T {
DatabaseTransactionManager.dataSource = this DatabaseTransactionManager.dataSource = this
return transaction(transactionIsolationLevel, 3, statement) return transaction(transactionIsolationLevel, 3, statement)

View File

@ -97,8 +97,8 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps, makeTestDatabaseProperties()) override val vaultService: VaultService = NodeVaultService(this, dataSourceProps, makeTestDatabaseProperties())
override val testReference = this@NodeSchedulerServiceTest override val testReference = this@NodeSchedulerServiceTest
} }
scheduler = NodeSchedulerService(services, schedulerGatedExecutor)
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
scheduler = NodeSchedulerService(services, schedulerGatedExecutor, serverThread = smmExecutor)
val mockSMM = StateMachineManager(services, DBCheckpointStorage(), smmExecutor, database) val mockSMM = StateMachineManager(services, DBCheckpointStorage(), smmExecutor, database)
mockSMM.changes.subscribe { change -> mockSMM.changes.subscribe { change ->
if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) { if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) {