ENT-1766 Release soft locks inside the database transaction (#737)

This commit is contained in:
Rick Parker 2018-04-16 13:51:50 +01:00 committed by GitHub
parent 408582cd05
commit 34f8719363
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 30 additions and 81 deletions

View File

@ -67,7 +67,6 @@ import net.corda.node.services.statemachine.*
import net.corda.node.services.transactions.*
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JVMAgentRegistry
import net.corda.node.utilities.NamedThreadFactory
@ -646,7 +645,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected open fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage = DBTransactionStorage(transactionCacheSizeBytes)
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager, schemaService: SchemaService, flowLogicRefFactory: FlowLogicRefFactory) {
VaultSoftLockManager.install(services.vaultService, smm)
ScheduledActivityObserver.install(services.vaultService, schedulerService, flowLogicRefFactory)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService)
}

View File

@ -16,6 +16,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
import net.corda.node.services.messaging.DeduplicationHandler
import java.time.Instant
import java.util.*
/**
* [Action]s are reified IO actions to execute as part of state machine transitions.
@ -127,6 +128,11 @@ sealed class Action {
* Execute the specified [operation].
*/
data class ExecuteAsyncOperation(val operation: FlowAsyncOperation<*>) : Action()
/**
* Release soft locks associated with given ID (currently the flow ID).
*/
data class ReleaseSoftLocks(val uuid: UUID?) : Action()
}
/**

View File

@ -82,9 +82,14 @@ class ActionExecutorImpl(
is Action.RollbackTransaction -> executeRollbackTransaction()
is Action.CommitTransaction -> executeCommitTransaction()
is Action.ExecuteAsyncOperation -> executeAsyncOperation(fiber, action)
is Action.ReleaseSoftLocks -> executeReleaseSoftLocks(action)
}
}
private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) {
if (action.uuid != null) services.vaultService.softLockRelease(action.uuid)
}
@Suspendable
private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) {
services.validatedTransactions.trackTransaction(action.hash).thenMatch(

View File

@ -16,6 +16,7 @@ import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.messaging.DeduplicationHandler
import java.util.*
/**
* Transitions in the flow state machine are triggered by [Event]s that may originate from the flow itself or from
@ -122,8 +123,9 @@ sealed class Event {
* Scheduled by the flow.
*
* @param returnValue the return value of the flow.
* @param softLocksId the flow ID of the flow if it is holding soft locks, else null.
*/
data class FlowFinish(val returnValue: Any?) : Event()
data class FlowFinish(val returnValue: Any?, val softLocksId: UUID?) : Event()
/**
* Signals the completion of a [FlowAsyncOperation].

View File

@ -150,9 +150,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
logger.warn("Flow threw exception", throwable)
Try.Failure<R>(throwable)
}
val softLocksId = if (hasSoftLockedStates) logic.runId.uuid else null
val finalEvent = when (resultOrError) {
is Try.Success -> {
Event.FlowFinish(resultOrError.value)
Event.FlowFinish(resultOrError.value, softLocksId)
}
is Try.Failure -> {
Event.Error(resultOrError.exception)

View File

@ -72,6 +72,7 @@ class ErrorFlowTransition(
}
actions.addAll(arrayOf(
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(context.id.uuid),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.RemoveSessionBindings(currentState.checkpoint.sessions.keys)

View File

@ -188,6 +188,7 @@ class TopLevelTransition(
}
actions.addAll(arrayOf(
Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(event.softLocksId),
Action.CommitTransaction,
Action.AcknowledgeMessages(pendingDeduplicationHandlers),
Action.RemoveSessionBindings(allSourceSessionIds),

View File

@ -207,9 +207,17 @@ class NodeVaultService(
if (!netUpdate.isEmpty()) {
recordUpdate(netUpdate)
concurrentBox.concurrent {
// flowId required by SoftLockManager to perform auto-registration of soft locks for new states
// flowId was required by SoftLockManager to perform auto-registration of soft locks for new states
val uuid = (Strand.currentStrand() as? FlowStateMachineImpl<*>)?.id?.uuid
val vaultUpdate = if (uuid != null) netUpdate.copy(flowId = uuid) else netUpdate
if (uuid != null) {
val fungible = netUpdate.produced.filter { it.state.data is FungibleAsset<*> }
if (fungible.isNotEmpty()) {
val stateRefs = fungible.map { it.ref }.toNonEmptySet()
log.trace { "Reserving soft locks for flow id $uuid and states $stateRefs" }
softLockReserve(uuid, stateRefs)
}
}
updatesPublisher.onNext(vaultUpdate)
}
}

View File

@ -1,76 +0,0 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.node.services.vault
import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.node.services.VaultService
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toNonEmptySet
import net.corda.core.utilities.trace
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.internal.persistence.contextDatabase
import java.util.*
class VaultSoftLockManager private constructor(private val vault: VaultService) {
companion object {
private val log = contextLogger()
@JvmStatic
fun install(vault: VaultService, smm: StateMachineManager) {
val manager = VaultSoftLockManager(vault)
smm.changes.subscribe { change ->
if (change is StateMachineManager.Change.Removed) {
val logic = change.logic
// Don't run potentially expensive query if the flow didn't lock any states:
if ((logic.stateMachine as FlowStateMachineImpl<*>).hasSoftLockedStates) {
manager.unregisterSoftLocks(logic.runId.uuid, logic)
}
}
}
// Discussion
//
// The intent of the following approach is to support what might be a common pattern in a flow:
// 1. Create state
// 2. Do something with state
// without possibility of another flow intercepting the state between 1 and 2,
// since we cannot lock the state before it exists. e.g. Issue and then Move some Cash.
//
// The downside is we could have a long running flow that holds a lock for a long period of time.
// However, the lock can be programmatically released, like any other soft lock,
// should we want a long running flow that creates a visible state mid way through.
vault.rawUpdates.subscribe { (_, produced, flowId) ->
if (flowId != null) {
val fungible = produced.filter { it.state.data is FungibleAsset<*> }
if (fungible.isNotEmpty()) {
manager.registerSoftLocks(flowId, fungible.map { it.ref }.toNonEmptySet())
}
}
}
}
}
private fun registerSoftLocks(flowId: UUID, stateRefs: NonEmptySet<StateRef>) {
log.trace { "Reserving soft locks for flow id $flowId and states $stateRefs" }
contextDatabase.transaction {
vault.softLockReserve(flowId, stateRefs)
}
}
private fun unregisterSoftLocks(flowId: UUID, logic: FlowLogic<*>) {
log.trace { "Releasing soft locks for flow ${logic.javaClass.simpleName} with flow id $flowId" }
contextDatabase.transaction {
vault.softLockRelease(flowId)
}
}
}

View File

@ -94,9 +94,12 @@ class VaultSoftLockManagerTest {
private val mockNet = InternalMockNetwork(cordappPackages = listOf(ContractImpl::class.packageName), defaultFactory = { args ->
object : InternalMockNetwork.MockNode(args) {
override fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, hibernateConfig: HibernateConfiguration): VaultServiceInternal {
val node = this
val realVault = super.makeVaultService(keyManagementService, services, hibernateConfig)
return object : VaultServiceInternal by realVault {
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
// Should be called before flow is removed
assertEquals(1, node.smm.allStateMachines.size)
mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests.
}
}