mirror of
https://github.com/corda/corda.git
synced 2025-06-18 23:28:21 +00:00
Cherry pick 34f8719363
This commit is contained in:
committed by
Andras Slemmer
parent
19dad6da96
commit
57caf9af28
@ -58,7 +58,6 @@ import net.corda.node.services.statemachine.*
|
|||||||
import net.corda.node.services.transactions.*
|
import net.corda.node.services.transactions.*
|
||||||
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
||||||
import net.corda.node.services.vault.NodeVaultService
|
import net.corda.node.services.vault.NodeVaultService
|
||||||
import net.corda.node.services.vault.VaultSoftLockManager
|
|
||||||
import net.corda.node.utilities.AffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor
|
||||||
import net.corda.node.utilities.JVMAgentRegistry
|
import net.corda.node.utilities.JVMAgentRegistry
|
||||||
import net.corda.node.utilities.NamedThreadFactory
|
import net.corda.node.utilities.NamedThreadFactory
|
||||||
@ -641,7 +640,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
|||||||
|
|
||||||
protected open fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage = DBTransactionStorage(transactionCacheSizeBytes)
|
protected open fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage = DBTransactionStorage(transactionCacheSizeBytes)
|
||||||
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager, schemaService: SchemaService, flowLogicRefFactory: FlowLogicRefFactory) {
|
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager, schemaService: SchemaService, flowLogicRefFactory: FlowLogicRefFactory) {
|
||||||
VaultSoftLockManager.install(services.vaultService, smm)
|
|
||||||
ScheduledActivityObserver.install(services.vaultService, schedulerService, flowLogicRefFactory)
|
ScheduledActivityObserver.install(services.vaultService, schedulerService, flowLogicRefFactory)
|
||||||
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService)
|
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService)
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ import net.corda.core.identity.Party
|
|||||||
import net.corda.core.internal.FlowAsyncOperation
|
import net.corda.core.internal.FlowAsyncOperation
|
||||||
import net.corda.node.services.messaging.DeduplicationHandler
|
import net.corda.node.services.messaging.DeduplicationHandler
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* [Action]s are reified IO actions to execute as part of state machine transitions.
|
* [Action]s are reified IO actions to execute as part of state machine transitions.
|
||||||
@ -117,6 +118,11 @@ sealed class Action {
|
|||||||
* Execute the specified [operation].
|
* Execute the specified [operation].
|
||||||
*/
|
*/
|
||||||
data class ExecuteAsyncOperation(val operation: FlowAsyncOperation<*>) : Action()
|
data class ExecuteAsyncOperation(val operation: FlowAsyncOperation<*>) : Action()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release soft locks associated with given ID (currently the flow ID).
|
||||||
|
*/
|
||||||
|
data class ReleaseSoftLocks(val uuid: UUID?) : Action()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -72,9 +72,14 @@ class ActionExecutorImpl(
|
|||||||
is Action.RollbackTransaction -> executeRollbackTransaction()
|
is Action.RollbackTransaction -> executeRollbackTransaction()
|
||||||
is Action.CommitTransaction -> executeCommitTransaction()
|
is Action.CommitTransaction -> executeCommitTransaction()
|
||||||
is Action.ExecuteAsyncOperation -> executeAsyncOperation(fiber, action)
|
is Action.ExecuteAsyncOperation -> executeAsyncOperation(fiber, action)
|
||||||
|
is Action.ReleaseSoftLocks -> executeReleaseSoftLocks(action)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) {
|
||||||
|
if (action.uuid != null) services.vaultService.softLockRelease(action.uuid)
|
||||||
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) {
|
private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) {
|
||||||
services.validatedTransactions.trackTransaction(action.hash).thenMatch(
|
services.validatedTransactions.trackTransaction(action.hash).thenMatch(
|
||||||
|
@ -6,6 +6,7 @@ import net.corda.core.internal.FlowIORequest
|
|||||||
import net.corda.core.serialization.SerializedBytes
|
import net.corda.core.serialization.SerializedBytes
|
||||||
import net.corda.core.transactions.SignedTransaction
|
import net.corda.core.transactions.SignedTransaction
|
||||||
import net.corda.node.services.messaging.DeduplicationHandler
|
import net.corda.node.services.messaging.DeduplicationHandler
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transitions in the flow state machine are triggered by [Event]s that may originate from the flow itself or from
|
* Transitions in the flow state machine are triggered by [Event]s that may originate from the flow itself or from
|
||||||
@ -112,8 +113,9 @@ sealed class Event {
|
|||||||
* Scheduled by the flow.
|
* Scheduled by the flow.
|
||||||
*
|
*
|
||||||
* @param returnValue the return value of the flow.
|
* @param returnValue the return value of the flow.
|
||||||
|
* @param softLocksId the flow ID of the flow if it is holding soft locks, else null.
|
||||||
*/
|
*/
|
||||||
data class FlowFinish(val returnValue: Any?) : Event()
|
data class FlowFinish(val returnValue: Any?, val softLocksId: UUID?) : Event()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signals the completion of a [FlowAsyncOperation].
|
* Signals the completion of a [FlowAsyncOperation].
|
||||||
|
@ -182,9 +182,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
logger.warn("Flow threw exception", throwable)
|
logger.warn("Flow threw exception", throwable)
|
||||||
Try.Failure<R>(throwable)
|
Try.Failure<R>(throwable)
|
||||||
}
|
}
|
||||||
|
val softLocksId = if (hasSoftLockedStates) logic.runId.uuid else null
|
||||||
val finalEvent = when (resultOrError) {
|
val finalEvent = when (resultOrError) {
|
||||||
is Try.Success -> {
|
is Try.Success -> {
|
||||||
Event.FlowFinish(resultOrError.value)
|
Event.FlowFinish(resultOrError.value, softLocksId)
|
||||||
}
|
}
|
||||||
is Try.Failure -> {
|
is Try.Failure -> {
|
||||||
Event.Error(resultOrError.exception)
|
Event.Error(resultOrError.exception)
|
||||||
|
@ -62,6 +62,7 @@ class ErrorFlowTransition(
|
|||||||
}
|
}
|
||||||
actions.addAll(arrayOf(
|
actions.addAll(arrayOf(
|
||||||
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
|
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
|
||||||
|
Action.ReleaseSoftLocks(context.id.uuid),
|
||||||
Action.CommitTransaction,
|
Action.CommitTransaction,
|
||||||
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
|
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
|
||||||
Action.RemoveSessionBindings(currentState.checkpoint.sessions.keys)
|
Action.RemoveSessionBindings(currentState.checkpoint.sessions.keys)
|
||||||
|
@ -167,6 +167,7 @@ class TopLevelTransition(
|
|||||||
}
|
}
|
||||||
actions.addAll(arrayOf(
|
actions.addAll(arrayOf(
|
||||||
Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
|
Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
|
||||||
|
Action.ReleaseSoftLocks(event.softLocksId),
|
||||||
Action.CommitTransaction,
|
Action.CommitTransaction,
|
||||||
Action.AcknowledgeMessages(pendingDeduplicationHandlers),
|
Action.AcknowledgeMessages(pendingDeduplicationHandlers),
|
||||||
Action.RemoveSessionBindings(allSourceSessionIds),
|
Action.RemoveSessionBindings(allSourceSessionIds),
|
||||||
|
@ -197,9 +197,17 @@ class NodeVaultService(
|
|||||||
if (!netUpdate.isEmpty()) {
|
if (!netUpdate.isEmpty()) {
|
||||||
recordUpdate(netUpdate)
|
recordUpdate(netUpdate)
|
||||||
mutex.locked {
|
mutex.locked {
|
||||||
// flowId required by SoftLockManager to perform auto-registration of soft locks for new states
|
// flowId was required by SoftLockManager to perform auto-registration of soft locks for new states
|
||||||
val uuid = (Strand.currentStrand() as? FlowStateMachineImpl<*>)?.id?.uuid
|
val uuid = (Strand.currentStrand() as? FlowStateMachineImpl<*>)?.id?.uuid
|
||||||
val vaultUpdate = if (uuid != null) netUpdate.copy(flowId = uuid) else netUpdate
|
val vaultUpdate = if (uuid != null) netUpdate.copy(flowId = uuid) else netUpdate
|
||||||
|
if (uuid != null) {
|
||||||
|
val fungible = netUpdate.produced.filter { it.state.data is FungibleAsset<*> }
|
||||||
|
if (fungible.isNotEmpty()) {
|
||||||
|
val stateRefs = fungible.map { it.ref }.toNonEmptySet()
|
||||||
|
log.trace { "Reserving soft locks for flow id $uuid and states $stateRefs" }
|
||||||
|
softLockReserve(uuid, stateRefs)
|
||||||
|
}
|
||||||
|
}
|
||||||
updatesPublisher.onNext(vaultUpdate)
|
updatesPublisher.onNext(vaultUpdate)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,63 +0,0 @@
|
|||||||
package net.corda.node.services.vault
|
|
||||||
|
|
||||||
import net.corda.core.contracts.FungibleAsset
|
|
||||||
import net.corda.core.contracts.StateRef
|
|
||||||
import net.corda.core.flows.FlowLogic
|
|
||||||
import net.corda.core.node.services.VaultService
|
|
||||||
import net.corda.core.utilities.*
|
|
||||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
|
||||||
import net.corda.node.services.statemachine.StateMachineManager
|
|
||||||
import net.corda.nodeapi.internal.persistence.contextDatabase
|
|
||||||
import java.util.*
|
|
||||||
|
|
||||||
class VaultSoftLockManager private constructor(private val vault: VaultService) {
|
|
||||||
companion object {
|
|
||||||
private val log = contextLogger()
|
|
||||||
@JvmStatic
|
|
||||||
fun install(vault: VaultService, smm: StateMachineManager) {
|
|
||||||
val manager = VaultSoftLockManager(vault)
|
|
||||||
smm.changes.subscribe { change ->
|
|
||||||
if (change is StateMachineManager.Change.Removed) {
|
|
||||||
val logic = change.logic
|
|
||||||
// Don't run potentially expensive query if the flow didn't lock any states:
|
|
||||||
if ((logic.stateMachine as FlowStateMachineImpl<*>).hasSoftLockedStates) {
|
|
||||||
manager.unregisterSoftLocks(logic.runId.uuid, logic)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Discussion
|
|
||||||
//
|
|
||||||
// The intent of the following approach is to support what might be a common pattern in a flow:
|
|
||||||
// 1. Create state
|
|
||||||
// 2. Do something with state
|
|
||||||
// without possibility of another flow intercepting the state between 1 and 2,
|
|
||||||
// since we cannot lock the state before it exists. e.g. Issue and then Move some Cash.
|
|
||||||
//
|
|
||||||
// The downside is we could have a long running flow that holds a lock for a long period of time.
|
|
||||||
// However, the lock can be programmatically released, like any other soft lock,
|
|
||||||
// should we want a long running flow that creates a visible state mid way through.
|
|
||||||
vault.rawUpdates.subscribe { (_, produced, flowId) ->
|
|
||||||
if (flowId != null) {
|
|
||||||
val fungible = produced.filter { it.state.data is FungibleAsset<*> }
|
|
||||||
if (fungible.isNotEmpty()) {
|
|
||||||
manager.registerSoftLocks(flowId, fungible.map { it.ref }.toNonEmptySet())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun registerSoftLocks(flowId: UUID, stateRefs: NonEmptySet<StateRef>) {
|
|
||||||
log.trace { "Reserving soft locks for flow id $flowId and states $stateRefs" }
|
|
||||||
contextDatabase.transaction {
|
|
||||||
vault.softLockReserve(flowId, stateRefs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun unregisterSoftLocks(flowId: UUID, logic: FlowLogic<*>) {
|
|
||||||
log.trace { "Releasing soft locks for flow ${logic.javaClass.simpleName} with flow id $flowId" }
|
|
||||||
contextDatabase.transaction {
|
|
||||||
vault.softLockRelease(flowId)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -84,9 +84,12 @@ class VaultSoftLockManagerTest {
|
|||||||
private val mockNet = InternalMockNetwork(cordappPackages = listOf(ContractImpl::class.packageName), defaultFactory = { args ->
|
private val mockNet = InternalMockNetwork(cordappPackages = listOf(ContractImpl::class.packageName), defaultFactory = { args ->
|
||||||
object : InternalMockNetwork.MockNode(args) {
|
object : InternalMockNetwork.MockNode(args) {
|
||||||
override fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, hibernateConfig: HibernateConfiguration): VaultServiceInternal {
|
override fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, hibernateConfig: HibernateConfiguration): VaultServiceInternal {
|
||||||
|
val node = this
|
||||||
val realVault = super.makeVaultService(keyManagementService, services, hibernateConfig)
|
val realVault = super.makeVaultService(keyManagementService, services, hibernateConfig)
|
||||||
return object : VaultServiceInternal by realVault {
|
return object : VaultServiceInternal by realVault {
|
||||||
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
|
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
|
||||||
|
// Should be called before flow is removed
|
||||||
|
assertEquals(1, node.smm.allStateMachines.size)
|
||||||
mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests.
|
mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user